Apache Kafka: Event Monitor

Apache Kafka: Event Monitor

Disclaimer

Your use of this download is governed by Stonebranch’s Terms of Use.

Version Information

Template NameExtension NameVersionStatus
Apache Kafka: Event Monitorue-kafka-monitor1 (Current 1.2.0)Fixes and new Features are introduced.

Refer to Changelog for version history information.

Overview

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. This Universal Extension is responsible for monitoring events (messages) from topics in Kafka, and consuming them based on filtering criteria via consumer group subscription.

Key Features

Typically, this extension can be used to monitor events from Kafka and, upon successful execution, trigger workflows or other tasks, or simply pass information related to the Kafka event within UAC.

FeatureDescription

Monitor for events

  • Monitor for a Kafka event based on filtering criteria through a consumer group subscription

Authentication

  • PLAINTEXT
  • SASL_SSL (PLAIN, SCRAM)
  • SSL with capability for SSL Client Authentication

Other

  • Capability to fetch topics dynamically from Kafka during task creation
  • Capability to control the partition assignment strategy, as well as session-related timeout values

Requirements

This integration requires a Universal Agent and a Python runtime to execute the Universal Task.

AreaDetails
Python VersionRequires Python of version 3.11.  Tested with the Universal Agent bundled Python distribution (python versions: 3.11.6, 3.11.9 and 3.11.13). 
Universal Agent Compatibility
  • Compatible with Universal Agent for Windows x64 and version >= 7.6.0.0.

  • Compatible with Universal Agent for Linux and version >= 7.6.0.0.

Universal Controller CompatibilityUniversal Controller Version >= 7.6.0.0.
Network and Connectivity

Only in cases of SSL Verification:

  • If Python 3.11 Distribution for Universal Agent is installed, the path location of the Certificate Authority file must be provided.

  • If Python is installed on the local system but not with the Universal Agent Distribution:

    • If the SSL certificate is signed by a publicly trusted certificate authority (CA), it is not required to provide the certificate.

    • If the SSL certificate is self-signed, either the path location of the certificate file must be provided, or the certificate must be stored at the machine's default location for installed certificates (in this case it will be trusted for all applications).

Apache Kafka CompatibilityThis Integration is tested on Kafka version 3.0.  Integration is expected to work with versions 2.0.0 onwards, however, this has not been tested.

Supported Actions

There is one Top-Level action controlled by the Action Field:

  • Monitor for events

Action: Monitor for events

This action is responsible for monitoring events from topics in Kafka and consuming them based on filtering criteria via consumer group subscription.

Action Output

Output TypeDescriptionExamples
EXTENSION

In the context of a workflow, subsequent tasks can rely on the information provided by this integration as Extension Output.

The extension output provides the following information:

  • exit_code, status_description: General info regarding the task execution. For more information, refer to the exit code table

  • invocation.fields: The task configuration used for this task execution.

  • result: Information based on the Kafka response after the message is monitored and consumed successfully, based on Action "Monitor for events".

The Extension Output for this Universal Extension is in JSON format as shown in the example.

Extension Output Success
{
    "exit_code": 0,
    "status_description": "SUCCESS: Successful Task execution",
    "changed": false,
    "invocation": {
        "extension": "ue-kafka-monitor",
        "version": "1.1.0",
        "fields": {
            "action": "Monitor for events",
            "auto_offset_reset": "Latest",
            "bootstrap_servers": [
                "kafka-host.org:9092"
            ],
            "client_certificate_path": null,
            "client_id": "ue-kafka-monitor-#163578238979483604275PR4VPKK89D1",
            "client_private_key_path": null,
            "client_private_key_password": null,
            "consumer_type": "Consumer Group",
            "consumer_group": "float_group",
            "heartbeat_interval_ms": 3000,
            "key_deserializer": "String",
            "partition_assign_strategy": "RangePartitionAssignor",
            "request_timeout_ms": 305000,
            "start_from": "Consumer Group Offset",
            "security_protocol": "PLAINTEXT",
            "sasl_user_credentials_user": null,
            "sasl_user_credentials_password": null,
            "sasl_mechanism": "SCRAM-SHA-256",
            "ssl_check_hostname": true,
            "ssl_cafile": null,
            "session_timeout_ms": 10000,
            "topic": "test",
            "value": "3.14",
            "value_deserializer": "Float",
            "value_filter": "<=",
            "value_json_path": null
        }
    },
    "result": {
        "client_id": "ue-kafka-monitor-#163578238979483604275PR4VPKK89D1",
        "event": {
            "key": "float_group_key",
            "value": 3.14,
            "headers": [],
            "partition": 0,
            "offset": 46,
            "timestamp": "2023-05-08T11:38:38.026000+00:00"
        }
    }
}
STDOUT

