How To
Summary
This article demonstrates how you can use WebSphere MQ Connector in DataStage parallel jobs to send messages in MQRFH2 format. The MQRFH2 format is used by the JMS implementation in WebSphere MQ to represent JMS messages.
Objective
The topic is divided into two parts:
- The first part describes a DataStage job. This job uses WebSphere MQ connector to assemble MQRFH2 message and to send it to WebSphere MQ queue. Then, the message is received by Java probgram that uses JMS API to obtain message from WebSphere MQ queue.
- The second part describes a Java program. This program uses JMS API to prepare and receive message from WebSphere MQ queue. Then, the message is sent by DataStage WebSphere MQ connector and processed to obtain details including sections of MQRFH2 header.
Environment
Required prerequisites: You must have a WebSphere MQ server installed. It is possible to use the WebSphere MQ client installation but queue configuration is different. You need to refer to WebSphere MQ Knowledge Center for MQ client configuration details. The following environment description is common for sending and receiving JMS message with WebSphere MQ connector.
WebSphere MQ queues configuration
-
Create MQ queue manager called
QMNAME
, and two queuesQUEUE1
andQUEUE2
. If queues already exists, make sure to remove any messages before proceeding.
crtmqm QMNAME
strqmqm QMNAME
runmqsc QMNAME
define qlocal(QUEUE1)
define qlocal(QUEUE2)
end -
Use the MQ JMS administration tools to configure MQQueueConnectionFactory object named
ivrQCF
for the queue managerMQNAME
and MQQueue object namedivtW
for the queueQUEUE1
. For example, if you wish to use the default configuration -JMSAdmin.config
- and file-based service provider, complete these steps:-
Open
JMSAdmin.config
file located in $MQ_HOME/Java/bin. -
Make sure the
INITIAL_CONTEXT_FACTORY
is defined as:
INITIAL_CONTEXT_FACTORY=com.sun.jndi.fscontext.RefFSContextFactory
-
Make sure the
PROVIDER_URL
is defined as:
PROVIDER_URL=file:///home/user/JNDI-Directory
Replace the/home/user/JNDI-Directory
with the desired location. The folder must exist. WebSphere MQ uses this folder to store file.bindings
for JMS objects. Note that the same path must be specified in Java programsSendJMSMessage.java
andReceiveJMSMessage.java
in the following line of code:
public static final String PROVIDER_URL = "file:///home/user/JNDI-Directory";
-
Start the JMSAdmin tool located in $MQ_HOME/Java/bin folder and run the following commands:
InitCtx> define qcf(ivtQCF) qmanager(QMNAME)
InitCtx> define q(ivtQ) qmanager(QMNAME) queue(QUEUE1)
InitCtx> end
-
-
Compile the Java programs
SendJMSMessage.java
andReceiveJMSMessage.java.
When the WebSphere MQ Java packages are installed with MQ server, you can use the following commands:
javac -cp $MQ_HOME/java/lib/com.ibm.mqjms.jar SendJMSMessage.java
javac -cp $MQ_HOME/java/lib/com.ibm.mqjms.jar ReceiveJMSMessage.java -
Import and compile DataStage job. New jobs ReceiveJMSMessage and SendJMSMessage are stored in the new folder MQRFH2_JMS under Jobs folder in Repository View. Both jobs have the Sequential File stage. The SendJMSMessage job uses in.txt file located in the
/tmp
folder as an input. The ReceiveJMSMessage job writes results to/tmp/out.txt
file. If necessary, update the location of the files in stage properties.
Steps
The job SendJMSMessage is designed to assemble and send a JMS message with the following characteristics:
- It is a TextMessage JMS message.
- It has Color string user property with value Blue.
- It has Size integer user property with value 1.
- It has CorrelationId JMS property with value CID000000000000000000001.
- It has the text payload Circle.
The following figure presents the layout of the job:
Each stage in the job flow is responsible for part of the message assembling and sending:
- The InputData stage reads input data from a file.
- The PrepareNameValue stage prepares the name value length/data pairs (the variable portion of the MQRFH2 structure). This variable portion contains the
<mcd>
,<jms>
and<usr>
folders, which were mentioned earlier. - The AdjustOffsets stage aligns the name value pair lengths to be multiples of 4 (the requirement of the MQRFH2 structure specification).
- The AssembleMQRFH2 stage prepares the values for the fixed MQRFH2 structure fields.
- The ExportColumns stage concatenates the columns that represent individual MQRFH2 fields into a single column that represents the complete MQRFH2 structure.
- The OutputMessage stage stores the assembled message on the queue.
- The RejectData stage accepts all the records that could not be put to the message queue due to an error.
InputData stage
The InputData stage reads data from the input file in.txt
. The content of this file is shown below:
"CorrelId","Color","Size","Payload"
"CID000000000000000000001","Blue","1","Circle"
The first line contains fields headers so the reading stage needs to be configured to ignore them. The configuration property "First Line is Column Names" is set to True.
The record and field delimiter have the default values ("end" and "comma"). Double quotes are used to enclose the field values. These settings match the layout of data in the input file.
The stage defines four columns. The first column (CorrelId) carries the message correlation identifier. The second and third columns (Color and Size) carry values for the user-defined JMS message properties Color and Size. The fourth column (Payload) carries the message payload data.
PrepareNameValue stage
The PrepareNameValue stage is used to prepare the variable portions of the MQRFH2 structure, namely the <mcd>
, <jms>
and <usr>
folders.
The output columns and their derivation rules configured in this stage are:
DSLink17.CorrelId = DSLink3.CorrelId
The correlation id value is passed from input to output.DSLink17.mcd = "<mcd><Msd>jms.text</Msd></mcd>"
The<mcd>
folder is an XML document that explains the format of the message. The<Msd>
element specifies the type of the JMS message. Since a message ofTextMessage
JMS message type is used, the value for this element needs to be set tojms.txt.
DSLink17.jms = "<jms><Dst>queue://QMNAME.QUEUE1</Dst><Tms>" : AsInteger(SecondsSinceFromTimestamp(CurrentTimestamp(), "1970-01-01 00:00:00")) + 5 * 60 * 60 : "000" : "</Tms><Cid>" : DSLink3.CorrelId : "</Cid><Dlv>2</Dlv></jms>"
The<jms>
folder is used to transport JMS header fields. In this case, we pass the following JMS header fields:<Dst>
specifies the message destination. In this case it is the queueQUEUE1
on queue managerQMNAME
.<Tms>
specifies the message timestamp. We construct the timestamp using the appropriate DataStage functions. TheCurrentTimestamp()
gives the current timestamp. TheSecondsSinceFromTimestamp()
gives the difference in seconds between two timestamps."1970-01-01 00:00:00"
is used as the second argument to obtain the number of seconds since the Epoch start. To obtain the result in Greenwich Time, you must add 5 hours (in case the job is running in US Eastern Time), and since the result must be expressed in milliseconds, you must multiple 5 hours with 60 minutes and 60 seconds and append three zero characters for the fractions of the second. Note that this calculation is only approximate since milliseconds are always specified as 000 and also because the value is calculated in the stage prior to the stage in which the message is actually stored on the queue. Note that the timestamp (<Tms>
element) is not mandatory – it can be excluded from the<jms>
folder.<Cid>
specifies the message correlation id. This value is present on the input in the CorrelId column so it can be reused.
DSLink17.usr = "<usr><Size dt='i4'>" : DSLink3.Size : "</Size><Color>" : DSLink3.Color : "</Color></usr>"
The<usr>
folder is used to pass user-specified properties. Two properties are defined in the example message: Color and Size. The values for these properties are represented by the input columns Color and Size, so these column names are pasted into the expression.DSLink17.Payload = DSLink3.Payload
The message payload value is passed from input to output.
AdjustOffsets stage
The AdjustOffsets stage aligns the lengths of variable JMS portions to be multiples of 4 which is the requirement of MQRFH2 structure. It also provides the lengths of the variable portions as new columns on the output link. The CorrelId and Payload columns are passed through this stage.
To understand how alignment and length calculation is performed, consider the rules for the mcd_aligned and mcd_aligned_length output columns:
DSLink19.mcd_aligned = PadString(DSLink17.mcd, " ", 4 - Mod(Len(DSLink17.mcd),4))
DSLink19.mcd_aligned_len = Len(DSLink17.mcd) + 4 - Mod(Len(DSLink17.mcd), 4)
The expression 4 - Mod(Len(DSLink17.mcd),4)
calculates the number of spaces that need to be appended to the DSLink.mcd value, so that the new length of the value is a multiple of 4. The function PadString()
performs the actual padding. Note that if the original DSLink.mcd is already a multiple of 4, the expressions still appends 4 space characters, which are not necessary. However, since they don't cause any problem and since they make the expressions simpler, they can be left in the formula.
AssembleMQRFH2 stage
The AssembleMQRFH2 stage prepares the remaining fixed MQRFH2 structure fields.
DSLink4.CorrelId = StringToRaw(DSLink19.CorrelId)
The stage passes the correlation id intact, except it changes the type to raw binary. This is done because theWSMQ.CORRELID
data element used by the WebSphere MQ connector requires this data element column to be defined as Binary(24).DSLink4.StrucId = "RFH "
The four-character MQRFH2 structure identifier.DSLink4.Version = 2
The four-byte integer representing the MQRFH2 structure version.DSLink4.StrucLength = 10*4 + 1*8 + DSLink19.mcd_aligned_len + DSLink19.jms_aligned_len + DSLink19.usr_aligned_len
The four-byte integer representing the MQRFH2 structure size. There are 10 four-byte fields, 1 eight-byte field and 3 variable portions. These lengths are all added to produce the size of the complete structure.DSLink4.Encoding = 546
The numerical encoding to use for the numerical values in the data following the MQRFH2 structure (in this example it doesn’t really matter since the data following the MQRFH2 structure doesn’t have any numerical data). The value 546 is specified, which is the default numerical encoding for queue managers on Windows operating system.DSLink4.CodedCharSetId = 1208
The character set encoding for the character data following the MQRFH2 structure. The value 1208 is specified, which is the CCSID for UTF-8 character set encoding.DSLink4.Format = "MQSTR "
The eight-character format of the data following the MQRFH2 structure. No additional format headers are provided after the initial MQRFH2 format header but only the text payload (the value "Circle"). So the string message format is specified.DSLink4.Flags = 0
The flags to use. No flags are specified, so 0 is passed.DSLink4.NameValueCCSID = 1208
The character set encoding for the character values in the name-value variable portions of the MQRFH2 structure. Again CCSID 1208 is specified for UTF-8 character set encoding.DSLink4.mcd_aligned_len = DSLink19.mcd_aligned_len
The length of the first name-value pair (<mcd>
folder) is passed intact.DSLink4.mcd_aligned = DSLink19.mcd_aligned
The content of the first name-value pair (<mcd>
folder) is passed intact.DSLink4.jms_aligned_len = DSLink19.jms_aligned_len
The length of the second name-value pair (<jms>
folder) is passed intact.DSLink4.jms_aligned = DSLink19.jms_aligned
The content of the second name-value pair (<jms>
folder) is passed intact.DSLink4.usr_aligned_len = DSLink19.usr_aligned_len
The length of the third name-value pair (<usr>
folder) is passed intact.DSLink4.usr_aligned = DSLink19.usr_aligned
The content of the third name-value pair (<usr>
folder) is passed intact.DSLink4.Payload = DSLink19.Payload
The message payload is passed intact.
ExportColums stage
The following figure shows the columns on the input link:
The following figure shows the properties of the stage. Note that all columns from the input link other than CorrelId and Payload appear in the Input section – these are the columns that will be merged. The column to be exported is the new FormatHeaders column. Since the MQRFH2 structure is a binary structure (contains both text and binary integer fields), the input data needs to be converted to binary format, which is why the Export Column Type property is set to Binary.
On the Format tab it is specified not to use any record and field delimiters and no quotes for the field values. This is required in order for the output MQ message to be constructed properly, according to the WebSphere MQ message format header specification (meaning the payload follows the MQRFH2 format header, without any delimiters or quotes).
The following figure shows the columns on the output link:
The following figure shows the mapping used to construct the new FormatHeaders column, and to pass CorrelId and Payload columns intact:
OutputMessage stage
The OutputMessage stage is WebSphere MQ Connector stage and it is used to put the actual message on the queue. Note that the queue manager is set to QMNAME
, the queue is set to QUEUE1
. The Format property is set to MQRFH2 to specify the format of the output message. The Encoding is set to -1 and Coded character set ID is set to 0 – this means that the numeric fields in the MQRFH2 format header use NATIVE encoding for the current platform, and text fields use the DEFAULT character set for the current platform.
The following figure shows the columns on the input link. Note that the CorrelId has Data element attribute set to WSMQ.CORRELID
. This tells the connector to set this value as the correlation identifier on the output message. The FormatHeaders column has Data element attribute set to WSMQ.FORMATHEADERS
. This tells the connector to put this data before the message payload in the message body, since this data contains the format headers data.
The stage also has reject link defined (DSLink22). This link is used to accept any records (messages) that cannot be stored on the target queue due to any reason selected in the left pane of the dialog shown here:
In this case the stage is configured to send the messages to the reject link if it cannot be sent to the target queue for any reason. Note that all the checkboxes in the right pane are selected to add all the possible information about the failure to the rejected record.
RejectData stage
The RejectData stage is WebSphere MQ stage that is used to accept rejected data from the previous stage (OutputData). Stage is configured to work with queue manager QMNAME
and queue QUEUE2
. Since we know that our job generates MQRFH2 messages, the Format property is set to MQRFH2 to preserve the message format in rejected messages.
The following figure shows the columns on the reject link. The link includes all the columns from the input link of the OutputData stage (link DSLink11) plus five reject-specific columns for the information about the reason for rejecting the message (they correspond to the five selected checkboxes in the right pane of the reject link dialog shown before).
Job execution
After job execution a message is stored on the queue QUEUE1
. Use the WebSphere MQ explorer to view the message.
Notice that its correlation identifier is set to the value from the input file:
In the Data section, notice that the message format is MQRFH2, and that the message body contains MQRFH2 structure followed by the message payload (the value Circle):
To read the message from the queue, use the ReceiveJMSMessage.java
program. Once executed, it reads and removes the message from queue QUEUE1
and finally displays the following results:
>java -cp .:/opt/mqm/java/lib/com.ibm.mqjms.jar ReceiveJMSMessage
Receiving text message
Message received successfully
The value of the Size int property is: 1
The value of the Color string property is: Blue
The correlation id is: CID000000000000000000001
The message text is: Circle
QUEUE1
. This can be done from the WebSphere MQ Explorer, by setting the Put message property on QUEUE1
to Inhibited:runmqsc QMNAME
alter qlocal(QUEUE1) put(disabled)
end
OutputMessage,0: [IIS-CONN-WSMQ-000009] MQPUT call executed with completion code 2 (MQCC_FAILED), reason code 2051 (MQRC_PUT_INHIBITED)
OutputMessage,0: [IIS-CONN-WSMQ-000082] Rows rejected successfully
Inspection of the queue QUEUE2,
that is used as the reject queue, shows a single message. Notice that the message has correlation id from the original message which could not be stored on QUEUE1
:
The data of the message preserves the structure and content of the original message (it is of MQRFH2 format):
Related Information
Document Location
Worldwide
Was this topic helpful?
Document Information
Modified date:
28 March 2019
UID
ibm10878613