Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Disclaimer

Your use of this download is governed by Stonebranch’s Terms of Use, which are available at https://www.stonebranch.com/integration-hub/Terms-and-Privacy/Terms-of-Use/

Overview

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. This Universal Extension is responsible for publishing events (messages) to topics in Kafka.

Software Requirements

This integration requires a Universal Agent and a Python runtime to execute the Universal Task.

Software Requirements for Universal Template and Universal Task

Requires Python 3.7.0 or higher. Tested with the Universal Agent bundled Python distribution.

Software Requirements for Universal Agent

Both Windows and Linux agents are supported:

  • Universal Agent for Windows x64 Version 7.0.0.0 and later.

  • Universal Agent for Linux Version 7.0.0.0 and later.

Software Requirements for Universal Controller

Universal Controller Version 7.0.0.0 and later.

Software Requirements in Cases of SSL Verification

  • If Python 3.7 Distribution for Universal Agent is installed, the path location of the Certificate Authority file must be provided.

  • If Python is installed on the local system but not with the Universal Agent Distribution:

    • If the SSL certificate is signed by a publicly trusted certificate authority (CA), it is not required to provide the certificate.

    • If the SSL certificate is self-signed, either the path location of the certificate file must be provided, or the certificate must be stored at the machine's default location for installed certificates (in this case it will be trusted for all applications).

Supported Apache Kafka versions

This Universal Extension supports the Apache Kafka versions 2.0.0 and above. Because the Kafka server protocol is backwards compatible, the extension is expected to work with versions 0.8.0 and above (except 0.8.2-beta release); however, this has not been tested.

Key Features

This Universal Extension supports the following main features:

  • Perform authentication towards Kafka, using PLAINTEXT or SASL_SSL SCRAM security protocol.

  • Send a message to Kafka with the capability to select, the topic, the partition, the message key, the message value, and message metadata.

  • Capability to control the transport of the messages by configuring the message acknowledgment strategy and the request timeout.

  • Capability to fetch topics and partitions dynamically from Kafka for selection during task creation.

  • Capability to automatically select the serialization method depending on the key/value message data types.

Import the Universal Template

To use this downloadable Universal Template, you first must perform the following steps:

  1. This Universal Task requires the Resolvable Credentials feature. Check that the Resolvable Credentials Permitted system property has been set to true.
  2. Download the provided ZIP file.

  3. In the Universal Controller UI, select Administration >Configuration > Universal Templates to display the current list of Universal Templates.

  4. Click Import Template.

  5. Select the template ZIP file and Import.

When the template has been imported successfully, the Universal Template will appear on the list. Refresh your Navigation Tree to see these tasks in the Automation Center Menu.

Configure Universal Task

For the new Universal Task type, create a new task, and enter the task-specific details that were created in the Universal Template.

Input Fields

The input fields for this Universal Extension are described below.

Field

Input type

Default value

Type

Description

Action

Required

Send a message

Choice

The action performed upon the task execution.

Security Protocol

Required

PLAINTEXT

Choice

The Security protocol used to communicate with Kafka brokers.

Valid values are:

  • PLAINTEXT

  • SASL_SSL

SASL Mechanism

Optional

SCRAM–SHA–256

Choice

The Authentication mechanism when Security Protocol is configured for SASL_SSL.

Valid values are:

  • SCRAM–SHA–256: credentials are hashed with SHA–256 algorithm (requires a 32bit processor).

  • SCRAM–SHA–512: credentials are hashed with SHA–512 algorithm (requires a 64bit processor).

Required when Security Protocol is "SASL_SSL".

SASL User Credentials

Optional

-

Credentials

Credentials for SCRAM authentication.

They are comprised of:

  • username

  • password

Required when Security Protocol is "SASL_SSL".

SSL Hostname Check

Optional

true

Boolean

Flag to configure whether SSL handshake should verify that the certificate matches the brokers hostname.

Required when Security Protocol is "SASL_SSL".

SSL Certificate Path

Optional

-

Text

Path and file name of the Certificate Authority (CA) file to use in certificate verification.

Used when it is required to locate the CA file if Security Protocol is configured for "SASL_SSL".

Bootstrap Servers

Required

-

Text

'host:port' string (or list of 'host:port' strings, separated by comma) that the producer should contact to bootstrap initial cluster metadata.

This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request (more than one can be used, though, in case a server is down).

Example with two servers: 'host1:port1,host2:port2'.

Topic

Required

-

Dynamic Choice

Dynamic fetched list of topics to subscribe the consumer to.

The user can select the required topic from a drop-down list.

Partition

Optional

-

Choice

The partition to which the message will be assigned to.