STDOUT provides additional information to the User. The populated content can be changed in future versions of this extension without notice.

Backward compatibility is not guaranteed.

STDOUT Success Example
Starting task ue-kafka-monitor. PID: 3996995
STDERR

STDERR provides additional information to the User. The populated content can be changed in future versions of this extension without notice.

Backward compatibility is not guaranteed.

STDERR Success Example
2025-10-28 15:15:11,355 - 140319962146560 AsyEvent[EXTENSION_START] - extension.py[94] INFO: Extension Information: ue-kafka-monitor-1.1.0
2025-10-28 15:15:11,356 - 140319962146560 AsyEvent[EXTENSION_START] - extension.py[100] INFO: System Information: Python Version: 3.11.6 (main, Nov 9 2023, 08:01:02) [GCC 8.5.0 20210514 (Red Hat 8.5.0-4)], system: Linux, release: 4.18.0-425.13.1.el8_7.x86_64, version: #1 SMP Tue Feb 21 19:25:54 UTC 2023, machine type: x86_64
2025-10-28 15:15:11,379 - 140319962146560 AsyEvent[EXTENSION_START] - conn.py[395] INFO: <BrokerConnection client_id=kafka-python-2.1.6, node_id=bootstrap-0 host=kafka-host.org:9092 <connecting> [IPv4 ('123.123.100.100', 9092)]>: connecting to kafka-host.org:9092 [('123.123.100.100', 9092) IPv4]
2025-10-28 15:15:11,391 - 140319962146560 AsyEvent[EXTENSION_START] - conn.py[615] INFO: <BrokerConnection client_id=kafka-python-2.1.6, node_id=bootstrap-0 host=kafka-host.org:9092 <checking_api_versions_recv> [IPv4 ('123.123.100.100', 9092)]>: Broker version identified as 2.6
2025-10-28 15:15:11,391 - 140319962146560 AsyEvent[EXTENSION_START] - conn.py[456] INFO: <BrokerConnection client_id=kafka-python-2.1.6, node_id=bootstrap-0 host=kafka-host.org:9092 <connected> [IPv4 ('123.123.100.100', 9092)]>: Connection complete.
2025-10-28 15:15:11,403 - 140319962146560 AsyEvent[EXTENSION_START] - conn.py[395] INFO: <BrokerConnection client_id=kafka-python-2.1.6, node_id=0 host=kafka-host.org:9092 <connecting> [IPv4 ('123.123.100.100', 9092)]>: connecting to kafka-host.org:9092 [('123.123.100.100', 9092) IPv4]
2025-10-28 15:15:11,413 - 140319962146560 AsyEvent[EXTENSION_START] - conn.py[615] INFO: <BrokerConnection client_id=kafka-python-2.1.6, node_id=0 host=kafka-host.org:9092 <checking_api_versions_recv> [IPv4 ('123.123.100.100', 9092)]>: Broker version identified as 2.6
2025-10-28 15:15:11,413 - 140319962146560 AsyEvent[EXTENSION_START] - conn.py[456] INFO: <BrokerConnection client_id=kafka-python-2.1.6, node_id=0 host=kafka-host.org:9092 <connected> [IPv4 ('123.123.100.100', 9092)]>: Connection complete.
2025-10-28 15:15:11,452 - 140319962146560 AsyEvent[EXTENSION_START] - conn.py[395] INFO: <BrokerConnection client_id=kafka-python-2.1.6, node_id=0 host=kafka-host.org:9092 <connecting> [IPv4 ('123.123.100.100', 9092)]>: connecting to kafka-host.org:9092 [('123.123.100.100', 9092) IPv4]
2025-10-28 15:15:11,553 - 140319962146560 AsyEvent[EXTENSION_START] - conn.py[456] INFO: <BrokerConnection client_id=kafka-python-2.1.6, node_id=0 host=kafka-host.org:9092 <connected> [IPv4 ('123.123.100.100', 9092)]>: Connection complete.
2025-10-28 15:15:11,553 - 140319962146560 AsyEvent[EXTENSION_START] - conn.py[936] INFO: <BrokerConnection client_id=kafka-python-2.1.6, node_id=bootstrap-0 host=kafka-host.org:9092 <connected> [IPv4 ('123.123.100.100', 9092)]>: Closing connection. 
2025-10-28 15:15:11,661 - 140319962146560 AsyEvent[EXTENSION_START] - cluster.py[393] INFO: Group coordinator for robottest1 is BrokerMetadata(nodeId='coordinator-0', host='kafka-host.org', port=9092, rack=None)
2025-10-28 15:15:11,661 - 140319962146560 AsyEvent[EXTENSION_START] - base.py[729] INFO: Discovered coordinator coordinator-0 for group robottest1
2025-10-28 15:15:11,661 - 140319962146560 AsyEvent[EXTENSION_START] - base.py[780] INFO: Starting new heartbeat thread
2025-10-28 15:15:11,662 - 140319962146560 AsyEvent[EXTENSION_START] - consumer.py[361] INFO: Revoking previously assigned partitions set() for group robottest1
2025-10-28 15:15:11,680 - 140319962146560 AsyEvent[EXTENSION_START] - conn.py[395] INFO: <BrokerConnection client_id=kafka-python-2.1.6, node_id=coordinator-0 host=kafka-host.org:9092 <connecting> [IPv4 ('123.123.100.100', 9092)]>: connecting to kafka-host.org:9092 [('123.123.100.100', 9092) IPv4]
2025-10-28 15:15:11,783 - 140319962146560 AsyEvent[EXTENSION_START] - conn.py[456] INFO: <BrokerConnection client_id=kafka-python-2.1.6, node_id=coordinator-0 host=kafka-host.org:9092 <connected> [IPv4 ('123.123.100.100', 9092)]>: Connection complete.
2025-10-28 15:15:11,883 - 140319962146560 AsyEvent[EXTENSION_START] - base.py[489] INFO: (Re-)joining group robottest1
2025-10-28 15:15:11,890 - 140319962146560 AsyEvent[EXTENSION_START] - base.py[552] INFO: Elected group leader -- performing partition assignments using range
2025-10-28 15:15:11,897 - 140319962146560 AsyEvent[EXTENSION_START] - base.py[380] INFO: Successfully joined group robottest1 with generation 1
2025-10-28 15:15:11,897 - 140319962146560 AsyEvent[EXTENSION_START] - subscription_state.py[254] INFO: Updated partition assignment: [TopicPartition(topic='StrKeyFloatValueMatch_lin-uapython-7.8', partition=0)]
2025-10-28 15:15:11,898 - 140319962146560 AsyEvent[EXTENSION_START] - consumer.py[252] INFO: Setting newly assigned partitions {TopicPartition(topic='StrKeyFloatValueMatch_lin-uapython-7.8', partition=0)} for group robottest1
2025-10-28 15:15:17,457 - 140319962146560 AsyEvent[EXTENSION_START] - extension.py[192] INFO: New matching message found!
2025-10-28 15:15:17,457 - 140319962146560 AsyEvent[EXTENSION_START] - base.py[794] INFO: Stopping heartbeat thread
2025-10-28 15:15:20,458 - 140319962146560 AsyEvent[EXTENSION_START] - base.py[823] INFO: Leaving consumer group (robottest1).
2025-10-28 15:15:20,465 - 140319962146560 AsyEvent[EXTENSION_START] - conn.py[936] INFO: <BrokerConnection client_id=kafka-python-2.1.6, node_id=0 host=kafka-host.org:9092 <connected> [IPv4 ('123.123.100.100', 9092)]>: Closing connection. 
2025-10-28 15:15:20,466 - 140319962146560 AsyEvent[EXTENSION_START] - conn.py[936] INFO: <BrokerConnection client_id=kafka-python-2.1.6, node_id=coordinator-0 host=kafka-host.org:9092 <connected> [IPv4 ('123.123.100.100', 9092)]>: Closing connection. 

