Processamento de mensagens usando um provedor Kafka

Com um provedor de mensagens Kafka , você pode processar mensagens de saída sequencialmente e mensagens de entrada sequencialmente ou continuamente. Em geral, o processamento de mensagens Kafka é semelhante ao processamento de mensagens JMS.

Processamento de mensagens Kafka

Kafka O processamento de mensagens rastreia mensagens processadas mantendo um valor offset que representa a ordem sequencial em que as mensagens são recebidas por tópicos Kafka . O valor de deslocamento indica a próxima mensagem a ser processada. Depois que uma mensagem é processada em Kafka, o deslocamento agora está além daquela mensagem processada, e a mensagem nunca mais é processada.

A mensagem processada não é excluída da fila. Ela permanece na fila até que o prazo de expiração configurado para a mensagem seja alcançado.

Maximo® Manage fornece um aplicativo para ajudá-lo a reprocessar mensagens que tenham erros. Veja mais informações sobre o processamento de erros.

Limites de tamanho de mensagens

Você configura um limite de tamanho da mensagem no servidor Kafka . Ao configurar esse limite, considere o tamanho de suas transações, incluindo o tamanho dos anexos integrados. Mensagens de tamanho grande podem afetar negativamente o desempenho das ações de leitura e gravação de mensagens na fila.

No Maximo Manage, é possível incluir a propriedade de sistema mxe.kafka.messagesize para o tamanho máximo da mensagem que pode ser processada Configure a propriedade para combinar com o tamanho da mensagem, em bytes, que você configura no servidor Kafka . A configuração padrão é 10 MB.

Kafka timeouts de gravação

No Maximo Manage, Kafka as ações de gravação aguardam confirmação dos brokers do Kafka . É possível controlar o tempo limite de gravação para o produtor Kafka usando a propriedade mxe.kafka.waittimeforack . A unidade de medição está em milissegundos.

Processamento de mensagens sequenciais de saída

A estrutura de integração usa os tópicos Kafka como filas de mensagens de saída, sequenciais. Quando uma mensagem é enviada a partir da estrutura de integração, ela é roteada para o seu provedor Kafka , onde ele é hospedado como uma mensagem JSON em um tópico predefinido. Embora os canais de publicação possam enviar mensagens em formato XML ou JSON, o produto os agrupa como mensagens formatadas JSON e as grava no tópico Kafka . Uma mensagem não enviada usa a estrutura que é mostrada no exemplo de código a seguir:
{
      “interfacetype”:”MAXIMO”,
      “INTERFACE”:”MXPRInterface”,
      “payload”:”<xml/json message>”,
      “SENDER”:”MX”,
      “destination”:”<external system name>”,
      “destjndiname”:”<kafka topic name>”,
      “compressed”:”1”,
      “MEAMessageID”:”<providername>~<topic>~<partition>~<offset>”,
      “mimetype”:”application/xml”
}
A propriedade payload contém a carga útil que é enviada para o terminal. A mensagem inteira Kafka é compactada no momento do armazenamento dentro das partições Kafka . Nas partições Kafka , as mensagens são armazenadas em uma ordem timestamp rigorosa. A posição da mensagem na partição recebe um número de ID sequencial que é conhecido como um deslocamento de mensagem. Embora os tópicos do Kafka possam ser particionados, tópicos processados sequencialmente devem estar em uma única partição.

Depois que a mensagem é escrita para o tópico Kafka , ela é processada pela tarefa cron Kafka associada que você configura no aplicativo Configuração da Tarefa Cron para cada tópico. O processamento ocorre de acordo com o sistema externo e o terminal que estão configurados para o canal.

Processamento sequencial de entrada

O processamento de mensagens de entrada pode ser sequencial ou contínuo. O processamento sequencial de entrada é semelhante ao processamento sequencial de saída e requer os mesmos tipos de configuração, incluindo uma instância da tarefa Cron do Kafka.

A estrutura de uma mensagem de entrada sequencial é mostrada no exemplo a seguir:
{
     “interfacetype”: "MAXIMO",
     “SENDER”: "testkafka",
     “destination”: "testkafka",
     “USER”: "wilson",
     “MEAMessageID”: "….",
     “INTERFACE”: "MXASSETInterface",
     “payload”: "{"assetnum":"AZ163","siteid":"BEDFORD"}",
     “mimetype”: "application/json",
    “destjndiname”: "anamitratestcont"
} 

Configure as filas sequenciais com uma única partição e não mais do que uma instância da tarefa Cron para consumir mensagens da fila.

Consulte mais informações sobre o processamento de erro

Processamento e ajuste de escala da fila contínua de saída e entrada

Em geral, e ao contrário do que ocorre nas filas sequenciais, o processamento de fila contínua não interrompe o processamento quando uma mensagem apresenta um erro. Mensagens com erros são gravadas na tabela de erros e processadas da tabela de erros pela instância da tarefa Cron KAFKAERROR para a respectiva fila.

Uma única fila Kafka pode ser escalonada como multithreaded, ou como ter vários consumidores de mensagens, definindo mais partições para a fila. Cada partição representa um consumidor separado, e cada partição requer sua própria instância de tarefa Cron para suportar o consumo de mensagens da partição. Para fazer o ajuste de escala do processamento, configure várias partições. O número de instâncias de tarefa Cron deve corresponder ao número de partições.

A fila contínua de saída grava mensagens de saída para interfaces, se selecionadas no aplicativo Sistemas Externos do aplicativo Maximo Manage Vários sistemas externos podem usar a mesma fila contínua de saída.

O processamento de mensagens contínuas de entrada com Kafka é semelhante ao processamento de mensagens JMS, no entanto, há diferenças em escalonamento e no processamento de erros.

Consulte mais informações sobre o processamento de erros de mensagens em filas contínuas e configurando um atraso de nova entrega.