Configuring the Kafka connector as a source

To configure a Kafka Connector stage to read messages from the topics, you must specify the Kafka server host name and the topic(s) from where you would like to read messages from.

Procedure

  1. From the job design canvas, double-click the Kafka Connector stage.
  2. The Stage properties would open by default. Enter the connection properties
  3. Click the Properties tab, and, in the Usage section specify the settings for the read operation.
  4. Enter the values for the required properties as listed below:
    • Topic Name - The list of topics from where the messages are to be read for further processing.
    • Consumer Group - A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id, multiple processes indicate that they are all part of the same consumer group.
    • Max Poll Records - The maximum number of records returned in a single call to poll ().
    • Max Messages - Max Messages to be consumed/produced from/to the topic on a per player process basis. This should be a multiple of max poll records.
    • Reset Policy - The value for this property decides what to do when there is no initial offset in Kafka or if the current offset does not exist anymore on the server (for example, if the data has been deleted). The values, earliest and latest when used, will automatically reset the offset to earliest of the latest offset.
  5. Open the Columns page and define a column of type Varchar with length as 255 or as desired based on the length of the messages which might be in the topics we are consuming messages from.
  6. Click OK, and then save the job.

Example

For example, if you intend to read from a topic test_topic, 10 messages per player process and also from the last committed offset, then you need to set Max Messages property to value 10 and Reset Policy property should be set to earliest .

If you want to read only the incoming messages that are being written into the topic, then Reset Policy property should be set to latest .