Configuration Examples

Example: PLAINTEXT Security Protocol Task configuration

Example of Universal Task for "PLAINTEXT" Security Protocol:

Example: SASL_SSL Security Protocol Task Configuration

Example of Universal Task for "SASL_SSL" Security Protocol:

Example: SSL Security Protocol Task Configuration

Example of Universal Task for "SSL" Security Protocol. Specifically, Client authentication over SSL takes place with an encrypted private key:

Example: Matching a Kafka Event with String Value Deserializer

In this example, the Task monitors for events with a value that contains the string "Stonebranch":

In the extension output result, the matched event's details are printed:

Extension Output Result
{
   "exit_code": 0,
   "status_description": "SUCCESS: Successful Task execution",
   "changed": false,
   "invocation": {  ... }
   "result":{
      "client_id":"ue-kafka-monitor-#163579072850187904243P79ZKF6CR00",
      "event":{
         "key":1,
         "value":"Stonebranch Inc.",
         "headers":[
            {
               "key":"test_header",
               "value":"1"
            }
         ],
         "partition":0,
         "offset":34,
         "timestamp":"2023-05-01T18:50:30.031000+00:00"
      }
   }
}

Example: Matching a Kafka Event with JSON Value Deserializer

In this example, the task monitors for events in with a value in JSON format, which has a list of "phoneNumbers", whose second element has an attribute type of "home":

