Kafka output parameters

OMEGAMON® Data Connect Kafka output parameters specify whether to publish data in JSON format to an Apache Kafka topic.

Figure 1. OMEGAMON Data Connect configuration: Kafka output
OMEGAMON Data Connect sends data in JSON format over TCP to Kafka

In this context, OMEGAMON Data Connect is a Kafka client. More specifically, OMEGAMON Data Connect is a Kafka producer.

connect:
  output:
    kafka:
      enabled: <true|false> # Default at this level: false
      servers: <string>
      # All of the following parameters are optional
      topic: <topic_name> # Default: per-table topics
      topic-prefix: <topic_prefix> # Default: odp
      filter: # Output-level filter
        <Filter parameters>
      properties:
        <Kafka producer configuration properties, such as SSL>
enabled
Whether this function is enabled. Allowed values: true, false. This key is optional. Default: false.

To enable this function, you must specify enabled: true.

Specifying enabled: false has the same effect as commenting-out the parent key of this enabled key and all descendants of that parent key.

servers
A string containing one or more host/port pairs to use for establishing the initial connection to the Kafka cluster. Use a comma to separate host/port pairs:
servers: <host>:<port>

or

servers: <host1>:<port1>,<host2>:<port2>,...

The value of the servers key is a string, not a YAML sequence.

topic
Optional Kafka topic name.

If you omit the topic key, then OMEGAMON Data Connect sends data for each table to a separate topic.

The per-table topic names have the following pattern:

<topic_prefix>.<product>.<table_name>

where <topic_prefix> is the value of the topic-prefix key.

Example per-table topic name:

odp.km5.ascpuutil

topic-prefix
Optional prefix for per-table Kafka topic names. Default: odp.

If you specify a topic key, then the topic-prefix key is ignored.

filter
Optional filter to restrict what data to send.

This output-level filter applies only to Kafka output, replacing any global-level filter (connect.filter).

properties
Optional Kafka producer configuration properties.

For example:

"[reconnect.backoff.max.ms]": 30000
Important: Enclose Kafka producer property names in square brackets, and then in double quotes.

You can specify any Kafka producer configuration properties under this key. For comprehensive details on Kafka producer configuration properties, see the Apache Kafka documentation.

To configure a secure connection between OMEGAMON Data Connect and the Kafka cluster, specify SSL properties. The following listing shows a typical subset of SSL properties:

"[security.protocol]": SSL

# Server certificates
"[ssl.truststore.location]": <file_path>
"[ssl.truststore.password]": <string>
"[ssl.truststore.type]": <JKS|PKCS12>

# Client certificate
# (only required if the Kafka server requires client authentication)
"[ssl.keystore.location]": <file_path>
"[ssl.keystore.password]": <string>
"[ssl.keystore.type]": <JKS|PKCS12>

SSL using RACF key rings

If you run OMEGAMON Data Connect on z/OS®, and you want to use RACF® key rings to configure SSL, then you need to use the SSL engine provided with OMEGAMON Data Provider instead of the default SSL engine for Kafka producers. The default SSL engine does not support RACF key rings as a store type.

To use the OMEGAMON Data Provider SSL engine, in addition to setting the security.protocol property as you would for the default SSL engine, set the ssl.engine.factory.class property:

"[security.protocol]": SSL
"[ssl.engine.factory.class]": com.rocketsoft.odp.server.output.kafka.KafkaSslEngineFactory

If you set the ssl.engine.factory.class property to the OMEGAMON Data Provider SSL engine, then OMEGAMON Data Connect does not use any of the other ssl.* properties.

Instead, the OMEGAMON Data Provider SSL engine has its own set of properties, with the prefix odp.ssl.*:

"[odp.ssl.cipher.suites]": <ciphers_list>
"[odp.ssl.enabled.protocols]": <protocols_list>
"[odp.ssl.protocol]": <protocol>
"[odp.ssl.key.alias]": <string>
"[odp.ssl.keystore.location]": <string>
"[odp.ssl.keystore.password]": <string>
"[odp.ssl.keystore.type]": <JKS|PKCS12|JCERACFKS>
"[odp.ssl.truststore.location]": <string>
"[odp.ssl.truststore.password]": <string>
"[odp.ssl.truststore.type]": <JKS|PKCS12|JCERACFKS>
Note: The default SSL engine supports Privacy-Enhanced Mail (PEM) as a store type, but the OMEGAMON Data Provider SSL engine does not. Typically, security providers in the Java™ runtime environment (JRE) provide support for store types. However, in this case, the default SSL engine contains its own built-in support for PEM. The OMEGAMON Data Provider SSL engine does not contain built-in support for PEM.
odp.ssl.cipher.suites
A list of candidate ciphers for the connection, in one of the following formats:
  • OpenSSL cipher list
  • A comma-separated list of ciphers using the standard OpenSSL cipher names or the standard JSSE cipher names

