IBM Support

JMS MQRFH2 header processing with WebSphere MQ Connector - sending a message

How To


Summary

This article demonstrates how you can use WebSphere MQ Connector in DataStage parallel jobs to send messages in MQRFH2 format. The MQRFH2 format is used by the JMS implementation in WebSphere MQ to represent JMS messages.

Objective

The topic is divided into two parts:

  • The first part describes a DataStage job. This job uses WebSphere MQ connector to assemble MQRFH2 message and to send it to WebSphere MQ queue. Then, the message is received by Java probgram that uses JMS API to obtain message from WebSphere MQ queue.
  • The second part describes a Java program. This program uses JMS API to prepare and receive message from WebSphere MQ queue. Then, the message is sent by DataStage WebSphere MQ connector and processed to obtain details including sections of MQRFH2 header.
This document is for demonstration purposes only. It shows only one possible way to process MQRFH2 messages. Depending on the environment and specific requirements it might be possible to design a job which is simpler, more efficient or which uses different stages than the one described in this document. 

Environment

Required prerequisites: You must have a WebSphere MQ server installed. It is possible to use the WebSphere MQ client installation but queue configuration is different. You need to refer to WebSphere MQ Knowledge Center for MQ client configuration details. The following environment description is common for sending and receiving JMS message with WebSphere MQ connector.

WebSphere MQ queues configuration

  1. Create MQ queue manager called QMNAME, and two queues QUEUE1 and QUEUE2. If queues already exists, make sure to remove any messages before proceeding.
    crtmqm QMNAME
    strqmqm QMNAME
    runmqsc QMNAME
      define qlocal(QUEUE1)
      define qlocal(QUEUE2)
      end

  2. Use the MQ JMS administration tools to configure MQQueueConnectionFactory object named ivrQCF for the queue manager MQNAME and MQQueue object named ivtW for the queue QUEUE1. For example, if you wish to use the default configuration - JMSAdmin.config - and file-based service provider, complete these steps:

    • Open JMSAdmin.config file located in $MQ_HOME/Java/bin.

    • Make sure the INITIAL_CONTEXT_FACTORY is defined as:
      INITIAL_CONTEXT_FACTORY=com.sun.jndi.fscontext.RefFSContextFactory

    • Make sure the PROVIDER_URL is defined as:
      PROVIDER_URL=file:///home/user/JNDI-Directory
      Replace the /home/user/JNDI-Directory with the desired location. The folder must exist. WebSphere MQ uses this folder to store file .bindings for JMS objects. Note that the same path must be specified in Java programs SendJMSMessage.java and ReceiveJMSMessage.java in the following line of code:
      public static final String PROVIDER_URL = "file:///home/user/JNDI-Directory";

    • Start the JMSAdmin tool located in $MQ_HOME/Java/bin folder and run the following commands:
      InitCtx> define qcf(ivtQCF) qmanager(QMNAME)
      InitCtx> define q(ivtQ) qmanager(QMNAME) queue(QUEUE1)
      InitCtx> end

  3. Compile the Java programs SendJMSMessage.java and ReceiveJMSMessage.java.
    When the WebSphere MQ Java packages are installed with MQ server, you can use the following commands:
    javac -cp $MQ_HOME/java/lib/com.ibm.mqjms.jar SendJMSMessage.java
    javac -cp $MQ_HOME/java/lib/com.ibm.mqjms.jar ReceiveJMSMessage.java

  4. Import and compile DataStage job. New jobs ReceiveJMSMessage and SendJMSMessage are stored in the new folder MQRFH2_JMS under Jobs folder in Repository View. Both jobs have the Sequential File stage. The SendJMSMessage job uses in.txt file located in the /tmp folder as an input. The ReceiveJMSMessage job writes results to /tmp/out.txt file. If necessary, update the location of the files in stage properties.

Steps

The job SendJMSMessage is designed to assemble and send a JMS message with the following characteristics:

  • It is a TextMessage JMS message.
  • It has Color string user property with value Blue.
  • It has Size integer user property with value 1.
  • It has CorrelationId JMS property with value CID000000000000000000001.
  • It has the text payload Circle.

The following figure presents the layout of the job:

image