The user can select the required partition from a drop-down list. A topic should be selected from the respective drop-down list.

If no partition key is specified when producing a record, Kafka will assign messages with the same key to the same partition. When no key is specified, the message is delivered to a random partition.

Client Id

Optional

-

Text

This string is passed in each request to servers and can be used to identify specific server-side log entries that correspond to this client.

The constructed client id seen on the server side is Client Id + "task instance id".

If Client Id is not populated, the default value used is "ue-kafka-monitor-#".

Example: "ue-kafka-monitor-#1635348654595881042JAYE7BKNPYVG3"

Message Payload

Required

-

Large Text

YAML type that the Message Payload is comprised of:

  • key: (optional) – A key to associate with the message. Must be of type integer or string. Can be used to determine which partition to send the message to.
    If Partition is not specified, then messages with the same key will be delivered to the same partition (but if key is absent or empty, the partition is chosen randomly).

  • value: (optional) – The message value. Must be of type integer, float, string, or object.
    If value is absent or empty, key is required and message acts as a ‘delete’.

Show Advanced Settings

Required

False

Boolean

By checking this field, three more fields are available for advanced configuration:

  • "Acks"

  • "Request Timeout (ms)"

  • "Headers"

Acks

Required

Leader

Choice

The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent.

The following options are available:

  • None: Producer will not wait for acknowledgment from the server.

  • Leader: Wait for leader to write the record to its local log only.

  • All: Wait for the full set of in-sync replicas to write the record.

Headers

Optional

-

Array

A list of header key/value pairs, in case more metadata about the Kafka message have to be added.

Request Timeout (ms)

Optional

30000

Integer

The configuration controls the maximum amount of time the client will wait for the response of a request.

Task Output

Exit Codes

The exit codes for this Universal Extension are described below.

Exit Code

Status Classification Code

Status Classification Description

Status Description

0

SUCCESS

Successful Execution

SUCCESS: Task executed successfully.

1

FAIL

Failed Execution

Unexpected Exception: < exception message >

2

AUTHENTICATION_ERROR

Bad Authentication data

AUTHENTICATION_ERROR: Incorrect username or password.

10

CONNECTION_ERROR

Bad connection data

CONNECTION_ERROR: No brokers available for the specified Bootstrap Server or Credentials. Execute in Debug mode for more details.

11

CONNECTION_TIMEOUT

Connection timed out

CONNECTION_TIMEOUT: Kafka Timeout Error. Connection timed out while pending for response from Kafka.

20

DATA_VALIDATION_ERROR

Bad input fields validation

DATA_VALIDATION_ERROR: Some of the input fields cannot be validated. See STDOUT for more details.

21

MESSAGE_ACKS_ERROR

Acknowledgement error

MESSAGE_ACKS_ERROR: Not enough replicas acknowledged the message.

Task Examples

  1. Give the task a name.

  2. Select an active agent of version 7.0.0.0 or higher for the task to run on.

  3. Enter the Bootstrap Server(s) separated by comma.

  4. Click the Topic field search icon to display the Topic names list. Clicking the Topic search icon will open the following dialog.



    Click the Submit button and select your Topic from the drop-down list.

  5. Click the Partition field search icon to bring the Partition names list. Clicking the Partition search icon will open the following dialog.



    Click the Submit button and select your Partition key from the drop-down list.

  6. Select Acks if a value other than the default one is required.
  7. Enter your message in YAML format in Message Payload field.

    Example of message with an integer as a key and an object as a value:




  8. Add key value pairs to the Headers field if necessary.

  9. Manually enter an integer value for Request Timeout field, if the default one is not the required one.

Message without Value

Example of message without value.

Message Payload Serialization

According to the key and value datatypes of the YAML message, the extension will handle their respective serialization before sending them to Kafka.

Examples of message payloads with different datatypes for key/value and how they are serialized:

Integer as a Key and an Integer as a Value

Both key and value will be serialized as integers:

YAML:

key: 1
value: 256

String as a Key and an Integer as a Value

Key will be serialized as a string (an integer in single or double quotes will be interpreted as string) and value will be serialized as an integer:

YAML:

key: "1"
value: 256

Integer as a Key and a Parameter as a Value

The value will be serialized according to the variable datatype:

YAML:

key: "1"
value: 256

Integer as a Key and an Object as a Value

The value which in this case is an object will be serialized as a JSON string:

YAML:

key: 1
value:
  employee:
    position: developer
    details:
     name: John
     surname: Smith
     age: 30

Document References

This document references the following documents:

Name

Description

Universal Templates

User documentation for creating Universal Templates in the Universal Controller user interface.

Universal Tasks

User documentation for creating Universal Tasks in the Universal Controller user interface.



  • No labels