Panel | |
---|---|
|
Disclaimer
Your use of this download is governed by Stonebranch’s Terms of Use, which are available at Stonebranch Integration Hub - Terms of Use.
Overview
Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. This Universal Extension is responsible for publishing events (messages) to topics in Kafka.
...
Template Name | Extension Name | Extension Version |
---|---|---|
Apache Kafka: Publish event | ue-kafka-producer | 1.2.0 |
Apache Kafka: Publish event | ue-kafka-producer | 1.1.2 |
Apache Kafka: Publish event | ue-kafka-producer | 1.1.1 |
Apache Kafka: Publish event | ue-kafka-producer | 1.1.0 |
Apache Kafka: Publish event | ue-kafka-producer | 1.0.0 |
Refer to Changelog for version history information.
Software Requirements
This integration requires a Universal Agent and a Python runtime to execute the Universal Task.
...
- Actions
- Send a message to Kafka with the capability to select
,the topic, the partition, the message key, the message value, and message metadata
- Send a message to Kafka with the capability to select
- Authentication
- PLAINTEXT
- SASL_SSL SCRAM
- SSL with capability for SSL Client Authentication
- Other
- Capability to fetch topics and partitions dynamically from Kafka during task creation
- Capability to automatically select the serialization method depending on the key/value message data types.
- Capability to control the transport of the messages by configuring the message acknowledgment strategy and the request timeout.
...
Field | Input type | Default value | Type | Description |
---|---|---|---|---|
Action | Required | Send a message | Choice | The action performed upon the task execution. |
Bootstrap Servers | Required | - | Text | 'host:port' string (or list of 'host:port' strings, separated by a comma) that the producer should contact to bootstrap initial cluster metadata. This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request (more than one can be used, though, in case a server is down). Example with two servers: 'host1:port1,host2:port2'. |
Security Protocol | Required | PLAINTEXT | Choice | The Security protocol used to communicate with Kafka brokers. Valid values are:
|
SASL Mechanism | Optional | SCRAM–SHA–256 | Choice | The Authentication mechanism when Security Protocol is configured for SASL_SSL. Valid values are:
Required when Security Protocol is "SASL_SSL". |
SASL User Credentials | Optional | - | Credentials | Credentials for SCRAM authentication. They are comprised of:
Required when Security Protocol is "SASL_SSL". |
SSL Hostname Check | Optional | true | Boolean | Flag to configure whether SSL handshake should verify that the certificate matches the broker's hostname. Required when Security Protocol is "SASL_SSL" or "SSL". |
CA Bundle Path | Optional | - | Text | Path and file name of the Certificate Authority (CA) file to use in certificate verification. Used when it is required to locate the CA file if Security Protocol is configured for "SASL_SSL" or "SSL". |
Client Certificate Path | Optional | - | Text | Filepath of the Client's Certificate for Client authentication over SSL in PEM format. Required when Security Protocol is "SSL". |
Client Private Key Path | Optional | - | Text | Filepath of the Client's private key for Client authentication over SSL in PEM format. The private key can be either unencrypted or encrypted. In the case of an encrypted private key, the respective Client Private Key Password should be provided. Required when Security Protocol is "SSL". |
Client Private Key Password | Optional | - | Credential | In case the client's private key in Client Private Key Path is encrypted, the key required for decryption. The Credentials definition should be as follows.
|
Topic | Required | - | Dynamic Choice | Dynamic fetched list of topics to subscribe the consumer to. The user can select the required topic from a drop-down list. |
Partition | Optional | - | Choice | The partition to which the message will be assigned to. The user can select the required partition from a drop-down list. A topic should be selected from the respective drop-down list. If no partition key is specified when producing a record, Kafka will assign messages with the same key to the same partition. When no key is specified, the message is delivered to a random partition. |
Client Id | Optional | - | Text | This string is passed in each request to servers and can be used to identify specific server-side log entries that correspond to this client. The constructed client id seen on the server side is Client Id + "task instance id". If Client Id is not populated, the default value used is "ue-kafka-monitor-#". Example: "ue-kafka-monitor-#1635348654595881042JAYE7BKNPYVG3" |
Message Payload | Required | - | Large Text | YAML type that the Message Payload is comprised of:
|
Show Advanced Settings | Required | False | Boolean | By checking this field, three more fields are available for advanced configuration:
|
Acks | Required | Leader | Choice | The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following options are available:
|
Headers | Optional | - | Array | A list of header key/value pairs, in case more metadata about the Kafka message have to be added. |
Request Timeout (ms) | Optional | 30000 | Integer | The configuration controls the maximum amount of time the client will wait for the response of a request. |
...
The Extension Output contains Attribute result
. The result
Attribute, as displayed below, is based on the Kafka response of the related message as it is produced based on action "Send Message".
The Extension Output for this Universal Extension is in JSON format as described below.
Code Block | ||||
---|---|---|---|---|
| ||||
{ "exit_code": 0, "status_description": "SUCCESS: Task executed successfully.", "status": "SUCCESS", "invocation": { "extension": "ue-kafka-producer", "version": "1.2.0", "fields": { "action": "Send a message", "security_protocol": "SSL", "ssl_check_hostname": true, "sasl_mechanism": "SCRAM-SHA-256", "ssl_cafile": "/path/to/ca/cert.pem", "sasl_username": null, "sasl_password": null, "client_certificate_path": "/path/to/client/cert.pem", "client_private_key_path": "/path/to/client_private_key.pem", "client_private_key_password": "****", "bootstrap_servers": "kafka.bootstrap.com:9092", "topic": "test", "partition": null, "message_payload": { "key": 1, "value": "test" }, "client_id": "ue-kafka-producer-#1682197200191106591I0HGMM9JD6IUW", "request_timeout_ms": 30000, "acks": "Leader", "headers": [] } }, "result": { "topic": "test", "partition": 0, "offset": 221, "timestamp": "1682328194041" } } |
...