Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Reverted from v. 24

...

Your use of this download is governed by Stonebranch’s Terms of Use, which are available at https://www.stonebranch.com/integration-hub/Terms-and-Privacy/Terms-of-Use/

Overview

Apache Airflow is an open source platform created to programmatically author, schedule, and monitor workflows.

This Universal Extension provides the capability to integrate with Apache Airflow and use it as part of your end-to-end Universal Controller workflow, allowing high-level visibility and orchestration of data-oriented jobs or pipelines.

Software Requirements

This integration requires a Universal Agent and a Python runtime to execute the Universal Task.

Software Requirements for /wiki/spaces/UC71x/pages/5178050 and /wiki/spaces/UC71x/pages/5180675

Requires Python 3.7.0 or higher. Tested with the Universal Agent bundled Python distribution.

Software Requirements for Universal Agent

Both

Version Information

Template NameExtension NameExtension Version
Apache Airflowue-airflow2.0.0


Note

Version 2.0.0, does not support Universal Agent/Controller 7.0.0.0. Detailed Software Requirements below.


Software Requirements

This integration requires a Universal Agent and a Python runtime to execute the Universal Task.

Software Requirements for Universal Template and Universal Task

Requires Python 3.7.0 or higher. Tested with the Universal Agent bundled Python distribution.

Software Requirements for Universal Agent

Both Windows and Linux agents are supported:.

  • Universal Agent for Windows x64 Version 7.01.0.0 and later with python options installed.

  • Universal Agent for Linux Version 7.01.0.0 and later with python options installed.

...

Universal Controller Version 7.01.0.0 and later. This Universal Task requires that Universal Controller property /wiki/spaces/UC71x/pages/5177877 (uc.web_service.response.content.default) is set to JSON.

Supported Apache Airflow Versions

This

Supported Apache Airflow Versions

This integration is tested on Apache airflow V2v2.2.3 .

Key Features

This extension provides support for:

...

Triggering a new DAG run.

and v2.2.5. It should be compatible with newer versions of Airflow as long as Airflow backward compatibility is preserved.

Key Features

This Universal Extension provides the following key features:

  • Actions
    • Trigger a DAG run and optionally wait until the DAG was reached "success" or "failure".
    • Information retrieval of a specific DAG

...

    • Run.
    • Information retrieval for a task that is part of a specific DAG

...

    • Run.
  • Authentication
    • Basic authentication

...

Using a proxy between Universal Controller and Apache Airflow server.

...

    • for Stand Alone Airflow Server.
    • Service Account Private Key for Google Cloud Composer.
  • Other
    • Capability to use HTTP or HTTPS proxy instead of direct communication to Stand Alone Airflow Server.

Import Universal Template

To use this downloadable the Universal Template, you first must perform the following steps:.

  1. This Universal Task requires

...

  1. the Resolvable Credentials feature. Check that the

...

  1. Resolvable Credentials Permitted system property has been set to true.

...

Download the provided ZIP file.

...

In the Universal Controller UI, select Administration >Configuration > Universal Templates to display the current list of Universal Templates.

...

Click Import Template.

...

Select the template ZIP file and Import.

...

  1. To import the Universal Template into your Controller, follow the instructions here.

  2. When the files have been imported successfully, refresh the Universal Templates list; the Universal Template will appear on the list.

...

Modifications of this integration, applied by users or customers, before or after import, might affect the supportability of this integration. For more information refer to Integration Modifications.

Configure Universal Task

For the a new Universal Task type, create a new task, and enter the task-specific details that were created in the Universal Templaterequired input fields.

Input Fields

The input fields for this Universal Extension are described in the following table.

The Apache Airflow account credentials.

