Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Reverted from v. 24

...

Your use of this download is governed by Stonebranch’s Terms of Use, which are available at https://www.stonebranch.com/integration-hub/Terms-and-Privacy/Terms-of-Use/

Overview

Apache Kafka Airflow is an open - source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. This Universal Extension is responsible for monitoring events (messages) from topics in Kafka.

Software Requirements

This integration requires a Universal Agent and a Python runtime to execute the Universal Task.

Software Requirements for /wiki/spaces/UC71x/pages/5178050 and /wiki/spaces/UC71x/pages/5180675

Requires Python 3.7.0 or higher. Tested with the Universal Agent bundled Python distribution.

...

platform created to programmatically author, schedule, and monitor workflows.

This Universal Extension provides the capability to integrate with Apache Airflow and use it as part of your end-to-end Universal Controller workflow, allowing high-level visibility and orchestration of data-oriented jobs or pipelines.

Version Information

Template NameExtension NameExtension Version
Apache Airflowue-airflow2.0.0


Note

Version 2.0.0, does not support Universal Agent/Controller 7.0.0.0. Detailed Software Requirements below.


Software Requirements

This integration requires a Universal Agent and a Python runtime to execute the Universal Task.

Software Requirements for Universal Template and Universal Task

Requires Python 3.7.0 or higher. Tested with the Universal Agent bundled Python distribution.

Software Requirements for Universal Agent

Both Windows and Linux agents are supported:.

  • Universal Agent for Windows x64 Version 7.1.0.0 and later with python options installed.

  • Universal Agent for Linux Version 7.1.0.0 and later with python options installed.

Software Requirements for Universal Controller

Universal Controller Version 7.01.0.0 and later.

Software Requirements in Cases of SSL Verification

  • If Python 3.7 Distribution for Universal Agent is installed, the path location of the Certificate Authority file must be provided.

  • If Python is installed on the local system but not with the Universal Agent Distribution:

    • If the SSL certificate is signed by a publicly trusted certificate authority (CA), it is not required to provide the certificate.

    • If the SSL certificate is self-signed, either the path location of the certificate file must be provided, or the certificate must be stored at the machine's default location for installed certificates (in this case it will be trusted for all applications).

Supported Apache Kafka versions

This Universal Extension supports the Apache Kafka versions 2.0.0 and above. Because the Kafka server protocol is backwards compatible, the extension is expected to work with versions 0.8.0 and above (except 0.8.2-beta release); however, this has not been tested.

Key Features

This Universal Extension provides the following main features:

  • Support to consume messages by consumer group subscription, from specific topic, until a specific condition is met. Filtering is based on the value of the message. When a matching Kafka message is detected, the Universal Task is finished by publishing information related to the matched message on extension output. Number, String, and JSON filter patterns are supported.

  • Support for authenticating to Kafka through PLAINTEXT or SASL_SSL SCRAM security protocol.

Typically, this extension can be used to monitor events from Kafka and, upon successful execution, to trigger workflows or other tasks, or just to pass information related to the Kafka event within UAC.

Import the Universal Template

To use this downloadable Universal Template, you first must perform the following steps:

  1. This Universal Task requires the /wiki/spaces/UC71x/pages/5178443 feature. Check that the/wiki/spaces/UC71x/pages/5177877 system property has been set to true.

  2. Download the provided ZIP file.

  3. In the Universal Controller UI, select Administration >Configuration > Universal Templates to display the current list of Universal Templates.

  4. Click Import Template.

  5. Select the template ZIP file and Import.

When the template has been imported successfully, the Universal Template will appear on the list. Refresh your Navigation Tree to see these tasks in the Automation Center Menu.

Configure Universal Task

For the new Universal Task type, create a new task, and enter the task-specific details that were created in the Universal Template.

Input Fields

The input fields for this Universal Extension are described below.

...

Field

...

Input type

...

Default value

...

Type

...

Description

...

Action

...

Required

...

Monitor for events

...

Choice

...

The action performed upon the task execution.

...

Security Protocol

...

Required

...

PLAINTEXT

...

Choice

...

The Security protocol used to communicate with Kafka brokers.

Valid values are:

  • PLAINTEXT

  • SASL_SSL

...

Bootstrap Servers

...

Required

...

-

...

Text

...

