001package org.opengion.plugin.daemon; 002 003import java.io.File; 004import java.util.Date; 005import java.util.Locale; // 7.2.9.4 (2020/11/20) 006 007import javax.jms.JMSException; 008import javax.jms.Message; 009import javax.jms.MessageListener; 010import javax.jms.TextMessage; 011 012// import org.opengion.fukurou.util.BizUtil; 013import org.opengion.fukurou.business.BizUtil; 014import org.opengion.fukurou.queue.QueueInfo; 015import org.opengion.fukurou.queue.QueueReceive; 016import org.opengion.fukurou.queue.QueueReceiveFactory; 017import org.opengion.fukurou.util.HybsTimerTask; 018import org.opengion.fukurou.util.StringUtil; 019import org.opengion.hayabusa.common.HybsSystem; 020import org.opengion.hayabusa.common.HybsSystemException; 021import org.opengion.hayabusa.queue.DBAccessQueue; 022 023/** 024 * メッセージキュー受信 メッセージキューの受信処理を行います。 025 * 026 * @og.group メッセージ連携 027 * 028 * @og.rev 5.10.15.2 (2019/09/20) 新規作成 029 * 030 * @version 5.0 031 * @author oota 032 * @since JDK7 033 * 034 */ 035public class Daemon_QueueReceive extends HybsTimerTask { 036 private int loopCnt ; 037 private QueueReceive queueReceive ; 038 039 private static final int LOOP_COUNTER = 24; 040 private static final char FPSC = File.pathSeparatorChar ; // 7.2.9.4 (2020/11/20) システムに依存するパス区切り文字 041 042 private final String CLOUD_SQS_ACCESS_KEY = HybsSystem.sys("CLOUD_SQS_ACCESS_KEY"); 043 private final String CLOUD_SQS_SECRET_KEY = HybsSystem.sys("CLOUD_SQS_SECRET_KEY"); 044 private final String MQ_QUEUE_TYPE; 045 private final String MQ_QURUE_SERVER_URL = HybsSystem.sys("MQ_QUEUE_SERVER_URL"); 046 private final String MQ_QUEUE_RECEIVE_LISTENER =HybsSystem.sys("MQ_QUEUE_RECEIVE_LISTENER"); 047 048 private final String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID"); 049 private final String USER_ID = "CYYYYY"; 050 private final String PG_ID; 051 private final String DMN_NAME = "QueueReceiveDMN"; 052 private final DBAccessQueue dbAccessQueue; 053 054 private final String REAL_PATH = HybsSystem.sys("REAL_PATH"); // 7.2.9.4 (2020/11/20) 055 056 /** 057 * コンストラクター 058 * 初期処理を行います。 059 * 060 * @og.rev 7.2.9.4 (2020/11/20) spotbugs:呼び出したメソッドの Locale パラメータの使用を検討する 061 */ 062 public Daemon_QueueReceive() { 063 super(); 064 065 // パラメータの設定 066 // 7.2.9.4 (2020/11/20) PMD:Avoid if (x != y) ..; else ..; 067 if(StringUtil.isNull(HybsSystem.sys("MQ_QUEUE_TYPE"))) { 068 throw new RuntimeException("システムリソースにMQ_QUEUE_TYPEを登録して下さい"); 069 }else { 070// MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase(); 071 MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase( Locale.JAPAN ); // 7.2.9.4 (2020/11/20) 072 PG_ID = StringUtil.cut("QueRec" + MQ_QUEUE_TYPE, 10); 073 } 074 075 dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME); 076 077// // パラメータの設定 078// if(!StringUtil.isNull(HybsSystem.sys("MQ_QUEUE_TYPE"))) { 079//// MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase(); 080// MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase( Locale.JAPAN ); // 7.2.9.4 (2020/11/20) 081// PG_ID = StringUtil.cut("QueRec" + MQ_QUEUE_TYPE, 10); 082// }else { 083// throw new RuntimeException("システムリソースにMQ_QUEUE_TYPEを登録して下さい"); 084// } 085// 086// dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME); 087 } 088 089 /** 090 * 初期処理 MQサーバに接続します。 091 * 092 * @og.rev 7.2.9.4 (2020/11/20) spotbugs:呼び出したメソッドの Locale パラメータの使用を検討する 093 */ 094 @Override 095 public void initDaemon() { 096 // 開始ログO 097 final StringBuilder errMsg = new StringBuilder(); 098 if (MQ_QUEUE_TYPE == null) { 099 errMsg.append("MQ_QUEUE_TYPE"); 100 } 101 if (MQ_QURUE_SERVER_URL == null) { 102 errMsg.append(" MQ_QUEUE_SERVER_URL"); 103 } 104 105 if (errMsg.length() > 0) { 106 errMsg.append(" キュータイプを特定するために、左記のシステムリソースを登録して下さい。"); 107 throw new HybsSystemException(errMsg.toString()); 108 } 109 110// final String queueType = MQ_QUEUE_TYPE.toUpperCase(); 111 final String queueType = MQ_QUEUE_TYPE.toUpperCase( Locale.JAPAN ); // 7.2.9.4 (2020/11/20) 112 113 // 開始ログ 114 System.out.println("MQキュータイプ:" + queueType); 115 System.out.println("MQサーバーURL:" + MQ_QURUE_SERVER_URL); 116 117 queueReceive = QueueReceiveFactory.newQueueReceive(queueType); 118 119 queueReceive.connect(MQ_QURUE_SERVER_URL, CLOUD_SQS_ACCESS_KEY, CLOUD_SQS_SECRET_KEY); 120 } 121 122 /** 123 * 開始処理 タイマータスクのデーモン処理の開始ポイントです。 124 */ 125 @Override 126 protected void startDaemon() { 127 if (loopCnt % LOOP_COUNTER == 0) { 128 loopCnt = 1; 129 System.out.println(); 130 System.out.print(toString() + " " + new Date() + " "); 131 } else { 132 // 対象 キュー名(グループ名)とbizlogic名の取得処理 133 final String[][] ge67vals = dbAccessQueue.setlectGE67(); 134 // キュー情報登録チェック 135 if (ge67vals.length == 0) { 136 final String errMsg = "GE67にキュー情報が登録されていません。"; 137 throw new RuntimeException(errMsg); 138 } 139 // MQとSQSで処理を分岐 140 // MQ:指定キューIDからキューメッセージを取得 141 // SQS:キューメッセージを取得してからキューID(グループID)を取得 142 switch (MQ_QUEUE_TYPE) { 143 case "MQ": 144 processMq(ge67vals); 145 break; 146 case "SQS": 147 processSqs(ge67vals); 148 break; 149 default: 150 final String errMsg = "リソース(MQ_QUEUE_TYPE)の値が不正です。:" + MQ_QUEUE_TYPE; 151 throw new RuntimeException(errMsg); 152 } 153 154 loopCnt++; 155 } 156 } 157 158 /** 159 * MQ用の処理 160 * GE67に登録されているキューIDの、 161 * メッセージキューを取得して処理を行います。 162 * 163 * @param ge67vals GE67の配列データ 164 */ 165 private void processMq(final String[][] ge67vals) { 166 boolean listenerMode = false; 167 168 if("true".equals(MQ_QUEUE_RECEIVE_LISTENER)) { 169 listenerMode = true; 170 } 171 172 if(listenerMode) { 173 // リスナーの初期化 174 queueReceive.closeListener(); 175 } 176 177 // ge67のキューリスト分繰り返します 178 for (int row = 0; row < ge67vals.length; row++) { 179 final String queueId = ge67vals[row][0]; 180 final String bizLogicId = ge67vals[row][1]; 181 182 if(listenerMode) { 183 // リスナーを設定して、動的な受信処理(MQ専用) 184 final QueueReceiveListener listener = new QueueReceiveListener(queueId, bizLogicId); 185 queueReceive.setListener(queueId, listener); 186 }else { 187 // 1件の受信処理 188 final QueueInfo queueInfo = queueReceive.receive(queueId); 189 if (queueInfo != null) { 190 processMessage(queueId, bizLogicId, queueInfo.getMessage()); 191 // 1件処理を行ったら処理を終了します。 192 break; 193 } 194 } 195 } 196 } 197 198 /** 199 * SQS用の処理 200 * SQSはグループIDを指定して、キューを取得することはできず、 201 * 任意のキューを1つ取得してから、 202 * 判定処理を行います。 203 * GE67に登録されていないグループIDのキューが取得された場合は、 204 * GE68にエラーレコードを登録します。 205 * 206 * @param ge67vals GE67の配列データ 207 */ 208 private void processSqs(final String[][] ge67vals) { 209 // 下記はSQSの場合(キューを1件取得して処理) 210 final QueueInfo queueInfo = queueReceive.receive(null); 211 212 // キューが未取得の場合 213 if(queueInfo == null) { 214 return; 215 } 216 217 // 受信したキューを処理 218 final String groupId = queueInfo.getSqsFifoGroupId(); 219 Boolean existsFlg = false; 220 // valsにグループIDのレコードが存在するか検索 221 for (int row = 0; row < ge67vals.length; row++) { 222 final String queueId = ge67vals[row][0]; 223 224 if (groupId != null && groupId.equals(queueId)) { 225 // 該当レコードあり 226 final String bizLogicId = ge67vals[row][1]; 227 processMessage(queueId, bizLogicId, queueInfo.getMessage()); 228 229 existsFlg = true; 230 break; 231 } 232 } 233 234 if (!existsFlg) { 235 // 該当groupIdの未登録エラー 236 // 処理番号生成 237 final String syoriNo = dbAccessQueue.generateSyoriNo(); 238 dbAccessQueue.insertGE68(groupId, syoriNo, null, queueInfo.getMessage()); 239 dbAccessQueue.updateGE68Error(syoriNo, "SQSキューに設定されているグループIDが、GE67に未登録です。"); 240 } 241 } 242 243 /** 244 * キャンセル処理 245 * タイマータスクのデーモン処理の終了ポイントです。 246 * 247 * @return キャンセルできれば、true 248 */ 249 @Override 250 public boolean cancel() { 251 if (queueReceive != null) { 252 queueReceive.close(); 253 } 254 255 return super.cancel(); 256 } 257 258 /** 259 * メッセージの処理 260 * 受信したメッセージをbizLogicに渡して、 261 * 処理を実行します。 262 * 263 * @param queueId キューID 264 * @param bizLogicId ビズロジックID 265 * @param msgText 受信メッセージ 266 */ 267 private void processMessage(final String queueId, final String bizLogicId, final String msgText) { 268 String syoriNo = ""; 269 try { 270 // 処理番号生成 271 syoriNo = dbAccessQueue.generateSyoriNo(); 272 273 // 管理テーブル登録 274 dbAccessQueue.insertGE68(queueId, syoriNo, bizLogicId, msgText); 275 276 // bizLogicの処理を実行 277 callActBizLogic(SYSTEM_ID, bizLogicId, msgText); 278 279 // 管理テーブル更新(完了) 280 dbAccessQueue.updateGE68(syoriNo, DBAccessQueue.FGKAN_END); 281 282 } catch (Throwable te) { 283 // bizLogicでのエラーはログの未出力して、処理を継続します。 284 // bizLogicのエラー情報はCauseに格納されているため、Causeから取得します。 285 String errMessage = null; 286 if (te.getCause() != null) { 287 // causeが設定されている場合のエラー情報 288 errMessage = te.getCause().getMessage(); 289 } else { 290 // causeが未設定の場合のエラー情報 291 errMessage = te.getMessage(); 292 } 293 System.out.println(errMessage); 294 try { 295 // エラーテーブルに登録 296 dbAccessQueue.updateGE68Error(syoriNo, errMessage); 297 } catch (Exception e) { 298 // ここでのエラーはスルーします。 299 System.out.println("管理テーブル登録エラー:" + e.getMessage()); 300 } 301 } 302 } 303 304 /** 305 * bizLogic処理の呼び出し 306 * 必要なパス情報をリソースから取得して、 307 * BizUtil.actBizLogicにパス情報を渡すことで、 308 * bizLogicの処理を行います。 309 * 310 * @og.rev 7.2.9.4 (2020/11/20) spotbugs:null になっている可能性があるメソッドの戻り値を利用している 311 * 312 * @param systemId システムID 313 * @param logicName ロジックファイル名 314 * @param msgText メッセージ 315 * @throws Throwable エラー情報 316 */ 317 private void callActBizLogic(final String systemId, final String logicName, final String msgText) throws Throwable { 318 // 対象 クラスパスの生成 319 // HotDeploy機能を使用する場合に、Javaクラスをコンパイルするためのクラスパスを設定します。 320 // 対象となるクラスパスは、WEB-INF/classes 及び WEB-INF/lib/*.jar です。 321 // bizLogicTag.javaのコードを移植 322 final String classDir = REAL_PATH + HybsSystem.sys( "BIZLOGIC_CLASS_PATH" ); // bizの下のパス 323 final String webIinf = REAL_PATH + "WEB-INF" + File.separator ; 324 325 final StringBuilder sb = new StringBuilder().append('.').append(FPSC); 326 327 final File lib = new File( webIinf + "lib"); 328 final File[] libFiles = lib.listFiles(); 329 if( libFiles != null ) { 330 // 7.2.9.4 (2020/11/20) PMD:This for loop can be replaced by a foreach loop 331 for( final File file : libFiles ) { 332 sb.append( file.getAbsolutePath() ).append(FPSC); 333 } 334// for (int i = 0; i < libFiles.length; i++) { 335// sb.append( libFiles[i].getAbsolutePath() ).append(FPSC); 336// } 337 } 338 339 // 上記で生成したクラスパスをclassPathに格納 340 final String classPath = 341 sb.append( webIinf ).append( "classes" ).append(FPSC) 342 .append( classDir ).append(FPSC) // bizの下のパス 343 .toString(); 344 345 // ソースパス情報の生成 346 final String srcDir = REAL_PATH + HybsSystem.sys( "BIZLOGIC_SRC_PATH" ); 347 final boolean isAutoCompile = HybsSystem.sysBool( "BIZLOGIC_AUTO_COMPILE" ); 348 final boolean isHotDeploy = HybsSystem.sysBool( "BIZLOGIC_HOT_DEPLOY" ); 349 350 // bizLogicに渡すパラメータ 351 final String[] keys = new String[] { "message" }; 352 final String[] vals = new String[] { msgText }; 353 354 // bizLogic処理の実行 355 BizUtil.actBizLogic( srcDir, classDir, isAutoCompile, isHotDeploy, classPath, systemId, logicName, keys, vals ); 356 } 357 358// 7.2.9.4 (2020/11/20) spotbugs:null になっている可能性があるメソッドの戻り値を利用している 359// private void callActBizLogic(final String systemId, final String logicName, final String msgText) throws Throwable { 360// // 対象 クラスパスの生成 361// // HotDeploy機能を使用する場合に、Javaクラスをコンパイルするためのクラスパスを設定します。 362// // 対象となるクラスパスは、WEB-INF/classes 及び WEB-INF/lib/*.jar です。 363// // bizLogicTag.javaのコードを移植 364// final StringBuilder sb = new StringBuilder(); 365// sb.append('.').append(File.pathSeparatorChar); 366// final File lib = new File(HybsSystem.sys("REAL_PATH") + "WEB-INF" + File.separator + "lib"); 367// final File[] libFiles = lib.listFiles(); 368// for (int i = 0; i < libFiles.length; i++) { 369// sb.append(libFiles[i].getAbsolutePath()).append(File.pathSeparatorChar); 370// } 371// sb.append(HybsSystem.sys("REAL_PATH") + "WEB-INF" + File.separator + "classes").append(File.pathSeparatorChar); 372// // bizの下のパス 373// sb.append(HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_CLASS_PATH")).append(File.pathSeparatorChar); 374// // 上記で生成したクラスパスをclassPathに格納 375// final String classPath = sb.toString(); 376// 377// // ソースパス情報の生成 378// final String srcDir = HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_SRC_PATH"); 379// final String classDir = HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_CLASS_PATH"); 380// final boolean isAutoCompile = HybsSystem.sysBool("BIZLOGIC_AUTO_COMPILE"); 381// final boolean isHotDeploy = HybsSystem.sysBool("BIZLOGIC_HOT_DEPLOY"); 382// 383// // bizLogicに渡すパラメータ 384// final String[] keys = new String[] { "message" }; 385// final String[] vals = new String[] { msgText }; 386// 387// // bizLogic処理の実行 388// BizUtil.actBizLogic(srcDir, classDir, isAutoCompile, isHotDeploy, classPath, systemId, logicName, keys, vals); 389// } 390 391 /** 392 * 受信処理リスナー用のインナークラス 393 * QueueReceiveリスナークラス リスナー用のクラスです。 394 * MQに設定することで、メッセージが受信されると、 395 * onMessageメソッドが実行されます。 396 * 397 * @og.rev 7.2.9.4 (2020/11/20) private final 追加 398 */ 399// class QueueReceiveListener implements MessageListener { 400 private final class QueueReceiveListener implements MessageListener { 401// private String queueId = ""; 402// private String bizLogicId = ""; 403 private final String queueId ; 404 private final String bizLogicId ; 405 406 /** 407 * コンストラクター 初期処理を行います。 408 * 409 * @param quId キューID 410 * @param bizId ビズロジックID 411 */ 412 public QueueReceiveListener(final String quId, final String bizId) { 413 queueId = quId; 414 bizLogicId = bizId; 415 } 416 417 /** 418 * メッセージ受信処理 MQサーバにメッセージが受信されると、 メソッドの処理が行われます。 419 * 420 * @param message 受信メッセージ 421 */ 422 @Override 423 public void onMessage(final Message message) { 424 // 要求番号 : ここでは使用していません。 425 final String ykno = ""; 426 427 // メッセージ受信 428 final TextMessage msg = (TextMessage) message; 429 String msgText = ""; 430 431 try { 432 // キューサーバのメッセージを取得 433 msgText = msg.getText(); 434 435 // メーッセージの受信応答を返します。 436 msg.acknowledge(); 437 438 processMessage(queueId, bizLogicId, msgText); 439 440 } catch (JMSException jmse) { 441 try { 442 // 管理テーブル更新 443 // 管理テーブル更新(エラー) 444 dbAccessQueue.updateGE68(ykno, DBAccessQueue.FGKAN_ERROR); 445 } catch (Exception e) { 446 // ここでのエラーはスルーします。 447 System.out.println("管理テーブル登録エラー:" + e.getMessage()); 448 } 449 450 throw new HybsSystemException("bizLogicの処理中にエラーが発生しました。" + jmse.getMessage()); 451 } 452 } 453 } 454}