Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 9 Next »

Disclaimer

Your use of this download is governed by Stonebranch’s Terms of Use, which are available at https://www.stonebranch.com/integration-hub/Terms-and-Privacy/Terms-of-Use/

Overview

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. This Universal Extension is responsible for monitoring events (messages) from topics in Kafka.

Software Requirements

This integration requires a Universal Agent and a Python runtime to execute the Universal Task.

Software Requirements for Universal Template and Universal Task

Requires Python 3.7.0 or higher. Tested with the Universal Agent bundled Python distribution.

Software Requirements for Universal Agent

Both Windows and Linux agents are supported:

  • Universal Agent for Windows x64 Version 7.1.0.0 and later.

  • Universal Agent for Linux Version 7.1.0.0 and later.

Software Requirements for Universal Controller

Universal Controller Version 7.0.0.0 and later.

Software Requirements in Cases of SSL Verification

  • If Python 3.7 Distribution for Universal Agent is installed, the path location of the Certificate Authority file must be provided.

  • If Python is installed on the local system but not with the Universal Agent Distribution:

    • If the SSL certificate is signed by a publicly trusted certificate authority (CA), it is not required to provide the certificate.

    • If the SSL certificate is self-signed, either the path location of the certificate file must be provided, or the certificate must be stored at the machine's default location for installed certificates (in this case it will be trusted for all applications).

Supported Apache Kafka versions

This Universal Extension supports the Apache Kafka versions 2.0.0 and above. Because the Kafka server protocol is backwards compatible, the extension is expected to work with versions 0.8.0 and above (except 0.8.2-beta release); however, this has not been tested.

Key Features

This Universal Extension provides the following main features:

  • Support to consume messages by consumer group subscription, from specific topic, until a specific condition is met. Filtering is based on the value of the message. When a matching Kafka message is detected, the Universal Task is finished by publishing information related to the matched message on extension output. Number, String, and JSON filter patterns are supported.

  • Support for authenticating to Kafka through PLAINTEXT or SASL_SSL SCRAM security protocol.

Typically, this extension can be used to monitor events from Kafka and, upon successful execution, to trigger workflows or other tasks, or just to pass information related to the Kafka event within UAC.

Import the Universal Template

To use this downloadable Universal Template, you first must perform the following steps:

  1. This Universal Task requires the Resolvable Credentials feature. Check that the Resolvable Credentials Permitted system property has been set to true
  2. Download the provided ZIP file.

  3. In the Universal Controller UI, select Administration >Configuration > Universal Templates to display the current list of Universal Templates.

  4. Click Import Template.

  5. Select the template ZIP file and Import.

When the template has been imported successfully, the Universal Template will appear on the list. Refresh your Navigation Tree to see these tasks in the Automation Center Menu.

Configure Universal Task

For the new Universal Task type, create a new task, and enter the task-specific details that were created in the Universal Template.

Input Fields

The input fields for this Universal Extension are described below.

Field

Input type

Default value

Type

Description

Action

Required

Monitor for events

Choice

The action performed upon the task execution.

Security Protocol

Required

PLAINTEXT

Choice

The Security protocol used to communicate with Kafka brokers.

Valid values are:

  • PLAINTEXT

  • SASL_SSL

Bootstrap Servers

Required

-

Text

‘host[:port]’ string (or list of ‘host[:port]’ strings, separated by comma) that the consumer should contact to bootstrap initial cluster metadata.

This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request.

Consumer Type

Required

Consumer Group

Choice

Type of Consumer to get messages from Kafka. Available options:

  • Consumer Group : consumer is part of a consumer group.

Consumer Group

Required

-

Text

The unique name of the consumer group to join for dynamic partition assignment and to use for fetching and committing offsets.

Topic

Required

-

Dynamic Choice

Dynamic fetched list of topics to subscribe the consumer to.The user can select the required topic from a drop-down list.

SASL Mechanism

Optional

SCRAM–SHA–256

Choice

The Authentication mechanism when Security Protocol is configured for SASL_SSL. Valid values are:

  • SCRAM–SHA–256: credentials are hashed with SHA–256 algorithm (requires a 32bit processor).

  • SCRAM–SHA–512: credentials are hashed with SHA–512 algorithm (requires a 64bit processor).

Required when Security Protocol is "SASL_SSL".

SASL User Credentials

Optional

-

Credentials

The credentials for SASL SCRAM authentication.

They are comprised of:

  • username

  • password

Required when Security Protocol is configured for "SASL_SSL".

SSL Hostname Check

Optional

True

Boolean

Flag to configure whether SSL handshake should verify that the certificate matches the brokers hostname.

Required when Security Protocol is configured for "SASL_SSL".

SSL Certificate Path