‘host[:port]’ string (or list of ‘host[:port]’ strings, separated by comma) that the consumer should contact to bootstrap initial cluster metadata.

This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request.

...

Consumer Type

...

Required

...

Consumer Group

...

Choice

...

Type of Consumer to get messages from Kafka. Available options:

  • Consumer Group : consumer is part of a consumer group.

...

Consumer Group

...

Required

...

-

...

Text

...

The unique name of the consumer group to join for dynamic partition assignment and to use for fetching and committing offsets.

...

Topic

...

Required

...

-

...

Dynamic Choice

...

Dynamic fetched list of topics to subscribe the consumer to.The user can select the required topic from a drop-down list.

...

SASL Mechanism

...

Optional

...

SCRAM–SHA–256

...

Choice

...

The Authentication mechanism when Security Protocol is configured for SASL_SSL. Valid values are:

  • SCRAM–SHA–256: credentials are hashed with SHA–256 algorithm (requires a 32bit processor).

  • SCRAM–SHA–512: credentials are hashed with SHA–512 algorithm (requires a 64bit processor).

Required when Security Protocol is "SASL_SSL".

...

SASL User Credentials

...

Optional

...

-

...

Credentials

...

The credentials for SASL SCRAM authentication.

They are comprised of:

  • username

  • password

Required when Security Protocol is configured for "SASL_SSL".

...

SSL Hostname Check

...

Optional

...

True

...

Boolean

...

Flag to configure whether SSL handshake should verify that the certificate matches the brokers hostname.

Required when Security Protocol is configured for "SASL_SSL".

...

SSL Certificate Path

...

Optional

...

-

...

Text

...

Path and file name of the Certificate Authority (CA) file to use in certificate verification.

Used when it is required to locate the CA file if Security Protocol is configured for "SASL_SSL".

...

Client Id

...

Optional

...

-

...

Text

...

This string is passed in each request to servers and can be used to identify specific server-side log entries that correspond to this client.

The constructed client id seen on the server side is Client Id + "task instance id". If Client Id is not populated, the default value used is "ue-kafka-monitor-#".

Example: "ue-kafka-monitor-#1635348654595881042JAYE7BKNPYVG3"

...

Start from

...

Required

...

Consumer Group Offset

...

Choice

...

Controls from which point the consuming will start.

Available option is:

  • Consumer Group Offset: start consuming from the committed offset of group.

...

Key Deserializer

...

Required

...

String

...

Choice

...

Type of key deserialization.

Available options are:

  • Integer

  • String

...

Value Deserializer

...

Required

...

String

...

Choice

...

Type of value deserialization.

Available options are:

  • Integer

  • Float

  • String

  • JSON

...

Value Filter

...

Optional

...

None

...

Choice

...

Value operators to specify the criteria used to match records and stop consuming messages.

If Value Deserializer is configured for "Integer" or "Float", the available options are:

  • None (all messages are matched)

  • >= (greater than or equals to)

  • <= (less than or equals to)

  • = (equals to)

  • != (not equals to)

If Value Deserializer is configured for "String", the available options are:

  • None (all messages are matched)

  • Contains

  • Does Not Contain

  • Equals

  • Is Blank

  • Is Not Blank

If Value Deserializer is configured for "JSON", the available options are all that apply to the "Integer", "Float" and "String" Value Deserializer.

...

Value

...

Optional

...

-

...

Text

...

The Value on which the Value Filter applies to.

...

Value JSON Path

...

Optional

...

-

...

Text

...

The JSON path to locate the Value, in case Value Deserializer is configured to "JSON".

The JSON Path needs to resolve either to a number, or to a string. If JSON path results in a list of numbers, or a list of strings , a Kafka message is matched if at least one element from the list matches the Value.

JSON Path syntax is based on jsonpath-ng python library. For examples, please refer to the official web site.

...

Show Advanced Settings

...

Required

...

False

...

Boolean

...

By checking this field, more fields are available for advanced configuration.

The advanced fields are: Partition Assignment Strategy, Session Timeout (ms), Auto Offset Reset, Request Timeout (ms), Heartbeat Interval (ms), Max Partition Fetch Bytes.

...

Partition Assignment Strategy

...

Optional

...

Range

...

Choice

...

Partition Assignment policies to distribute partition ownership amongst consumer instances when group management is used.