Each stage in the job flow is responsible for part of the message assembling and sending:

  • The InputData stage reads input data from a file.
  • The PrepareNameValue stage prepares the name value length/data pairs (the variable portion of the MQRFH2 structure). This variable portion contains the <mcd>, <jms> and <usr> folders, which were mentioned earlier.
  • The AdjustOffsets stage aligns the name value pair lengths to be multiples of 4 (the requirement of the MQRFH2 structure specification).
  • The AssembleMQRFH2 stage prepares the values for the fixed MQRFH2 structure fields.
  • The ExportColumns stage concatenates the columns that represent individual MQRFH2 fields into a single column that represents the complete MQRFH2 structure.
  • The OutputMessage stage stores the assembled message on the queue.
  • The RejectData stage accepts all the records that could not be put to the message queue due to an error.

InputData stage

The InputData stage reads data from the input file in.txt. The content of this file is shown below:

"CorrelId","Color","Size","Payload"
"CID000000000000000000001","Blue","1","Circle"

The first line contains fields headers so the reading stage needs to be configured to ignore them. The configuration property "First Line is Column Names" is set to True.

image

The record and field delimiter have the default values ("end" and "comma"). Double quotes are used to enclose the field values. These settings match the layout of data in the input file.

image

The stage defines four columns. The first column (CorrelId) carries the message correlation identifier. The second and third columns (Color and Size) carry values for the user-defined JMS message properties Color and Size. The fourth column (Payload) carries the message payload data.

image

PrepareNameValue stage

The PrepareNameValue stage is used to prepare the variable portions of the MQRFH2 structure, namely the <mcd>, <jms> and <usr> folders. 

image

The output columns and their derivation rules configured in this stage are:

  • DSLink17.CorrelId = DSLink3.CorrelId
    The correlation id value is passed from input to output.
  • DSLink17.mcd = "<mcd><Msd>jms.text</Msd></mcd>"
    The <mcd> folder is an XML document that explains the format of the message. The <Msd> element specifies the type of the JMS message. Since a message of TextMessage JMS message type is used, the value for this element needs to be set to jms.txt.
  • DSLink17.jms = "<jms><Dst>queue://QMNAME.QUEUE1</Dst><Tms>" : AsInteger(SecondsSinceFromTimestamp(CurrentTimestamp(), "1970-01-01 00:00:00")) + 5 * 60 * 60 : "000" : "</Tms><Cid>" : DSLink3.CorrelId : "</Cid><Dlv>2</Dlv></jms>"
    The <jms> folder is used to transport JMS header fields. In this case, we pass the following JMS header fields:
    • <Dst> specifies the message destination. In this case it is the queue QUEUE1 on queue manager QMNAME.
    • <Tms> specifies the message timestamp. We construct the timestamp using the appropriate DataStage functions. The CurrentTimestamp() gives the current timestamp. The SecondsSinceFromTimestamp() gives the difference in seconds between two timestamps. "1970-01-01 00:00:00" is used as the second argument to obtain the number of seconds since the Epoch start. To obtain the result in Greenwich Time, you must add 5 hours (in case the job is running in US Eastern Time), and since the result must be expressed in milliseconds, you must multiple 5 hours with 60 minutes and 60 seconds and append three zero characters for the fractions of the second. Note that this calculation is only approximate since milliseconds are always specified as 000 and also because the value is calculated in the stage prior to the stage in which the message is actually stored on the queue. Note that the timestamp (<Tms> element) is not mandatory – it can be excluded from the <jms> folder.
    • <Cid> specifies the message correlation id. This value is present on the input in the CorrelId column so it can be reused.
  • DSLink17.usr = "<usr><Size dt='i4'>" : DSLink3.Size : "</Size><Color>" : DSLink3.Color : "</Color></usr>"
    The <usr> folder is used to pass user-specified properties. Two properties are defined in the example message: Color and Size. The values for these properties are represented by the input columns Color and Size, so these column names are pasted into the expression.
  • DSLink17.Payload = DSLink3.Payload
    The message payload value is passed from input to output.

AdjustOffsets stage

The AdjustOffsets stage aligns the lengths of variable JMS portions to be multiples of 4 which is the requirement of MQRFH2 structure. It also provides the lengths of the variable portions as new columns on the output link. The CorrelId and Payload columns are passed through this stage.

image

To understand how alignment and length calculation is performed, consider the rules for the mcd_aligned and mcd_aligned_length output columns:

DSLink19.mcd_aligned = PadString(DSLink17.mcd, " ", 4 - Mod(Len(DSLink17.mcd),4))
DSLink19.mcd_aligned_len = Len(DSLink17.mcd) + 4 - Mod(Len(DSLink17.mcd), 4)

