001/*
002 * Copyright (c) 2009 The openGion Project.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
013 * either express or implied. See the License for the specific language
014 * governing permissions and limitations under the License.
015 */
016package org.opengion.plugin.daemon;
017
018import java.util.Date;
019import java.util.Locale;                                                                                        // 7.2.9.4 (2020/11/20)
020
021import javax.jms.QueueSession;
022
023// import org.hsqldb.lib.StringUtil;
024import org.opengion.fukurou.util.StringUtil;                                            // 7.0.6.0 (2019/10/07)
025import org.opengion.fukurou.queue.QueueInfo;
026import org.opengion.fukurou.queue.QueueSend;
027import org.opengion.fukurou.queue.QueueSendFactory;
028import org.opengion.fukurou.util.HybsTimerTask;
029import org.opengion.hayabusa.common.HybsSystem;
030import org.opengion.hayabusa.queue.DBAccessQueue;
031
032/**
033 * メッセージキュー送信
034 * メッセージキュー送信テーブルを監視して、
035 * 送信処理を行います。
036 *
037 * @og.group メッセージ連携
038 *
039 * @og.rev 5.10.15.0 (2019/08/30) 新規作成
040 * @og.rev 5.10.15.2 (2019/09/20) DB登録の実装をhayabusa.queueに移動
041 *
042 * @version 5.0
043 * @author oota
044 * @since JDK7
045 *
046 */
047public class Daemon_QueueSend extends HybsTimerTask {
048        /** このプログラムのVERSION文字列を設定します。   {@value} */
049        private static final String VERSION = "7.2.9.4 (2020/11/20)" ;
050
051        private static final String CLOUD_SQS_ACCESS_KEY = HybsSystem.sys( "CLOUD_SQS_ACCESS_KEY" );
052        private static final String CLOUD_SQS_SECRET_KEY = HybsSystem.sys( "CLOUD_SQS_SECRET_KEY" );
053        private static final int    LOOP_COUNTER         = 24;
054
055//      private String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID");
056        private final String SYSTEM_ID  = HybsSystem.sys("SYSTEM_ID");
057        private final String USER_ID    = "CYYYYY";
058        private final String PG_ID              = "DMN_QueSnd";
059        private final String DMN_NAME   = "QueueReceiveDMN";
060        private final DBAccessQueue dbAccessQueue;
061
062        private int loopCnt ;
063        private QueueSend queueSend;
064
065        /**
066         * コンストラクター
067         * 初期処理を行います。
068         */
069        public Daemon_QueueSend(){
070                super();                                                                // 7.2.9.4 (2020/11/20) PMD:It is a good practice to call super() in a constructor
071                dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME);
072        }
073        /**
074         * 開始処理
075         * タイマータスクのデーモン処理の開始ポイントです。
076         *
077         * @og.rev 7.2.9.4 (2020/11/20) spotbugs:呼び出したメソッドの Locale パラメータの使用を検討する
078         */
079        @Override
080        protected void startDaemon() {
081                if (loopCnt % LOOP_COUNTER == 0) {
082                        loopCnt = 1;
083                        System.out.println();
084//                      System.out.println(toString() + " " + new Date() + "");
085                        System.out.println(toString() + " " + new Date() );                             // 7.2.9.4 (2020/11/20) PMD:Do not add empty strings
086                } else {
087                        // メッセージキュー送信管理テーブルから、送信対象のレコードを取得
088                        final String[][] vals = dbAccessQueue.selectGE65();
089
090                        // 取得データ分の繰り返し処理を実行する
091                        for(int i = 0; i  < vals.length; i++) {
092                                final String[] record = vals[i];
093
094                                // GE65から取得した値を変数に格納
095                                final String ykno =  record[0];
096                                final String queueId = record[1];
097                                final String message = record[2];
098                                final String dedupliId = record[3];
099                                final String queSyu = record[4];
100                                final String jmsUrl = record[5];
101
102//                              final String queueType = queSyu.toUpperCase();
103                                final String queueType = queSyu.toUpperCase( Locale.JAPAN );    // 7.2.9.4 (2020/11/20)
104                                queueSend = QueueSendFactory.newQueueSend(queueType);
105
106                                // 接続処理
107                                queueSend.connect(jmsUrl, CLOUD_SQS_ACCESS_KEY, CLOUD_SQS_SECRET_KEY);
108
109                                // メッセージ送信管理テーブルから取得したデータを送信実装予定
110                                final QueueInfo queueInfo = new QueueInfo();
111
112                                // 応答確認種別
113                                if("MQ".equals(queueType)){
114                                        // MQメッセージサーバ指定時
115                                        queueInfo.setMqTransacted(false);
116                                        queueInfo.setMqAcknowledgeMode(QueueSession.AUTO_ACKNOWLEDGE);
117                                        // キュー名
118                                        queueInfo.setMqQueueName(queueId);
119                                }else if("SQS".equals(queueType)){
120                                        // SQSメッセージサーバ指定時
121                                        // グループID
122                                        queueInfo.setSqsFifoGroupId(queueId);
123                                        if(!StringUtil.isEmpty(dedupliId)) {
124                                                // 重複排除ID
125                                                // コンテンツに基づく重複排除が有効時は、未設定でも可(メッセージによる重複判定が行われる)
126                                                queueInfo.setSqsFifoDedupliId(dedupliId);
127                                        }
128                                }
129
130                                // メッセージ
131                                queueInfo.setMessage(message);
132
133                                // 完了フラグを処理中:2に更新
134                                dbAccessQueue.updateGE66(ykno, DBAccessQueue.FGKAN_PROCESS);
135
136                                // メッセージ送信処理
137                                try{
138                                        queueSend.sendMessage(queueInfo);
139
140                                        // 完了フラグを完了:3に更新
141                                        dbAccessQueue.updateGE66(ykno, DBAccessQueue.FGKAN_END);
142
143//                              }catch(Exception e) {                           // 8.0.0.0 (2021/07/31) Avoid catching generic exceptions such as Exception in try-catch block
144//                                      // 完了フラグをエラー:4に更新して、エラー情報を登録
145//                                      dbAccessQueue.updateGE66Error(ykno, e.getMessage());
146                                } catch( final Throwable th ) {
147                                        // 完了フラグをエラー:4に更新して、エラー情報を登録
148                                        dbAccessQueue.updateGE66Error(ykno, th.getMessage());
149                                }
150                        }
151
152                        // クローズ処理
153                        queueSend.close();
154
155                        loopCnt++;
156                }
157        }
158}