Available options are:

  • Range

  • Round Robin

...

Session Timeout (ms)

...

Optional

...

10000

...

Integer

...

Controls the time it takes to detect a consumer crash and stop sending heartbeats.

If more than Session Timeout milliseconds passes without the consumer sending a heartbeat to the group coordinator, it is considered dead and the group coordinator will trigger a rebalance of the consumer group to allocate partitions from the dead consumer to the other consumers in the group.

...

Auto Offset Reset

...

Optional

...

Latest

...

Choice

...

Controls the behavior of the consumer when it starts reading a partition for which it does not have a committed offset or the committed offset is invalid (usually because the consumer was down for so long that the record with that offset was already aged out of the broker).

Available options are:

  • Earliest which will move to the oldest available message.

  • Latest which will move to the most recent.

...

Request Timeout (ms)

...

Optional

...

305000

...

Integer

...

Controls the maximum amount of time the client will wait for the response of a request.

...

Heartbeat Interval (ms)

...

Optional

...

3000

...

Integer

...

The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka’s group management facilities.

The value must be set lower than Session Timeout (ms).

...

Max Partition Fetch Bytes

...

Optional

...

1048576

...

Integer

...

Controls the maximum number of bytes the server will return per partition.

This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition.

Default is 1MB.

Task Examples

PLAINTEXT Security Protocol Task configuration

Example of Universal Task for "PLAINTEXT" Security Protocol:

Image Removed

SASL_SSL Security Protocol Task Configuration

Example of Universal Task for "SASL_SSL" Security Protocol:

Image Removed

Matching a Kafka Event with String Value Deserializer

In this example, the Task monitors for events with a value that contains the string "test":

In the extension output result, the matched event's details are printed (only the result JSON element is shown below):

Image Removed

...

languagetext
linenumberstrue

...

Supported Apache Airflow Versions

This integration is tested on Apache airflow v2.2.3 and v2.2.5. It should be compatible with newer versions of Airflow as long as Airflow backward compatibility is preserved.

Key Features

This Universal Extension provides the following key features:

  • Actions
    • Trigger a DAG run and optionally wait until the DAG was reached "success" or "failure".
    • Information retrieval of a specific DAG Run.
    • Information retrieval for a task that is part of a specific DAG Run.
  • Authentication
    • Basic authentication for Stand Alone Airflow Server.
    • Service Account Private Key for Google Cloud Composer.
  • Other
    • Capability to use HTTP or HTTPS proxy instead of direct communication to Stand Alone Airflow Server.

Import Universal Template

To use the Universal Template, you first must perform the following steps.

  1. This Universal Task requires the Resolvable Credentials feature. Check that the Resolvable Credentials Permitted system property has been set to true.

  2. To import the Universal Template into your Controller, follow the instructions here.

  3. When the files have been imported successfully, refresh the Universal Templates list; the Universal Template will appear on the list.

Modifications of this integration, applied by users or customers, before or after import, might affect the supportability of this integration. For more information refer to Integration Modifications.

Configure Universal Task

For a new Universal Task, create a new task, and enter the required input fields.

Input Fields

The input fields for this Universal Extension are described in the following table.

FieldInput typeDefault valueTypeDescription
Connect To

Introduced in version 2.0.0
RequiredStandalone Airflow ServerChoiceAirflow Service Provider.
  • Standalone Airflow Server
  • Google Cloud Composer
ActionRequiredTrigger DAG RunChoiceThe action performed upon the task execution. Supported options are the following.

  • Trigger DAG Run
  • Read DAG Run Information
  • Read Task Instance Information
Airflow Base URLRequired-TextThe Base URL of the Airflow server.
Airflow Credentials

Optional since version 2.0.0
--CredentialsThe Apache Airflow account credentials. The Credentials definition should be as follows.
  • Airflow username as "Runtime username".
  • Airflow password as "Runtime password".
Required for Connect To "Standalone Airflow Server".
Credentials Type

Introduced in version 2.0.0
-Service Account Private KeyChoiceThe authentication method for Google Cloud Composer.
  • Service Account Private Key.
Required for Connect To "Google Cloud Composer".
Service Account Key

Introduced in version 2.0.0
Optional-CredentialsThe Credentials definition should be as follows.
  • Google Cloud Composer Service Account Key as "Token" in JSON format.
