Apache Kafka: Publish Event
Disclaimer
Your use of this download is governed by Stonebranch’s Terms of Use.
Version Information
| Template Name | Extension Name | Extension Version | Status |
|---|---|---|---|
| Apache Kafka: Publish event | ue-kafka-producer | 2 (Current 2.0.0) | Fixes and New Features are introduced |
| Apache Kafka: Publish event | ue-kafka-producer | 1 | Hot Fixes Only (Until UAC 7.5 is End of Support) |
Refer to Changelog for version history information.
Breaking changes
Apache Kafka: Publish event 2.0.0 is a major release update and introduces breaking changes that might affect some customers depending on their setup. Administrators are strongly advised to refer to Changelog for more information on the changes introduced if updating from a version earlier than 2.0.0
Overview
Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. This Universal Extension is responsible for publishing events (messages) to topics in Kafka.
Key Features
| Feature | Description |
|---|---|
| Public Kafka event | Send a message to Kafka with the capability to select |
| Authentication | Support different authentication mechanisms. |
| Serialization | Capability to automatically select the serialization method depending on the key/value message data types. |
| Message acknowledgment | Capability to control the transport of the messages by configuring the message acknowledgment strategy and the request timeout. |
Requirements
| Area | Details |
|---|---|
| Python Version | Supports Python 3.11. Tested with Agent Bundled Python |
| Universal Agent Compatibility |
|
| Universal Controller Compatibility | Universal Controller Version >= 7.6.0.0. |
| Network and Connectivity | Requires network connectivity between Universal Agent and Kafka servers. |
| Southbound System | This Integration is tested on Kafka version 3.0. Integration is expected to work with versions 2.0.0 onwards, however, this has not been tested. |
| SSL Certificate Verification |
|
Supported Actions
Action: Send a message
Publishes a message to a specified Kafka topic. Additional options like the topic partition or the message acknowledgment can be tuned.
Configuration Examples
Send Kafka Message with plaintext authentication
Plaintext authentication is performed by choosing Security Protocol = PLAINTEXT and populating the needed fields.
Send Kafka Message with SASL SSL authentication
SASL_SSL authentication is performed by choosing Security Protocol = SASL_SSL and populating the related CA bundle file path and credential field.
Send Kafka Message with SSL authentication
Client authentication is performed by choosing Security Protocol = SSL and populating the related certificate and key file path fields. In case the Client Private Key certificate is encrypted, the respective password for decryption of the key, is needed.
Message Payloads
According to the key and value datatypes of the YAML message, the extension will handle their respective serialization before sending them to Kafka. Examples of message payloads with different datatypes for key/value and how they are serialized, are followed.
| Key Value Combination | Example Configuration | Code Snippet |
|---|---|---|
Message without Value. | Yaml Snippet key: "1" | |
Integer as a Key and an Integer as a Value. Both key and value will be serialized as integers. | Yaml Snippet key: "1" value: 256 | |
String as a Key and an Integer as a Value. Key will be serialized as a string (an integer in single or double quotes will be interpreted as string) and value will be serialized as an integer | Yaml Snippet key: "1" value: 256 | |
Integer as a Key and a Parameter as a Value. The value will be serialized according to the variable datatype. | Yaml Snippet key: "1" value: 256 | |
Integer as a Key and an Object as a Value. The value which in this case is an object will be serialized as a JSON string. | Yaml Snippet key: 1
value:
employee:
position: developer
details:
name: John
surname: Smith
age: 30
|
Action Output
Output Type | Description | Examples |
|---|---|---|
EXTENSION | The extension output provides the following information:
|
Input Fields
Name | Type | Description | Version Information |
|---|---|---|---|
Action | Choice | The action performed upon the task execution. | Introduced in 1.0.0 |
| Bootstrap Servers | Text | ' This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request (more than one can be used, though, in case a server is down). Example with two servers: 'host1:port1,host2:port2'. | Introduced in 1.0.0 |
Security Protocol | Choice | The Security protocol used to communicate with Kafka brokers. Valid values are:
| Introduced in 1.0.0 |
SASL Mechanism | Choice | The Authentication mechanism when Security Protocol is configured for SASL_SSL. Valid values are:
Required when Security Protocol is "SASL_SSL". | Introduced in 1.0.0 Option "PLAIN" is introduced in 2.0.0 |
SASL User Credentials | Credentials | Credentials for SCRAM authentication. They are comprised of:
Required when Security Protocol is "SASL_SSL". | Introduced in 1.0.0 |
SSL Hostname Check | Checkbox | Flag to configure whether SSL handshake should verify that the certificate matches the broker's hostname. Required when Security Protocol is "SASL_SSL" or "SSL". Default setting is checked. | Introduced in 1.0.0 |
CA Bundle Path | Text | Path and file name of the Certificate Authority (CA) file to use in certificate verification. Used when it is required to locate the CA file if Security Protocol is configured for "SASL_SSL" or "SSL". | Introduced in 1.0.0 |
| Client Certificate Path | Text | File path of the Client's Certificate for Client authentication over SSL in PEM format. Required when Security Protocol is "SSL". | Introduced in 1.0.0 |
| Client Private Key Path | Text | File path of the Client's private key for Client authentication over SSL in PEM format. The private key can be either unencrypted or encrypted. In the case of an encrypted private key, the respective Client Private Key Password should be provided. Required when Security Protocol is "SSL". | Introduced in 1.0.0 |
| Client Private Key Password | Credential | In case the client's private key in Client Private Key Path is encrypted, the key required for decryption. The Credentials definition should be as follows.
| Introduced in 1.0.0 |
Topic | Dynamic Choice | Dynamic fetched list of topics to subscribe the consumer to. The user can select the required topic from a drop-down list. | Introduced in 1.0.0 |
Partition | Choice | The partition to which the message will be assigned to. The user can select the required partition from a drop-down list. A topic should be selected from the respective drop-down list. If no partition key is specified when producing a record, Kafka will assign messages with the same key to the same partition. When no key is specified, the message is delivered to a random partition. | Introduced in 1.0.0 |
Client Id | Text | This string is passed in each request to servers and can be used to identify specific server-side log entries that correspond to this client. The constructed client id seen on the server side is Client Id + "task instance id". If Client Id is not populated, the default value used is " Example: " | Introduced in 1.0.0 |
Message Payload | Large Text | YAML type that the Message Payload is comprised of:
| Introduced in 1.0.0 |
Show Advanced Settings | Checkbox | By checking this field, three more fields are available for advanced configuration:
Default setting is unchecked. | Introduced in 1.0.0 |
Acks | Choice | The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following options are available:
| Introduced in 1.0.0 |
Headers | Array | A list of header key/value pairs, in case more metadata about the Kafka message have to be added. | Introduced in 1.0.0 |
Request Timeout (ms) | Integer | The configuration controls the maximum amount of time the client will wait for the response of a request. | Introduced in 1.0.0 |
Exit Codes
Exit Code | Status Classification Description | Status Description |
|---|---|---|
0 | Success | SUCCESS: Task executed successfully. |
1 | Failure | Unexpected Exception: < exception message > |
2 | Failure | AUTHENTICATION_ERROR: Incorrect username or password. |
10 | Failure | CONNECTION_ERROR: No brokers available for the specified Bootstrap Server or Credentials. Execute in Debug mode for more details. |
11 | Failure | CONNECTION_TIMEOUT: Kafka Timeout Error. Connection timed out while pending for response from Kafka. |
20 | Failure | DATA_VALIDATION_ERROR: Some of the input fields cannot be validated. See STDOUT for more details. |
21 | Failure | MESSAGE_ACKS_ERROR: Not enough replicas acknowledged the message. |
STDOUT and STDERR
STDOUT of this integration is empty and STDERR provides additional information to the user, the verbosity of which is controlled by Log Level Task Definition field.
Backward compatibility is not guaranteed for the content of STDOUT/STDERR and can be changed in future versions without notice
How To
Import Universal Template
- This Universal Task requires the Resolvable Credentials feature. Check that the Resolvable Credentials Permitted system property has been set to true.
- Import the Universal Template into your Controller:
Extract the zip file, you downloaded from the Integration Hub.
In the Controller UI, select Services > Import Integration Template option.
Browse to the “export” folder under the extracted files for the ZIP file (Name of the file will be unv_tmplt_*.zip) and click Import.
When the file is imported successfully, refresh the Universal Templates list; the Universal Template will appear on the list.
Modifications of this integration, applied by users or customers, before or after import, might affect the supportability of this integration. For more information refer to Integration Modifications paragraph.
Configure Universal Task
For a new Universal Task, create a new task, and enter the required input fields.
Integration Modifications
Modifications applied by users or customers, before or after import, might affect the supportability of this integration. The following modifications are discouraged to retain the support level as applied for this integration.
Python code modifications should not be done.
Template Modifications
General Section
"Name", "Extension", "Variable Prefix", and "Icon" should not be changed.
Universal Template Details Section
"Template Type", "Agent Type", "Send Extension Variables", and "Always Cancel on Force Finish" should not be changed.
Result Processing Defaults Section
Success and Failure Exit codes should not be changed.
Success and Failure Output processing should not be changed.
Fields Restriction Section
The setup of the template does not impose any restrictions. However, concerning the "Exit Code Processing Fields" section.Success/Failure exit codes need to be respected.
In principle, as STDERR and STDOUT outputs can change in follow-up releases of this integration, they should not be considered as a reliable source for determining the success or failure of a task.
Event Template configuration related to “Metric Label Attributes” & “Optional Metric Labels” is allowed. However, administrators should be cautious of high cardinality scenarios that might occur.
Users and customers are encouraged to report defects, or feature requests at Stonebranch Support Desk.
Document References
This document references the following documents:
Name | Description |
|---|---|
| Universal Templates | User documentation for creating, working with and understanding Universal Templates and Integrations. |
| Universal Tasks | User documentation for creating Universal Tasks in the Universal Controller user interface. |
Change Log
ue-kafka-producer-2.0.0 (2025-08-01)
Enhancements
Added: Update dependencies and code to utilize the native Confluent Python Library (#35482)Added: Adding Authentication Mechanism withsecurity_protocol: "SASL_SSL" andsasl_mechanism: "PLAIN" (#47839)- Added: More debug log messages (#47654)
Breaking Changes
- Dropped compatibility for python 3.7 and therefore Universal Agent Version 7.5. Bundled libraries include C Bindings. Compatibility is set from Universal Agent Version 7.6 onwards.
ue-kafka-producer-1.2.0 (2023-04-28)
Enhancements
Added: Support for SSL Security Protocol and Client authentication over SSL.Added: Extension Output result upon successful task instance execution.
Fixes
Fixed: Improve robustness of application (#32501)Fixed: Debug mode execution enhancement (#32733)Fixed: Data validation error due to missing partition-dependent fields (#32500)
ue-kafka-producer-1.1.2 (2022-03-30)
Fixes
Fixed: Change of Template Icon
ue-kafka-producer-1.1.1 (2021-12-09)
Fixes
Fixed: Minor bugfixes
ue-kafka-producer-1.1.0 (2021-10-26)
Enhancements
Added: Authentication enhancements (SASL SCAM over SSL)
ue-kafka-producer-1.0.0 (2021-09-27)
Added: Initial Release and Support for PLAINTEXT