2台测试机,195发送消息 196接收消息
1. 195mq相关操作
crtmqm QM_00000000
strmqm QM_00000000
runmqsc QM_00000000
DEFINE QLOCAL (LQ_00000000)
DEFINE QLOCAL(XQ_88888888) USAGE(XMITQ)
DEFINE QREMOTE(RQ_88888888) rname(LQ_88888888) rqmname(QM_88888888) xmitq(XQ_88888888)
DEFINE CHANNEL(00000000.88888888) CHLTYPE(SDR) CONNAME('192.168.40.196(1414)') XMITQ(XQ_88888888) REPLACE
DEFINE CHANNEL(88888888.00000000) CHLTYPE(RCVR) REPLACE
DEFINE CHANNEL(DC.SVRCONN) CHLTYPE(SVRCONN) REPLACE
防止:AMQ9557: Queue Manager User ID initialization failed for 'mqm'.
ALTER AUTHINFO(SYSTEM.DEFAULT.AUTHINFO.IDPWOS) AUTHTYPE(IDPWOS) CHCKCLNT(OPTIONAL)
REFRESH SECURITY TYPE(CONNAUTH)
ALTER QMGR CHLAUTH(DISABLED)
后台启动监听
su - mqm
runmqlsr -t tcp -m QM_00000000 -p 1414 &
验证监听
netstat -an |grep 1414 |grep LISTEN
dis lsstatus(*)
43 : dis lsstatus(*)
AMQ8631: Display listener status details.
LISTENER(SYSTEM.LISTENER.TCP.1) STATUS(RUNNING)
PID(2337)
等196 启动机器、启动队列管理器、启动mq监听后
执行启动SDR 发送通道
runmqchl -c 00000000.88888888 -m QM_00000000 &
验证通道状态 dis chs(00000000.88888888)
结果不正常
AMQ8417: Display Channel Status details.
CHANNEL(00000000.88888888) CHLTYPE(SDR)
CONNAME(192.168.40.196(1414)) CURRENT
RQMNAME( ) STATUS(RETRYING)
SUBSTATE( ) XMITQ(XQ_88888888)
结果正常:
dis chs(00000000.88888888)
42 : dis chs(00000000.88888888)
AMQ8417: Display Channel Status details.
CHANNEL(00000000.88888888) CHLTYPE(SDR)
CONNAME(192.168.40.196(1414)) CURRENT
RQMNAME(QM_88888888) STATUS(RUNNING)
SUBSTATE(MQGET) XMITQ(XQ_88888888)
1.1. 196 mq相关操作
一行一行执行
crtmqm QM_88888888
strmqm QM_88888888
runmqsc QM_88888888
DEFINE QLOCAL (LQ_88888888)
DEFINE QLOCAL(XQ_00000000) USAGE(XMITQ)
DEFINE QREMOTE(RQ_00000000) rname(LQ_00000000) rqmname(QM_00000000) xmitq(XQ_00000000)
DEFINE CHANNEL(88888888.00000000) CHLTYPE(SDR) CONNAME('192.168.40.195(1414)') XMITQ(XQ_00000000) REPLACE
DEFINE CHANNEL(00000000.88888888) CHLTYPE(RCVR) REPLACE
DEFINE CHANNEL(DC.SVRCONN) CHLTYPE(SVRCONN) REPLACE
单独执行
runmqlsr -t tcp -m QM_88888888 -p 1414 &
验证监听
netstat -an |grep 1414 |grep LISTEN
回到195 --执行启动SDR发送通道
196执行
设定队列管理器的死信队列
ALTER QMGR DEADQ(SYSTEM.DEAD.LETTER.QUEUE)
验证死信队列
dis qmgr DEADQ
死信队列处理
196
DEFINE QLOCAL (LQ_DLQ)
[mqm@ibmmq-single196 ~]$pwd
/var/mqm
[mqm@ibmmq-single ~]$more SEND.rules.table
******=====Retry every 3 seconds, and WAIT for messages =======*****
RETRYINT(3) WAIT(YES)
******* For APP1, forward messages to APP1.ERR.DLQ ******
DESTQ(LQ_88888888) ACTION(FWD) FWDQ(LQ_DLQ) HEADER(YES)
***** For reason queue full and put disabled *******
**** retry 1 times to put the message on the original queue ****
REASON(MQRC_Q_FULL) ACTION(RETRY) RETRY(1)
REASON(MQRC_PUT_INHIBITED) ACTION(RETRY) RETRY(1)
**** For all other dlq messages, move those to designated queue *****
ACTION(FWD) FWDQ(SYSTEM.DEAD.LETTER.QUEUE) HEADER(YES)
[mqm@ibmmq-single ~]$
定义服务
DEFINE SERVICE(dlqhandler) +
CONTROL(QMGR) DESCR('DLQ Handler Service') +
SERVTYPE(SERVER) STARTCMD('/bin/sh') +
STARTARG('-c "+MQ_INSTALL_PATH+bin/runmqdlq SYSTEM.DEAD.LETTER.QUEUE QM_88888888 < +MQ_DATA_PATH+/SEND.rules.table"') +
STDOUT('+MQ_DATA_PATH+/dlq.+QMNAME+.log') +
STDERR('+MQ_DATA_PATH+/dlq.+QMNAME+.err') +
STOPCMD('+MQ_INSTALL_PATH+bin/amqsstop') +
STOPARG('-m +QMNAME+ -p +MQ_SERVER_PID+') +
REPLACE
启动 死信队列 dlqhandler 服务
START SERVICE(dlqhandler)
编写 Send.java 发送消息程序
4.
195 执行发送程序
[mqm@ibmmq-single test]$/var/mqm/jdk1.7.0_80/bin/java -Djava.ext.dirs=/opt/mqm/java/lib Send RQ_88888888 5000
The message has been Sussesfully put/n/n#########
[mqm@ibmmq-single test]$
再次发送10条
[mqm@ibmmq-single test]$/var/mqm/jdk1.7.0_80/bin/java -Djava.ext.dirs=/opt/mqm/java/lib Send RQ_88888888 10
The message has been Sussesfully put/n/n#########
[mqm@ibmmq-single test]$
5. 此时看到10条死信被转移到LQ_DLQ队列
dis qs(LQ_DLQ) CURDEPTH
18 : dis qs(LQ_DLQ) CURDEPTH
AMQ8450: Display queue status details.
QUEUE(LQ_DLQ) TYPE(QUEUE)
CURDEPTH(10)
代码 Send.java
import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
/**
* 发送消息程序
*/
public class Send {
public static void main(String[] args) {
try {
int b=Integer.parseInt(args[1]);
for (int i=0;i<b;i++){
// 主机名称
String hostName = "192.168.40.195";
// 端口(缺省 1414)
int port = 1414;
// 通道名称(缺省)
String channel = "DC.SVRCONN";
// 队列管理器名称
String qManager = "QM_00000000";
// 队列名称 (远程队列名)
String qName = args[0];
// Set up the MQEnvironment properties for Client Connections.
// 建立MQEnvironment 属性以便客户机连接.
MQEnvironment.hostname = hostName;
MQEnvironment.port = port;
MQEnvironment.channel = channel;
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES);
MQEnvironment.CCSID = 1381;
// Connection To the Queue Manager.
// 连接到队列管理器.
MQQueueManager qMgr = new MQQueueManager(qManager);
/**
* Set up the open options to open the queue for out put
* and additionally we have set the option to fail if the queue manager
* is quiescing.
*
* 建立打开选项以便打开用于输出的队列,进一步而言,如果队列管理器是
* 停顿的话,我们也已设置了选项去应对不成功情况.
*/
int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
// Open the queue.
// 打开队列.
MQQueue queue = qMgr.accessQueue(qName, openOptions, null, null,
null);
// Set the put message options , we will use the default setting.
// 设置放置消息选项我们将使用默认设置.
MQPutMessageOptions pmo = new MQPutMessageOptions();
/**
* Next we Build a message The MQMessage class encapsulates the data
* buffer that contains the actual message data, together with all the MQMD
* parameters that describe the message.
*
* 下一步我们建立消息,MQMessage类压缩了包含实际消息数据的数据缓冲区,
* 和描述消息的所有MQMD 参数.
*
* To Build a new message, create a new instance of MQMessage class
* and use writxxx (we will be using writeString method).
* The put() method of MQQueue also takes an instance of the
* MQPutMessageOptions class as a parameter.
*
* 欲建立新消息,创建MQMessage类新实例以及使用writxxx(我们将使用writeString 方法.).
* MQQueue 的put()方法也可作为参数MQPutMessageOptions 类的实例.
*/
// Create The message buffer.
// 创建消息缓冲区.
MQMessage outMsg = new MQMessage();
// Set the MQMD format field.
// 设置MQMD 格式字段.
outMsg.format = MQC.MQFMT_STRING;
// Prepare message with user data.
// 准备用户数据消息.
String msgString = "TAIL";
// Now we put The message on the Queue.
// 现在我们在队列上放置消息.
outMsg.writeString(msgString);
// Commit the transaction.
// 提交事务处理.
queue.put(outMsg, pmo);
qMgr.commit();
// Close the the Queue and Queue manager objects.
// 关闭队列和队列管理器对象.
queue.close();
qMgr.disconnect();
}
System.out.println(" The message has been Sussesfully put/n/n#########");
} catch (MQException ex) {
System.out.println("An MQ Error Occurred: Completion Code is :/t" +
ex.completionCode + "/n/n The Reason Code is :/t" + ex.reasonCode);
ex.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