The expression 4 - Mod(Len(DSLink17.mcd),4) calculates the number of spaces that need to be appended to the DSLink.mcd value, so that the new length of the value is a multiple of 4. The function PadString() performs the actual padding. Note that if the original DSLink.mcd is already a multiple of 4, the expressions still appends 4 space characters, which are not necessary. However, since they don't cause any problem and since they make the expressions simpler, they can be left in the formula.

AssembleMQRFH2 stage

The AssembleMQRFH2 stage prepares the remaining fixed MQRFH2 structure fields.

image
The output columns and their derivation rules configured in this stage are:
  • DSLink4.CorrelId = StringToRaw(DSLink19.CorrelId)
    The stage passes the correlation id intact, except it changes the type to raw binary. This is done because the WSMQ.CORRELID data element used by the WebSphere MQ connector requires this data element column to be defined as Binary(24).
  • DSLink4.StrucId = "RFH "
    The four-character MQRFH2 structure identifier.
  • DSLink4.Version = 2
    The four-byte integer representing the MQRFH2 structure version.
  • DSLink4.StrucLength = 10*4 + 1*8 + DSLink19.mcd_aligned_len + DSLink19.jms_aligned_len + DSLink19.usr_aligned_len
    The four-byte integer representing the MQRFH2 structure size. There are 10 four-byte fields, 1 eight-byte field and 3 variable portions. These lengths are all added to produce the size of the complete structure.
  • DSLink4.Encoding = 546
    The numerical encoding to use for the numerical values in the data following the MQRFH2 structure (in this example it doesn’t really matter since the data following the MQRFH2 structure doesn’t have any numerical data). The value 546 is specified, which is the default numerical encoding for queue managers on Windows operating system.
  • DSLink4.CodedCharSetId = 1208
    The character set encoding for the character data following the MQRFH2 structure. The value 1208 is specified, which is the CCSID for UTF-8 character set encoding.
  • DSLink4.Format = "MQSTR   "
    The eight-character format of the data following the MQRFH2 structure. No additional format headers are provided after the initial MQRFH2 format header but only the text payload (the value "Circle"). So the string message format is specified.
  • DSLink4.Flags = 0
    The flags to use. No flags are specified, so 0 is passed.
  • DSLink4.NameValueCCSID = 1208
    The character set encoding for the character values in the name-value variable portions of the MQRFH2 structure. Again CCSID 1208 is specified for UTF-8 character set encoding.
  • DSLink4.mcd_aligned_len = DSLink19.mcd_aligned_len
    The length of the first name-value pair (<mcd> folder) is passed intact.
  • DSLink4.mcd_aligned = DSLink19.mcd_aligned
    The content of the first name-value pair (<mcd> folder) is passed intact.
  • DSLink4.jms_aligned_len = DSLink19.jms_aligned_len
    The length of the second name-value pair (<jms> folder) is passed intact.
  • DSLink4.jms_aligned = DSLink19.jms_aligned
    The content of the second name-value pair (<jms> folder) is passed intact.
  • DSLink4.usr_aligned_len = DSLink19.usr_aligned_len
    The length of the third name-value pair (<usr> folder) is passed intact.
  • DSLink4.usr_aligned = DSLink19.usr_aligned
    The content of the third name-value pair (<usr> folder) is passed intact.
  • DSLink4.Payload = DSLink19.Payload
    The message payload is passed intact.

ExportColums stage

The ExportColumns stage passes CorrelId and Payload fields intact and concatenates the fields for all the remaining columns into one field represented by the single FormatHeaders columns on output. The FormatHeaders column represents the whole MQRFH2 structure.
The following figure shows the columns on the input link:
image

The following figure shows the properties of the stage. Note that all columns from the input link other than CorrelId and Payload appear in the Input section – these are the columns that will be merged. The column to be exported is the new FormatHeaders column. Since the MQRFH2 structure is a binary structure (contains both text and binary integer fields), the input data needs to be converted to binary format, which is why the Export Column Type property is set to Binary.

image

On the Format tab it is specified not to use any record and field delimiters and no quotes for the field values. This is required in order for the output MQ message to be constructed properly, according to the WebSphere MQ message format header specification (meaning the payload follows the MQRFH2 format header, without any delimiters or quotes).

image

The following figure shows the columns on the output link:

image

The following figure shows the mapping used to construct the new FormatHeaders column, and to pass CorrelId and Payload columns intact:

image

OutputMessage stage