Required for Credential Type "Service Account Private Key".
DAG IDRequired-Dynamic ChoiceDynamic Choice field populated by getting a list of active DAG’s from the server.
Configuration Parameters (JSON)

Introduced in version 2.0.0
Optional-Large Text FieldConfiguration parameters, mapped to "conf" payload attribute of Airflow /dags/{dag_id}/dagRuns API. It should be in JSON format.

Optional for Action "Trigger DAG Run".
DAG Run IDOptional-TextID of a specific DAG Run. Required for Action "Read DAG Run Information" / "Read Task Instance Information".
Task IDOptional-TextDynamic Choice field populated by getting a list of Task IDs for a specific DAG ID. Required for Action "Read Task Instance Information."
SSL OptionsRequiredFalseBooleanSpecifies if SSL protocol should be used for the communication with the foreign API. Optional for Connect To= "Standalone Airflow Server".
CA Bundle PathOptional-TextPath and file name of the trusted certificate or CA bundle to use in certificate verification. The file must be in PEM format.Required for Connect To= "Standalone Airflow Server" and SSL Options is checked.
SSL Certificate VerificationOptionalTrueBooleanDetermines if the host name of the certificate should be verified against the hostname in the URL.
Required for Connect To="Standalone Airflow Server" and SSL Options is checked.
Use ProxyRequiredFalseBooleanFlag to allow Proxy configuration to allow connection to Apache Airflow through Proxy. Optional for Connect To= "Standalone Airflow Server".
Proxy ServersOptional-TextProxy server and port. Valid format. http://proxyserver:port or https://proxyserver:port.
Required for Connect To="Standalone Airflow Server" and Use Proxy is checked.
Wait for success or Failure

Introduced in version 1.1.0
RequiredFalseBooleanIf selected, the task will continue running until DAG Run reaches the "success" or "failed" state.
Required for Action "Trigger DAG Run".
Polling Interval

Introduced in version 1.1.0
Required1IntegerThe polling interval in seconds between checking for the status of the DAG Run execution state.
Required for Action "Trigger DAG Run".

Cancellation and Re-Run

In the case of "Trigger DAG Run" Action, the executing task is immediatelly populating output only field DAG Run Id asynchronously for information purposes and to keep the state of execution in case of "Cancel" and "Re-run". In case of "Re-run", the executing task checks if the DAG Run ID output field is populated and in the case it is not, it will trigger the DAG run.

Note

Canceling the task execution for Action "Trigger DAG Run" before the DAG Run ID output only field is populated, can lead to undesirable behavior on task "Re-run".

Task Examples

Trigger DAG Run (GCC)

Example of Universal Task for triggering a new DAG Run with Google Cloud Composer .

Image Added

Trigger DAG Run (Standalone)

Example of Universal Task for triggering a new DAG Run on Standalone Airflow Server and waiting for "success" or "failure" state as result. Configurations Parameters are also used.

Image Added

Read Airflow Task Instance Information

Example of Universal Task for getting information on Standalone Airflow Task instance. HTTPS Proxy , CA Bundle , SSL Verification are used.

Image Added

Task Output

Output Only Fields

The output fields for this Universal Extension are described below.

FieldTypeDescription
DAG Run IdTextThe field is populated when Action is one of the following.
  • Trigger DAG Run
  • Read DAG Run Information
DAG Run StateTextThe field represents the state of the DAG Run and it is populated when Action is one of the following.
  • Trigger DAG Run
  • Read DAG Run Information

The available values are

  • queued
  • running
  • success
  • failed

Exit Codes

The exit codes for this Universal Extension are described in the following table.

Exit CodeStatus Classification CodeStatus Classification DescriptionStatus Description
0SUCCESSSuccessful ExecutionSUCCESS: Successful Task Execution
1FAILFailed ExecutionFAIL: <Error Description>
2AUTHENTICATION_ERRORAuthentication ErrorAUTHENTICATION_ERROR: <Error Description>
3CONNECTION_ERRORConnection ErrorCONNECTION_ERROR: <Error Description>
20DATA_VALIDATION_ERRORInput fields Validation ErrorDATA_VALIDATION_ERROR: <Error Description>
21REQUEST_FAILUREHTTP request errorREQUEST_FAILED: <Error Description>
22FAILFailed ExecutionFAIL: DAG Run was triggered, but the status of DAG Run is 'Failed'.

