Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Changes for v1.1.0

...

This Universal Extension provides the capability to integrate with Apache Airflow and use it as part of your end-to-end Universal Controller workflow, allowing high-level visibility and orchestration of data-oriented jobs or pipelines.

Version Information

Template NameExtension NameExtension Version
Apache Airflowue-airflow1.1.0

Software Requirements

This integration requires a Universal Agent and a Python runtime to execute the Universal Task.

...

Both Windows and Linux agents are supported:.

  • Universal Agent for Windows x64 Version 7.0.0.0 and later with python options installed.

  • Universal Agent for Linux Version 7.0.0.0 and later with python options installed.

...

Universal Controller Version 7.0.0.0 and later. This Universal Task requires that Universal Controller property Web Service Default Response Content (uc.web_service.response.content.default) is set to JSON.

Supported Apache Airflow Versions

This integration is tested on Apache airflow V2v2.2.3. It should be compatible with newer versions of Airflow as long as Airflow backward compatibility is preserved.

Key Features

This extension provides support for:The main key features of this integration are the following.

  • Triggering a new DAG run.

  • Trigger a DAG run and wait until it reaches state "success" or "failed".
  • Information retrieval of a specific DAG run.

  • Information retrieval for a task that is part of a specific DAG run.

  • Basic authentication (username/password) and SSL protocol.

  • Using a proxy between Universal Controller and Apache Airflow server.

...

To use this downloadable Universal Template, you first must perform the following steps:.

  1. This Universal Task requires the Resolvable Credentials feature

    . Check

    , check that the Resolvable Credentials Permitted system property has been set to true.

  2. Download the provided ZIP file.
  3. In the Universal Controller UI, select Administration >Configuration > Universal Templates to display the current list of Universal Templates.

  4. Click Import Template.

  5. Select the template ZIP file and Import.

...

  1. For more information about Resolvable Credentials click here.

  2. To import the Universal Template into your Controller, follow the instructions here.

  3. When the files have been imported successfully, refresh the Universal Templates list; the Universal Template will appear on the list

...

  1. .

Configure Universal Task

For the new Universal Task type, create a new task and enter the task-specific details that were created in the Universal Template.

...

Field

Input type

Default value

Type

Description

Airflow Base URL

Required

-

Text

The Base URL of the Airflow server.

Airflow Credentials

Required

-

Credentials

The Apache Airflow account credentials.

