001package org.opengion.plugin.daemon;
002
003import java.util.Date;
004import java.util.Locale;                                                                                        // 7.2.9.4 (2020/11/20)
005
006import javax.jms.QueueSession;
007
008// import org.hsqldb.lib.StringUtil;
009import org.opengion.fukurou.util.StringUtil;                                            // 7.0.6.0 (2019/10/07)
010import org.opengion.fukurou.queue.QueueInfo;
011import org.opengion.fukurou.queue.QueueSend;
012import org.opengion.fukurou.queue.QueueSendFactory;
013import org.opengion.fukurou.util.HybsTimerTask;
014import org.opengion.hayabusa.common.HybsSystem;
015import org.opengion.hayabusa.queue.DBAccessQueue;
016
017/**
018 * メッセージキュー送信
019 * メッセージキュー送信テーブルを監視して、
020 * 送信処理を行います。
021 *
022 * @og.group メッセージ連携
023 *
024 * @og.rev 5.10.15.0 (2019/08/30) 新規作成
025 * @og.rev 5.10.15.2 (2019/09/20) DB登録の実装をhayabusa.queueに移動
026 *
027 * @version 5.0
028 * @author oota
029 * @since JDK7
030 *
031 */
032public class Daemon_QueueSend extends HybsTimerTask {
033        private static final String CLOUD_SQS_ACCESS_KEY = HybsSystem.sys( "CLOUD_SQS_ACCESS_KEY" );
034        private static final String CLOUD_SQS_SECRET_KEY = HybsSystem.sys( "CLOUD_SQS_SECRET_KEY" );
035        private static final int    LOOP_COUNTER         = 24;
036
037//      private String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID");
038        private final String SYSTEM_ID  = HybsSystem.sys("SYSTEM_ID");
039        private final String USER_ID    = "CYYYYY";
040        private final String PG_ID              = "DMN_QueSnd";
041        private final String DMN_NAME   = "QueueReceiveDMN";
042        private final DBAccessQueue dbAccessQueue;
043
044        private int loopCnt ;
045        private QueueSend queueSend;
046
047        /**
048         * コンストラクター
049         * 初期処理を行います。
050         */
051        public Daemon_QueueSend(){
052                super();                                                                // 7.2.9.4 (2020/11/20) PMD:It is a good practice to call super() in a constructor
053                dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME);
054        }
055        /**
056         * 開始処理
057         * タイマータスクのデーモン処理の開始ポイントです。
058         *
059         * @og.rev 7.2.9.4 (2020/11/20) spotbugs:呼び出したメソッドの Locale パラメータの使用を検討する
060         */
061        @Override
062        protected void startDaemon() {
063                if (loopCnt % LOOP_COUNTER == 0) {
064                        loopCnt = 1;
065                        System.out.println();
066//                      System.out.println(toString() + " " + new Date() + "");
067                        System.out.println(toString() + " " + new Date() );                             // 7.2.9.4 (2020/11/20) PMD:Do not add empty strings
068                } else {
069                        // メッセージキュー送信管理テーブルから、送信対象のレコードを取得
070                        final String[][] vals = dbAccessQueue.selectGE65();
071
072                        // 取得データ分の繰り返し処理を実行する
073                        for(int i = 0; i  < vals.length; i++) {
074                                final String[] record = vals[i];
075
076                                // GE65から取得した値を変数に格納
077                                final String ykno =  record[0];
078                                final String queueId = record[1];
079                                final String message = record[2];
080                                final String dedupliId = record[3];
081                                final String queSyu = record[4];
082                                final String jmsUrl = record[5];
083
084//                              final String queueType = queSyu.toUpperCase();
085                                final String queueType = queSyu.toUpperCase( Locale.JAPAN );    // 7.2.9.4 (2020/11/20)
086                                queueSend = QueueSendFactory.newQueueSend(queueType);
087
088                                // 接続処理
089                                queueSend.connect(jmsUrl, CLOUD_SQS_ACCESS_KEY, CLOUD_SQS_SECRET_KEY);
090
091                                // メッセージ送信管理テーブルから取得したデータを送信実装予定
092                                final QueueInfo queueInfo = new QueueInfo();
093
094                                // 応答確認種別
095                                if("MQ".equals(queueType)){
096                                        // MQメッセージサーバ指定時
097                                        queueInfo.setMqTransacted(false);
098                                        queueInfo.setMqAcknowledgeMode(QueueSession.AUTO_ACKNOWLEDGE);
099                                        // キュー名
100                                        queueInfo.setMqQueueName(queueId);
101                                }else if("SQS".equals(queueType)){
102                                        // SQSメッセージサーバ指定時
103                                        // グループID
104                                        queueInfo.setSqsFifoGroupId(queueId);
105                                        if(!StringUtil.isEmpty(dedupliId)) {
106                                                // 重複排除ID
107                                                // コンテンツに基づく重複排除が有効時は、未設定でも可(メッセージによる重複判定が行われる)
108                                                queueInfo.setSqsFifoDedupliId(dedupliId);
109                                        }
110                                }
111
112                                // メッセージ
113                                queueInfo.setMessage(message);
114
115                                // 完了フラグを処理中:2に更新
116                                dbAccessQueue.updateGE66(ykno, DBAccessQueue.FGKAN_PROCESS);
117
118                                // メッセージ送信処理
119                                try{
120                                        queueSend.sendMessage(queueInfo);
121
122                                        // 完了フラグを完了:3に更新
123                                        dbAccessQueue.updateGE66(ykno, DBAccessQueue.FGKAN_END);
124
125                                }catch(Exception e) {
126                                        // 完了フラグをエラー:4に更新して、エラー情報を登録
127                                        dbAccessQueue.updateGE66Error(ykno, e.getMessage());
128                                }
129                        }
130
131                        // クローズ処理
132                        queueSend.close();
133
134                        loopCnt++;
135                }
136        }
137}