Extension Output

In the context of a workflow, subsequent tasks can rely on the information provided by this integration as Extension Output.

The Extension output contains Attribute result. The result Attribute, as displayed above, is based on the response of the related Airflow REST APIs for the respective actions and version 2.2.3 or 2.2.5. Other versions of Airflow may produce different information as part of the result attribute.

Attribute changed is populated as follows.

  • true, in case when a DAG Run is successfully triggered.
  • false, in case when DAG Run is not triggered or the Action is "Read DAG Run Information" or "Read Task Instance Information".

result section includes attributes as described in Airflow API Official documentation.

The Extension Output for this Universal Extension is in JSON format as described below.

For Action

  • Trigger DAG Run
  • Read DAG Run Information

The Extension Output below refers to Trigger DAG Run (Standalone) task example.

Panel
{
    "exit_code": 22,
    "status_description": "FAIL: DAG Run was triggered, but the status of DAG Run is 'Failed'.",
    "changed": true,
    "invocation": {
        "extension": "ue-airflow",
        "
headers
version":
[
 "2.0.0",
        "fields": 
{

            
"
key
connect_to": "
test
standalone_airflow_
key
server",
            
"value
"credentials_type_google":
"1"
 null,
           
}
 "service_account_key": null,
       
],
     "base_url": "http://airlfow_url:8080/api/v1",
   
"partition":0,
         "credentials_user": "
offset":34
****",
         
"timestamp
   "credentials_password":
"2021-11-01T18:50:30.031000+00:00"
 "****",
      
}
     
}
Code Block
languagetext
linenumberstrue
{ "result":{
 
}

The Task has successfully matched the value: "test" because the value inside the event element is "new test" which contains the string "test".

Matching a Kafka Event with JSON Value Deserializer

In this example, the task monitors for events with a value of JSON format, that has a list of "phoneNumbers" and the second element of the list has an attribute type which is "home":

In the extension output result, the matched event's details are printed (only the result JSON element is shown below):

Image Removed

"use_ssl": false,
            "ssl_verify": false,
            "trusted_certificate_file": null,
            "ssl_hostname_check": false,
            "
client
private_
id
key_certificate":
"ue-kafka-monitor-#163578238979483604275PR4VPKK89D1"
 null,
      
"event":{
      
"public_key_certificate":
1
 null,
            "
value
use_proxy":
{
 false,
            "
firstName
proxies":
"John"
 null,
            "
lastName
action": "
doe
trigger_dag_run",
            "
age
dag_id":
26,
 "example_bash_operator",
         
"address
   "dag_run_id":
{
 "manual__2023-02-09T20:20:39.820239+00:00",
            
"
streetAddress
task_id":
"naist
 
street"
null,

            "
city
wait_for_success_or_failure":
"Nara",
 true,
            "
postalCode
polling_interval":
"630-0192" }
 1,
            "
phoneNumbers
configuration_parameters":
[
 
{

                
"
type
conf":
"iPhone",
 {
                    "
number
test": "
0123-4567-8888
parameter"
                }
,

            },
  
{
          "dag_run_id_output": null
        
"type":"home",
}
    },
    "result": {
        "
number
conf":
"0123-4567-8910"
 {
            "test": "parameter"
}
        },
    
]
    
}
"dag_id": "example_bash_operator",
        
"headers":[],
"dag_run_id": "manual__2023-02-09T20:20:39.820239+00:00",
        "
partition
end_date":
0,
 
"offset":20
"2023-02-09T20:20:54.717256+00:00",
        
"
timestamp
execution_date": "
2021
2023-
11
02-
01T18
09T20:
31
20:
11
39.
415000
820239+00:00",
      
}
  "external_trigger": true,
       
} }

The Task has successfully matched the value: "home" because the value inside the event element contains a phoneNumbers list whose second element's type is "home".

Matching a Kafka Event with JSON Value Filter None

In this example, the task monitors for events and matches any Kafka event as the selected Value Filter is "None":

For more information about the Extension Output Result, please refer to the Extension Output section.

Image Removed

Task Output

Exit Codes

The exit codes for this Universal Extension are described below.

...

Exit Code

...

Status Classification Code

...

Status Classification Description

...

Status Description

...

0

...

SUCCESS