In the extension output result, the matched event's details are printed (only the result JSON element is shown below):

Extension Output Result
{
   "exit_code": 0,
   "status_description": "SUCCESS: Successful Task execution",
   "changed": false,
   "invocation": {  ... }
   "result":{
      "client_id":"ue-kafka-monitor-#163578238979483604275PR4VPKK89D1",
      "event":{
         "key":1,
         "value":{
            "firstName":"John",
            "lastName":"doe",
            "age":26,
            "address":{
               "streetAddress":"naist street",
               "city":"Nara",
               "postalCode":"630-0192"
            },
            "phoneNumbers":[
               {
                  "type":"iPhone",
                  "number":"0123-4567-8888"
               },
               {
                  "type":"home",
                  "number":"0123-4567-8910"
               }
            ]
         },
         "headers":[],
         "partition":0,
         "offset":20,
         "timestamp":"2021-11-01T18:31:11.415000+00:00"
      }
   }
}

Example: Matching a Kafka Event with Value Filter None

In this example, the task monitors for events and matches any Kafka event as the selected Value Filter is "None":

For more information about the Extension Output Result, refer to the Extension Output section.

Input Fields

The input fields for this Universal Extension are described below.

Field

Input type

Default value

Type

Description

Action

Required

Monitor for events

Choice

The action performed upon the task execution.

Security Protocol

Required

PLAINTEXT

Choice

The Security protocol is used to communicate with Kafka brokers.

Valid values are:

  • PLAINTEXT

  • SASL_SSL

  • SSL

Bootstrap Servers

Required

-

Text

'host:port' string (or list of 'host:port' strings, separated by commas) that the producer should contact to bootstrap initial cluster metadata.

This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request (more than one can be used, though, in case a server is down).

Example with two servers: 'host1:port1,host2:port2'.

SASL Mechanism

Optional

SCRAM–SHA–256

Choice

The Authentication mechanism when Security Protocol is configured for SASL_SSL.

Valid values are:

  • PLAIN: credentials are not hashed.
  • SCRAM–SHA–256: credentials are hashed with SHA–256 algorithm (requires a 32bit processor).

  • SCRAM–SHA–512: credentials are hashed with SHA–512 algorithm (requires a 64bit processor).

Required when Security Protocol is "SASL_SSL".

SASL User Credentials

Optional

-

Credentials

Credentials for SCRAM authentication.

They are comprised of:

  • Kafka host's username as "Runtime username".
  • Kafka host's password as "Runtime password".

Required when Security Protocol is "SASL_SSL".

SSL Hostname Check

Optional

true

Boolean

Flag to configure whether SSL handshake should verify that the certificate matches the broker's hostname.

Required when Security Protocol is "SASL_SSL" or "SSL".

CA Bundle Path

Optional

-

Text

Path and file name of the Certificate Authority (CA) file to use in certificate verification.

Used when it is required to locate the CA file if Security Protocol is configured for "SASL_SSL" or "SSL".

Client Certificate PathOptional-Text

Filepath of the Client's Certificate for Client authentication over SSL in PEM format.

Required when Security Protocol is "SSL".

Client Private Key PathOptional-Text

Filepath of the Client's private key for Client authentication over SSL in PEM format.

The private key can be either unencrypted or encrypted. In the case of an encrypted private key, the respective Client Private Key Password should be provided. 

Required when Security Protocol is "SSL".

Client Private Key PasswordOptional-Credential

In case the client's private key in Client Private Key Path is encrypted, the key required for decryption. The Credentials definition should be as follows.

  • Key Password as "Runtime Password".

Consumer Type

Required

Consumer Group

Choice

Type of Consumer to get messages from Kafka. Available options:

  • Consumer Group: a consumer is part of a consumer group.

Consumer Group