They are comprised of:

  • Runtime username

  • Runtime password

    The Credentials definition should be as follows.

    • Airflow username as "Runtime username".

    • Airflow password as "Runtime password".

    Action

    Required

    Trigger DAG Run

    Choice

    The action performed upon the task execution.

    Valid values are:Supported options are the following.

    • Trigger Dag Run

    • Read DAG Run Information

    • Read Task Instance Information

    DAG Id

    Required

    -

    Dynamic Choice

    Dynamic Choice field populated by getting a list of active DAG’s from the server.

    DAG Run Id

    Optional

    -

    Text

    Id of a specific DAG Run. Required for Action "Read DAG Run Information"/"Read Task Instance Information".

    Task Id

    Optional

    -

    Text

    Dynamic Choice field populated by getting a list of Task Ids for a specific DAG ID.

    Required for Action "Read Task Instance Information".

    Use Proxy

    Required

    False

    Boolean

    Flag to allow Proxy configuration.

    Required when connection to Apache Airflow is through Proxy.

    Proxy Servers

    Optional

    -

    Text

    Proxy server and port. Valid format : is http://proxyserver:port or https://proxyserver:port.

    Required when Use Proxy is True.

    Use SSL

    Required

    False

    Boolean

    Specifies if SSL protocol should be used for the communication with the foreign API.

    SSL Hostname Check

    Optional

    True

    Boolean

    Determines if the host name of the certificate should be verified against the hostname in the URL.

    Required when Use SSL is checked.

    SSL Certificate Path

    Optional

    -

    Text

    Path and file name of the trusted certificate or CA bundle to use in certificate verification.

    The file must be in PEM format.

    Wait for success or Failure

    Introduced in version 1.1.0
    RequiredFalseBoolean 

    If selected, the task will continue running until DAG Run reaches the "success" or "failed" state.

    Required for Action "Trigger DAG Run".

    Polling Interval

    Introduced in version 1.1.0

    Required1Integer 

    The polling interval in seconds between checking for the status of the DAG Run execution state.

    Required for Action "Trigger DAG Run".

    Cancellation and Re-Run

    In the case of "Trigger DAG Run" Action, the executing task is immediately populating output only field DAG Run Id asynchronously for information purposes and to keep the state of execution in case of "Cancel" and "Re-run". In case of "Re-run", the executing task checks if the DAG Run Id output field is populated and in the case it is not, it will trigger the DAG run.

    ⚠ Canceling the task execution for Action "Trigger DAG Run" before the DAG Run Id output only field is populated, can lead to undesirable behavior on task "Re-run"

    Task Examples

    Trigger a new DAG Run

    Example of Universal Task for triggering a new DAG Run. In this specific example SSL protocol, as well as a proxy, is used.

    Image Removed

    Extension Output Sample

    Code Block
    languagetext
    linenumberstrue
    {
        "exit_code": 0,
        "status_description": "SUCCESS: Successful Task Execution",
        "changed": true,
        "invocation": {
            "extension": "ue-airflow",
            "version": "1.0.0",
            "fields": {
                "credentials_user": "****",
                "credentials_password": "****",
                "base_url": "https://foo_airflow_base_url.com:8443/api/v1",
                "use_ssl": true,
                "ssl_verify": true,
                "trusted_certificate_file": "C:\\Users\\****\\temp\\some_certificate.cer",
                "ssl_hostname_check": false,
                "private_key_certificate": null,
                "public_key_certificate": null,
                "use_proxy": true,
                "proxies": "http://foo.proxy.com:8080",
                "action": "trigger_dag_run",
                "dag_id": "example_bash_operator",
                "dag_run_id": null,
                "task_id": null
            }
        },
        "result": {
            "conf": {},
            "dag_id": "example_bash_operator",
            "dag_run_id": "manual__2022-02-09T13:09:48.136996+00:00",
            "end_date": null,
            "execution_date": "2022-02-09T13:09:48.136996+00:00",
            "external_trigger": true,
            "logical_date": "2022-02-09T13:09:48.136996+00:00",
            "start_date": null,
            "state": "queued"
        }
    }

    Read DAG Run information

    Example of Universal Task for getting DAG Run information

    Image Removed

    Extension Output Sample

    ...

    languagetext
    linenumberstrue

    ...

    Image Added

    Read DAG Run information

    Example of Universal Task for getting DAG Run information

    Image Added

    Reading Airflow Task Instance information

    Example of Universal Task for getting information on Airflow Task instance.

    Image Added

    Trigger a DAG run and wait until it reaches state "success" or "failed"

    Example of Universal Task for triggering a new DAG Run and waiting for "success" or "failure" state as result.

    Image Added

    Task Output

    Output Only Fields

    The output fields for this Universal Extension are described below.

    FieldTypeDescription
    DAG Run IdTextThe field is populated when Action is one of the following.
    • Trigger DAG Run
    • Read DAG Run Information
    DAG Run StateTextThe field represents the state of the DAG Run and it is populated when Action is one of the following.
    • Trigger DAG Run
    • Read DAG Run Information

    The available values are.

    • queued
    • running
    • success
    • failed

    Exit Codes

    The exit codes for this Universal Extension are described in the following table.

    Exit Code

    Status Classification Code

    Status Classification Description

    Status Description

    0

    SUCCESS

    Successful Execution

    SUCCESS: Successful Task Execution

    1

    FAIL

    Failed Execution

    FAIL: <Error Description>

    2

    AUTHENTICATION_ERROR

    Authentication Error

    AUTHENTICATION_ERROR: <Error Description>

    3

    CONNECTION_ERROR

    Connection Error

    CONNECTION_ERROR: <Error Description>

    20

    DATA_VALIDATION_ERROR

    Input fields Validation Error

    DATA_VALIDATION_ERROR: <Error Description>

    21

    REQUEST_FAILURE

    HTTP request error

    REQUEST_FAILED: <Error Description>

    22FAILFailed ExecutionFAIL: DAG Run was triggered, but the status of DAG Run is 'Failed'.

    Extension Output

    The Extension Output for this Universal Extension is in JSON format as described below.

    {
       "exit_code":0,
       "status_description":

    ...

    "SUCCESS: Successful Task Execution!",

    ...

    
       "changed":

    ...

    true,

    ...

    
       "invocation":

    ...

    {

    ...

    
          "extension":

    ...

    "ue-airflow",

    ...

    
          "version":

    ...

    "1.

    ...

    1.0",

    ...

    
          "fields":

    ...

    {
             

    ...

    "action":

    ...

    "trigger_dag_run",
             

    ...

    "base_url":

    ...

    "https://foo

    ...

    -airflow

    ...

    -base

    ...

    -url

    ...

    :8443/api/v1",
             "credentials_user":"****",
             "

    ...

    credentials_

    ...

    password":

    ...

    "****",
             "use_ssl

    ...

    ":

    ...

    true,

    ...

    
             "trusted_certificate_file": "/home/****/temp/airflow_cert.cer",
             

    ...

    "ssl_

    ...

    verify":

    ...

    false,
             

    ...

    "private_key_certificate":

    ...

    null,

    ...

    
             

    ...

    "public_key_certificate

    ...

    "

    ...

    :

    ...

    null,
             

    ...

    "

    ...

    dag_

    ...

    id":

    ...

    "example_

    ...

    sla_

    ...

    dag",

    ...

    
             "dag_run_id":

    ...

    null,
             "task_id":

    ...

    null

    ...

    ,
        

    ...

         "

    ...

    use_proxy":false,
            

    ...

     "proxies":null,
            

    ...

    Reading Airflow Task Instance information

    Example of Universal Task for getting information on Airflow Task instance.

    Image Removed

    Extension Output Sample

    ...

    languagetext
    linenumberstrue

    ...

     "wait_for_success_or_failure":false,
        

    ...

         "

    ...

    polling_interval":1,
         

    ...

     

    ...

       

    ...

    "dag_run_id_output":null
          }
       

    ...

    },
       

    ...

    "

    ...

    result":

    ...

    {
          

    ...

    "conf":

    ...

    {
        

    ...

     

    ...

        
          

    ...

    },
          

    ...

    "

    ...

    dag_id":

    ...

    "example_

    ...

    sla_

    ...

    dag",

    ...

    
          "dag_run_id":

    ...

    "manual__2022-

    ...

    06-

    ...

    06T10:

    ...

    48:

    ...

    37.

    ...

    891324+00:00",

    ...

    
         

    ...

     "

    ...

    end_date":

    ...

    null,

    ...

    
          "execution_date":

    ...

    "2022-

    ...

    06-

    ...

    06T10:

    ...

    48:

    ...

    37.

    ...

    891324+00:00",

    ...

    
          

    ...

    "

    ...

    external_

    ...

    trigger":

    ...

    true,
          "

    ...

    logical_date":"2022-06-06T10:48:37.891324+00:00",
          "

    ...

    start_date":

    ...

    null,

    ...

    
          

    ...

    "state

    ...

    "

    ...

    :

    ...

    "

    ...

    queued"

    ...

    Task Output

    Exit Codes

    The exit codes for this Universal Extension are described in the following table.

    ...

    Exit Code

    ...

    Status Classification Code

    ...

    Status Classification Description

    ...

    Status Description

    ...

    0

    ...

    SUCCESS

    ...

    Successful Execution

    ...

    SUCCESS: Successful Task Execution

    ...

    1

    ...

    FAIL

    ...

    Failed Execution

    ...

    FAIL: <Error Description>

    ...

    2

    ...

    AUTHENTICATION_ERROR

    ...

    Authentication Error

    ...

    AUTHENTICATION_ERROR: <Error Description>

    ...

    3

    ...

    CONNECTION_ERROR

    ...

    Connection Error

    ...

    CONNECTION_ERROR: <Error Description>

    ...

    20

    ...

    DATA_VALIDATION_ERROR

    ...

    Input fields Validation Error

    ...

    DATA_VALIDATION_ERROR: <Error Description>

    ...

    21

    ...

    REQUEST_FAILURE

    ...

    HTTP request error

    ...

    REQUEST_FAILED: <Error Description>

    Extension Output

    ...

    
       }
    

    ...

    }
    

    Attribute changed is populated as follows.

    • true, in case when a DAG Run is successfully triggered.
    • false, in case when DAG Run is not triggered or the Action is "Read DAG Run information" or "Read Task Instance information".

    If subsequent tasks in a workflow rely on the output of this Universal Extension then they should rely on Extension Output. The Extension output contains Attribute result. The result section, as displayed above, is based on the response of the related Airflow REST APIs for the respective actions and version 2.2.3. Other versions of Airflow may produce different information as part of the result attribute.

    STDOUT and STDERR

    STDOUT and STDERR provide additional information to User. The populated content can be changed in future versions of this extension without notice.

    Document References

    This document references the following documents:.

    Name

    Location

    Description

    Universal Templates

    https://docs.stonebranch.com/confluence/display/UC72x/Universal+Templates

    User documentation for creating Universal Templates in the Universal Controller user interface.

    Universal Tasks

    https://docs.stonebranch.com/confluence/display/UC72x/Universal+Tasks

    User documentation for creating Universal Tasks in the Universal Controller user interface.

    Apache Airflow Documentation

    Apache Airflow Documentation

    User documentation for Apache Airflow.

    Apache Airflow API Documentation

    Airflow REST API

    User Documentation for Airflow REST API.

    Changelog

    ue-airflow-1.1.0 (2022-06-10)

    Enhancements

    • Added: Support for Trigger a DAG run and wait for state "success" or "failed".

    ue-airflow-1.0.0 (2022-03-03)

    • Initial version.