KafkaConsumer node
Use the KafkaConsumer node to connect to the Kafka messaging system and to receive messages that are published on a Kafka topic. IBM® App Connect Enterprise can then propagate these messages in a message flow.
This topic contains the following sections:
Purpose
You can use a KafkaConsumer node in a message flow to subscribe to a specified topic on a Kafka server. The KafkaConsumer then receives messages published on the Kafka topic as input to the message flow. Each KafkaConsumer node consumes messages from a single topic; however, if the topic is defined to have multiple partitions, the KafkaConsumer node can receive messages from any of the partitions. For more information, see Processing Kafka messages.
For information about the supported versions of Kafka, see IBM App Connect Enterprise system requirements. For more information about Kafka version compatibility, see the Apache Kafka documentation.
The KafkaConsumer node handles messages in the following message domains:
- DFDL
- XMLNSC
- JSON
- BLOB
- MIME
- XMLNS
- MRM
The KafkaConsumer node is contained in the Kafka drawer of the palette, and is represented in the IBM App Connect Enterprise Toolkit by the following icon:
Using the KafkaConsumer node in a message flow
Use the KafkaConsumer node in a message flow to receive messages that are published to topics that are hosted on a Kafka server. The received messages can then be processed in a message flow. For information about how to use the node, see Consuming messages from Kafka topics.
For information about reading individual messages from a Kafka topic, see KafkaRead node.
Terminals and properties
The KafkaConsumer node terminals are described in the following table.
Terminal | Description |
---|---|
Failure | The output terminal to which the message is routed if a failure is detected during processing in the node. |
Out | The output terminal to which the message is routed if it represents successful completion of the Kafka request, and if further processing is required within this message flow. |
Catch | The output terminal to which the message is routed if an exception is thrown downstream and caught by this node. Exceptions are caught only if this terminal is attached. |
The following tables describe the node properties. The column headed M indicates whether the property is mandatory (marked with an asterisk on the panel if you must enter a value when no default is defined); the column headed C indicates whether the property is configurable (you can change the value when you add the message flow to the BAR file to deploy it).
The KafkaConsumer node Description properties are described in the following table.
Property | M | C | Default | Description |
---|---|---|---|---|
Node name | No | No | The node type, KafkaConsumer | The name of the node. |
Short description | No | No | A brief description of the node. | |
Long description | No | No | Text that describes the purpose of the node in the message flow. |
The KafkaConsumer node Basic properties are described in the following table.
Property | M | C | Default | Description | mqsiapplybaroverride command property |
---|---|---|---|---|---|
Topic name | Yes | Yes | The name of the Kafka topic to which you want to subscribe. Only one topic name can be specified. The name can be up to 255 characters in length, and can include the following characters: a-z, A-Z, 0-9, . (dot), _ (underscore), and - (dash). | topicName | |
Bootstrap servers | Yes | Yes | A list of host/port pairs, separated by commas, to use for establishing the initial connection to the Kafka cluster. | bootstrapServers | |
Consumer group ID | Yes | Yes | A string that identifies the consumer group to which this consumer belongs. This value can be up to 255 characters in length, and can include the following characters: a-z, A-Z, 0-9, . (dot), _ (underscore), and - (dash). | groupId | |
Default message offset | Yes | Yes | Latest | The message offset for the consumer if no message offset exists, such as the
first time the consumer starts, or if the message offset is not valid. Possible values are:
|
initialOffset |
Commit message offset in Kafka | No | Yes | Yes | Automatically saves the current message offset in Kafka, which allows messages to be consumed from the saved position when the consumer is restarted. | enableAutoCommit |
Wait for message offset commit to complete | No | Yes | Yes | If this property is selected, the KafkaConsumer node waits for the message offset to be committed to Kafka before delivering the message to the message flow, which provides an at-most-once level of message reliability. If this option is not selected, the KafkaConsumer node does not wait for the response from the commit operation to be received before delivering the message to the flow. As a result, message throughput is increased in exchange for a reduction in message reliability, as messages can be redelivered to the message flow if the request to commit the consumer offset subsequently fails. | useSyncCommit |
Client ID | No | Yes | The client name to be used when connecting to Kafka. This value can be up to 255 characters in length, and can include the following characters: a-z, A-Z, 0-9, . (dot), _ (underscore), and - (dash). | clientId | |
Add server name suffix to client ID | No | Yes | Yes | If selected, this property suffixes the provided client ID with
either -integrationServerName (for an independent integration
server) or -integrationNodeName-integrationServerName (for an
integration server that is managed by an integration node). This option is selected by
default. |
useClientIdSuffix |
The KafkaConsumer node Advanced properties are described in the following table.
Property | M | C | Default | Description | mqsiapplybaroverride command property |
---|---|---|---|---|---|
Connection timeout (sec) | Yes | Yes | 15 | The maximum time (in seconds) that the KafkaConsumer node waits to establish a connection with the Kafka server. The Connection timeoutvalue must be greater than the Session timeout value. | connectionTimeout |
Session timeout (sec) | Yes | Yes | 10 | The timeout used to detect KafkaConsumer node failures when using Kafka's group management facility. The KafkaConsumer node sends periodic heartbeats to indicate its liveness to the Kafka server. If no heartbeats are received by the Kafka server before the expiration of this session timeout, the Kafka server removes this Kafka consumer from the group and initiates a rebalance. The minimum valid value for this property is 10 seconds, which ensures that the session timeout is greater than the length of time between heartbeats. The Session timeout value must be less than the Connection timeout value. | sessionTimeout |
Receive batch size | Yes | Yes | 1 | The number of records that are received from the Kafka server in a single batch. This property is used for the max.poll.records value specified by the KafkaConsumer node when receiving messages from the Kafka server. | receiveBatchSize |
The KafkaConsumer node Advanced properties are described in the following table.
Property | M | C | Default | Description | mqsiapplybaroverride command property |
---|---|---|---|---|---|
Properties file | No | Yes | null | Full path name of a kafka.properties file to set Kafka
client properties. You can use a properties file to set properties on the Kafka consumer that cannot be set through the KafkaConsumer node properties. If the properties file specifies a property that is also set on the KafkaConsumer node or by a Kafka policy, the value set in the properties file takes precedence. For information about the complete list of properties that are available, see the Apache Kafka documentation. |
propertiesFilePath |
The KafkaConsumer node Security properties are described in the following table.
Property | M | C | Default | Description | mqsiapplybaroverride command property |
---|---|---|---|---|---|
Security identifier | No | Yes | null | Identifier that is used to look up the user ID and password from the App Connect Enterprise vault, which was defined by using the mqsicredentials command. If this property is left blank, the
default |
securityIdentity |
Security protocol | Yes | Yes | PLAINTEXT | The protocol to be used when communicating with the Kafka server. Valid values
are:
If you are using IBM Event Streams, this property must be set to SASL_SSL. If either SASL_PLAINTEXT or SASL_SSL is selected, you must configure the user ID and password that will be used to authenticate with the Kafka server. You can configure these credentials by using the mqsicredentials command. Alternatively, you can use the mqsisetdbparms command. For more information, see Configuring security credentials for connecting to Kafka. |
securityProtocol |
SSL protocol | Yes | Yes | TLSv1_2 | The SSL protocol that is used when the selected value for the Security protocol property is either SSL or SASL_SSL.
You can select one of the following values from the editable list, or you can specify an alternative
value:
|
sslProtocol |
The KafkaConsumer node Policy properties are described in the following table.
Property | M | C | Default | Description | mqsiapplybaroverride command property |
---|---|---|---|---|---|
Policy | No | Yes | null | Name of a Kafka policy to be used to override node properties; for example,
{PolicyProject}:kafkaPolicy
|
policyUrl |
The KafkaConsumer node Input Message Parsing properties are described in the following table.
Property | M | C | Default | Description | mqsiapplybaroverride command property |
---|---|---|---|---|---|
Message domain | No | No | BLOB | The parser domain that is used to parse the input message. If the field is blank, the default is BLOB. | |
Message model | No | No | Cleared | The name or location of the message model schema file in which the message is
defined. When you click Browse, you see a list of available message model schema files for the selected Message domain. |
|
Message | No | No | Cleared | The name or location of the message root within your message model schema file. This list is populated with all available messages that are defined in the Message model that you selected. | |
Physical format | No | No | Cleared | The name of the physical format of the message. If you are using the MRM or IDOC parser, select the physical format of the incoming message from the list. This list includes all the physical formats that you have defined for the selected message model. If you set the Message domain property to DataObject, you can set this property to XML or SAP ALE IDoc. Set this property to SAP ALE IDoc when you have to parse a bit stream from an external source and generate a message tree. |
The KafkaConsumer node Parser Options properties are described in the following table.
Property | M | C | Default | Description |
---|---|---|---|---|
Parse timing | No | No | On Demand | This property controls when an input message is parsed. Valid values are
On Demand, Immediate, and Complete. For a full description of this property, see Parsing on demand. |
Build tree using XML schema data types | No | No | Cleared | This property controls whether the syntax elements in the message tree have data types taken from the XML schema. |
Use XMLNSC compact parser for XMLNS domain | No | No | Cleared | This property specifies whether the XMLNSC Compact Parser is used for messages
in the XMLNS domain. If you set this property, the message data is displayed under XMLNSC in nodes
that are connected to the output terminal when the input MQRFH2 header is
XMLNS . |
Retain mixed content | No | No | Cleared | This property controls whether the XMLNSC parser creates elements in the message tree for mixed text in an input message. If you select the check box, elements are created for mixed text. If you clear the check box, mixed text is ignored and no elements are created. |
Retain comments | No | No | Cleared | This property specifies whether the XMLNSC parser creates elements in the message tree for comments in an input message. If you select the check box, elements are created for comments. If you clear the check box, comments are ignored and no elements are created. |
Retain processing instructions | No | No | Cleared | This property controls whether the XMLNSC parser creates elements in the message tree for processing instructions in an input message. If you select the check box, elements are created for processing instructions. If you clear the check box, processing instructions are ignored and no elements are created. |
Opaque elements | No | No | Blank | This property is used to specify a list of elements in the input message that are to be opaquely parsed by the XMLNSC parser. Opaque parsing is performed only if validation is not enabled (that is, if Validate is None); entries that are specified in Opaque Elements are ignored if validation is enabled. |
The KafkaConsumer node Validation properties are described in the following table.
For a full description of these properties see Validation properties.
Property | M | C | Default | Description | mqsiapplybaroverride command property |
---|---|---|---|---|---|
Validate | No | Yes | None | This property controls whether validation takes place. Valid values are None, Content and Value, and Content. | validateMaster |
Validation failure action | No | Yes | Exception | This property controls what happens if validation fails. You can set this property only if you set Validate to Content or Content and Value. Valid values are User Trace, Local Error Log, Exception, and Exception List. | validateFailureAction |
Property | M | C | Default | Description | mqsiapplybaroverride command property |
---|---|---|---|---|---|
Additional instances pool | No | Yes | Use Pool Associated with Message Flow | The pool from which additional instances are obtained.
|
componentLevel |
Additional instances | No | Yes | 0 | The number of additional instances that the node can start if the Additional instances pool property is set to Use Pool Associated with Node. By default, no additional instances are given to the node. | additionalInstances |
Property | M | C | Default | Description |
---|---|---|---|---|
Events | No | No | None | Events that you have defined for the node are displayed on this tab. By
default, no monitoring events are defined on any node in a message flow. Use
Add, Edit, and Delete to
create, change or delete monitoring events for the node. You can enable and disable events that are shown here by selecting or clearing the Enabled check box. |
Local environment overrides
You can view values for Kafka topics that have been received by the KafkaConsumer node. For more information, see Using local environment variables with Kafka nodes.
Kafka custom header properties
You can use Kafka custom header properties to add metadata to the Kafka message for use during message processing. For more information, see Setting and retrieving Kafka custom header properties.