This key is optional. Example, in OpenSSL cipher list format:

HIGH:!aNULL:!eNULL:!EXPORT:!DES:!RC4:!MD5:!kRSA

odp.ssl.enabled.protocols
List of protocols to enable.

This key is optional. Example:

TLSv1.3,TLSv1.2

odp.ssl.protocol
Protocol to use.

If this protocol is not supported by both ends of the connection, then the connection can fall back (downgrade) to one of the other enabled protocols.

This key is optional. Default in Java 17: TLSv1.3.

odp.ssl.key.alias
Alias of the client private key and associated client certificate in the keystore. On z/OS, also known as the certificate label.

This key is optional. Default: the default certificate in the keystore.

odp.ssl.keystore.location
Location of the keystore that contains the client certificate.

A keystore is required only if the server requires client authentication.

The location format depends on the keystore type:

JKS
Keystore file path. Example:

/u/my/security/certs/keystore.jks

PKCS12
Keystore file path. Example:

/u/my/security/certs/keystore.p12

JCERACFKS
Only valid if OMEGAMON Data Connect runs on z/OS.

RACF key ring, in the following format:

safkeyring://<owner_user_id>/<key_ring_name>

Note: In this specific context, follow safkeyring: with two (2) consecutive slashes.

where <owner_user_id> is the RACF user ID that owns the key ring and <key_ring_name> is the RACF key ring name.

odp.ssl.keystore.password
Password to access the keystore.

If the keystore type is JCERACFKS, then specify the fixed value:

password

RACF does not use this value for authentication; this value is required only for compatibility with the JCE requirement for a password.

odp.ssl.keystore.type
Keystore type. Supported types depend on the security providers in the JRE. Examples:
JKS
Java keystore.
PKCS12
Public-Key Cryptography Standards (PKCS) #12.
JCERACFKS
Java Cryptography Standards (JCE) RACF keystore (key ring). Only available if OMEGAMON Data Connect is running on z/OS and the IBMZSecurity provider is available in the JRE.
odp.ssl.truststore.location
Location of the truststore that contains trusted server certificates. See the list of values for odp.ssl.keystore.location.
odp.ssl.truststore.password
Password to access the truststore.

If the truststore type is JCERACFKS, then specify the fixed value:

password

RACF does not use this value for authentication; this value is required only for compatibility with the JCE requirement for a password.

odp.ssl.truststore.type
Truststore type. See the list of values for odp.ssl.keystore.type.
Tip: Conversely, if you specify odp.ssl.* properties, but the default SSL engine is selected, then the OMEGAMON Data Connect log contains information messages that report the odp.ssl.* properties are supplied but are not used yet. In any case, the presence of properties for the other SSL engine is benign, and does not affect the behavior of the selected SSL engine.

Example: Per-table topics

The following example sends data to per-table topics in Kafka.

connect:
  input:
    tcp:
      enabled: true
      hostname: 0.0.0.0
      port: 15379

  output:
    kafka:
      enabled: true
      servers: kafka.example.com:9095

The topic names match the pattern:

odp.<product>.<table_name>

Example: Per-table topics with a custom topic name prefix

The following example sends data to per-table topics in Kafka. The only difference to the previous example is the prefix of the topic names.

connect:
  input:
    ...

  output:
    kafka:
      enabled: true
      servers: kafka.example.com:9095
      topic-prefix: omegamon

In this example, the topic names match the pattern:

omegamon.<product>.<table_name>

Example: Send fields from all tables to a single topic

The following example sends all data to a single Kafka topic named omegamon-json.

connect:
  input:
    ...

  output:
    kafka:
      enabled: true
      servers: kafka.example.com:9095
      topic: omegamon-json

Example: Connection with TLS using PKCS12

The following example sends data to Kafka over a secure connection using the default SSL engine.

connect:
  input:
    ...

  output:
    kafka:
      enabled: true
      servers: kafka1.example.com:9095,kafka2.example.com:9095
      properties:
        "[reconnect.backoff.max.ms]": 30000

        "[security.protocol]": SSL

        # Server certificates
        "[ssl.truststore.location]": /u/my/security/certs/omdp-kafka-server.p12
        "[ssl.truststore.password]": Pa$$w0rdTS
        "[ssl.truststore.type]": PKCS12

        # Client certificate
        # (only required if the Kafka server requires client authentication)
        "[ssl.keystore.location]": /u/my/security/certs/omdp-connect.p12
        "[ssl.keystore.password]": Pa$$w0rdKS
        "[ssl.keystore.type]": PKCS12

