001/*
002 * Copyright (c) 2009 The openGion Project.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
013 * either express or implied. See the License for the specific language
014 * governing permissions and limitations under the License.
015 */
016package org.opengion.plugin.daemon;
017
018import java.io.File;
019import java.util.Date;
020import java.util.Locale;                                                                                        // 7.2.9.4 (2020/11/20)
021
022import javax.jms.JMSException;
023import javax.jms.Message;
024import javax.jms.MessageListener;
025import javax.jms.TextMessage;
026
027// import org.opengion.fukurou.util.BizUtil;
028import org.opengion.fukurou.business.BizUtil;
029import org.opengion.fukurou.queue.QueueInfo;
030import org.opengion.fukurou.queue.QueueReceive;
031import org.opengion.fukurou.queue.QueueReceiveFactory;
032import org.opengion.fukurou.util.HybsTimerTask;
033import org.opengion.fukurou.util.StringUtil;
034import org.opengion.hayabusa.common.HybsSystem;
035import org.opengion.hayabusa.common.HybsSystemException;
036import org.opengion.hayabusa.queue.DBAccessQueue;
037
038/**
039 * メッセージキュー受信 メッセージキューの受信処理を行います。
040 *
041 * @og.group メッセージ連携
042 *
043 * @og.rev 5.10.15.2 (2019/09/20) 新規作成
044 *
045 * @version 5.0
046 * @author oota
047 * @since JDK7
048 *
049 */
050public class Daemon_QueueReceive extends HybsTimerTask {
051        /** このプログラムのVERSION文字列を設定します。   {@value} */
052        private static final String VERSION = "7.2.9.4 (2020/11/20)" ;
053
054        private int loopCnt ;
055        private QueueReceive queueReceive ;
056
057        private static final int LOOP_COUNTER = 24;
058        private static final char FPSC = File.pathSeparatorChar ;                                       // 7.2.9.4 (2020/11/20) システムに依存するパス区切り文字
059
060        private final String CLOUD_SQS_ACCESS_KEY = HybsSystem.sys("CLOUD_SQS_ACCESS_KEY");
061        private final String CLOUD_SQS_SECRET_KEY = HybsSystem.sys("CLOUD_SQS_SECRET_KEY");
062        private final String MQ_QUEUE_TYPE;
063        private final String MQ_QURUE_SERVER_URL = HybsSystem.sys("MQ_QUEUE_SERVER_URL");
064        private final String MQ_QUEUE_RECEIVE_LISTENER =HybsSystem.sys("MQ_QUEUE_RECEIVE_LISTENER");
065
066        private final String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID");
067        private final String USER_ID   = "CYYYYY";
068        private final String PG_ID;
069        private final String DMN_NAME  = "QueueReceiveDMN";
070        private final DBAccessQueue dbAccessQueue;
071
072        private final String REAL_PATH = HybsSystem.sys("REAL_PATH");                           // 7.2.9.4 (2020/11/20)
073
074        /**
075         * コンストラクター
076         * 初期処理を行います。
077         *
078         * @og.rev 7.2.9.4 (2020/11/20) spotbugs:呼び出したメソッドの Locale パラメータの使用を検討する
079         */
080        public Daemon_QueueReceive() {
081                super();
082
083                // パラメータの設定
084                // 7.2.9.4 (2020/11/20) PMD:Avoid if (x != y) ..; else ..;
085                if(StringUtil.isNull(HybsSystem.sys("MQ_QUEUE_TYPE"))) {
086                        throw new RuntimeException("システムリソースにMQ_QUEUE_TYPEを登録して下さい");
087                }else {
088//                      MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase();
089                        MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase( Locale.JAPAN );    // 7.2.9.4 (2020/11/20)
090                        PG_ID = StringUtil.cut("QueRec" + MQ_QUEUE_TYPE, 10);
091                }
092
093                dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME);
094
095//              // パラメータの設定
096//              if(!StringUtil.isNull(HybsSystem.sys("MQ_QUEUE_TYPE"))) {
097////                    MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase();
098//                      MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase( Locale.JAPAN );    // 7.2.9.4 (2020/11/20)
099//                      PG_ID = StringUtil.cut("QueRec" + MQ_QUEUE_TYPE, 10);
100//              }else {
101//                      throw new RuntimeException("システムリソースにMQ_QUEUE_TYPEを登録して下さい");
102//              }
103//
104//              dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME);
105        }
106
107        /**
108         * 初期処理 MQサーバに接続します。
109         *
110         * @og.rev 7.2.9.4 (2020/11/20) spotbugs:呼び出したメソッドの Locale パラメータの使用を検討する
111         */
112        @Override
113        public void initDaemon() {
114                // 開始ログO
115                final StringBuilder errMsg = new StringBuilder();
116                if (MQ_QUEUE_TYPE == null) {
117                        errMsg.append("MQ_QUEUE_TYPE");
118                }
119                if (MQ_QURUE_SERVER_URL == null) {
120                        errMsg.append(" MQ_QUEUE_SERVER_URL");
121                }
122
123                if (errMsg.length() > 0) {
124                        errMsg.append(" キュータイプを特定するために、左記のシステムリソースを登録して下さい。");
125                        throw new HybsSystemException(errMsg.toString());
126                }
127
128//              final String queueType = MQ_QUEUE_TYPE.toUpperCase();
129                final String queueType = MQ_QUEUE_TYPE.toUpperCase( Locale.JAPAN );     // 7.2.9.4 (2020/11/20)
130
131                // 開始ログ
132                System.out.println("MQキュータイプ:" + queueType);
133                System.out.println("MQサーバーURL:" + MQ_QURUE_SERVER_URL);
134
135                queueReceive = QueueReceiveFactory.newQueueReceive(queueType);
136
137                queueReceive.connect(MQ_QURUE_SERVER_URL, CLOUD_SQS_ACCESS_KEY, CLOUD_SQS_SECRET_KEY);
138        }
139
140        /**
141         * 開始処理 タイマータスクのデーモン処理の開始ポイントです。
142         */
143        @Override
144        protected void startDaemon() {
145                if (loopCnt % LOOP_COUNTER == 0) {
146                        loopCnt = 1;
147                        System.out.println();
148                        System.out.print(toString() + " " + new Date() + " ");
149                } else {
150                        // 対象 キュー名(グループ名)とbizlogic名の取得処理
151                        final String[][] ge67vals = dbAccessQueue.setlectGE67();
152                        // キュー情報登録チェック
153                        if (ge67vals.length == 0) {
154                                final String errMsg = "GE67にキュー情報が登録されていません。";
155                                throw new RuntimeException(errMsg);
156                        }
157                        // MQとSQSで処理を分岐
158                        // MQ:指定キューIDからキューメッセージを取得
159                        // SQS:キューメッセージを取得してからキューID(グループID)を取得
160                        switch (MQ_QUEUE_TYPE) {
161                                case "MQ":
162                                        processMq(ge67vals);
163                                        break;
164                                case "SQS":
165                                        processSqs(ge67vals);
166                                        break;
167                                default:
168                                        final String errMsg = "リソース(MQ_QUEUE_TYPE)の値が不正です。:" + MQ_QUEUE_TYPE;
169                                        throw new RuntimeException(errMsg);
170                        }
171
172                        loopCnt++;
173                }
174        }
175
176        /**
177         * MQ用の処理
178         * GE67に登録されているキューIDの、
179         * メッセージキューを取得して処理を行います。
180         *
181         * @param ge67vals GE67の配列データ
182         */
183        private void processMq(final String[][] ge67vals) {
184                boolean listenerMode = false;
185
186                if("true".equals(MQ_QUEUE_RECEIVE_LISTENER)) {
187                        listenerMode = true;
188                }
189
190                if(listenerMode) {
191                        // リスナーの初期化
192                        queueReceive.closeListener();
193                }
194
195                // ge67のキューリスト分繰り返します
196                for (int row = 0; row < ge67vals.length; row++) {
197                        final String queueId = ge67vals[row][0];
198                        final String bizLogicId = ge67vals[row][1];
199
200                        if(listenerMode) {
201                                // リスナーを設定して、動的な受信処理(MQ専用)
202                                final QueueReceiveListener listener = new QueueReceiveListener(queueId, bizLogicId);
203                                queueReceive.setListener(queueId, listener);
204                        }else {
205                                // 1件の受信処理
206                                final QueueInfo queueInfo = queueReceive.receive(queueId);
207                                if (queueInfo != null) {
208                                        processMessage(queueId, bizLogicId, queueInfo.getMessage());
209                                        // 1件処理を行ったら処理を終了します。
210                                        break;
211                                }
212                        }
213                }
214        }
215
216        /**
217         * SQS用の処理
218         * SQSはグループIDを指定して、キューを取得することはできず、
219         * 任意のキューを1つ取得してから、
220         * 判定処理を行います。
221         * GE67に登録されていないグループIDのキューが取得された場合は、
222         * GE68にエラーレコードを登録します。
223         *
224         * @param ge67vals GE67の配列データ
225         */
226        private void processSqs(final String[][] ge67vals) {
227                // 下記はSQSの場合(キューを1件取得して処理)
228                final QueueInfo queueInfo = queueReceive.receive(null);
229
230                // キューが未取得の場合
231                if(queueInfo == null) {
232                        return;
233                }
234
235                // 受信したキューを処理
236                final String groupId = queueInfo.getSqsFifoGroupId();
237                Boolean existsFlg = false;
238                // valsにグループIDのレコードが存在するか検索
239                for (int row = 0; row < ge67vals.length; row++) {
240                        final String queueId = ge67vals[row][0];
241
242                        if (groupId != null && groupId.equals(queueId)) {
243                                // 該当レコードあり
244                                final String bizLogicId = ge67vals[row][1];
245                                processMessage(queueId, bizLogicId, queueInfo.getMessage());
246
247                                existsFlg = true;
248                                break;
249                        }
250                }
251
252                if (!existsFlg) {
253                        // 該当groupIdの未登録エラー
254                        // 処理番号生成
255                        final String syoriNo = dbAccessQueue.generateSyoriNo();
256                        dbAccessQueue.insertGE68(groupId, syoriNo, null, queueInfo.getMessage());
257                        dbAccessQueue.updateGE68Error(syoriNo, "SQSキューに設定されているグループIDが、GE67に未登録です。");
258                }
259        }
260
261        /**
262         * キャンセル処理
263         * タイマータスクのデーモン処理の終了ポイントです。
264         *
265         * @return キャンセルできれば、true
266         */
267        @Override
268        public boolean cancel() {
269                if (queueReceive != null) {
270                        queueReceive.close();
271                }
272
273                return super.cancel();
274        }
275
276        /**
277         * メッセージの処理
278         *  受信したメッセージをbizLogicに渡して、
279         *  処理を実行します。
280         *
281         * @param queueId キューID
282         * @param bizLogicId ビズロジックID
283         * @param msgText 受信メッセージ
284         */
285        private void processMessage(final String queueId, final String bizLogicId, final String msgText) {
286                String syoriNo = "";
287                try {
288                        // 処理番号生成
289                        syoriNo = dbAccessQueue.generateSyoriNo();
290
291                        // 管理テーブル登録
292                        dbAccessQueue.insertGE68(queueId, syoriNo, bizLogicId, msgText);
293
294                        // bizLogicの処理を実行
295                        callActBizLogic(SYSTEM_ID, bizLogicId, msgText);
296
297                        // 管理テーブル更新(完了)
298                        dbAccessQueue.updateGE68(syoriNo, DBAccessQueue.FGKAN_END);
299
300                } catch (Throwable te) {
301                        // bizLogicでのエラーはログの未出力して、処理を継続します。
302                        // bizLogicのエラー情報はCauseに格納されているため、Causeから取得します。
303                        String errMessage = null;
304                        if (te.getCause() != null) {
305                                // causeが設定されている場合のエラー情報
306                                errMessage = te.getCause().getMessage();
307                        } else {
308                                // causeが未設定の場合のエラー情報
309                                errMessage = te.getMessage();
310                        }
311                        System.out.println(errMessage);
312                        try {
313                                // エラーテーブルに登録
314                                dbAccessQueue.updateGE68Error(syoriNo, errMessage);
315                        //      8.0.0.0 (2021/07/31) Avoid catching generic exceptions such as Exception in try-catch block
316//                      } catch (Exception e) {
317//                              // ここでのエラーはスルーします。
318//                              System.out.println("管理テーブル登録エラー:" + e.getMessage());
319                        } catch( final Throwable th ) {
320                                // ここでのエラーはスルーします。
321                                System.out.println("管理テーブル登録エラー:" + th.getMessage());
322                        }
323                }
324        }
325
326        /**
327         * bizLogic処理の呼び出し
328         * 必要なパス情報をリソースから取得して、
329         * BizUtil.actBizLogicにパス情報を渡すことで、
330         * bizLogicの処理を行います。
331         *
332         * @og.rev 7.2.9.4 (2020/11/20) spotbugs:null になっている可能性があるメソッドの戻り値を利用している
333         *
334         * @param systemId  システムID
335         * @param logicName ロジックファイル名
336         * @param msgText   メッセージ
337         * @throws Throwable エラー情報
338         */
339        private void callActBizLogic(final String systemId, final String logicName, final String msgText) throws Throwable {
340                // 対象 クラスパスの生成
341                // HotDeploy機能を使用する場合に、Javaクラスをコンパイルするためのクラスパスを設定します。
342                // 対象となるクラスパスは、WEB-INF/classes 及び WEB-INF/lib/*.jar です。
343                // bizLogicTag.javaのコードを移植
344                final String classDir = REAL_PATH + HybsSystem.sys( "BIZLOGIC_CLASS_PATH" );            // bizの下のパス
345                final String webIinf  = REAL_PATH + "WEB-INF" + File.separator ;
346
347                final StringBuilder sb = new StringBuilder().append('.').append(FPSC);
348
349                final File lib = new File( webIinf + "lib");
350                final File[] libFiles = lib.listFiles();
351                if( libFiles != null ) {
352                        // 7.2.9.4 (2020/11/20) PMD:This for loop can be replaced by a foreach loop
353                        for( final File file : libFiles ) {
354                                sb.append( file.getAbsolutePath() ).append(FPSC);
355                        }
356//                      for (int i = 0; i < libFiles.length; i++) {
357//                              sb.append( libFiles[i].getAbsolutePath() ).append(FPSC);
358//                      }
359                }
360
361                // 上記で生成したクラスパスをclassPathに格納
362                final String classPath =
363                        sb.append( webIinf ).append( "classes" ).append(FPSC)
364                          .append( classDir ).append(FPSC)              // bizの下のパス
365                          .toString();
366
367                // ソースパス情報の生成
368                final String  srcDir        = REAL_PATH + HybsSystem.sys( "BIZLOGIC_SRC_PATH" );
369                final boolean isAutoCompile = HybsSystem.sysBool( "BIZLOGIC_AUTO_COMPILE" );
370                final boolean isHotDeploy   = HybsSystem.sysBool( "BIZLOGIC_HOT_DEPLOY" );
371
372                // bizLogicに渡すパラメータ
373                final String[] keys = new String[] { "message" };
374                final String[] vals = new String[] { msgText };
375
376                // bizLogic処理の実行
377                BizUtil.actBizLogic( srcDir, classDir, isAutoCompile, isHotDeploy, classPath, systemId, logicName, keys, vals );
378        }
379
380//      7.2.9.4 (2020/11/20) spotbugs:null になっている可能性があるメソッドの戻り値を利用している
381//      private void callActBizLogic(final String systemId, final String logicName, final String msgText) throws Throwable {
382//              // 対象 クラスパスの生成
383//              // HotDeploy機能を使用する場合に、Javaクラスをコンパイルするためのクラスパスを設定します。
384//              // 対象となるクラスパスは、WEB-INF/classes 及び WEB-INF/lib/*.jar です。
385//              // bizLogicTag.javaのコードを移植
386//              final StringBuilder sb = new StringBuilder();
387//              sb.append('.').append(File.pathSeparatorChar);
388//              final File lib = new File(HybsSystem.sys("REAL_PATH") + "WEB-INF" + File.separator + "lib");
389//              final File[] libFiles = lib.listFiles();
390//              for (int i = 0; i < libFiles.length; i++) {
391//                      sb.append(libFiles[i].getAbsolutePath()).append(File.pathSeparatorChar);
392//              }
393//              sb.append(HybsSystem.sys("REAL_PATH") + "WEB-INF" + File.separator + "classes").append(File.pathSeparatorChar);
394//              // bizの下のパス
395//              sb.append(HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_CLASS_PATH")).append(File.pathSeparatorChar);
396//              // 上記で生成したクラスパスをclassPathに格納
397//              final String classPath = sb.toString();
398//
399//              // ソースパス情報の生成
400//              final String srcDir = HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_SRC_PATH");
401//              final String classDir = HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_CLASS_PATH");
402//              final boolean isAutoCompile = HybsSystem.sysBool("BIZLOGIC_AUTO_COMPILE");
403//              final boolean isHotDeploy = HybsSystem.sysBool("BIZLOGIC_HOT_DEPLOY");
404//
405//              // bizLogicに渡すパラメータ
406//              final String[] keys = new String[] { "message" };
407//              final String[] vals = new String[] { msgText };
408//
409//              // bizLogic処理の実行
410//              BizUtil.actBizLogic(srcDir, classDir, isAutoCompile, isHotDeploy, classPath, systemId, logicName, keys, vals);
411//      }
412
413        /**
414         * 受信処理リスナー用のインナークラス
415         * QueueReceiveリスナークラス リスナー用のクラスです。
416         *  MQに設定することで、メッセージが受信されると、
417         * onMessageメソッドが実行されます。
418         *
419         * @og.rev 7.2.9.4 (2020/11/20) private final 追加
420         */
421//      class QueueReceiveListener implements MessageListener {
422        private final class QueueReceiveListener implements MessageListener {
423//              private String queueId = "";
424//              private String bizLogicId = "";
425                private final String queueId ;
426                private final String bizLogicId ;
427
428                /**
429                 * コンストラクター 初期処理を行います。
430                 *
431                 * @param quId  キューID
432                 * @param bizId ビズロジックID
433                 */
434                public QueueReceiveListener(final String quId, final String bizId) {
435                        queueId    = quId;
436                        bizLogicId = bizId;
437                }
438
439                /**
440                 * メッセージ受信処理 MQサーバにメッセージが受信されると、 メソッドの処理が行われます。
441                 *
442                 * @param message 受信メッセージ
443                 */
444                @Override
445                public void onMessage(final Message message) {
446                        // 要求番号 : ここでは使用していません。
447                        final String ykno = "";
448
449                        // メッセージ受信
450                        final TextMessage msg = (TextMessage) message;
451                        String msgText = "";
452
453                        try {
454                                // キューサーバのメッセージを取得
455                                msgText = msg.getText();
456
457                                // メーッセージの受信応答を返します。
458                                msg.acknowledge();
459
460                                processMessage(queueId, bizLogicId, msgText);
461
462                        } catch (JMSException jmse) {
463                                try {
464                                        // 管理テーブル更新
465                                        // 管理テーブル更新(エラー)
466                                        dbAccessQueue.updateGE68(ykno, DBAccessQueue.FGKAN_ERROR);
467                                //      8.0.0.0 (2021/07/31) Avoid catching generic exceptions such as Exception in try-catch block
468//                              } catch (Exception e) {
469//                                      // ここでのエラーはスルーします。
470//                                      System.out.println("管理テーブル登録エラー:" + e.getMessage());
471                                } catch( final Throwable th ) {
472                                        // ここでのエラーはスルーします。
473                                        System.out.println("管理テーブル登録エラー:" + th.getMessage());
474                                }
475
476                                throw new HybsSystemException("bizLogicの処理中にエラーが発生しました。" + jmse.getMessage());
477                        }
478                }
479        }
480}