Required

-

Text

The unique name of the consumer group to join for dynamic partition assignment and to use for fetching and committing offsets.

Topic

Required

-

Dynamic Choice

Dynamic fetched list of topics to subscribe the consumer to.

The user can select the required topic from a drop-down list.

Client Id

Optional

-

Text

This string is passed in each request to servers and can be used to identify specific server-side log entries that correspond to this client.

The constructed client id seen on the server side is Client Id + "task instance id".

If Client Id is not populated, the default value used is "ue-kafka-monitor-#". Example: "ue-kafka-monitor-#1635348654595881042JAYE7BKNPYVG3"

Start from

Required

Consumer Group Offset

Choice

Controls from which point the consumption will start.

Available option is:

  • Consumer Group Offset: start consuming from the committed offset of the group.

Key Deserializer

Required

String

Choice

Type of key deserialization.

Available options are:

  • Integer

  • String

Value Deserializer

Required

String

Choice

Type of value deserialization.

Available options are:

  • Integer

  • Float

  • String

  • JSON

Value Filter

Optional

None

Choice

Value operators to specify the criteria used to match records and stop consuming messages.

If Value Deserializer is set to "Integer" or "Float", the available options are:

  • None (all messages are matched)

  • >= (greater than or equal to)

  • <= (less than or equal to)

  • = (equal to)

  • != (not equal to)

If Value Deserializer is set to "String", the available options are:

  • None (all messages are matched)

  • Contains

  • Does Not Contain

  • Equals

  • Is Blank

  • Is Not Blank

If Value Deserializer is set to "JSON", the available options are all that apply to the "Integer", "Float", and "String" Value Deserializer.

Value

Optional

-

Text

The Value on which the Value Filter applies to.

Value JSON Path

Optional

-

Text

The JSON path to locate the Value, if Value Deserializer is set to "JSON".

The JSON path needs to resolve either to a number or to a string. If the JSON path results in a list of numbers or a list of strings, a Kafka message is matched if at least one element from the list matches the Value.

JSON Path syntax is based on jsonpath-ng python library. For examples, please refer to the official web site.

Show Advanced Settings

Required

False

Boolean

By checking this field, more fields are available for advanced configuration.

The advanced fields are: Partition Assignment Strategy, Session Timeout (ms), Auto Offset Reset, Request Timeout (ms), Heartbeat Interval (ms), Max Partition Fetch Bytes.

Partition Assignment Strategy

Optional

Range

Choice

Partition Assignment policies to distribute partition ownership amongst consumer instances when group management is used.

Available options are:

  • Range

  • Round Robin

Session Timeout (ms)

Optional

10000

Integer

Controls the time it takes to detect a consumer crash and stop sending heartbeats.

If more than Session Timeout milliseconds pass without the consumer sending a heartbeat to the group coordinator, it is considered dead, and the group coordinator will trigger a rebalance of the consumer group to allocate partitions from the dead consumer to the other consumers in the group.

Auto Offset Reset

Optional

Latest

Choice

Controls the behavior of the consumer when it starts reading a partition for which it does not have a committed offset or the committed offset is invalid (usually because the consumer was down for so long that the record with that offset was already aged out of the broker).

Available options are:

  • Earliest - Moves to the oldest available message.

  • Latest - Moves to the most recent.

Request Timeout (ms)

Optional

305000

Integer

Controls the maximum amount of time the client will wait for the response of a request.

Heartbeat Interval (ms)

Optional

3000

Integer

The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka’s group management facilities.

The value must be set lower than Session Timeout (ms).

Max Partition Fetch Bytes

Optional

1048576

Integer

Controls the maximum number of bytes the server will return per partition.

This size must be at least as large as the maximum message size the server allows, or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition.

The default is 1MB.

Environment Variables

Environment Variables can be set from the Environment Variables table in the task definition. The following environment variables can affect the behavior of the extension.

Environment Variable NameDescriptionVersion Information

UE_KAFKA_POLL_TIMEOUT_MS

Milliseconds spent waiting in poll if data is not available in the buffer.

Must be an integer greater than 0.

Default value is 1000.

Introduced in 1.2.0
UE_KAFKA_MAX_READ_BUFFER_LENGTH

The maximum number of records to fill the buffer with from a single poll.

Must be an integer greater than 0.

Default value is 10.

Introduced in 1.2.0
UE_KAFKA_API_VERSION_AUTO_TIMEOUT_MS

Milliseconds the Kafka client will wait to automatically detect the broker API version before raising an error.

