Apache Kafka: Publish Event

Apache Kafka: Publish Event

Disclaimer

Your use of this download is governed by Stonebranch’s Terms of Use.

Version Information

Template NameExtension NameExtension VersionStatus
Apache Kafka: Publish eventue-kafka-producer2 (Current 2.0.0)Fixes and New Features are introduced
Apache Kafka: Publish eventue-kafka-producer1Hot Fixes Only (Until UAC 7.5 is End of Support)

Refer to Changelog for version history information.

Breaking changes

Apache Kafka: Publish event 2.0.0 is a major release update and introduces breaking changes that might affect some customers depending on their setup. Administrators are strongly advised to refer to Changelog for more information on the changes introduced if updating from a version earlier than 2.0.0

Overview

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. This Universal Extension is responsible for publishing events (messages) to topics in Kafka.

Key Features

FeatureDescription
Public Kafka event

Send a message to Kafka with the capability to select, the topic, the partition, the message key, the message value, and message metadata

AuthenticationSupport different authentication mechanisms.
Serialization

Capability to automatically select the serialization method depending on the key/value message data types.

Message acknowledgment 

Capability to control the transport of the messages by configuring the message acknowledgment strategy and the request timeout.

Requirements

AreaDetails
Python VersionSupports Python 3.11. Tested with Agent Bundled Python
Universal Agent Compatibility
  • Compatible with Universal Agent for Windows x64 and version >= 7.6.0.0.

  • Compatible with Universal Agent for Linux and version >= 7.6.0.0.

Universal Controller CompatibilityUniversal Controller Version >= 7.6.0.0.
Network and ConnectivityRequires network connectivity between Universal Agent and Kafka servers.  
Southbound SystemThis Integration is tested on Kafka version 3.0.  Integration is expected to work with versions 2.0.0 onwards, however, this has not been tested.
SSL Certificate Verification
  • If Python Distribution for Universal Agent is installed, the path location of the Certificate Authority file must be provided.

  • If Python is installed on the local system but not with the Universal Agent Distribution:

    • If the SSL certificate is signed by a publicly trusted certificate authority (CA), it is not required to provide the certificate.

    • If the SSL certificate is self-signed, either the path location of the certificate file must be provided, or the certificate must be stored at the machine's default location for installed certificates (in this case it will be trusted for all applications).


Supported Actions

Action: Send a message

Publishes a message to a specified Kafka topic. Additional options like the topic partition or the message acknowledgment can be tuned. 

Configuration Examples

Send Kafka Message with plaintext authentication 

Plaintext authentication is performed by choosing Security Protocol = PLAINTEXT and populating the needed fields.


Send Kafka Message with SASL SSL authentication 

SASL_SSL authentication is performed by choosing Security Protocol = SASL_SSL and populating the related CA bundle file path and credential field.


Send Kafka Message with SSL authentication

Client authentication is performed by choosing Security Protocol = SSL and populating the related certificate and key file path fields. In case the Client Private Key certificate is encrypted, the respective password for decryption of the key, is needed.


Message Payloads

According to the key and value datatypes of the YAML message, the extension will handle their respective serialization before sending them to Kafka. Examples of message payloads with different datatypes for key/value and how they are serialized, are followed.

Key Value CombinationExample ConfigurationCode Snippet

Message without Value.

Yaml Snippet
key: "1"

Integer as a Key and an Integer as a Value.

Both key and value will be serialized as integers.


Yaml Snippet
key: "1"
value: 256

String as a Key and an Integer as a Value.

Key will be serialized as a string (an integer in single or double quotes will be interpreted as string) and value will be serialized as an integer

Yaml Snippet
key: "1"
value: 256

Integer as a Key and a Parameter as a Value.

The value will be serialized according to the variable datatype.

Yaml Snippet
key: "1"
value: 256

Integer as a Key and an Object as a Value.

The value which in this case is an object will be serialized as a JSON string.

Yaml Snippet
key: 1
value:
  employee:
    position: developer
    details:
     name: John
     surname: Smith
     age: 30


Action Output

Output Type

Description

Examples

EXTENSION