...

Successful Execution

...

SUCCESS: Successful Task execution

...

1

...

FAIL

...

Failed Execution

...

FAIL: < Error Description >

...

10

...

CONNECTION_ERROR

...

Bad connection data

...

CONNECTION_ERROR: < Error Description >

...

11

...

CONNECTION_TIMEOUT

...

Connection timed out

...

CONNECTION_TIMEOUT: < Error Description >

...

20

...

DATA_VALIDATION_ERROR

...

Bad input fields validation

...

DATA_VALIDATION_ERROR: Some of the input fields cannot be validated. See STDERR for more details

Extension Output

Upon Task's completion, the extension always produces an Extension Output. This Universal Extension runs continuously until a Kafka event with a value is matched, according to filter criteria. When an event is matched, related information is published as Extension Output.

An example of Extension Output for this Universal Extension is below:

Code Block
languagetext
linenumberstrue
{ "exit_code":0, "status_description":"SUCCESS: Successful Task execution", "changed":false, "invocation":{
 "logical_date": "2023-02-09T20:20:39.820239+00:00",
        "start_date": "2023-02-09T20:20:40.022415+00:00",
        "state": "failed"
    }
}
For Action
  • Read Task Instance Information

The Extension Output below refers to Read Airflow Task Instance information task example.

Panel
{
    "exit_code": 0,
    "status_description": "SUCCESS: Successful Task Execution!",
    "changed": false,
    "invocation": {
        "extension": "ue-airflow",
        "version": "2.0.0",
        "fields": {
            "connect_to": "standalone_airflow_server",
            "credentials_type_google": null,
            "service_account_key": null,
            "
extension
base_url":
"ue-kafka-monitor",
 "http://airflow_url:8080/api/v1",
     
"version":"1.0.0",
       "
fields
credentials_user":
{
 
"action":"Monitor for events",
"****",
            "
auto
credentials_
offset_reset
password": "
Latest
****",
            "
bootstrap
use_
servers
ssl":
[
 true,
            
"ue-kafka-30.stonebranch.org:9092"
"ssl_verify": true,
      
],
      
"client_id
"trusted_certificate_file":
"ue-kafka-monitor-#1635511436609765042WY8M6GWOTJB1P
 "****",
         
"consumer_type
   "ssl_hostname_check":
"Consumer
 
Group"
false,
         
"consumer_group":"my-group1",
   "private_key_certificate": null,
            "
heartbeat
public_
interval
key_
ms
certificate":
3000
 null,
            "
key
use_
deserializer
proxy":
"Integer",
 true,
            "
partition_assign_strategy
proxies":
"RangePartitionAssignor
 "https://ue-proxy-dev-noauth-ssl.stonebranch.org:3128",
         
"request_timeout_ms":305000
   "action": "get_task_instance",
            "
start
dag_
from
id":
"Consumer Group
 
Offset",
"example_bash_operator",
         
"security_protocol
   "dag_run_id":
"PLAINTEXT
 "manual__2023-02-09T15:42:00.467617+00:00",
            "
sasl_user_credentials_user":null,
task_id": "run_this_last",
            "
sasl
wait_for_
user
success_
credentials
or_
password
failure":
null
 false,
          
"sasl_mechanism":"SCRAM-SHA-256",
  "polling_interval": 1,
            "
ssl
configuration_
check_hostname
parameters":
true,
 {
        
"ssl_cafile":null,
        
"session_timeout_ms
"conf":
10000,
 {}
     
"topic":"testtopic",
       },
  
"value":null,
          "
value_deserializer
dag_run_id_output":
"JSON",
 null
        
"value_filter":"Is Not Blank",
}
    },
    
"value_json_path":"$.phoneNumbers[1].type"
"result": {
     
}
   
}
"dag_id": "example_bash_operator",
   
"result":{
     
"client_id":"ue-kafka-monitor-#1635511436609765042WY8M6GWOTJB1P"
"duration": 0.0,
        "
event
end_date":
{
 "2023-01-10T09:26:55.661975+00:00",
        "
key":1,
execution_date": "2023-01-09T14:16:48.856712+00:00",
        "
value
executor_config":
{
 "{}",
        "
firstName
hostname": "
John
",
        
"lastName":"Doe"
"max_tries": 0,
        
"operator": "
age
DummyOperator"
:26
,
        "pid": null,
  
"address":{
      "pool": "default_pool",
        "
streetAddress
pool_slots":
"Naist
 
Street"
1,
        "priority_weight": 1,
        "
city
queue": "
Nara
default",
        "queued_when": null,
     
"postalCode
   "sla_miss":
"630-0192"
 null,
        
},
"start_date": "2023-01-10T09:26:55.661975+00:00",
        "
phoneNumbers
state":
[
 "upstream_failed",
        
{ "type":"iPhone
"task_id": "run_this_last",
        "try_number": 0,
        "
number
unixname":
"0123-4567-8888
 "****"
    }
}, { "type":"home", "number":"0123-4567-8910" } ] }, "headers":[], "partition":0, "offset":206, "timestamp":"2021-10-29T14:01:03.027000+00:00" } } }