Default value is 2000.

Introduced in 1.2.0

Exit Codes

The exit codes for this Universal Extension are described below.

Exit Code

Status Classification Code

Status Classification Description

Status Description

0

SUCCESS

Successful Execution

SUCCESS: Successful Task execution

1

FAIL

Failed Execution

FAIL: < Error Description >

10

CONNECTION_ERROR

Bad connection data

CONNECTION_ERROR: < Error Description >

11

CONNECTION_TIMEOUT

Connection timed out

CONNECTION_TIMEOUT: < Error Description >

20

DATA_VALIDATION_ERROR

Bad input fields validation

DATA_VALIDATION_ERROR: Some of the input fields cannot be validated. See STDERR for more details

How To

Import Universal Template

To use the Universal Template, you first must perform the following steps.

  1. This Universal Task requires the Resolvable Credentials feature. Check that the Resolvable Credentials Permitted system property has been set to true.

  2. Import the Universal Template into your Controller:

    1. Extract the zip file, you downloaded from the Integration Hub.

    2. In the Controller UI, select Services > Import Integration Template option.

    3. Browse to the “export” folder under the extracted files for the ZIP file (Name of the file will be unv_tmplt_*.zip) and click Import.

    4. When the file is imported successfully, refresh the Universal Templates list; the Universal Template will appear on the list.

Modifications of this integration, applied by users or customers, before or after import, might affect the supportability of this integration. For more information refer to Integration Modifications.

Configure Universal Task

For a new Universal Task, create a new task, and enter the required input fields.

Integration Modifications

Modifications applied by users or customers, before or after import, might affect the supportability of this integration. The following modifications are discouraged to retain the support level as applied for this integration.

  • Python code modifications should not be done.
  • Template Modifications
    • General Section
      • "Name", "Extension", "Variable Prefix", "Icon" should not be changed.
    • Universal Template Details Section
      • "Template Type", "Agent Type", "Send Extension Variables", "Always Cancel on Force Finish" should not be changed.
    • Result Processing Defaults Section
      • Success and Failure Exit codes should not be changed.
      • Success and Failure Output processing should not be changed.
    • Fields Restriction Section
      The setup of the template does not impose any restrictions, However with respect to "Exit Code Processing Fields" section.
      1. Success/Failure exit codes need to be respected.
      2. In principle, as STDERR and STDOUT outputs can change in follow-up releases of this integration, they should not be considered as a reliable source for determining success or failure of a task.

Users and customers are encouraged to report defects, or feature requests at Stonebranch Support Desk.

Document References

This document references the following documents:

Name

Description

Universal Templates

User documentation for creating Universal Templates in the Universal Controller user interface.

Universal Tasks

User documentation for creating Universal Tasks in the Universal Controller user interface.

Task Instance Output By JsonPath

How Output Functions can be used to extract information from extension output.

JSON Path Syntax

JSON Path predicate notation.

Changelog 

ue-kafka-monitor-1.2.0 (2025-11-13) 

Enhancements

  •  Upgraded kafka-python to version 2.1.6, bringing major enhancements in stability, performance, and compatibility, along with numerous bug fixes (#50522)
  •  Improved log messages (#50656)
  •  Added support for PLAIN mechanism under SASL authentication (#48743)
  •  Added validation step after initializing a consumer, for improved stability and robustness (#50547)
  •  Added environment variables for configuring Kafka connection and polling parameters (#50621, #50889)
  •  Optimized the message retrieval logic for better efficiency (#50621)
  •  Optimized the pipeline stages flow for better efficiency  (#50621)

Fixes

  • Improved consumer shutdown handling to ensure tasks terminate gracefully without leaving open connections. (#50577)
  • Fixed bug where the last message was not committed when consuming multiple messages. (#50622)

ue-kafka-monitor-1.1.0 (2023-05-10) 

Enhancements

  • Added: Support for Client authentication over SSL (#32826)

Fixes

  • Fixed: Improve robustness of application (#32947) 

ue-kafka-monitor-1.0.2 (2022-03-30) 

Enhancements

  • Fixed: Change of Template Icon

ue-kafka-monitor-1.0.1 (2021-12-10) 

Enhancements

  • Added: Enable Always Cancel On Force Finish in Universal Template (#26607)

Fixes

  • Fixed: Minor bugfixes (#26555, #26574)

ue-kafka-monitor-1.0.0 (2021-11-02) 

  • Added: Initial Release (#25615)