001/*
002 * Copyright (c) 2009 The openGion Project.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
013 * either express or implied. See the License for the specific language
014 * governing permissions and limitations under the License.
015 */
016package org.opengion.fukurou.queue;
017
018import java.util.ArrayList;
019import java.util.List;
020
021import javax.jms.JMSException;
022// import javax.jms.Message;
023import javax.jms.MessageListener;
024import javax.jms.Queue;
025import javax.jms.QueueConnection;
026import javax.jms.QueueConnectionFactory;
027import javax.jms.QueueReceiver;
028import javax.jms.QueueSession;
029import javax.jms.TextMessage;
030import javax.naming.Context;
031import javax.naming.InitialContext;
032
033import org.apache.activemq.ActiveMQConnectionFactory;
034
035/**
036 * MQメッセージ受信用クラス。
037 *
038 * @og.group メッセージ連携
039 *
040 * @og.rev 5.10.15.2 (2019/09/20) 新規作成
041 *
042 * @version 5
043 * @author oota
044 * @since JDK7
045 */
046public class QueueReceive_MQ implements QueueReceive{
047
048        private QueueConnection connection ;
049        private QueueSession session ;
050        private QueueReceiver receiver ;
051//      List<QueueReceiver> listReceiver ;
052        private List<QueueReceiver> listReceiver ;              // 7.2.9.4 (2020/11/20)
053        private boolean batch ;
054
055        /**
056         * 接続処理
057         * メッセージキューサーバに接続します。
058         *
059         *  @param jmsServer jsmサーバ
060         *  @param sqsAccessKey sqs用awsアクセスキー(MQでは利用しません)
061         *  @param sqsSecretKey sqs用awsシークレットキー(MQでは利用しません)
062         */
063        public void connect(final String jmsServer, final String sqsAccessKey, final String sqsSecretKey) {
064                connect(jmsServer);
065        }
066
067        /**
068         * 接続処理
069         * jmsServerに接続します。
070         * MQの場合は、受信リスナーを設定して、随時メッセージ受信処理を行います。
071         * SQSの場合は最大受信件数の10件の処理を行います。
072         *
073         * @param jmsServer 接続先情報 MQ:jndi接続先 SQS:URL
074         */
075        private void connect(final String jmsServer) {
076                try {
077                        if(batch) {
078                                // バッチ用
079                                final String mqUserId = System.getProperty("mqUserId");
080                                final String mqPassword = System.getProperty("mqPassword");
081                                final QueueConnectionFactory factory = new ActiveMQConnectionFactory(jmsServer);
082                                connection = factory.createQueueConnection(mqUserId,  mqPassword);
083                        }else {
084                                // jndi接続用
085                                final Context ctx = new InitialContext();
086                                final QueueConnectionFactory factory = (QueueConnectionFactory)ctx.lookup("java:comp/env/" + jmsServer);
087                                connection = factory.createQueueConnection();
088                        }
089
090                        connection.start();
091
092                        // Receiveの作成
093                        session = connection.createQueueSession(false, QueueSession.CLIENT_ACKNOWLEDGE);
094
095                        // 初期化
096                        listReceiver = new ArrayList<QueueReceiver>();
097//              }catch(Exception e) {                                           // 8.0.0.0 (2021/07/31)
098//                      throw new RuntimeException("MQサーバの接続に失敗しました。:" + e.getMessage());
099                } catch( final Throwable th ) {
100                        throw new RuntimeException("MQサーバの接続に失敗しました。:",th );
101                }
102        }
103
104        /**
105         * 受信処理
106         * メッセージキューの受信の処理を行います。
107         *
108         * @param queueName キューの名前
109         * @return キュー情報格納クラス
110         */
111        @Override
112        public QueueInfo receive(final String queueName) {
113                QueueInfo queueInfo = null;
114
115                try {
116                        final Queue queue = session.createQueue(queueName);
117                        receiver = session.createReceiver(queue);
118
119                        final TextMessage msg = (TextMessage)receiver.receive(1000);
120
121                        if(msg != null) {
122                                // メッセージ受信の確認応答
123                                msg.acknowledge();
124
125                                // メッセージの設定
126                                queueInfo = new QueueInfo();
127                                queueInfo.setMessage(msg.getText());
128                        }
129//              }catch(Exception e) {                                                           // 8.0.0.0 (2021/07/31)
130//                      throw new RuntimeException(e.getMessage());
131                } catch( final Throwable th ) {
132                        throw new RuntimeException( th );
133                }finally {
134                        try {
135                                receiver.close();
136//                      }catch(Exception e) { ; }                                               // 8.0.0.0 (2021/07/31)
137                        } catch( final Throwable th ) {
138                                System.out.println("receiverのクローズに失敗しました。");
139                        }
140                }
141
142                return queueInfo;
143        }
144
145        /**
146         * リスナーの起動
147         * 指定したキュー名に対して、
148         * MessageListenerのリスナーを設定します。
149         *
150         * @param queueName キュー名
151         * @param listener MessageListerを実装したクラス
152         */
153        @Override
154        public void setListener(final String queueName, final MessageListener listener) {
155                QueueReceiver receiver = null;
156                try {
157                        final Queue queue = session.createQueue(queueName);
158                        receiver = session.createReceiver(queue);
159                        receiver.setMessageListener(listener);
160
161                        // リスナーの起動
162                        listReceiver.add(receiver);
163                }catch(JMSException ex) {
164//                      throw new RuntimeException("リスナーの起動に失敗しました。" + e.getMessage());
165                        throw new RuntimeException("リスナーの起動に失敗しました。" , ex);             // 8.0.0.0 (2021/07/31) original stack trace may be lost
166                }
167        }
168
169        /**
170         * クローズリスナー
171         * レシーバーをクローズすることで、
172         * リスナーの処理を終了します。
173         */
174        public void closeListener() {
175                for(final QueueReceiver receiver: listReceiver) {
176                        try {
177                                receiver.close();
178//                      }catch(Exception e) { ; }                                               // 8.0.0.0 (2021/07/31)
179                        } catch( final Throwable th ) {
180                                System.out.println("receiverのクローズに失敗しました。");
181                        }
182                }
183
184                // 初期化
185                listReceiver = null;
186                listReceiver = new ArrayList<QueueReceiver>();
187        }
188
189        /**
190         * クローズ処理
191         * クローズ処理を行います。
192         *
193         * @og.rev 8.0.0.0 (2021/07/31) Avoid catching generic exceptions such as Exception in try-catch block
194         */
195        @Override
196        public void close() {
197                if(receiver != null) {
198                        try {
199                                receiver.close();
200//                      }catch(Exception e) { ; }                                               // 8.0.0.0 (2021/07/31)
201                        } catch( final Throwable th ) {
202                                System.out.println("receiverのクローズに失敗しました。");
203                        }
204                }
205                if(session != null) {
206                        try {
207                                session.close();
208//                      }catch(Exception e) { ; }                                               // 8.0.0.0 (2021/07/31)
209                        } catch( final Throwable th ) {
210                                System.out.println("sessionのクローズに失敗しました。");
211                        }
212                }
213                if(connection != null) {
214                        try {
215                                connection.close();
216//                      }catch(Exception e) { ; }                                               // 8.0.0.0 (2021/07/31)
217                        } catch( final Throwable th ) {
218                                System.out.println("connectionのクローズに失敗しました。");
219                        }
220                }
221        }
222
223        /**
224         * バッチ処理判定フラグを設定します。
225         *
226         * @param batchFlg バッチ処理判定フラグ
227         */
228        public void setBatchFlg(final Boolean batchFlg) {
229                batch = batchFlg;
230        }
231
232        /**
233         * 検証用メソッド
234         * テスト用のメソッドです。
235         *
236         * @param args 引数
237         */
238        public static void main(final String[] args) {
239                final QueueReceive receive = new QueueReceive_MQ();
240                final String jmsServer = "tcp://localhost:61616";
241
242                // バッチフラグにtrueを設定
243                // 未設定の場合は、tomcatのjndi接続処理が実行されます。
244                receive.setBatchFlg(true);
245
246                // 認証情報の設定
247                System.setProperty("mqUserId", "admin");
248                System.setProperty("mqPassword", "admin");
249
250                // 接続
251                receive.connect(jmsServer, null, null);
252
253                // 処理対象のキュー名
254                final String queueName = "queue01";
255
256                // ** 1件受信する場合
257                final QueueInfo queueInfo = receive.receive(queueName);
258                if(queueInfo != null) {
259                        System.out.println("message:" + queueInfo.getMessage());
260                }else {
261                        System.out.println("キューが登録されていません。");
262                }
263
264//              // ** リスナーを設定して、受信を検知すると処理を実行します。(MQのみ)
265//              // MessageListerを実装した、QueueReceiveListenerクラスを作成します。
266//              MessageListener listener = new QueueReceiveListener();
267//              receive.setListener(queueName, listener);
268//              // 複数のキューにリスナーを設定することも可能です。
269//              receive.setListener("queue02", listener);
270//
271//              try {
272//                      // 1分間リスナーを起動しておく場合の、プロセス待機処理
273//                      Thread.sleep(60 * 1000);
274//              }catch(InterruptedException e) {
275//                      throw new RuntimeException(e.getMessage());
276//              }
277
278                // リスナー利用時は、closeListenerを実行して、解放してください。
279                receive.closeListener();
280
281                // 終了処理
282                receive.close();
283        }
284
285//      /**
286//       * QueueReceiveリスナークラス
287//       * リスナー用のクラスです。
288//       * MQに設定することで、メッセージが受信されると、
289//       * 自動的にonMessageメソッドが実行されます。
290//       *
291//       */
292//      static class QueueReceiveListener implements MessageListener {
293//              /**
294//               * メッセージ受信処理
295//               * MQサーバにメッセージが受信されると、
296//               * メソッドの処理が行われます。
297//               *
298//               * @param message 受信メッセージ
299//               */
300//              @Override
301//              public void onMessage(final Message message) {
302//
303//                      // メッセージ受信
304//                      TextMessage msg = (TextMessage) message;
305//                      String msgText = "";
306//
307//                      try {
308//                              // キューサーバのメッセージを取得
309//                              msgText = msg.getText();
310//                              // メーッセージの受信応答を返します。
311//                              msg.acknowledge();
312//
313//                              System.out.println("message:" + msgText);
314//
315//                      } catch (JMSException e) {
316//                              throw new RuntimeException(e.getMessage());
317//                      }
318//              }
319//      }
320
321}