The KafkaConsumer, KafkaRead, and KafkaProducer nodes support local environment message tree
variables, which you can use to dynamically alter the node properties.
The following table shows the elements in the
LocalEnvironment.Destination.Kafka.Output message tree, which can be used to
override the
Topic name property in the
KafkaProducer node. The table includes an example of how to set
the values by using ESQL; however, you can also set them by using transformation nodes, such as the
Mapping node. To access the LocalEnvironment from a
Mapping node, see
Customizing a message map to include a message assembly component.
Table 1. Input local environment properties
Element name |
Type |
Description |
topicName |
string |
The name of the topic where the message will be published. This environment
variable overrides the Topic name property on the node. For
example:SET OutputLocalEnvironment.Destination.Kafka.Output.topicName = 'customers';
|
key |
string |
A string value to associate with the message. When the message is being
published to a topic with multiple partitions, a hash of the key value is used to select the
partition on which the message is stored. If no key is provided, messages are distributed across the
topic partitions by Kafka. All messages published using the same key value are sent to the same
partition. |
The following table shows the data that is written by the
KafkaProducer node to the LocalEnvironment when propagating an
output message:
Table 2. Output local environment properties generated by the KafkaProducer node
Element |
Type |
Description |
LocalEnvironment.WrittenDestination.Kafka.partition |
string |
The partition (in the topic) that contains the message. |
LocalEnvironment.WrittenDestination.Kafka.topicName |
string |
The topic where the message was published. |
LocalEnvironment.WrittenDestination.Kafka.offset |
integer |
The offset number in the partition, for the message that was
published. |
LocalEnvironment.WrittenDestination.Kafka.checksum |
integer |
The checksum of the message. |
LocalEnvironment.WrittenDestination.Kafka.key |
string |
The key value that was provided in the input local environment. |
After a KafkaConsumer node has read a message, these
properties are put into the LocalEnvironment. The KafkaRead
node can take these values as input for re-reading the message in a failure scenario for which the
KafkaRead node is being used (during catch processing).
The following table shows the data that is written by the
KafkaConsumer node to the LocalEnvironment when propagating an
output message:
Table 3. Output local environment properties generated by the KafkaConsumer node
Element |
Type |
Description |
LocalEnvironment.Kafka.Input.partition |
string |
The partition (in the topic) that contains the message. |
LocalEnvironment.Kafka.Input.topicName |
string |
The topic where the message was published. |
LocalEnvironment.Kafka.Input.offset |
integer |
The offset number in the partition, for the message that was received. |
LocalEnvironment.Kafka.Input.checksum |
integer |
The checksum of the message. |
LocalEnvironment.Kafka.Input.key |
string |
The key (if any) that was associated with the received message. This field
exists only if a key was associated with the message when it was published. The received key value
is received as a string by the KafkaConsumer node. |
The following table shows elements that can be used to override
the properties in the
KafkaRead node:
Table 4. Input local environment properties that can be used by the KafkaRead node
Element name |
Type |
Description |
LocalEnvironment.Destination.Kafka.Read.partitionNumber |
string |
The partition (in the topic) that contains the message. |
LocalEnvironment.Destination.Kafka.Read.topicName |
string |
The topic where the message was published. |
LocalEnvironment.Destination.Kafka.Read.offset |
integer |
The offset number in the partition for the message that was received. |
LocalEnvironment.Destination.Kafka.Read.timeoutInterval |
integer |
The timeout used for reading the message from the Kafka topic. |
The following table shows the data that is written by the
KafkaRead node to the LocalEnvironment after reading a
message:
Table 5. Data written by the KafkaRead node to the LocalEnvironment
Element name |
Type |
Description |
LocalEnvironment.Kafka.Read.partitionNumber |
string |
The partition (in the topic) that contains the message. |
LocalEnvironment.Kafka.Read.topicName |
string |
The topic where the message was published. |
LocalEnvironment.Kafka.Read.offset |
integer |
The offset number in the partition for the message that was received. |
LocalEnvironment.Kafka.Read.checksum |
integer |
The checksum of the message. |
LocalEnvironment.Kafka.Read.key |
string |
The key (if any) that was associated with the received message. This field
exists only if a key was associated with the message when it was published. The received key value
is received as a string by the KafkaRead node. |
You can use Kafka custom header properties to add metadata to Kafka messages for use
during message processing. These properties are set in the LocalEnvironment, in a folder called
KafkaHeader
. For more information, see Setting and retrieving Kafka custom header properties.