Apache Kafka: Event Monitor
Disclaimer
Your use of this download is governed by Stonebranch’s Terms of Use, which are available at Stonebranch Integration Hub - Terms of Use.
Overview
Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. This Universal Extension is responsible for monitoring events (messages) from topics in Kafka, and consuming them based on filtering criteria via consumer group subscription.
Version Information
Template Name | Extension Name | Extension Version |
---|---|---|
Apache Kafka: Event Monitor | ue-kafka-monitor | 1.1.0 |
Refer to Change Log for version history information.
Software Requirements
This integration requires a Universal Agent and a Python runtime to execute the Universal Task.
Software Requirements for Universal Template and Universal Task
Requires Python of version 3.7
. Tested with the Universal Agent bundled Python distribution (python version 3.7.6
).
Software Requirements for Universal Agent
Both Windows and Linux agents are supported:
Universal Agent for Windows x64 Version 7.1.0.0 and later.
Universal Agent for Linux Version 7.1.0.0 and later.
Software Requirements for Universal Controller
Universal Controller Version 7.1.0.0 and later.
Software Requirements in Cases of SSL Verification
If Python 3.7 Distribution for Universal Agent is installed, the path location of the Certificate Authority file must be provided.
If Python is installed on the local system but not with the Universal Agent Distribution:
If the SSL certificate is signed by a publicly trusted certificate authority (CA), it is not required to provide the certificate.
If the SSL certificate is self-signed, either the path location of the certificate file must be provided, or the certificate must be stored at the machine's default location for installed certificates (in this case it will be trusted for all applications).
Supported Apache Kafka versions
This Integration is tested on Kafka version 3.0. Integration is expected to work with versions 2.0.0 onwards, however, this has not been tested.
Key Features
This Universal Extension provides the following main features:
- Actions
- Monitor for a Kafka event based on filtering criteria through a consumer group subscription
Authentication
- PLAINTEXT
- SASL_SSL SCRAM
- SSL with capability for SSL Client Authentication
Other
- Capability to fetch topics dynamically from Kafka during task creation
- Capability to control the partition assignment strategy, as well as session-related timeout values
Typically, this extension can be used to monitor events from Kafka and, upon successful execution, to trigger workflows or other tasks, or just to pass information related to the Kafka event within UAC.
Import the Universal Template
To use the Universal Template, you first must perform the following steps.
This Universal Task requires the Resolvable Credentials feature. Check that the Resolvable Credentials Permitted system property has been set to true.
To import the Universal Template into your Controller, follow these instructions.
When the files have been imported successfully, refresh the Universal Templates list; the Universal Template will appear on the list.
Modifications of this integration, applied by users or customers, before or after import, might affect the supportability of this integration. For more information refer to Integration Modifications.
Configure Universal Task
For the new Universal Task type, create a new task, and enter the task-specific details that were created in the Universal Template.
Input Fields
The input fields for this Universal Extension are described below.
Field | Input type | Default value | Type | Description |
---|---|---|---|---|
Action | Required | Monitor for events | Choice | The action performed upon the task execution. |
Security Protocol | Required | PLAINTEXT | Choice | The Security protocol is used to communicate with Kafka brokers. Valid values are:
|
Bootstrap Servers | Required | - | Text | 'host:port' string (or list of 'host:port' strings, separated by a comma) that the producer should contact to bootstrap initial cluster metadata. This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request (more than one can be used, though, in case a server is down). Example with two servers: 'host1:port1,host2:port2'. |
SASL Mechanism | Optional | SCRAM–SHA–256 | Choice | The Authentication mechanism when Security Protocol is configured for SASL_SSL. Valid values are:
Required when Security Protocol is "SASL_SSL". |
SASL User Credentials | Optional | - | Credentials | Credentials for SCRAM authentication. They are comprised of:
Required when Security Protocol is "SASL_SSL". |
SSL Hostname Check | Optional | true | Boolean | Flag to configure whether SSL handshake should verify that the certificate matches the broker's hostname. Required when Security Protocol is "SASL_SSL" or "SSL". |
CA Bundle Path | Optional | - | Text | Path and file name of the Certificate Authority (CA) file to use in certificate verification. Used when it is required to locate the CA file if Security Protocol is configured for "SASL_SSL" or "SSL". |
Client Certificate Path | Optional | - | Text | Filepath of the Client's Certificate for Client authentication over SSL in PEM format. Required when Security Protocol is "SSL". |
Client Private Key Path | Optional | - | Text | Filepath of the Client's private key for Client authentication over SSL in PEM format. The private key can be either unencrypted or encrypted. In the case of an encrypted private key, the respective Client Private Key Password should be provided. Required when Security Protocol is "SSL". |
Client Private Key Password | Optional | - | Credential | In case the client's private key in Client Private Key Path is encrypted, the key required for decryption. The Credentials definition should be as follows.
|
Consumer Type | Required | Consumer Group | Choice | Type of Consumer to get messages from Kafka. Available options:
|
Consumer Group | Required | - | Text | The unique name of the consumer group to join for dynamic partition assignment and to use for fetching and committing offsets. |
Topic | Required | - | Dynamic Choice | Dynamic fetched list of topics to subscribe the consumer to. The user can select the required topic from a drop-down list. |
Client Id | Optional | - | Text | This string is passed in each request to servers and can be used to identify specific server-side log entries that correspond to this client. The constructed client id seen on the server side is Client Id + "task instance id". If Client Id is not populated, the default value used is "ue-kafka-monitor-#". Example: "ue-kafka-monitor-#1635348654595881042JAYE7BKNPYVG3" |
Start from | Required | Consumer Group Offset | Choice | Controls from which point the consumption will start. Available option is:
|
Key Deserializer | Required | String | Choice | Type of key deserialization. Available options are:
|
Value Deserializer | Required | String | Choice | Type of value deserialization. Available options are:
|
Value Filter | Optional | None | Choice | Value operators to specify the criteria used to match records and stop consuming messages. If Value Deserializer is configured for "Integer" or "Float", the available options are:
If Value Deserializer is configured for "String", the available options are:
If Value Deserializer is configured for "JSON", the available options are all that apply to the "Integer", "Float" and "String" Value Deserializer. |
Value | Optional | - | Text | The Value on which the Value Filter applies to. |
Value JSON Path | Optional | - | Text | The JSON path to locate the Value, in case Value Deserializer is configured to "JSON". The JSON Path needs to resolve either to a number or to a string. If the JSON path results in a list of numbers or a list of strings, a Kafka message is matched if at least one element from the list matches the Value. JSON Path syntax is based on jsonpath-ng python library. For examples, please refer to the official web site. |
Show Advanced Settings | Required | False | Boolean | By checking this field, more fields are available for advanced configuration. The advanced fields are: Partition Assignment Strategy, Session Timeout (ms), Auto Offset Reset, Request Timeout (ms), Heartbeat Interval (ms), Max Partition Fetch Bytes. |
Partition Assignment Strategy | Optional | Range | Choice | Partition Assignment policies to distribute partition ownership amongst consumer instances when group management is used. Available options are:
|
Session Timeout (ms) | Optional | 10000 | Integer | Controls the time it takes to detect a consumer crash and stop sending heartbeats. If more than Session Timeout milliseconds passes without the consumer sending a heartbeat to the group coordinator, it is considered dead and the group coordinator will trigger a rebalance of the consumer group to allocate partitions from the dead consumer to the other consumers in the group. |
Auto Offset Reset | Optional | Latest | Choice | Controls the behavior of the consumer when it starts reading a partition for which it does not have a committed offset or the committed offset is invalid (usually because the consumer was down for so long that the record with that offset was already aged out of the broker). Available options are:
|
Request Timeout (ms) | Optional | 305000 | Integer | Controls the maximum amount of time the client will wait for the response of a request. |
Heartbeat Interval (ms) | Optional | 3000 | Integer | The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka’s group management facilities. The value must be set lower than Session Timeout (ms). |
Max Partition Fetch Bytes | Optional | 1048576 | Integer | Controls the maximum number of bytes the server will return per partition. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition. The default is 1MB. |
Task Examples
PLAINTEXT Security Protocol Task configuration
Example of Universal Task for "PLAINTEXT" Security Protocol:
SASL_SSL Security Protocol Task Configuration
Example of Universal Task for "SASL_SSL" Security Protocol:
SSL Security Protocol Task Configuration
Example of Universal Task for "SSL" Security Protocol. Specifically, Client authentication over SSL is taking place, with an encrypted private key:
Matching a Kafka Event with String Value Deserializer
In this example, the Task monitors for events with a value that contains the string "Stonebranch":
In the extension output result, the matched event's details are printed:
{ "exit_code": 0, "status_description": "SUCCESS: Successful Task execution", "changed": false, "invocation": { ... } "result":{ "client_id":"ue-kafka-monitor-#163579072850187904243P79ZKF6CR00", "event":{ "key":1, "value":"Stonebranch Inc.", "headers":[ { "key":"test_header", "value":"1" } ], "partition":0, "offset":34, "timestamp":"2023-05-01T18:50:30.031000+00:00" } } }
Matching a Kafka Event with JSON Value Deserializer
In this example, the task monitors for events with a value of JSON format, that has a list of "phoneNumbers" and the second element of the list has an attribute type which is "home":
In the extension output result, the matched event's details are printed (only the result JSON element is shown below):
{ "exit_code": 0, "status_description": "SUCCESS: Successful Task execution", "changed": false, "invocation": { ... } "result":{ "client_id":"ue-kafka-monitor-#163578238979483604275PR4VPKK89D1", "event":{ "key":1, "value":{ "firstName":"John", "lastName":"doe", "age":26, "address":{ "streetAddress":"naist street", "city":"Nara", "postalCode":"630-0192" }, "phoneNumbers":[ { "type":"iPhone", "number":"0123-4567-8888" }, { "type":"home", "number":"0123-4567-8910" } ] }, "headers":[], "partition":0, "offset":20, "timestamp":"2021-11-01T18:31:11.415000+00:00" } } }
Matching a Kafka Event with Value Filter None
In this example, the task monitors for events and matches any Kafka event as the selected Value Filter is "None":
For more information about the Extension Output Result, please refer to the Extension Output section.
Task Output
Exit Codes
The exit codes for this Universal Extension are described below.
Exit Code | Status Classification Code | Status Classification Description | Status Description |
---|---|---|---|
0 | SUCCESS | Successful Execution | SUCCESS: Successful Task execution |
1 | FAIL | Failed Execution | FAIL: < Error Description > |
10 | CONNECTION_ERROR | Bad connection data | CONNECTION_ERROR: < Error Description > |
11 | CONNECTION_TIMEOUT | Connection timed out | CONNECTION_TIMEOUT: < Error Description > |
20 | DATA_VALIDATION_ERROR | Bad input fields validation | DATA_VALIDATION_ERROR: Some of the input fields cannot be validated. See STDERR for more details |
Extension Output
In the context of a workflow, subsequent tasks can rely on the information provided by this integration as Extension Output.
The Extension Output contains Attribute result
. The result
Attribute, as displayed below, is based on the Kafka response after the message is monitored and consumed successfully, based on Action "Monitor for events".
The Extension Output for this Universal Extension is in JSON format as described below.
{ "exit_code": 0, "status_description": "SUCCESS: Successful Task execution", "changed": false, "invocation": { "extension": "ue-kafka-monitor", "version": "1.1.0", "fields": { "action": "Monitor for events", "auto_offset_reset": "Latest", "bootstrap_servers": [ "kafka.plain.com:9092" ], "client_certificate_path": null, "client_id": "ue-kafka-monitor-#163578238979483604275PR4VPKK89D1", "client_private_key_path": null, "client_private_key_password": null, "consumer_type": "Consumer Group", "consumer_group": "float_group", "heartbeat_interval_ms": 3000, "key_deserializer": "String", "partition_assign_strategy": "RangePartitionAssignor", "request_timeout_ms": 305000, "start_from": "Consumer Group Offset", "security_protocol": "PLAINTEXT", "sasl_user_credentials_user": null, "sasl_user_credentials_password": null, "sasl_mechanism": "SCRAM-SHA-256", "ssl_check_hostname": true, "ssl_cafile": null, "session_timeout_ms": 10000, "topic": "test", "value": "3.14", "value_deserializer": "Float", "value_filter": "<=", "value_json_path": null } }, "result": { "client_id": "ue-kafka-monitor-#163578238979483604275PR4VPKK89D1", "event": { "key": "float_group_key", "value": 3.14, "headers": [], "partition": 0, "offset": 46, "timestamp": "2023-05-08T11:38:38.026000+00:00" } } }
STDOUT and STDERR
STDOUT and STDERR provide additional information to User. The populated content can be changed in future versions of this extension without notice. Backward compatibility is not guaranteed.
Integration Modifications
Modifications applied by users or customers, before or after import, might affect the supportability of this integration. The following modifications are discouraged to retain the support level as applied for this integration.
- Python code modifications should not be done.
- Template Modifications
- General Section
- "Name", "Extension", "Variable Prefix", "Icon" should not be changed.
- Universal Template Details Section
- "Template Type", "Agent Type", "Send Extension Variables", "Always Cancel on Force Finish" should not be changed.
- Result Processing Defaults Section
- Success and Failure Exit codes should not be changed.
- Success and Failure Output processing should not be changed.
- Fields Restriction Section
The setup of the template does not impose any restrictions, However with respect to "Exit Code Processing Fields" section.- Success/Failure exit codes need to be respected.
- In principle, as STDERR and STDOUT outputs can change in follow-up releases of this integration, they should not be considered as a reliable source for determining success or failure of a task.
- General Section
Users and customers are encouraged to report defects, or feature requests at Stonebranch Support Desk.
Document References
This document references the following documents:
Name | Description |
---|---|
User documentation for creating Universal Templates in the Universal Controller user interface. | |
User documentation for creating Universal Tasks in the Universal Controller user interface. | |
How Output Functions can be used to extract information from extension output. | |
JSON Path predicate notation. |
Change Log
ue-kafka-monitor-1.1.0 (2023-05-10)
Enhancements
Added
: Support for Client authentication over SSL (#32826)
Fixes
Fixed
: Improve robustness of application (#32947)
ue-kafka-monitor-1.0.2 (2022-03-30)
Enhancements
Fixed
: Change of Template Icon
ue-kafka-monitor-1.0.1 (2021-12-10)
Enhancements
Added
: Enable Always Cancel On Force Finish in Universal Template (#26607)
Fixes
Fixed
: Minor bugfixes (#26555, #26574)
ue-kafka-monitor-1.0.0 (2021-11-02)
Added
: Initial Release (#25615)