Configuring Apache Kafka brokers

To implement scalable data loading, you must configure at least one Apache Kafka broker.

Apache Kafka is bundled with Log Analysis. The sample configuration files for Apache Kafka are in the <HOME>/IBM®/LogAnalysis/kafka/test-configs/kafka-configs directory.

Create one partition per topic for every two physical processors on the server where the broker is installed. For example, if you use eight core processors, create four partitions per topic in the Apache Kafka broker.

To implement High Availability messaging, you must create multiple brokers on different servers. To set up multiple brokers, update the configuration files as described in step 3. Set it to the same Apache ZooKeeper server and update the broker ID so that it is unique for each broker.

  1. Copy the kafka_version_number.tgz to an appropriate directory on the server where you want to install Apache Kafka, where version_number is the Kafka version number.

    For Kafka version numbers for Log Analysis Version 1.3.6 and its fix packs, see Other supported software.

  2. Extract the kafka_version_number.tgz file.
  3. Update each section of the configuration files as outlined in the following tables:
    Table 1. Server basics and socket server settings
    Parameter Description
    broker.id=0 Specify a unique ID for each broker.
    port=17991 Specify the port that the socket server listens on. For example, 17991.
    #host.name=<local_host> Specify the host name that the broker binds to. If you do not specify a value, the broker binds to all interfaces.
    #advertised.host.name=<hostname_routable_by_clients> Specify the host name that Apache ZooKeeper uses to advertise to clients and consumers. If you do not specify a value, the value from java.net.InetAddress.getCanonicalHostName() is used.
    #advertised.port=<port accessible by clients> Specify the port that publishes to Apache ZooKeeper that it uses to connect to the clients. If you do not specify a value, it uses the same port that the broker uses.
    num.network.threads=3 Specify the number of threads that are used to process network requests.
    num.io.threads=8 Specify the number of threads that are used for input and output operations.
    socket.send.buffer.bytes=1048576 Specify the send buffer or SO_SNDBUF that is used by the socket server.
    socket.receive.buffer.bytes=1048576 Specify the receive buffer or SO_RCVBUF that is used by the socket server.
    socket.request.max.bytes=104857600 Specify the maximum size for requests that are accepted by the socket server. This setting helps you to protect against memory issues.
    queued.max.requests=16 Specify the maximum number of threads that are allowed before the network threads are blocked.
    fetch.purgatory.purge.interval.requests=100 Specify the interval in number of requests that triggers the purging of the fetch requests.
    producer.purgatory.purge.interval.requests=100 Specify the interval in number of requests that triggers the purging of the producer requests.
    Table 2. Log basics
    Parameter Description
    log.dirs=<Log_dir_path> Specify the directory or directories where you want to store the log files. For example, la/home/logs1, la/home/logs2, la/home/logs3
    num.partitions=4 Specify the number of partitions that are used to process a topic. Specify half the number of physical processors on the server. For example, if you have 8 processors, specify 4.
    num.recovery.threads.per.data.dir=1 Specify the maximum number of threads that are used for log recovery for each data directory.
    log.index.size.max.bytes=154624 Specify the maximum size of the offset index in bytes.
    log.index.interval.bytes=4096 Specify the interval in bytes when an entry is added to the offset index.
    message.max.bytes=1000000 Specify the maximum size of messages that the server can receive.
    auto.create.topics.enable=true Use this to enable the automatic creation of topics on the server.
    Table 3. Log flush policy
    Parameter Description
    default.replication.factor=1 Specify the replication factor used for automatically created topics.
    log.flush.interval.messages=100000 Specify the maximum number of messages that can accumulate before they are flushed.
    log.flush.interval.ms=50000 Specify the maximum amount of time that messages can accumulate in log before they are flushed.
    log.flush.scheduler.interval.ms=2000 Specify an interval after which the log is flushed.
    Table 4. Log retention policy
    Parameter Description
    log.retention.hours=168 Specify the minimum age of a log file before
    #log.retention.bytes=1073741824 Specify the number of bytes that must be retained in each of the specified segments. If the amount reaches this minimum, logs are retained rather than being deleted.
    log.segment.bytes=1073741824 Specify the maximum size of a segment. When this limit is reached, a new segment is created.
    log.retention.check.interval.ms=300000 Specify the interval that elapses before Apache Kafka deletes the log files according to the rules that are specified in the log retention policies.
    log.cleaner.enable=false The log cleaner is disabled by default. This setting means that the segment is deleted after the retention period that is specified in the log retention policy expires.
    log.roll.hours=168 Specify the maximum time that must elapse before a new log segment is rolled out.
    Table 5. Apache Zookeeper parameters
    Parameter Description
    zookeeper.connect=<localhost>:17981 Specify the host name and port of the Apache ZooKeeper server.
    zookeeper.connection.timeout.ms=6000 Specify the timeout value in milliseconds after which connections to the Apache ZooKeeper time out.
    zk.sync.time.ms=2000 Specify the interval after which Apache ZooKeeper synchronizes with the connected servers.
    num.replica.fetchers=4 Specify the number of threads that are used to replicate messages from a source broker. Increasing this value can lead to increased parallelism in I/O operations in the broker.
    replica.fetch.max.bytes=1048576 Specify the number of bytes that are used in fetch requests for each partition.
    replica.fetch.wait.max.ms=500 Specify the maximum wait time for each fetcher request that is issued by the follower replicas. This value should not be greater than the one specified in replica.lag.time.max.ms.
    replica.high.watermark.checkpoint.interval.ms=5000 Specify the interval at which the high watermark is saved to disk.
    replica.socket.timeout.ms=30000 Specify the socket timeout value for network requests. The value needs be the same or greater than the value that is specified in replica.fetch.wait.max.ms.
    replica.socket.receive.buffer.bytes=65536 Specify the buffer for the socket receiver.
    replica.lag.time.max.ms=10000 Specify the time before the follower is removed.
    controller.socket.timeout.ms=30000 Specify the socket timeout value for channels used to send messages from controllers to brokers.
    Table 6. Durability and hardening parameters
    Parameter Description
    retries=0 If you specify a value that is greater than zero, any failed messages are sent again.
    acks=all Specify the number of acknowledgments that the producer requires the receiver to receive before a request is complete.
  4. Start Apache ZooKeeper.
    To start it in console mode, enter the following command:
    <Kafka_home>/bin/zookeeper-server-start.sh config/zookeeper.properties 
  5. Start the Apache Kafka broker. Enter the following command:
    <Kafka_home>/bin/kafka-server-start.sh -daemon config/server0.properties 
    To stop the broker, enter the following command:
    <Kafka_home>/bin/kafka-server-stop.sh
    To stop Apache ZooKeeper, enter the following command:
    <Kafka_home>/bin/zookeeper-server-stop.sh