They are comprised of:

  • Runtime username

  • Runtime passwordValid values are: Dag DAG Id
    FieldInput typeDefault valueTypeDescription

    Airflow Base URL

    Required

    -

    Text

    The Base URL of the Airflow server.

    Airflow Credentials

    Required

    -

    Credentials

    Connect To

    Introduced in version 2.0.0
    RequiredStandalone Airflow ServerChoiceAirflow Service Provider.
    • Standalone Airflow Server
    • Google Cloud Composer
    ActionRequiredTrigger DAG RunChoiceThe action performed upon the task execution. Supported options are the following.

    • Trigger
    • DAG Run
    • Read DAG Run Information
    • Read Task Instance Information
    Airflow Base URLRequired-

    Dynamic Choice

    Dynamic Choice field populated by getting a list of active DAG’s from the server.

    DAG Run Id

    Optional

    -

    Text

    Id of a specific DAG Run. Required for Action "Read DAG Run Information"/"Read Task Instance Information".

    Task Id

    Optional

    -

    Text

    Dynamic Choice field populated by getting a list of Task Ids for a specific DAG ID.

    Required for Action "Read Task Instance Information"

    Use Proxy

    Required

    False

    Boolean

    Flag to allow Proxy configuration.

    Required when connection to Apache Airflow is through Proxy.

    Proxy Servers

    Optional

    -

    Text

    Proxy server and port. Valid format: http://proxyserver:port or https://proxyserver:port.

    Required when Use Proxy is True.

    Use SSL

    Required

    False

    Boolean

    Specifies if SSL protocol should be used for the communication with the foreign API.

    SSL Hostname Check

    Optional

    True

    Boolean

    Determines if the host name of the certificate should be verified against the hostname in the URL.

    Required when Use SSL is checked.

    SSL Certificate Path

    Optional

    -

    Text

    Path and file name of the trusted certificate or CA bundle to use in certificate verification.

    The file must be in PEM format.

    Task Examples

    Trigger a new DAG Run

    Example of Universal Task for triggering a new DAG Run. In this specific example SSL protocol, as well as a proxy, is used.

    Image Removed

    Extension Output Sample

    ...

    languagetext
    linenumberstrue

    ...

    TextThe Base URL of the Airflow server.
    Airflow Credentials

    Optional since version 2.0.0
    --CredentialsThe Apache Airflow account credentials. The Credentials definition should be as follows.
    • Airflow username as "Runtime username".
    • Airflow password as "Runtime password".
    Required for Connect To "Standalone Airflow Server".
    Credentials Type

    Introduced in version 2.0.0
    -Service Account Private KeyChoiceThe authentication method for Google Cloud Composer.
    • Service Account Private Key.
    Required for Connect To "Google Cloud Composer".
    Service Account Key

    Introduced in version 2.0.0
    Optional-CredentialsThe Credentials definition should be as follows.
    • Google Cloud Composer Service Account Key as "Token" in JSON format.
    Required for Credential Type "Service Account Private Key".
    DAG IDRequired-Dynamic ChoiceDynamic Choice field populated by getting a list of active DAG’s from the server.
    Configuration Parameters (JSON)

    Introduced in version 2.0.0
    Optional-Large Text FieldConfiguration parameters, mapped to "conf" payload attribute of Airflow /dags/{dag_id}/dagRuns API. It should be in JSON format.

    Optional for Action "Trigger DAG Run".
    DAG Run IDOptional-TextID of a specific DAG Run. Required for Action "Read DAG Run Information" / "Read Task Instance Information".
    Task IDOptional-TextDynamic Choice field populated by getting a list of Task IDs for a specific DAG ID. Required for Action "Read Task Instance Information."
    SSL OptionsRequiredFalseBooleanSpecifies if SSL protocol should be used for the communication with the foreign API. Optional for Connect To= "Standalone Airflow Server".
    CA Bundle PathOptional-TextPath and file name of the trusted certificate or CA bundle to use in certificate verification. The file must be in PEM format.Required for Connect To= "Standalone Airflow Server" and SSL Options is checked.
    SSL Certificate VerificationOptionalTrueBooleanDetermines if the host name of the certificate should be verified against the hostname in the URL.
    Required for Connect To="Standalone Airflow Server" and SSL Options is checked.
    Use ProxyRequiredFalseBooleanFlag to allow Proxy configuration to allow connection to Apache Airflow through Proxy. Optional for Connect To= "Standalone Airflow Server".
    Proxy ServersOptional-TextProxy server and port. Valid format. http://proxyserver:port or https://proxyserver:port.
    Required for Connect To="Standalone Airflow Server" and Use Proxy is checked.
    Wait for success or Failure

    Introduced in version 1.1.0
    RequiredFalseBooleanIf selected, the task will continue running until DAG Run reaches the "success" or "failed" state.
    Required for Action "Trigger DAG Run".
    Polling Interval

    Introduced in version 1.1.0
    Required1IntegerThe polling interval in seconds between checking for the status of the DAG Run execution state.
    Required for Action "Trigger DAG Run".

    Cancellation and Re-Run

    In the case of "Trigger DAG Run" Action, the executing task is immediatelly populating output only field DAG Run Id asynchronously for information purposes and to keep the state of execution in case of "Cancel" and "Re-run". In case of "Re-run", the executing task checks if the DAG Run ID output field is populated and in the case it is not, it will trigger the DAG run.

    Note

    Canceling the task execution for Action "Trigger DAG Run" before the DAG Run ID output only field is populated, can lead to undesirable behavior on task "Re-run".

    Task Examples

    Trigger DAG Run (GCC)

    Example of Universal Task for triggering a new DAG Run with Google Cloud Composer .

    Image Added

    Trigger DAG Run (Standalone)

    Example of Universal Task for triggering a new DAG Run on Standalone Airflow Server and waiting for "success" or "failure" state as result. Configurations Parameters are also used.

    Image Added

    Read Airflow Task Instance Information

    Example of Universal Task for getting information on Standalone Airflow Task instance. HTTPS Proxy , CA Bundle , SSL Verification are used.

    Image Added

    Task Output

    Output Only Fields

    The output fields for this Universal Extension are described below.

    FieldTypeDescription
    DAG Run IdTextThe field is populated when Action is one of the following.
    • Trigger DAG Run
    • Read DAG Run Information
    DAG Run StateTextThe field represents the state of the DAG Run and it is populated when Action is one of the following.
    • Trigger DAG Run
    • Read DAG Run Information

    The available values are

    • queued
    • running
    • success
    • failed

    Exit Codes

    The exit codes for this Universal Extension are described in the following table.

    Exit CodeStatus Classification CodeStatus Classification DescriptionStatus Description
    0SUCCESSSuccessful ExecutionSUCCESS: Successful Task Execution
    1FAILFailed ExecutionFAIL: <Error Description>
    2AUTHENTICATION_ERRORAuthentication ErrorAUTHENTICATION_ERROR: <Error Description>
    3CONNECTION_ERRORConnection ErrorCONNECTION_ERROR: <Error Description>
    20DATA_VALIDATION_ERRORInput fields Validation ErrorDATA_VALIDATION_ERROR: <Error Description>
    21REQUEST_FAILUREHTTP request errorREQUEST_FAILED: <Error Description>
    22FAILFailed ExecutionFAIL: DAG Run was triggered, but the status of DAG Run is 'Failed'.

    Extension Output

    In the context of a workflow, subsequent tasks can rely on the information provided by this integration as Extension Output.

    The Extension output contains Attribute result. The result Attribute, as displayed above, is based on the response of the related Airflow REST APIs for the respective actions and version 2.2.3 or 2.2.5. Other versions of Airflow may produce different information as part of the result attribute.

    Attribute changed is populated as follows.

    • true, in case when a DAG Run is successfully triggered.
    • false, in case when DAG Run is not triggered or the Action is "Read DAG Run Information" or "Read Task Instance Information".

    result section includes attributes as described in Airflow API Official documentation.

    The Extension Output for this Universal Extension is in JSON format as described below.

    For Action

    • Trigger DAG Run
    • Read DAG Run Information

    The Extension Output below refers to Trigger DAG Run (Standalone) task example.

    Panel
    {
        "exit_code": 22,
        "status_description": "FAIL: DAG Run was triggered, but the status of DAG Run is 'Failed'.",
        "changed": true,
        "invocation": {
            "extension": "ue-airflow",
            "version": "2.0.0",
            "fields": {
                "connect_to": "standalone_airflow_server",
                "credentials_type_google": null,
                "service_account_key": null,
                "base_url": "http://airlfow_url:8080/api/v1",
                "credentials_user": "****",
                "credentials_password": "****",
                "use_ssl": false,
                "ssl_verify": false,
                "trusted_certificate_file": null,
                "ssl_hostname_check": false,
                "private_key_certificate": null,
                "public_key_certificate": null,
                "use_proxy": false,
                "proxies": null,
                "action": "trigger_dag_run",
                "
    task
    dag_id": 
    null
    "example_bash_operator",
            
    }
        
    },
    "dag_run_id": "manual__2023-02-09T20:20:39.820239+00:00",
      
    "result":
     
    {
             "
    conf
    task_id": 
    {}
    null,
            
    "dag_id
        "wait_for_success_or_failure": 
    "example_bash_operator",
    true,
                "
    dag
    polling_
    run_id
    interval": 
    "manual__2022-02-09T13:09:48.136996+00:00",
    1,
                "
    end
    configuration_
    date
    parameters": 
    null,
    {
                    "
    execution_date
    conf":
    "2022-02-09T13:09:48.136996+00:00",
     {
              
    "external_trigger":
     
    true,
             "
    logical_date
    test": "
    2022-02-09T13:09:48.136996+00:00",
    parameter"
                
    "start_date":
     
    null,
       }
         
    "state": "queued"
           }
    }

    Read DAG Run information

    Example of Universal Task for getting DAG Run information

    Image Removed

    Extension Output Sample

    Code Block
    languagetext
    linenumberstrue
    {
    ,
                "
    exit_code
    dag_run_id_output": 
    0,
    null
        
    "status_description":
     
    "SUCCESS:
     
    Successful
     
    Task
     
    Execution",
    }
        
    "changed": false
    },
        "
    invocation
    result": {
            "
    extension
    conf": 
    "ue-airflow",
    {
                "
    version
    test": "
    1.0.0",
    parameter"
      
    "fields":
     
    {
         },
            "
    credentials
    dag_
    user
    id": "
    ****
    example_bash_operator",
    
            "
    credentials
    dag_run_
    password
    id": "
    ****
    manual__2023-02-09T20:20:39.820239+00:00",
            
    "base_url": "https://foo_airflow_base_url.com:8443/api/v1",
    "end_date": "2023-02-09T20:20:54.717256+00:00",
            "
    use
    execution_
    ssl
    date": 
    true,
    "2023-02-09T20:20:39.820239+00:00",
            "
    ssl
    external_
    verify
    trigger":
    false,
     true,
            
    "
    trusted
    logical_
    certificate_file
    date": "
    /home/****/temp/airflow_cert.cer
    2023-02-09T20:20:39.820239+00:00",
            
    "ssl_hostname_check": false
    "start_date": "2023-02-09T20:20:40.022415+00:00",
            
    "private_key_certificate
    "state": 
    null,
    "failed"
        }
    
    }
    For Action
    • Read Task Instance Information

    The Extension Output below refers to Read Airflow Task Instance information task example.

    Panel
    {
        "
    public
    exit_
    key_certificate
    code": 
    null
    0,
        "status_description": "SUCCESS: Successful Task 
    Execution!",
        "
    use_proxy
    changed": false,
        "invocation": {
            "
    proxies
    extension": 
    null,
    "ue-airflow",
            "
    action
    version": "
    get_dag_run
    2.0.0",
    
            "
    dag_id
    fields": 
    "example_bash_operator",
    {
                "
    dag
    connect_
    run_id
    to": "
    manual
    standalone_
    _2022-02-08T09:17:54.600827+00:00
    airflow_server",
                "
    task
    credentials_type_
    id
    google": null,
            
    }
        
    }
    "service_account_key": null,
        
    "result": {
            "base_url": "http://airflow_url:8080/api/v1",
      
    "conf":
     
    {},
             "
    dag
    credentials_
    id
    user": "
    example_bash_operator
    ****",
                "
    dag
    credentials_
    run_id
    password": "
    manual__2022-02-08T09:17:54.600827+00:00",
    ****",
                "
    end
    use_
    date
    ssl": 
    null
    true,
                "
    execution
    ssl_
    date
    verify": 
    "2022-02-08T09:17:54.600827+00:00",
    true,
                "
    external
    trusted_certificate_
    trigger
    file": 
    true,
    "****",
                "
    logical
    ssl_hostname_
    date
    check": 
    "2022-02-08T09:17:54.600827+00:00"
    false,
            
    "start_date
        "private_key_certificate": null,
                "
    state
    public_key_certificate":
    "queued"
     null,
          
    }
     
    }

    Reading Airflow Task Instance information

    Example of Universal Task for getting information on Airflow Task instance.

    Image Removed

    Extension Output Sample

    Code Block
    languagetext
    linenumberstrue
    {
         "
    exit
    use_
    code
    proxy": 
    0
    true,
        
    "status_description":
     
    "SUCCESS:
     
    Successful
     
    Task Execution",
         "
    changed
    proxies": 
    false,
    "https://ue-proxy-dev-noauth-ssl.stonebranch.org:3128",
           
    "invocation":
     
    {
        "action": "get_task_instance",
       
    "extension":
     
    "ue-airflow",
            "
    version
    dag_id": "
    1.0.0
    example_bash_operator",
                "
    fields
    dag_run_id": 
    {
    "manual__2023-02-09T15:42:00.467617+00:00",
                "
    credentials
    task_
    user
    id": "
    ****
    run_this_last",
                "
    credentials_password
    wait_for_success_or_failure": 
    "****"
    false,
                "
    base
    polling_
    url
    interval": 
    "https://foo_airflow_base_url.com:8443/api/v1"
    1,
                "
    use
    configuration_
    ssl
    parameters": {
       
    true,
                 "
    ssl_verify
    conf": 
    true,
    {}
                
    "trusted_certificate_file": "/home/****/temp/some_certificate.cer"
    },
                "
    ssl
    dag_run_
    hostname
    id_
    check
    output": 
    false,
    null
            }
       
    "private_key_certificate": null
     },
        "result": {
            "dag_id": "
    public
    example_
    key
    bash_
    certificate
    operator"
    : null
    ,
    
            
    "
    use_proxy
    duration": 
    false,
    0.0,
            "
    proxies
    end_date": 
    null,
    "2023-01-10T09:26:55.661975+00:00",
            "
    action
    execution_date": "
    get_task_instance",
    2023-01-09T14:16:48.856712+00:00",
            
    "
    dag
    executor_
    id
    config": "
    example_bash_operator
    {}",
    
            "
    dag_run_id
    hostname": "
    manual__2022-02-08T12:45:08.835386+00:00
    ",
            
    "
    task
    max_
    id
    tries": 
    "run_this_last"
    0,
            
    }
    "operator": "DummyOperator",
       
    },
         "
    result
    pid": 
    {
    null,
            "
    dag_id
    pool": "
    example
    default_
    bash_operator
    pool",
            "
    duration
    pool_slots": 
    null
    1,
            "
    end
    priority_
    date
    weight": 
    null
    1,
            "
    execution_date
    queue": "
    2022-02-08T12:45:08.835386+00:00"
    default",
            "queued_when": null,
            "
    executor
    sla_
    config
    miss": 
    "{}"
    null,
            "
    hostname
    start_date": "2023-01-10T09:26:55.661975+00:00",
            "
    max_tries
    state": 
    0
    "upstream_failed",
            "
    operator
    task_id": "
    DummyOperator
    run_this_last",
            "
    pid
    try_number": 
    null
    0,
            "
    pool
    unixname": "
    default_pool",
    ****"
        }
    
    "pool_slots": 1, "priority_weight": 1, "queue": "default", "queued_when": null, "sla_miss": null, "start_date": null, "state": null, "task_id": "run_this_last", "try_number": 0, "unixname": "****" } }

    Task Output

    Exit Codes

    The exit codes for this Universal Extension are described in the following table.

    ...

    Exit Code

    ...

    Status Classification Code

    ...

    Status Classification Description

    ...

    Status Description

    ...

    0

    ...

    SUCCESS

    ...

    Successful Execution

    ...

    SUCCESS: Successful Task Execution

    ...

    1

    ...

    FAIL

    ...

    Failed Execution

    ...

    FAIL: <Error Description>

    ...

    2

    ...

    AUTHENTICATION_ERROR

    ...

    Authentication Error

    ...

    AUTHENTICATION_ERROR: <Error Description>

    ...

    3

    ...

    CONNECTION_ERROR

    ...

    Connection Error

    ...

    CONNECTION_ERROR: <Error Description>

    ...

    20

    ...

    DATA_VALIDATION_ERROR

    ...

    Input fields Validation Error

    ...

    DATA_VALIDATION_ERROR: <Error Description>

    ...

    21

    ...

    REQUEST_FAILURE

    ...

    HTTP request error

    ...

    REQUEST_FAILED: <Error Description>

    Extension Output

    The Extension Output samples are available on the task examples above.

    Document References

    This document references the following documents:

    ...

    Name

    ...

    Location

    ...

    Description

    ...

    Universal Templates

    ...

    https://docs.stonebranch.com/confluence/display/UC71/Universal+Templates

    ...

    User documentation for creating Universal Templates in the Universal Controller user interface.

    ...

    Universal Tasks

    ...

    https://docs.stonebranch.com/confluence/display/UC71/Universal+Tasks

    ...

    User documentation for creating Universal Tasks in the Universal Controller user interface.

    ...

    Apache Airflow Documentation

    ...

    Apache Airflow Documentation

    ...

    User documentation for Apache Airflow.

    ...

    Apache Airflow API Documentation

    ...

    Airflow REST API

    ...

    }
    

    STDOUT and STDERR

    STDOUT and STDERR provide additional information to User. The populated content can be changed in future versions of this extension without notice. Backward compatibility is not guaranteed.

    Anchor
    Integration Modification
    Integration Modification

    Integration Modifications

    Modifications applied by users or customers, before or after import, might affect the supportability of this integration. The following modifications are discouraged to retain the support level as applied for this integration.

    • Python code modifications should not be done.
    • Template Modifications
      • General Section
        • "Name", "Extension", "Variable Prefix", "Icon" should not be changed.
      • Universal Template Details Section
        • "Template Type", "Agent Type", "Send Extension Variables", "Always Cancel on Force Finish" should not be changed.
      • Result Processing Defaults Section
        • Success and Failure Exit codes should not be changed.
        • Success and Failure Output processing should not be changed.
      • Fields Restriction Section
        The setup of the template does not impose any restrictions, However with respect to "Exit Code Processing Fields" section.
        1. Success/Failure exit codes need to be respected.
        2. In principle, as STDERR and STDOUT outputs can change in follow-up releases of this integration, they should not be considered as a reliable source for determining success or failure of a task.

    Users and customers are encouraged to report defects, or feature requests at Stonebranch Support Desk.

    Document References

    This document references the following documents.

    Document LinkDescription
    Universal TemplatesUser documentation for creating, working with and understanding Universal Templates and Integrations.
    Universal TasksUser documentation for creating Universal Tasks in the Universal Controller user interface.
    CredentialsUser documentation for creating and working with credentials.
    Resolvable Credentials Permitted PropertyUser documentation for Resolvable Credentials Permitted Property.
    Apache Airflow DocumentationUser documentation for Apache Airflow.
    Apache Airflow API DocumentationUser Documentation for Airflow REST API.
    Google Cloud Composer AirflowUser Documentation for Google Cloud Composer.

    Changelog

    ue-airflow-2.0.0 (2023-02-24)

    Enhancements

    • Added: Support for Google Cloud Composer Airflow
    • Added: Support for passing JSON configuration parameters on Action "Trigger Dag Run"

    Deprecations and Breaking Changes

    • Breaking Change: Stop supporting Universal Agent/Controller 7.0.0.0. Support Universal Agent/Controller 7.1.0 or higher.

    ue-airflow-1.1.0 (2022-06-10)

    Enhancements

    • Added: Support for Trigger a DAG run and wait for state "success" or "failed".

    ue-airflow-1.0.0 (2022-03-03)

    • Initial version.