已阅读5页,还剩11页未读, 继续免费阅读
版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
往队里写消息package com.iss;import java.io.BufferedWriter;import java.io.File;import java.io.FileInputStream;import java.io.FileWriter;import java.io.IOException;import java.sql.Connection;import java.sql.DriverManager;import java.sql.Statement;import java.text.SimpleDateFormat;import java.util.Date;import java.util.HashMap;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Properties;import org.dom4j.DocumentHelper;import org.dom4j.Element;import com.ibm.mq.MQC;import com.ibm.mq.MQEnvironment;import com.ibm.mq.MQException;import com.ibm.mq.MQGetMessageOptions;import com.ibm.mq.MQMessage;import com.ibm.mq.MQQueue;import com.ibm.mq.MQQueueManager;public class MessageByMQ /定义队列管理器和队列的名称 private static String qmName; private static String qName; private static MQQueueManager qMgr; private static MQQueue queue; private static String getMessageQueueFlag;/队列: Y 表示常连接 N 表示非常连接用完就关闭 private static String getMessageQueueManagerFlag;/队列管理器: Y 表示常连接 N 表示非常连接用完就关闭 static try /设置环境: /MQEnvironment中包含控制MQQueueManager对象中的环境的构成的静态变量,MQEnvironment的值的设定会在MQQueueManager的构造函数加载的时候起作用, /load properties FileInputStream fis = new FileInputStream(new File(perties); Properties props = new Properties(); props.load(fis); fis.close(); /因此必须在建立MQQueueManager对象之前设定MQEnvironment中的值. getMessageQueueFlag=props.getProperty(getMessage.queue.queueAlwaysConnected); getMessageQueueManagerFlag=props.getProperty(getMessage.queue.queueManagerAlwaysConnected); MQEnvironment.hostname=props.getProperty(getMessage.queue.host);/MQ服务器的IP地址 MQEnvironment.channel=props.getProperty(getMessage.queue.channel);/服务器连接的通道 MQEnvironment.CCSID=Integer.valueOf(props.getProperty(getMessage.queue.ccsid);/服务器MQ服务使用的编码1381代表GBK、1208代表UTF(Coded Character Set Identifier:CCSID) MQEnvironment.port=Integer.valueOf(props.getProperty(getMessage.queue.port);/MQ端口 qmName = props.getProperty(getMessage.queue.qmname);/MQ的队列管理器名称 qName = props.getProperty(getMessage.queue.qname);/MQ远程队列的名称 /定义并初始化队列管理器对象并连接 /MQQueueManager可以被多线程共享,但是从MQ获取信息的时候是同步的,任何时候只有一个线程可以和MQ通信。 qMgr = new MQQueueManager(qmName); catch (MQException e) / TODO Auto-generated catch block System.out.println(初使化MQ出错); e.printStackTrace(); catch(Exception ex) System.out.println(初使化MQ出错); ex.printStackTrace(); private void getQueueInfo() throws Exception int count = 0; int count_error = 0; Connection con = null; try/ String url = jdbc:oracle:thin:37:1521:ORCL;/ 数据库连接字符串/ String username = cthi;/ 数据库用户名/ String password = cthi;/ 数据库密码/ / Class.forName(oracle.jdbc.driver.OracleDriver);/ con = DriverManager.getConnection(url, username, password); int openOptions = MQC.MQOO_BROWSE | MQC.MQOO_OUTPUT | MQC.MQOO_INQUIRE; MQGetMessageOptions gmo = new MQGetMessageOptions(); gmo.options = MQC.MQGMO_BROWSE_NEXT; / int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT | MQC.MQOO_INQUIRE;/ MQGetMessageOptions gmo = new MQGetMessageOptions(); / gmo.options = gmo.options + MQC.MQGMO_WAIT; / Wait if no messages on the Queue(如果在队列上没有消息则等待) / gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;/ Fail if Qeue Manager Quiescing(如果队列管理器停顿则失败) / gmo.waitInterval = 1 ; / Sets the time limit for the wait.(设置等待的毫秒时间限制) if(qMgr=null | !qMgr.isConnected() qMgr = new MQQueueManager(qmName); if(queue=null | !queue.isOpen() queue = qMgr.accessQueue(qName, openOptions); try String strMessage = ; File fs = new File(c:1abc_hr_error10707.txt); fs.createNewFile(); File f = new File(c:1abc_hr10707.txt); f.createNewFile(); FileWriter fw=null; BufferedWriter bw=null; SimpleDateFormat sdf = new SimpleDateFormat(yyyy-MM-dd HH:mm:ss); String sql = ;/Statement stmt = con.createStatement(); while(true) MQMessage retrieve = new MQMessage(); queue.get(retrieve, gmo); byte iii = new byteretrieve.getMessageLength(); retrieve.readFully(iii); / 取出队列的消息 strMessage = new String(iii,UTF-8);/ if(strMessage.trim().length() != 0)/ try/Element root = DocumentHelper.parseText(strMessage).getRootElement();/Element headElement = root.element(format);/List nodeLists = headElement.elements();/Map elementMap = new HashMap();/elementMap.put(classify, root.attributeValue(classify);/for(Iterator it = nodeLists.iterator();it.hasNext();)/Element element = (Element) it.next();/elementMap.put(element.getName(), element.getText();/sql =insert into MQCHECK (status,user_code,msg_id,insert_date) values(+String.valueOf(elementMap.get(classify)+,+String.valueOf(elementMap.get(owner)+,+String.valueOf(elementMap.get(id)+,sysdate);/stmt.executeUpdate(sql);/ catch(Exception ex)/ / /如果错误的消息格式放下边的文件当中/ count_error+;/ count-;/ FileWriter fws=null; / BufferedWriter bws=null; / fws = new FileWriter(fs, true);/ bws = new BufferedWriter(fws); / bws.write(strMessage); / bws.newLine(); / bws.flush(); / bws.close();/ ex.printStackTrace();/ / fw = new FileWriter(f, true); bw = new BufferedWriter(fw); bw.write(strMessage); bw.newLine(); bw.flush(); bw.close(); count+; catch(MQException ex) /当以MQC.MQOO_BROWSE(只读方式)读取消息时如果队列中消息已经被全部取走 mq 将返回 2033的code编码 可以捕捉此code 不做处理 catch(Exception ex) System.out.println(ServiceManageAction.getQueueInfo error); ex.printStackTrace(); throw ex; finally System.out.println(成功处理了 + count + 条); System.out.println(失败处理了 + count_error + 条); try queue.close(); catch (MQException e) e.printStackTrace(); try qMgr.disconnect(); catch (MQException e) e.printStackTrace(); / con.close(); /* * 从队列中去获取消息,如果队列中没有消息,就会发生异常,不过没有关系,有TRY.CATCH,如果是第三方程序调用方法,如果无返回则说明无消息 * 第三方可以将该方法放于一个无限循环的while(true).之中,不需要设置等待,因为在该方法内部在没有消息的时候会自动等待。 * return */ public static String getMessage() try /设置将要连接的队列属性 / Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface /(except for completion code constants and error code constants). /MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default. /MQOO_OUTPUT:Open the queue to put messages. int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT | MQC.MQOO_INQUIRE; /设置取出消息的属性(默认属性) /Set the put message options.(设置放置消息选项) MQGetMessageOptions gmo = new MQGetMessageOptions(); /gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;/Get messages under sync point control(在同步点控制下获取消息) gmo.options = gmo.options + MQC.MQGMO_WAIT; / Wait if no messages on the Queue(如果在队列上没有消息则等待) gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;/ Fail if Qeue Manager Quiescing(如果队列管理器停顿则失败) gmo.waitInterval = 1 ; / Sets the time limit for the wait.(设置等待的毫秒时间限制) /*关闭了就重新打开*/ if(qMgr=null | !qMgr.isConnected() qMgr = new MQQueueManager(qmName); /*要求为长连接所以保持一直打开状态*/ if(queue=null | !queue.isOpen() System.out.println(队列关闭了重新打开); queue = qMgr.accessQueue(qName, openOptions); / 从队列中取出消息 /先判断队列中是否有消息如果没有返回空字符串 if(getMessageCountByQ() = 0) return ;/队列中没有数据了返回空串 /一次读取队列中所有消息/ while(getMessageCountByQ() != 0)/ / MQMessage retrieve = new MQMessage();/ queue.get(retrieve, gmo);/ byte iii = new byteretrieve.getMessageLength();/ retrieve.readFully(iii); / 取出队列的消息/ messageList.add(new String(iii);/ /每调用一次读取一次队列中的消息 MQMessage retrieve = new MQMessage(); queue.get(retrieve, gmo);/ System.out.println(retrieve.correlationId+rrrrrrrrrrrrrr); byte iii = new byteretrieve.getMessageLength(); retrieve.readFully(iii); / 取出队列的消息 return new String(iii); catch (MQException ex) ex.printStackTrace(); System.out.println(Invoke get message. A WebSphere MQ error occurred : Completion code + pletionCode + Reason code + ex.reasonCode); catch (IOException ex) ex.printStackTrace(); System.out.println(Invoke get message. An error occurred whilst writing to the message buffer: + ex); catch(Exception ex) System.out.println(Invoke get message. An error); ex.printStackTrace(); finally if(getMessageQueueFlag.equalsIgnoreCase(N) try queue.close();/关闭队列 catch (MQException e) System.out.println(调用getMessage 后关闭队列失败!);e.printStackTrace(); if(getMessageQueueManagerFlag.equalsIgnoreCase(N) try qMgr.disconnect(); catch (MQException e) System.out.println(调用getMessage 后关闭队列管理器失败!); e.printStackTrace(); return ; /* * return int 返回消息条数 */ private static int getMessageCountByQ() int messagecount = 0; try messagecount = queue.getCurrentDepth(); catch (MQException e) System.out.println(queue.getCurrentDepth error!); e.printStackTrace(); return messagecount; /* * 关闭队列 * private static MQQueue queue; */ public static int closeQueueConnection() try queue.close(); catch (MQException e) System.out.println(调用 MessageByMQ.closeQueueConnection() 失败); e.printStackTrace(); return 2667;/关闭队列异常 catch(Exception e) System.out.println(调用 MessageByMQ.closeQueueConnection() 失败); e.printStackTrace(); return 2667;/关闭队列异常 return 1;/关闭成功 /* * 关闭队列管理器 * private static MQQueueManager qMgr; */ public static int closeQueueManagerConnection() try qMgr.disconnect(); catch (MQException e) System.out.println(调用 MessageByMQ.closeQueueManagerConnection() 失败); e.printStackTrace(); return 2671;/关闭队列管理器异常 catch(Exception e) System.out.println(调用 MessageByMQ.closeQueueManagerConnection() 失败); e.printStackTrace(); return 2671;/关闭队列管理器异常 return 1;/关闭成功 public static void main(String args) throws Exception /*下面两个方法可同时使用,也可以单独使用*/ System.out.println(MessageByMQ.closeQueueConnection();/ System.out.println(MessageSendMQ.closeQueueConnection();/ System.out.println(MessageProxy.sendMessage(aadsfasdfaf);/ System.out.println(MessageSendMQ.closeQueueConnection(); MessageByMQ messageByMQ = new MessageByMQ(); messageByMQ.getQueueInfo(); / String ss = 1234,5678;/ Element root = DocumentHelper.parseText(ss).getRootElement();/ System.out.println(root.getStringValue(); 取消息package com.iss;import java.io.File;import java.io.FileInputStream;import java.io.IOException;import java.util.List;import java.util.Properties;import com.ibm.mq.MQC;import com.ibm.mq.MQEnvironment;import com.ibm.mq.MQException;import com.ibm.mq.MQMessage;import com.ibm.mq.MQPutMessageOptions;import com.ibm.mq.MQQueue;import com.ibm.mq.MQQueueManager;public class MessageSendMQ /定义队列管理器和队列的名称 private static String qmName; private static String qName; private static MQQueueManager qMgr; private static MQQueue queue; private static String sendMessageQueueFlag;/队列:Y 表示常连接 N 表示非常连接用完就关闭 private static String sendMessageQueueManagerFlag;/队列管理器: Y 表示常连接 N 表示非常连接用完就关闭 static try /设置环境: /MQEnvironment中包含控制MQQueueManager对象中的环境的构成的静态变量,MQEnvironment的值的设定会在MQQueueManager的构造函数加载的时候起作用, /load properties FileInputStream fis = new FileInputStream(new File(perties); Properties props = new Properties(); props.load(fis); fis.close(); /因此必须在建立MQQueueManager对象之前设定MQEnvironment中的值. sendMessageQueueFlag=props.getProperty(sendMessage.queue.queueAlwaysConnected); sendMessageQueueManagerFlag=props.getProperty(sendMessage.queue.queueManagerAlwaysConnected); MQEnvironment.hostname=props.getProperty(sendMessage.queue.host);/MQ服务器的IP地址 MQEnvironment.channel=props.getProperty(sendMessage.queue.channel);/服务器连接的通道 MQEnvironment.CCSID=Integer.valueOf(props.getProperty(sendMessage.queue.ccsid);/服务器MQ服务使用的编码1381代表GBK、1208代表UTF(Coded Character Set Identifier:CCSID) MQEnvironment.port=Integer.valueOf(props.getProperty(sendMessage.queue.port);/MQ端口 qmName = props.getProperty(sendMessage.queue.qmname);/MQ的队列管理器名称 qName = props.getProperty(sendMessage.queue.qname);/MQ远程队列的名称 /定义并初始化队列管理器对象并连接 /MQQueueManager可以被多线程共享,但是从MQ获取信息的时候是同步的,任何时候只有一个线程可以和MQ通信。 qMgr = new MQQueueManager(qmName); catch (MQException e) / TODO Auto-generated catch block System.out.println(初使化MQ出错); e.printStackTrace(); catch(Exception ex) System.out.println(初使化MQ出错); ex.printStackTrace(); /* * 往MQ发送消息 * param message * return */ public static int sendMessage(String message) int result=0;/代表发送失败初始化为0 try /设置将要连接的队列属性 / Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface /(except for completion code constants and error code constants). /MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default. /MQOO_OUTPUT:Open the queue to put messages. /*目标为远程队列,所有这里不可以用MQOO_INPUT_AS_Q_DEF属性*/ /int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; /*以下选项可适合远程队列与本地队列*/ int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING | MQC.MQOO_INQUIRE; /连接队列 /MQQueue provides inquire, set, put and get operations for WebSphere MQ queues. /The inquire and set capabilities are inherited from MQManagedObject. /*关闭了就重新打开*/ if(qMgr=null | !qMgr.isConnected() qMgr = new MQQueueManager(qmName); /*要求为长连接所以保持一直打开状态*/ if(queue=null | !queue.isOpen() queue = qMgr.accessQueue(qName, openOptions); /定义一个简单的消息 MQMessage putMessage = new MQMessage(); /将数据放入消息缓冲区 putMessage.characterSet=1381; putMessage.write(message.getBytes(); /设置写入消息的属性(默认属性) MQPutMessageOptions pmo = new MQPutMessageOptions()
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2026年甘肃省天水市秦州区人力资源和社会保障局招聘城镇公益性岗位15人笔试参考题库及答案解析
- 2026贵州黔东南州施秉淦源医投经营管理有限责任公司招聘2人考试备考题库及答案解析
- 2026中农发贵粱(贵州)农业科技发展有限公司招聘5人考试参考题库及答案解析
- 2026华中师范大学琼中附属中学春季临聘教师招聘9人考试参考题库及答案解析
- 2026贵阳市工业投资有限公司管培生招聘98人考试备考试题及答案解析
- 2026年温州苍南县交通发展集团有限公司公开招聘工作人员9人的笔试模拟试题及答案解析
- 2026河南南阳职业学院招聘考试备考试题及答案解析
- 2026上半年陕西事业单位联考渭南市招聘769人考试备考题库及答案解析
- 2026广东广州黄埔区广钢和苑幼儿园招聘考试参考试题及答案解析
- 2026中陕核(甘肃)现代物理科技有限公司招聘4人(第一批)考试参考试题及答案解析
- 安全生产思想隐患讲解
- 2026年山东交通职业学院单招综合素质考试参考题库附答案详解
- 2025年软装设计师资格考试试题及答案解析
- 兵团护理考试题库及答案解析
- 《机械制图》电子教材
- 2025年自然博物馆招聘面试模拟试题集
- DB32/T 4013-2021第三方社会稳定风险评估规范
- 新教科版(2021年春)小学四年级下册科学全册教案设计
- 腐蚀学 金属的腐蚀腐蚀形态
- 火电环保题库
- 图像修辞视域下高中语文教材插图的功能及教学应用
评论
0/150
提交评论