The extension output provides the following information:

  • exit_code, status_description: General info regarding the task execution. For more information users can refer to the exit code table.

  • invocation. fields: The task configuration used for this task execution.

  • result.topic: The Kafka Topic where the message is sent.

  • result.partitionThe Partition of assigned to Topic selected to sent the message.

  • result.offsetThe offset of the kafka message.
  • result.timestampThe timestamp when the message sent to the Kafka topic.
  • result.errors: List of errors that might have occurred during execution.

Successful Execution
{
    "exit_code": 0,
    "status_description": "SUCCESS: Task executed successfully.",
    "status": "SUCCESS",
    "invocation": {
        "extension": "ue-kafka-producer",
        "version": "2.0.0",
        "fields": {
            "action": "Send a message",
            "security_protocol": "SSL",
            "ssl_check_hostname": true,
            "sasl_mechanism": "SCRAM-SHA-256",
            "ssl_cafile": "/path/to/ca/cert.pem",
            "sasl_username": null,
            "sasl_password": null,
            "client_certificate_path": "/path/to/client/cert.pem",
            "client_private_key_path": "/path/to/client_private_key.pem",
            "client_private_key_password": "****",
            "bootstrap_servers": "kafka.bootstrap.com:9092",
            "topic": "test",
            "partition": null,
            "message_payload": {
                "key": 1,
                "value": "test"
            },
            "client_id": "ue-kafka-producer-#1682197200191106591I0HGMM9JD6IUW",
            "request_timeout_ms": 30000,
            "acks": "Leader",
            "headers": []
        }
    },
    "result": {
        "topic": "test",
        "partition": 0,
        "offset": 221,
        "timestamp": "1682328194041"
    }
}
Failed Execution
{
    "exit_code": 11,
    "status_description": "CONNECTION_TIMEOUT_ERROR: Kafka Connection Error. 
Something went wrong while connecting to Kafka. Execute in Debug mode and see more details in STDOUT.",
    "invocation": {
        "extension": "ue-kafka-producer",
        "version": "2.0.0",
        "fields": {
            "ssl_check_hostname": true,
            "bootstrap_servers": "kafka.clientauth.org:9092",
            "client_certificate_path": "/path/to/ue-cert.pem",
            "security_protocol": "SSL",
            "request_timeout_ms": 30000,
            "partition_sasl": null,
            "ops_task_id": null,
            "acks": "Leader",
            "partition_ssl": "0",
            "topic_sasl": null,
            "topic_ssl": "topic_ssl",
            "sasl_mechanism": "SCRAM-SHA-256",
            "message_payload": {
                "key": 1,
                "value": "hello World"
            },
            "headers": null,
            "partition": null,
            "action": "Send a message",
            "client_id": null,
            "show_advanced_settings": true,
            "topic": null,
            "ssl_cafile": "/path/to/ca-cert.pem",
            "client_private_key_path": "/path/to/client_key_encrypted2.key",
            "client_private_key_****": {
                "user": "a_user",
                "password": "****",
                "key_location": null,
                "passphrase": null,
                "token": null
            }
        }
    },
    "result": {
        "errors": [
            "CONNECTION_TIMEOUT_ERROR: Kafka Connection Error. 
Something went wrong while connecting to Kafka. Execute in Debug mode and see more details in STDOUT."
        ]
    }
}

Input Fields

Name

Type

Description

Version Information

Action

Choice

The action performed upon the task execution.

Introduced in 1.0.0
Bootstrap ServersText

'host:port' string (or list of 'host:port' strings, separated by a comma) that the producer should contact to bootstrap initial cluster metadata.

This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request (more than one can be used, though, in case a server is down).

Example with two servers: 'host1:port1,host2:port2'.

Introduced in 1.0.0

Security Protocol

Choice

The Security protocol used to communicate with Kafka brokers.

Valid values are:

  • PLAINTEXT (default)

  • SASL_SSL

  • SSL
Introduced in 1.0.0

SASL Mechanism

Choice

The Authentication mechanism when Security Protocol is configured for SASL_SSL.

Valid values are:

  • SCRAM–SHA–256: credentials are hashed with SHA–256 algorithm (requires a 32bit processor).  (default)

  • SCRAM–SHA–512: credentials are hashed with SHA–512 algorithm (requires a 64bit processor).

  • PLAIN

