001/* 002 * Copyright (c) 2009 The openGion Project. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the specific language 014 * governing permissions and limitations under the License. 015 */ 016package org.opengion.plugin.daemon; 017 018import java.util.Date; 019import java.util.Locale; // 7.2.9.4 (2020/11/20) 020 021import javax.jms.QueueSession; 022 023// import org.hsqldb.lib.StringUtil; 024import org.opengion.fukurou.util.StringUtil; // 7.0.6.0 (2019/10/07) 025import org.opengion.fukurou.queue.QueueInfo; 026import org.opengion.fukurou.queue.QueueSend; 027import org.opengion.fukurou.queue.QueueSendFactory; 028import org.opengion.fukurou.util.HybsTimerTask; 029import org.opengion.hayabusa.common.HybsSystem; 030import org.opengion.hayabusa.queue.DBAccessQueue; 031 032/** 033 * メッセージキュー送信 034 * メッセージキュー送信テーブルを監視して、 035 * 送信処理を行います。 036 * 037 * @og.group メッセージ連携 038 * 039 * @og.rev 5.10.15.0 (2019/08/30) 新規作成 040 * @og.rev 5.10.15.2 (2019/09/20) DB登録の実装をhayabusa.queueに移動 041 * 042 * @version 5.0 043 * @author oota 044 * @since JDK7 045 * 046 */ 047public class Daemon_QueueSend extends HybsTimerTask { 048 /** このプログラムのVERSION文字列を設定します。 {@value} */ 049 private static final String VERSION = "7.2.9.4 (2020/11/20)" ; 050 051 private static final String CLOUD_SQS_ACCESS_KEY = HybsSystem.sys( "CLOUD_SQS_ACCESS_KEY" ); 052 private static final String CLOUD_SQS_SECRET_KEY = HybsSystem.sys( "CLOUD_SQS_SECRET_KEY" ); 053 private static final int LOOP_COUNTER = 24; 054 055// private String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID"); 056 private final String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID"); 057 private final String USER_ID = "CYYYYY"; 058 private final String PG_ID = "DMN_QueSnd"; 059 private final String DMN_NAME = "QueueReceiveDMN"; 060 private final DBAccessQueue dbAccessQueue; 061 062 private int loopCnt ; 063 private QueueSend queueSend; 064 065 /** 066 * コンストラクター 067 * 初期処理を行います。 068 */ 069 public Daemon_QueueSend(){ 070 super(); // 7.2.9.4 (2020/11/20) PMD:It is a good practice to call super() in a constructor 071 dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME); 072 } 073 /** 074 * 開始処理 075 * タイマータスクのデーモン処理の開始ポイントです。 076 * 077 * @og.rev 7.2.9.4 (2020/11/20) spotbugs:呼び出したメソッドの Locale パラメータの使用を検討する 078 */ 079 @Override 080 protected void startDaemon() { 081 if (loopCnt % LOOP_COUNTER == 0) { 082 loopCnt = 1; 083 System.out.println(); 084// System.out.println(toString() + " " + new Date() + ""); 085 System.out.println(toString() + " " + new Date() ); // 7.2.9.4 (2020/11/20) PMD:Do not add empty strings 086 } else { 087 // メッセージキュー送信管理テーブルから、送信対象のレコードを取得 088 final String[][] vals = dbAccessQueue.selectGE65(); 089 090 // 取得データ分の繰り返し処理を実行する 091 for(int i = 0; i < vals.length; i++) { 092 final String[] record = vals[i]; 093 094 // GE65から取得した値を変数に格納 095 final String ykno = record[0]; 096 final String queueId = record[1]; 097 final String message = record[2]; 098 final String dedupliId = record[3]; 099 final String queSyu = record[4]; 100 final String jmsUrl = record[5]; 101 102// final String queueType = queSyu.toUpperCase(); 103 final String queueType = queSyu.toUpperCase( Locale.JAPAN ); // 7.2.9.4 (2020/11/20) 104 queueSend = QueueSendFactory.newQueueSend(queueType); 105 106 // 接続処理 107 queueSend.connect(jmsUrl, CLOUD_SQS_ACCESS_KEY, CLOUD_SQS_SECRET_KEY); 108 109 // メッセージ送信管理テーブルから取得したデータを送信実装予定 110 final QueueInfo queueInfo = new QueueInfo(); 111 112 // 応答確認種別 113 if("MQ".equals(queueType)){ 114 // MQメッセージサーバ指定時 115 queueInfo.setMqTransacted(false); 116 queueInfo.setMqAcknowledgeMode(QueueSession.AUTO_ACKNOWLEDGE); 117 // キュー名 118 queueInfo.setMqQueueName(queueId); 119 }else if("SQS".equals(queueType)){ 120 // SQSメッセージサーバ指定時 121 // グループID 122 queueInfo.setSqsFifoGroupId(queueId); 123 if(!StringUtil.isEmpty(dedupliId)) { 124 // 重複排除ID 125 // コンテンツに基づく重複排除が有効時は、未設定でも可(メッセージによる重複判定が行われる) 126 queueInfo.setSqsFifoDedupliId(dedupliId); 127 } 128 } 129 130 // メッセージ 131 queueInfo.setMessage(message); 132 133 // 完了フラグを処理中:2に更新 134 dbAccessQueue.updateGE66(ykno, DBAccessQueue.FGKAN_PROCESS); 135 136 // メッセージ送信処理 137 try{ 138 queueSend.sendMessage(queueInfo); 139 140 // 完了フラグを完了:3に更新 141 dbAccessQueue.updateGE66(ykno, DBAccessQueue.FGKAN_END); 142 143// }catch(Exception e) { // 8.0.0.0 (2021/07/31) Avoid catching generic exceptions such as Exception in try-catch block 144// // 完了フラグをエラー:4に更新して、エラー情報を登録 145// dbAccessQueue.updateGE66Error(ykno, e.getMessage()); 146 } catch( final Throwable th ) { 147 // 完了フラグをエラー:4に更新して、エラー情報を登録 148 dbAccessQueue.updateGE66Error(ykno, th.getMessage()); 149 } 150 } 151 152 // クローズ処理 153 queueSend.close(); 154 155 loopCnt++; 156 } 157 } 158}