Optional

-

Text

Path and file name of the Certificate Authority (CA) file to use in certificate verification.

Used when it is required to locate the CA file if Security Protocol is configured for "SASL_SSL".

Client Id

Optional

-

Text

This string is passed in each request to servers and can be used to identify specific server-side log entries that correspond to this client.

The constructed client id seen on the server side is Client Id + "task instance id". If Client Id is not populated, the default value used is "ue-kafka-monitor-#".

Example: "ue-kafka-monitor-#1635348654595881042JAYE7BKNPYVG3"

Start from

Required

Consumer Group Offset

Choice

Controls from which point the consuming will start.

Available option is:

  • Consumer Group Offset: start consuming from the committed offset of group.

Key Deserializer

Required

String

Choice

Type of key deserialization.

Available options are:

  • Integer

  • String

Value Deserializer

Required

String

Choice

Type of value deserialization.

Available options are:

  • Integer

  • Float

  • String

  • JSON

Value Filter

Optional

None

Choice

Value operators to specify the criteria used to match records and stop consuming messages.

If Value Deserializer is configured for "Integer" or "Float", the available options are:

  • None (all messages are matched)

  • >= (greater than or equals to)

  • <= (less than or equals to)

  • = (equals to)

  • != (not equals to)

If Value Deserializer is configured for "String", the available options are:

  • None (all messages are matched)

  • Contains

  • Does Not Contain

  • Equals

  • Is Blank

  • Is Not Blank

If Value Deserializer is configured for "JSON", the available options are all that apply to the "Integer", "Float" and "String" Value Deserializer.

Value

Optional

-

Text

The Value on which the Value Filter applies to.

Value JSON Path

Optional

-

Text

The JSON path to locate the Value, in case Value Deserializer is configured to "JSON".

The JSON Path needs to resolve either to a number, or to a string. If JSON path results in a list of numbers, or a list of strings , a Kafka message is matched if at least one element from the list matches the Value.

JSON Path syntax is based on jsonpath-ng python library. For examples, please refer to the official web site.

Show Advanced Settings

Required

False

Boolean

By checking this field, more fields are available for advanced configuration.

The advanced fields are: Partition Assignment Strategy, Session Timeout (ms), Auto Offset Reset, Request Timeout (ms), Heartbeat Interval (ms), Max Partition Fetch Bytes.

Partition Assignment Strategy

Optional

Range

Choice

Partition Assignment policies to distribute partition ownership amongst consumer instances when group management is used.

Available options are:

  • Range

  • Round Robin

Session Timeout (ms)

Optional

10000

Integer

Controls the time it takes to detect a consumer crash and stop sending heartbeats.

If more than Session Timeout milliseconds passes without the consumer sending a heartbeat to the group coordinator, it is considered dead and the group coordinator will trigger a rebalance of the consumer group to allocate partitions from the dead consumer to the other consumers in the group.

Auto Offset Reset

Optional

Latest

Choice

Controls the behavior of the consumer when it starts reading a partition for which it does not have a committed offset or the committed offset is invalid (usually because the consumer was down for so long that the record with that offset was already aged out of the broker).

Available options are:

  • Earliest which will move to the oldest available message.

  • Latest which will move to the most recent.

Request Timeout (ms)

Optional

305000

Integer

Controls the maximum amount of time the client will wait for the response of a request.

Heartbeat Interval (ms)

Optional

3000

Integer

The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka’s group management facilities.

The value must be set lower than Session Timeout (ms).

Max Partition Fetch Bytes

Optional

1048576

Integer

Controls the maximum number of bytes the server will return per partition.

This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition.

Default is 1MB.

Task Examples

PLAINTEXT Security Protocol Task configuration

Example of Universal Task for "PLAINTEXT" Security Protocol:

SASL_SSL Security Protocol Task Configuration

Example of Universal Task for "SASL_SSL" Security Protocol:

Matching a Kafka Event with String Value Deserializer

In this example, the Task monitors for events with a value that contains the string "test":

In the extension output result, the matched event's details are printed (only the result JSON element is shown below):

{
   "result":{
      "client_id":"ue-kafka-monitor-#163579072850187904243P79ZKF6CR00",
      "event":{
         "key":1,
         "value":"new test",
         "headers":[
            {
               "key":"test_key",
               "value":"1"
            }
         ],
         "partition":0,
         "offset":34,
         "timestamp":"2021-11-01T18:50:30.031000+00:00"
      }
   }
}

The Task has successfully matched the value: "test" because the value inside the event element is "new test" which contains the string "test".

Matching a Kafka Event with JSON Value Deserializer

In this example, the task monitors for events with a value of JSON format, that has a list of "phoneNumbers" and the second element of the list has an attribute type which is "home":

