001package org.opengion.fukurou.queue; 002 003import java.util.ArrayList; 004import java.util.List; 005 006import javax.jms.JMSException; 007// import javax.jms.Message; 008import javax.jms.MessageListener; 009import javax.jms.Queue; 010import javax.jms.QueueConnection; 011import javax.jms.QueueConnectionFactory; 012import javax.jms.QueueReceiver; 013import javax.jms.QueueSession; 014import javax.jms.TextMessage; 015import javax.naming.Context; 016import javax.naming.InitialContext; 017 018import org.apache.activemq.ActiveMQConnectionFactory; 019 020/** 021 * MQメッセージ受信用クラス。 022 * 023 * @og.group メッセージ連携 024 * 025 * @og.rev 5.10.15.2 (2019/09/20) 新規作成 026 * 027 * @version 5 028 * @author oota 029 * @since JDK7 030 */ 031public class QueueReceive_MQ implements QueueReceive{ 032 033 private QueueConnection connection = null; 034 private QueueSession session = null; 035 private QueueReceiver receiver = null; 036 List<QueueReceiver> listReceiver = null; 037 private boolean batch = false; 038 039 /** 040 * 接続処理 041 * メッセージキューサーバに接続します。 042 * 043 * @param jmsServer jsmサーバ 044 * @param sqsAccessKey sqs用awsアクセスキー(MQでは利用しません) 045 * @param sqsSecretKey sqs用awsシークレットキー(MQでは利用しません) 046 */ 047 public void connect(final String jmsServer, final String sqsAccessKey, final String sqsSecretKey) { 048 connect(jmsServer); 049 } 050 051 /** 052 * 接続処理 053 * jmsServerに接続します。 054 * MQの場合は、受信リスナーを設定して、随時メッセージ受信処理を行います。 055 * SQSの場合は最大受信件数の10件の処理を行います。 056 * 057 * @param jmsServer 接続先情報 MQ:jndi接続先 SQS:URL 058 */ 059 private void connect(final String jmsServer) { 060 try { 061 if(batch) { 062 // バッチ用 063 final String mqUserId = System.getProperty("mqUserId"); 064 final String mqPassword = System.getProperty("mqPassword"); 065 final QueueConnectionFactory factory = new ActiveMQConnectionFactory(jmsServer); 066 connection = factory.createQueueConnection(mqUserId, mqPassword); 067 }else { 068 // jndi接続用 069 final Context ctx = new InitialContext(); 070 final QueueConnectionFactory factory = (QueueConnectionFactory)ctx.lookup("java:comp/env/" + jmsServer); 071 connection = factory.createQueueConnection(); 072 } 073 074 connection.start(); 075 076 // Receiveの作成 077 session = connection.createQueueSession(false, QueueSession.CLIENT_ACKNOWLEDGE); 078 079 // 初期化 080 listReceiver = new ArrayList<QueueReceiver>(); 081 }catch(Exception e) { 082 throw new RuntimeException("MQサーバの接続に失敗しました。:" + e.getMessage()); 083 } 084 } 085 086 /** 087 * 受信処理 088 * メッセージキューの受信の処理を行います。 089 * 090 * @param queueName キューの名前 091 * @return キュー情報格納クラス 092 */ 093 @Override 094 public QueueInfo receive(final String queueName) { 095 QueueInfo queueInfo = null; 096 097 try { 098 final Queue queue = session.createQueue(queueName); 099 receiver = session.createReceiver(queue); 100 101 final TextMessage msg = (TextMessage)receiver.receive(1000); 102 103 if(msg != null) { 104 // メッセージ受信の確認応答 105 msg.acknowledge(); 106 107 // メッセージの設定 108 queueInfo = new QueueInfo(); 109 queueInfo.setMessage(msg.getText()); 110 } 111 }catch(Exception e) { 112 throw new RuntimeException(e.getMessage()); 113 }finally { 114 try { 115 receiver.close(); 116 }catch(Exception e) {} 117 } 118 119 return queueInfo; 120 } 121 122 /** 123 * リスナーの起動 124 * 指定したキュー名に対して、 125 * MessageListenerのリスナーを設定します。 126 * 127 * @param queueName キュー名 128 * @param listener MessageListerを実装したクラス 129 */ 130 @Override 131 public void setListener(final String queueName, final MessageListener listener) { 132 QueueReceiver receiver = null; 133 try { 134 final Queue queue = session.createQueue(queueName); 135 receiver = session.createReceiver(queue); 136 receiver.setMessageListener(listener); 137 138 // リスナーの起動 139 listReceiver.add(receiver); 140 }catch(JMSException e) { 141 throw new RuntimeException("リスナーの起動に失敗しました。" + e.getMessage()); 142 } 143 } 144 145 /** 146 * クローズリスナー 147 * レシーバーをクローズすることで、 148 * リスナーの処理を終了します。 149 */ 150 public void closeListener() { 151 for(final QueueReceiver receiver: listReceiver) { 152 try { 153 receiver.close(); 154 }catch(Exception e) { 155 156 } 157 } 158 159 // 初期化 160 listReceiver = null; 161 listReceiver = new ArrayList<QueueReceiver>(); 162 } 163 164 /** 165 * クローズ処理 166 * クローズ処理を行います。 167 */ 168 @Override 169 public void close() { 170 if(receiver != null) { 171 try { 172 receiver.close(); 173 }catch(Exception e) { 174 175 } 176 } 177 if(session != null) { 178 try { 179 session.close(); 180 }catch(Exception e) { 181 182 } 183 } 184 if(connection != null) { 185 try { 186 connection.close(); 187 }catch(Exception e) { 188 189 } 190 } 191 } 192 193 /** 194 * バッチ処理判定フラグを設定します。 195 * 196 * @param batchFlg バッチ処理判定フラグ 197 */ 198 public void setBatchFlg(final Boolean batchFlg) { 199 batch = batchFlg; 200 } 201 202 /** 203 * 検証用メソッド 204 * テスト用のメソッドです。 205 * 206 * @param args 引数 207 */ 208 public static void main(final String[] args) { 209 final QueueReceive receive = new QueueReceive_MQ(); 210 final String jmsServer = "tcp://localhost:61616"; 211 212 // バッチフラグにtrueを設定 213 // 未設定の場合は、tomcatのjndi接続処理が実行されます。 214 receive.setBatchFlg(true); 215 216 // 認証情報の設定 217 System.setProperty("mqUserId", "admin"); 218 System.setProperty("mqPassword", "admin"); 219 220 // 接続 221 receive.connect(jmsServer, null, null); 222 223 // 処理対象のキュー名 224 final String queueName = "queue01"; 225 226 227 // ** 1件受信する場合 228 final QueueInfo queueInfo = receive.receive(queueName); 229 if(queueInfo != null) { 230 System.out.println("message:" + queueInfo.getMessage()); 231 }else { 232 System.out.println("キューが登録されていません。"); 233 } 234 235// // ** リスナーを設定して、受信を検知すると処理を実行します。(MQのみ) 236// // MessageListerを実装した、QueueReceiveListenerクラスを作成します。 237// MessageListener listener = new QueueReceiveListener(); 238// receive.setListener(queueName, listener); 239// // 複数のキューにリスナーを設定することも可能です。 240// receive.setListener("queue02", listener); 241// 242// try { 243// // 1分間リスナーを起動しておく場合の、プロセス待機処理 244// Thread.sleep(60 * 1000); 245// }catch(InterruptedException e) { 246// throw new RuntimeException(e.getMessage()); 247// } 248 249 // リスナー利用時は、closeListenerを実行して、解放してください。 250 receive.closeListener(); 251 252 // 終了処理 253 receive.close(); 254 } 255 256// /** 257// * QueueReceiveリスナークラス 258// * リスナー用のクラスです。 259// * MQに設定することで、メッセージが受信されると、 260// * 自動的にonMessageメソッドが実行されます。 261// * 262// */ 263// static class QueueReceiveListener implements MessageListener { 264// /** 265// * メッセージ受信処理 266// * MQサーバにメッセージが受信されると、 267// * メソッドの処理が行われます。 268// * 269// * @param message 受信メッセージ 270// */ 271// @Override 272// public void onMessage(final Message message) { 273// 274// // メッセージ受信 275// TextMessage msg = (TextMessage) message; 276// String msgText = ""; 277// 278// try { 279// // キューサーバのメッセージを取得 280// msgText = msg.getText(); 281// // メーッセージの受信応答を返します。 282// msg.acknowledge(); 283// 284// System.out.println("message:" + msgText); 285// 286// } catch (JMSException e) { 287// throw new RuntimeException(e.getMessage()); 288// } 289// } 290// } 291 292}