Required when Security Protocol is "SASL_SSL".

Introduced in 1.0.0

Option "PLAIN" is introduced in 2.0.0 

SASL User Credentials

Credentials

Credentials for SCRAM authentication.

They are comprised of:

  • User as "Runtime User".
  • User Password as "Runtime Password".

Required when Security Protocol is "SASL_SSL".

Introduced in 1.0.0

SSL Hostname Check

Checkbox

Flag to configure whether SSL handshake should verify that the certificate matches the broker's hostname.

Required when Security Protocol is "SASL_SSL" or "SSL".


Default setting is checked.

Introduced in 1.0.0

CA Bundle Path

Text

Path and file name of the Certificate Authority (CA) file to use in certificate verification.

Used when it is required to locate the CA file if Security Protocol is configured for "SASL_SSL" or "SSL".

Introduced in 1.0.0
Client Certificate PathText

File path of the Client's Certificate for Client authentication over SSL in PEM format.

Required when Security Protocol is "SSL".

Introduced in 1.0.0
Client Private Key PathText

File path of the Client's private key for Client authentication over SSL in PEM format.

The private key can be either unencrypted or encrypted. In the case of an encrypted private key, the respective Client Private Key Password should be provided. 

Required when Security Protocol is "SSL".

Introduced in 1.0.0
Client Private Key PasswordCredential

In case the client's private key in Client Private Key Path is encrypted, the key required for decryption. The Credentials definition should be as follows.

  • Key Password as "Runtime Password".
Introduced in 1.0.0

Topic

Dynamic Choice

Dynamic fetched list of topics to subscribe the consumer to.

The user can select the required topic from a drop-down list.

Introduced in 1.0.0

Partition

Choice

The partition to which the message will be assigned to.

The user can select the required partition from a drop-down list. A topic should be selected from the respective drop-down list.

If no partition key is specified when producing a record, Kafka will assign messages with the same key to the same partition. When no key is specified, the message is delivered to a random partition.

Introduced in 1.0.0

Client Id

Text

This string is passed in each request to servers and can be used to identify specific server-side log entries that correspond to this client.

The constructed client id seen on the server side is Client Id + "task instance id".

If Client Id is not populated, the default value used is "ue-kafka-monitor-#".

Example: "ue-kafka-monitor-#1635348654595881042JAYE7BKNPYVG3"

Introduced in 1.0.0

Message Payload

Large Text

YAML type that the Message Payload is comprised of:

  • key: (optional) – A key to associate with the message. Must be of type integer or string. Can be used to determine which partition to send the message to.
    If Partition is not specified, then messages with the same key will be delivered to the same partition (but if key is absent or empty, the partition is chosen randomly).

  • value: (optional) – The message value. Must be of type integer, float, string, or object.
    If value is absent or empty, the key is required and the message acts as a ‘delete’.

Introduced in 1.0.0

Show Advanced Settings

Checkbox

By checking this field, three more fields are available for advanced configuration:

  • "Acks"

  • "Request Timeout (ms)"

  • "Headers"

Default setting is unchecked.

Introduced in 1.0.0

Acks

Choice

The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent.

The following options are available:

  • None: Producer will not wait for acknowledgment from the server.

  • Leader: Wait for leader to write the record to its local log only. (default)

  • All: Wait for the full set of in-sync replicas to write the record.

Introduced in 1.0.0

Headers

Array

A list of header key/value pairs, in case more metadata about the Kafka message have to be added.

Introduced in 1.0.0

Request Timeout (ms)

Integer

The configuration controls the maximum amount of time the client will wait for the response of a request.

Introduced in 1.0.0


Exit Codes

Exit Code

Status Classification Description

Status Description

0

Success

SUCCESS: Task executed successfully.

1

Failure

Unexpected Exception: < exception message >

2

Failure

AUTHENTICATION_ERROR: Incorrect username or password.

10

Failure

CONNECTION_ERROR: No brokers available for the specified Bootstrap Server or Credentials. Execute in Debug mode for more details.

11

Failure

CONNECTION_TIMEOUT: Kafka Timeout Error. Connection timed out while pending for response from Kafka.