In the extension output result, the matched event's details are printed (only the result JSON element is shown below):


{
   "result":{
      "client_id":"ue-kafka-monitor-#163578238979483604275PR4VPKK89D1",
      "event":{
         "key":1,
         "value":{
            "firstName":"John",
            "lastName":"doe",
            "age":26,
            "address":{
               "streetAddress":"naist street",
               "city":"Nara",
               "postalCode":"630-0192"
            },
            "phoneNumbers":[
               {
                  "type":"iPhone",
                  "number":"0123-4567-8888"
               },
               {
                  "type":"home",
                  "number":"0123-4567-8910"
               }
            ]
         },
         "headers":[],
         "partition":0,
         "offset":20,
         "timestamp":"2021-11-01T18:31:11.415000+00:00"
      }
   }
}

The Task has successfully matched the value: "home" because the value inside the event element contains a phoneNumbers list whose second element's type is "home".

Matching a Kafka Event with JSON Value Filter None

In this example, the task monitors for events and matches any Kafka event as the selected Value Filter is "None":

For more information about the Extension Output Result, please refer to the Extension Output section.

Task Output

Exit Codes

The exit codes for this Universal Extension are described below.

Exit Code

Status Classification Code

Status Classification Description

Status Description

0

SUCCESS

Successful Execution

SUCCESS: Successful Task execution

1

FAIL

Failed Execution

FAIL: < Error Description >

10

CONNECTION_ERROR

Bad connection data

CONNECTION_ERROR: < Error Description >

11

CONNECTION_TIMEOUT

Connection timed out

CONNECTION_TIMEOUT: < Error Description >

20

DATA_VALIDATION_ERROR

Bad input fields validation

DATA_VALIDATION_ERROR: Some of the input fields cannot be validated. See STDERR for more details

Extension Output

Upon Task's completion, the extension always produces an Extension Output. This Universal Extension runs continuously until a Kafka event with a value is matched, according to filter criteria. When an event is matched, related information is published as Extension Output.

An example of Extension Output for this Universal Extension is below:

{
   "exit_code":0,
   "status_description":"SUCCESS: Successful Task execution",
   "changed":false,
   "invocation":{
      "extension":"ue-kafka-monitor",
      "version":"1.0.0",
      "fields":{
         "action":"Monitor for events",
         "auto_offset_reset":"Latest",
         "bootstrap_servers":[
            "ue-kafka-30.stonebranch.org:9092"
         ],
         "client_id":"ue-kafka-monitor-#1635511436609765042WY8M6GWOTJB1P",
         "consumer_type":"Consumer Group",
         "consumer_group":"my-group1",
         "heartbeat_interval_ms":3000,
         "key_deserializer":"Integer",
         "partition_assign_strategy":"RangePartitionAssignor",
         "request_timeout_ms":305000,
         "start_from":"Consumer Group Offset",
         "security_protocol":"PLAINTEXT",
         "sasl_user_credentials_user":null,
         "sasl_user_credentials_password":null,
         "sasl_mechanism":"SCRAM-SHA-256",
         "ssl_check_hostname":true,
         "ssl_cafile":null,
         "session_timeout_ms":10000,
         "topic":"testtopic",
         "value":null,
         "value_deserializer":"JSON",
         "value_filter":"Is Not Blank",
         "value_json_path":"$.phoneNumbers[1].type"
      }
   },
   "result":{
      "client_id":"ue-kafka-monitor-#1635511436609765042WY8M6GWOTJB1P",
      "event":{
         "key":1,
         "value":{
            "firstName":"John",
            "lastName":"Doe",
            "age":26,
            "address":{
               "streetAddress":"Naist Street",
               "city":"Nara",
               "postalCode":"630-0192"
            },
            "phoneNumbers":[
               {
                  "type":"iPhone",
                  "number":"0123-4567-8888"
               },
               {
                  "type":"home",
                  "number":"0123-4567-8910"
               }
            ]
         },
         "headers":[],
         "partition":0,
         "offset":206,
         "timestamp":"2021-10-29T14:01:03.027000+00:00"
      }
   }
}

result.event includes information of the matched (based on filter conditions) Kafka event, providing information about the event payload and metadata.

The UAC provides Output Functions that can be used to resolve the JSON Extension Output for further use (for example, by a sibling task) .

Document References

This document references the following documents:

Name

Description

Universal Templates

User documentation for creating Universal Templates in the Universal Controller user interface.

Universal Tasks

User documentation for creating Universal Tasks in the Universal Controller user interface.

Task Instance Output By JsonPath

How Output Functions can be used to extract information from extension output.

JSON Path Syntax

JSON Path predicate notation.





  • No labels