The OutputMessage stage is WebSphere MQ Connector stage and it is used to put the actual message on the queue. Note that the queue manager is set to QMNAME, the queue is set to QUEUE1. The Format property is set to MQRFH2 to specify the format of the output message. The Encoding is set to -1 and Coded character set ID is set to 0 – this means that the numeric fields in the MQRFH2 format header use NATIVE encoding for the current platform, and text fields use the DEFAULT character set for the current platform.

image

The following figure shows the columns on the input link. Note that the CorrelId has Data element attribute set to WSMQ.CORRELID. This tells the connector to set this value as the correlation identifier on the output message. The FormatHeaders column has Data element attribute set to WSMQ.FORMATHEADERS. This tells the connector to put this data before the message payload in the message body, since this data contains the format headers data.

image

The stage also has reject link defined (DSLink22). This link is used to accept any records (messages) that cannot be stored on the target queue due to any reason selected in the left pane of the dialog shown here:

image

In this case the stage is configured to send the messages to the reject link if it cannot be sent to the target queue for any reason. Note that all the checkboxes in the right pane are selected to add all the possible information about the failure to the rejected record.

RejectData stage

The RejectData stage is WebSphere MQ stage that is used to accept rejected data from the previous stage (OutputData). Stage is configured to work with queue manager QMNAME and queue QUEUE2. Since we know that our job generates MQRFH2 messages, the Format property is set to MQRFH2 to preserve the message format in rejected messages.

image

The following figure shows the columns on the reject link. The link includes all the columns from the input link of the OutputData stage (link DSLink11) plus five reject-specific columns for the information about the reason for rejecting the message (they correspond to the five selected checkboxes in the right pane of the reject link dialog shown before).

image

Job execution

After job execution a message is stored on the queue QUEUE1. Use the WebSphere MQ explorer to view the message.

Notice that its correlation identifier is set to the value from the input file:

image

In the Data section, notice that the message format is MQRFH2, and that the message body contains MQRFH2 structure followed by the message payload (the value Circle):

image

To read the message from the queue, use the ReceiveJMSMessage.java program. Once executed, it reads and removes the message from queue QUEUE1 and finally displays the following results:

>java -cp .:/opt/mqm/java/lib/com.ibm.mqjms.jar ReceiveJMSMessage
Receiving text message
Message received successfully
The value of the Size int property is: 1
The value of the Color string property is: Blue
The correlation id is: CID000000000000000000001
The message text is: Circle

This shows that the Java JMS application was able to read and process the JMS message that was stored as MQRFH2 message using WebSphere MQ Connector in DataStage job.
Finally, to demonstrate the reject functionality for MQRFH2 message, disable the put operation on the queue QUEUE1. This can be done from the WebSphere MQ Explorer, by setting the Put message property on QUEUE1 to Inhibited:
image
Alternatively, it can be done from command line with the following MQSC commands:

runmqsc QMNAME
alter qlocal(QUEUE1) put(disabled)
end

After execution of the DataStage job again, the job log contains entries like this:

OutputMessage,0: [IIS-CONN-WSMQ-000009] MQPUT call executed with completion code 2  (MQCC_FAILED), reason code 2051  (MQRC_PUT_INHIBITED)
OutputMessage,0: [IIS-CONN-WSMQ-000082] Rows rejected successfully

Inspection of the queue QUEUE2, that is used as the reject queue, shows a single message. Notice that the message has correlation id from the original message which could not be stored on QUEUE1:

image

The data of the message preserves the structure and content of the original message (it is of MQRFH2 format):

image

Document Location

Worldwide

Attachment

Archive with files described in this article: MQRFH2_messages_with_MQ_Connector_0.zip
  1. MQRFH2_JMS.dsx - DataStage jobs - SendJMSMessage, ReceiveJMSMessage - described and used in this document.
  2. SendJMSMessage.java - Java program that sends message by using JMS API.
  3. ReceiveJMSMessage.java - Java program that receives message by using JMS API.
  4. in.txt - input file used by SendJMSMessage job.

[{"Business Unit":{"code":"BU059","label":"IBM Software w\/o TPS"},"Product":{"code":"SSZJPZ","label":"IBM InfoSphere Information Server"},"Component":"DataStage;WebSphere MQ Connector","Platform":[{"code":"PF002","label":"AIX"},{"code":"PF016","label":"Linux"},{"code":"PF033","label":"Windows"}],"Version":"11.5;11.7","Edition":"","Line of Business":{"code":"LOB10","label":"Data and AI"}}]

Document Information

Modified date:
28 March 2019

UID

ibm10878613