20

Failure

DATA_VALIDATION_ERROR: Some of the input fields cannot be validated. See STDOUT for more details.

21

Failure

MESSAGE_ACKS_ERROR: Not enough replicas acknowledged the message.

STDOUT and STDERR

STDOUT of this integration is empty and STDERR provides additional information to the user, the verbosity of which is controlled by Log Level Task Definition field.

Backward compatibility is not guaranteed for the content of STDOUT/STDERR and can be changed in future versions without notice

How To

Import Universal Template

  1. This Universal Task requires the Resolvable Credentials feature. Check that the Resolvable Credentials Permitted system property has been set to true.
  2. Import the Universal Template into your Controller:
    1. Extract the zip file, you downloaded from the Integration Hub.

    2. In the Controller UI, select Services > Import Integration Template option.

    3. Browse to the “export” folder under the extracted files for the ZIP file (Name of the file will be unv_tmplt_*.zip) and click Import.

    4. When the file is imported successfully, refresh the Universal Templates list; the Universal Template will appear on the list.

Modifications of this integration, applied by users or customers, before or after import, might affect the supportability of this integration. For more information refer to Integration Modifications paragraph.

Configure Universal Task

For a new Universal Task, create a new task, and enter the required input fields.


Integration Modifications

Modifications applied by users or customers, before or after import, might affect the supportability of this integration. The following modifications are discouraged to retain the support level as applied for this integration.

  • Python code modifications should not be done.

  • Template Modifications

    • General Section

      • "Name", "Extension", "Variable Prefix", and "Icon" should not be changed.

    • Universal Template Details Section

      • "Template Type", "Agent Type", "Send Extension Variables", and "Always Cancel on Force Finish" should not be changed.

    • Result Processing Defaults Section

      • Success and Failure Exit codes should not be changed.

      • Success and Failure Output processing should not be changed.

    • Fields Restriction Section
      The setup of the template does not impose any restrictions. However, concerning the "Exit Code Processing Fields" section.

      1. Success/Failure exit codes need to be respected.

      2. In principle, as STDERR and STDOUT outputs can change in follow-up releases of this integration, they should not be considered as a reliable source for determining the success or failure of a task.

Event Template configuration related to “Metric Label Attributes” & “Optional Metric Labels” is allowed. However, administrators should be cautious of high cardinality scenarios that might occur.

Users and customers are encouraged to report defects, or feature requests at Stonebranch Support Desk.


Document References

This document references the following documents:

Name

Description

Universal Templates

User documentation for creating, working with and understanding Universal Templates and Integrations.

Universal Tasks

User documentation for creating Universal Tasks in the Universal Controller user interface.

Change Log

ue-kafka-producer-2.0.0 (2025-08-01)

Enhancements

  • Added: Update dependencies and code to utilize the native Confluent Python Library (#35482)
  • Added: Adding Authentication Mechanism with security_protocol: "SASL_SSL"  and sasl_mechanism: "PLAIN" (#47839)
  • Added: More debug log messages (#47654)

Breaking Changes

  • Dropped compatibility for python 3.7 and therefore Universal Agent Version 7.5. Bundled libraries include C Bindings. Compatibility is set from Universal Agent Version 7.6 onwards. 

ue-kafka-producer-1.2.0 (2023-04-28)

Enhancements

  • Added: Support for SSL Security Protocol and Client authentication over SSL.
  • Added: Extension Output result upon successful task instance execution.

Fixes

  • Fixed: Improve robustness of application (#32501)
  • Fixed: Debug mode execution enhancement (#32733)
  • Fixed: Data validation error due to missing partition-dependent fields (#32500)

ue-kafka-producer-1.1.2 (2022-03-30)

Fixes

  • Fixed: Change of Template Icon

ue-kafka-producer-1.1.1 (2021-12-09)

Fixes

  • Fixed: Minor bugfixes

ue-kafka-producer-1.1.0 (2021-10-26)

Enhancements

  • Added: Authentication enhancements (SASL SCAM over SSL)

ue-kafka-producer-1.0.0 (2021-09-27)

  • Added: Initial Release and Support for PLAINTEXT