result.event includes information of the matched (based on filter conditions) Kafka event, providing information about the event payload and metadata.

The UAC provides Output Functions that can be used to resolve the JSON Extension Output for further use (for example, by a sibling task) .

Document References

This document references the following documents:

...

Name

...

Location

...

Description

...

Universal Templates

...

https://docs.stonebranch.com/confluence/display/UC71x/Universal+Templates

...

User documentation for creating Universal Templates in the Universal Controller user interface.

...

Universal Tasks

...

https://docs.stonebranch.com/confluence/display/UC71x/Universal+Tasks

...

User documentation for creating Universal Tasks in the Universal Controller user interface.

...

Task Instance Output By JsonPath

...

https://docs.stonebranch.com/confluence/display/UC71x/Functions#Functions-TaskInstanceOutputByJsonPathTaskInstanceOutputByJsonPath

...

How Output Functions can be used to extract information from extension output.

...

JSON Path Syntax

...

https://github.com/h2non/jsonpath-ng#jsonpath-syntax

...

}

STDOUT and STDERR

STDOUT and STDERR provide additional information to User. The populated content can be changed in future versions of this extension without notice. Backward compatibility is not guaranteed.

Anchor
Integration Modification
Integration Modification

Integration Modifications

Modifications applied by users or customers, before or after import, might affect the supportability of this integration. The following modifications are discouraged to retain the support level as applied for this integration.

  • Python code modifications should not be done.
  • Template Modifications
    • General Section
      • "Name", "Extension", "Variable Prefix", "Icon" should not be changed.
    • Universal Template Details Section
      • "Template Type", "Agent Type", "Send Extension Variables", "Always Cancel on Force Finish" should not be changed.
    • Result Processing Defaults Section
      • Success and Failure Exit codes should not be changed.
      • Success and Failure Output processing should not be changed.
    • Fields Restriction Section
      The setup of the template does not impose any restrictions, However with respect to "Exit Code Processing Fields" section.
      1. Success/Failure exit codes need to be respected.
      2. In principle, as STDERR and STDOUT outputs can change in follow-up releases of this integration, they should not be considered as a reliable source for determining success or failure of a task.

Users and customers are encouraged to report defects, or feature requests at Stonebranch Support Desk.

Document References

This document references the following documents.

Document LinkDescription
Universal TemplatesUser documentation for creating, working with and understanding Universal Templates and Integrations.
Universal TasksUser documentation for creating Universal Tasks in the Universal Controller user interface.
CredentialsUser documentation for creating and working with credentials.
Resolvable Credentials Permitted PropertyUser documentation for Resolvable Credentials Permitted Property.
Apache Airflow DocumentationUser documentation for Apache Airflow.
Apache Airflow API DocumentationUser Documentation for Airflow REST API.
Google Cloud Composer AirflowUser Documentation for Google Cloud Composer.

Changelog

ue-airflow-2.0.0 (2023-02-24)

Enhancements

  • Added: Support for Google Cloud Composer Airflow
  • Added: Support for passing JSON configuration parameters on Action "Trigger Dag Run"

Deprecations and Breaking Changes

  • Breaking Change: Stop supporting Universal Agent/Controller 7.0.0.0. Support Universal Agent/Controller 7.1.0 or higher.

ue-airflow-1.1.0 (2022-06-10)

Enhancements

  • Added: Support for Trigger a DAG run and wait for state "success" or "failed".

ue-airflow-1.0.0 (2022-03-03)

  • Initial version.