Example: Connection with TLS using a RACF key ring

The following example sends data to Kafka over a secure connection using the OMEGAMON Data Provider SSL engine to refer to a RACF key ring. In this example, the same key ring is used as both a truststore and a keystore.

connect:
  input:
    ...

  output:
    kafka:
      enabled: true
      servers: kafka1.example.com:9095,kafka2.example.com:9095
      properties:
        "[reconnect.backoff.max.ms]": 30000
        "[security.protocol]": SSL
        "[ssl.engine.factory.class]":
          com.rocketsoft.odp.server.output.kafka.KafkaSslEngineFactory
        "[odp.ssl.truststore.location]": safkeyring://STCOMDP/OMDPring
        "[odp.ssl.truststore.password]": password
        "[odp.ssl.truststore.type]": JCERACFKS
        "[odp.ssl.keystore.location]": safkeyring://STCOMDP/OMDPring
        "[odp.ssl.keystore.password]": password
        "[odp.ssl.keystore.type]": JCERACFKS

In this example, the keystore and truststore passwords are the literal string password; this is the actual value, not a placeholder.

Example: Connection with TLS using a RACF key ring for the keystore and a PKCS12 file for the truststore

The following example sends data to Kafka over a secure connection using the OMEGAMON Data Provider SSL engine to refer to a RACF key ring for the keystore and a PKCS12 file for the truststore.

connect:
  input:
    ...

  output:
    kafka:
      enabled: true
      servers: kafka1.example.com:9095,kafka2.example.com:9095
      properties:
        "[security.protocol]": SSL
        "[ssl.engine.factory.class]":
          com.rocketsoft.odp.server.output.kafka.KafkaSslEngineFactory
        "[odp.ssl.truststore.location]": /u/my/security/certs/omdp-kafka-server.p12
        "[odp.ssl.truststore.password]": Pa$$w0rdTS
        "[odp.ssl.truststore.type]": PKCS12
        "[odp.ssl.keystore.location]": safkeyring://STCOMDP/OMDPring
        "[odp.ssl.keystore.password]": password
        "[odp.ssl.keystore.type]": JCERACFKS

Example: Connection with TLS using a RACF key ring for the truststore

The following example sends data to Kafka over a secure connection using the OMEGAMON Data Provider SSL engine to refer to a RACF key ring for the truststore. In this example, the Kafka servers do not require client authentication, so there is no need for a keystore.

connect:
  input:
    ...

  output:
    kafka:
      enabled: true
      servers: kafka1.example.com:9095,kafka2.example.com:9095
      properties:
        "[security.protocol]": SSL
        "[ssl.engine.factory.class]":
          com.rocketsoft.odp.server.output.kafka.KafkaSslEngineFactory
        "[odp.ssl.truststore.location]": safkeyring://STCOMDP/OMDPring
        "[odp.ssl.truststore.password]": password
        "[odp.ssl.truststore.type]": JCERACFKS

Example: ssl.* properties present but unused

The following example has exactly the same effect as the previous example.

connect:
  input:
    ...

  output:
    kafka:
      enabled: true
      servers: kafka1.example.com:9095,kafka2.example.com:9095
      properties:
        "[security.protocol]": SSL
        "[ssl.engine.factory.class]":
          com.rocketsoft.odp.server.output.kafka.KafkaSslEngineFactory
        "[odp.ssl.truststore.location]": safkeyring://STCOMDP/OMDPring
        "[odp.ssl.truststore.password]": password
        "[odp.ssl.truststore.type]": JCERACFKS
        # The remaining properties have no effect: the ODP SSL engine does not use them
        "[ssl.truststore.location]": /u/my/security/certs/omdp-kafka-server.p12
        "[ssl.truststore.password]": Pa$$w0rdTS
        "[ssl.truststore.type]": PKCS12
        "[ssl.keystore.location]": /u/my/security/certs/omdp-connect.p12
        "[ssl.keystore.password]": Pa$$w0rdKS
        "[ssl.keystore.type]": PKCS12

This example demonstrates that, if ssl.engine.factory.class selects the OMEGAMON Data Provider SSL engine, then OMEGAMON Data Connect does not use any other ssl.* properties.