001/* 002 * Copyright (c) 2009 The openGion Project. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the specific language 014 * governing permissions and limitations under the License. 015 */ 016package org.opengion.fukurou.queue; 017 018import javax.jms.Connection; 019import javax.jms.JMSException; 020import javax.jms.MessageProducer; 021import javax.jms.Queue; 022import javax.jms.QueueConnectionFactory; 023import javax.jms.QueueSession; 024import javax.jms.Session; 025import javax.jms.TextMessage; 026import javax.naming.Context; 027import javax.naming.InitialContext; 028import javax.naming.NamingException; 029 030import org.apache.activemq.ActiveMQConnection; 031import org.apache.activemq.ActiveMQConnectionFactory; 032// import org.opengion.hayabusa.common.HybsSystemException; 033 034// import com.sun.star.uno.RuntimeException; 035 036/** 037 * MQサーバへのメッセージキュー送信用クラス 038 * 039 * MQサーバへのメッセージキュー送信用のクラスです。 040 * Apache ActiveMQとAmazonMQへの送信が可能です。 041 * tomcatからの送信(JNDI利用)と、 042 * バッチ処理(urlを指定し接続)の2通りが可能です。 043 * 044 * ※Apache ActiveMQとAmazonMQの切り替えは、 045 * jmsServerの接続先URLを変更するのみで接続の変更が可能です。 046 * (proxy環境からAmazonMqへの接続は行えない場合があります) 047 * 048 * @og.group メッセージ連携 049 * 050 * @og.rev 5.10.14.0 (2019/08/01) 新規作成 051 * 052 * @version 5 053 * @author oota 054 * @since JDK7 055 * 056 */ 057public class QueueSend_MQ implements QueueSend { 058 private Connection connection; 059 private Session session; 060 private MessageProducer sender; 061 private Context ctx; 062 // バッチ用フィールド 063 private boolean batch; 064 private String mqUserId = ""; 065 private String mqPassword = ""; 066 067 /** 068 * 接続処理 069 * MQサーバに接続を行います。 070 * 071 * @param jmsServer jmsサーバ接続名(バッチの場合はurl) 072 */ 073 public void connect(final String jmsServer) { 074 try { 075 ctx = new InitialContext(); 076 // 1. Connectionの作成s 077// QueueConnectionFactory factory = null; 078 final QueueConnectionFactory factory; 079 if (batch) { 080 // バッチ処理の場合。URL指定で、ユーザIDとパスワードを指定して接続。 081 mqUserId = System.getProperty("mqUserId"); 082 mqPassword = System.getProperty("mqPassword"); 083 factory = new ActiveMQConnectionFactory(jmsServer); 084 connection = (ActiveMQConnection)factory.createConnection(mqUserId, mqPassword); 085 } else { 086 // tomcat接続の場合。JNDIを利用して接続。 087 factory = (QueueConnectionFactory) ctx.lookup("java:comp/env/" + jmsServer); 088 connection = (ActiveMQConnection)factory.createConnection(); 089 } 090 091 // 2. Connectioの開始 092 connection.start(); 093 094 } catch (final JMSException jmse) { 095 throwErrMsg("MQサーバーの接続に失敗しました。" + jmse.getMessage()); 096 } catch (final NamingException ne) { 097 throwErrMsg("名前解決に失敗しました。" + ne.getMessage()); 098 } 099 } 100 101 /** 102 * 接続処理 103 * MQサーバに接続します。 104 * connect(String jmsServer)と同じ処理になります。 105 * 106 * @og.rev 5.10.15.0 (2019/08/30) 引数追加対応 107 * 108 * @param jmsServer jmsサーバ情報 109 * @param sqsAccessKey アクセスキー(MQサーバでは未使用) 110 * @param sqsSecretKey シークレットキー(MQサーバでは未使用) 111 */ 112 @Override 113 public void connect(final String jmsServer, final String sqsAccessKey, final String sqsSecretKey) { 114 // MQではsqsAccessKeyとsqsSecretKeyは利用しません。 115 connect(jmsServer); 116 } 117 118 /** 119 * エラーメッセージ送信。 120 * 121 * @og.rev 5.10.15.0 (2019/08/30) Hybs除外 122 * 123 * @param errMsg エラーメッセージ 124 */ 125 public void throwErrMsg(final String errMsg) { 126 throw new RuntimeException( errMsg ); 127// if (batch) { 128// // バッチ用エラー 129// throw new RuntimeException(errMsg); 130// } else { 131// // 画面用エラー 132// throw new HybsSystemException(errMsg); 133// } 134 } 135 136 /** 137 * メッセージ送信 138 * MQサーバにメッセージを送信します。 139 * 140 * @param queueInfo 送信キュー情報 141 */ 142 @Override 143 public void sendMessage(final QueueInfo queueInfo) { 144 try { 145 // 初期チェック 146 if (connection == null) { 147 throwErrMsg("MQサーバに接続されていません。"); 148 } 149 150 // 1. QueueSessionの作成 151 session = connection.createSession(queueInfo.isMqTransacted(), queueInfo.getMqAcknowledgeMode()); 152 if (session == null) { 153 throwErrMsg("キューセッションの生成に失敗しました。"); 154 } 155 156 // 2. Queueの作成 157// Queue queue = null; 158// queue = session.createQueue(queueInfo.getMqQueueName()); 159 final Queue queue = session.createQueue(queueInfo.getMqQueueName()); 160 sender = session.createProducer(queue); 161 162 // 3. テキストメッセージの作成 163 final TextMessage msg = session.createTextMessage(queueInfo.getMessage()); 164 165 // 4. 送信処理 166 sender.send(msg); 167 168 } catch (JMSException e) { 169 throwErrMsg("キューの送信処理に失敗しました。" + e.getMessage()); 170 } 171 } 172 173 /** 174 * クローズ処理 175 * MQサーバとの接続をクローズします。 176 * 177 * @og.rev 8.0.0.0 (2021/07/31) Avoid catching generic exceptions such as Exception in try-catch block 178 */ 179 @Override 180 public void close() { 181 if (ctx != null) { 182 try { 183 ctx.close(); 184// } catch (Exception e) { // 8.0.0.0 (2021/07/31) 185 } catch( final Throwable th ) { 186 System.out.println("ctxのクローズに失敗しました。"); 187 } 188 } 189 // 1. sender,session,connectionのクローズ処理 190 if (sender != null) { 191 try { 192 sender.close(); 193// } catch (Exception e) { // 8.0.0.0 (2021/07/31) 194 } catch( final Throwable th ) { 195 System.out.println("senderのクローズに失敗しました。"); 196 } 197 } 198 if (session != null) { 199 try { 200 session.close(); 201// } catch (Exception e) { // 8.0.0.0 (2021/07/31) 202 } catch( final Throwable th ) { 203 System.out.println("sessionのクローズに失敗しました。"); 204 } 205 } 206 if (connection != null) { 207 try { 208 connection.close(); 209// } catch (Exception e) { // 8.0.0.0 (2021/07/31) 210 } catch( final Throwable th ) { 211 System.out.println("connectionのクローズに失敗しました。"); 212 } 213 } 214 } 215 216 /** 217 * バッチ処理判定フラグを設定します。 218 * バッチ処理の場合は引数で接続先情報を与えます。 219 * それ以外の場合(Tomcat)ではJNDIより情報を取得します。 220 * 221 * @param batchFlg バッチ処理判定フラグ 222 */ 223 @Override 224 public void setBatchFlg(final Boolean batchFlg) { 225 batch = batchFlg; 226 } 227 228 /** 229 * テスト用メソッド 230 * テスト実行用です。 231 * 232 * @param args 引数 233 */ 234 public static void main(final String[] args) { 235 System.out.println("main start"); 236 // 送信情報の設定 237 final String url = "tcp://localhost:61616"; 238 final String queueName = "test01"; 239 final String msg = "送信メッセージ"; 240 241 final QueueInfo queueInfo = new QueueInfo(); 242 queueInfo.setMqQueueName(queueName); 243 queueInfo.setMqTransacted(false); 244 queueInfo.setMqAcknowledgeMode(QueueSession.AUTO_ACKNOWLEDGE); 245 queueInfo.setMessage(msg); 246 247 final QueueSend queueSend = new QueueSend_MQ(); 248 queueSend.setBatchFlg(true); 249 250 try { 251 queueSend.connect(url,null,null); 252// queueSend.connect(url); 253 queueSend.sendMessage(queueInfo); 254// } catch (final Exception e) { // 8.0.0.0 (2021/07/31) Avoid catching generic exceptions such as Exception in try-catch block 255// System.out.println(e.getMessage()); 256 } catch( final Throwable th ) { 257 System.out.println(th.getMessage()); 258 } finally { 259 queueSend.close(); 260 } 261 262 System.out.println("main end"); 263 } 264}