001/* 002 * Copyright (c) 2009 The openGion Project. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the specific language 014 * governing permissions and limitations under the License. 015 */ 016package org.opengion.fukurou.process; 017 018import org.opengion.fukurou.util.Argument; 019import org.opengion.fukurou.util.SystemParameter; 020import org.opengion.fukurou.util.LogWriter; 021 022import org.opengion.fukurou.util.HybsEntry ; 023import org.opengion.fukurou.util.Closer; 024import org.opengion.fukurou.db.ConnectionFactory; 025 026import java.util.Set ; 027import java.util.HashSet ; 028import java.util.Map ; 029import java.util.LinkedHashMap ; 030 031import java.sql.Connection; 032import java.sql.Statement; 033import java.sql.ResultSet; 034import java.sql.SQLException; 035 036/** 037 * Process_BulkQueryは、データベースから読み取った内容を、一括処理するために、 038 * ParamProcess のサブクラス(Process_DBParam)にセットしたり、加工したりする 039 * FirstProcess と、ChainProcess のインターフェースを両方持った、実装クラスです。 040 * 041 * このクラスは、上流から、下流への処理は、1度しか実行されません。 042 * FirstProcess の検索結果は、Set オブジェクトとして、Process_DBParam に渡します。 043 * ChainProcess は、その結果を取り出し、自分自身の処理結果と合せて加工します。 044 * 045 * FirstProcess では、-action は、query のみです。 046 * query は、指定のSQL文を実行し、結果のSetをParamProcessに設定します。 047 * ChainProcess では、-action は、query、bulkSet、minus、intersect が指定できます。 048 * query は、上記と同じです。 049 * minus は、先のSetから、SQL文の実行結果を引き算し、結果Setを再設定します。 050 * intersect は、先のSetから、SQL文の実行結果と重複する結果Setを再設定します。 051 * bulkSet は、先のSetを取り出し、SQL文に加味して処理します。 052 * 流れ的には、query で検索し、minusまたはintersect でSetオブジェクトを加工し、bulkSet で 053 * 利用します。例えば、ORACLEから、ユニークキーのSetを作成し、SQLServerのユニークキーを 054 * minusした結果を、ORACLEからDELETEすれば、不要なデータを削除するなどの処理が実行可能になります。 055 * また、単純に、query だけを、チェインすれば、単発のUPDATE文を実行することが可能です。 056 * 057 * データベース接続先等は、ParamProcess のサブクラス(Process_DBParam)に 058 * 設定された接続(Connection)を使用します。 059 * DBID は、Process_DBParam の -configFile で指定する DBConfig.xml ファイルを使用します。 060 * 061 * 引数文字列中にスペースを含む場合は、ダブルコーテーション("") で括って下さい。 062 * 引数文字列の 『=』の前後には、スペースは挟めません。必ず、-key=value の様に 063 * 繋げてください。 064 * 065 * SQL文には、{@DATE.YMDH}等のシステム変数が使用できます。 066 * 067 * @og.formSample 068 * Process_BulkQuery -action=query -dbid=DBGE -sql="select KEY from TABLE_X" 069 * 070 * -action=処理方法(必須) : 実行する処理方法を指定します 071 * -action=query 単なるSQL文を実行します。 072 * -action=bulkSet 実行したSQL文の結果を、Set<String> オブジェクトに設定します。 073 * -action=minus Set<String> オブジェクトと、ここでの実行結果の差分をとります。 074 * -action=intersect Set<String> オブジェクトと、ここでの実行結果の積分をとります。 075 * [ -dbid=DB接続ID ] : -dbid=DBGE (例: Process_DBParam の -configFile で指定する DBConfig.xml ファイルで規定) 076 * [ -sql=検索SQL文 ] : -sql="select * from GEA08" 077 * [ -sqlFile=検索SQLファイル ] : -sqlFile=select.sql 078 * -sql= を指定しない場合は、ファイルで必ず指定してください。 079 * [ -sql_XXXX=固定値 ] : -sql_SYSTEM_ID=GE 080 * SQL文中の{@XXXX}文字列を指定の固定値で置き換えます。 081 * WHERE SYSTEM_ID='{@SYSTEM_ID}' ⇒ WHERE SYSTEM_ID='GE' 082 * [ -bulkKey=XXXX ] : -bulkKey=XXXX 083 * SQL文中の{@XXXX}文字列をProcess_BulkQuery等で取得した値で置き換えます。 084 * WHERE SYSTEM_ID IN ( {@XXXX} ) ⇒ WHERE SYSTEM_ID IN ( 'AA','BB','CC' ) 085 * [ -bulkType=NUM|STR ] : -bulType=STR 086 * Bulkの値を文字列に変換する場合に、数字型か、文字型を指定します。 087 * 数字型では、AA,BB,CC とし、文字型では、'AA','BB','CC' に変換します(初期値:STR)。 088 * [ -fetchSize=100 ] :フェッチする行数(初期値:100) 089 * [ -display=[false/true] ] :結果を標準出力に表示する(true)かしない(false)か(初期値:false[表示しない]) 090 * [ -debug=[false/true] ] :デバッグ情報を標準出力に表示する(true)かしない(false)か(初期値:false[表示しない]) 091 * 092 * @og.rev 5.3.4.0 (2011/04/01) 新規追加 093 * @version 4.0 094 * @author Kazuhiko Hasegawa 095 * @since JDK5.0, 096 */ 097public class Process_BulkQuery extends AbstractProcess implements FirstProcess , ChainProcess { 098 private static final int MAX_BULK_SET = 500 ; // ORACLE の制約が 1000 なので。 099 100 private static final String ACT_QUERY = "query" ; 101 private static final String ACT_BULKSET = "bulkSet" ; 102 private static final String ACT_MINUS = "minus" ; 103 private static final String ACT_INTERSECT = "intersect" ; 104 105 private static final String[] ACTION_LST = new String[] { ACT_QUERY,ACT_BULKSET,ACT_MINUS,ACT_INTERSECT }; 106 107 private String actionCmd = null; // SQL結果を加工(query:実行、minus:引き算、intersect:重複分) 108 private String dbid = null; // メインDB接続ID 109 110 private String bulkKey = null; 111 private boolean bulkType = true; // true:STR , false:NUM 112 113 private int sqlCount = 0; // SQL文の処理件数 114 private int setCount = 0; // 取り出したSetの件数 115 private int outCount = 0; // マージ後のSetの件数 116 117 private int fetchSize = 100; 118 private boolean display = false; // 表示しない 119 private boolean debug = false; // デバッグ情報 120 private boolean firstTime = true; // 最初の一回目 121 122 private static final Map<String,String> mustProparty ; // [プロパティ]必須チェック用 Map 123 private static final Map<String,String> usableProparty ; // [プロパティ]整合性チェック Map 124 125 static { 126 mustProparty = new LinkedHashMap<String,String>(); 127 mustProparty.put( "action", "実行する処理方法を指定します。(query|minus|intersect)" ); 128 129 usableProparty = new LinkedHashMap<String,String>(); 130 usableProparty.put( "dbid", "Process_DBParam の -configFile で指定する DBConfig.xml ファイルで規定" ); 131 usableProparty.put( "sql", "検索SQL文(sql or sqlFile 必須)例: \"select * from GEA08\"" ); 132 usableProparty.put( "sqlFile", "検索SQLファイル(sql or sqlFile 必須)例: select.sql" ); 133 usableProparty.put( "sql_", "SQL文中の{@XXXX}文字列を指定の固定値で置き換えます。" + 134 CR + "WHERE SYSTEM_ID='{@SYSTEM_ID}' ⇒ WHERE SYSTEM_ID='GE'" ); 135 usableProparty.put( "dbid2", "DB接続ID2 例: Process_DBParam の -configFile で指定する DBConfig.xml ファイルで規定" ); 136 usableProparty.put( "sql2", "検索SQL文2(sql or sqlFile 必須)例: \"select * from GEA08\"" ); 137 usableProparty.put( "sqlFile2", "検索SQLファイル2(sql or sqlFile 必須)例: select.sql" ); 138 usableProparty.put( "sql2_", "SQL文2中の{@XXXX}文字列を指定の固定値で置き換えます。" + 139 CR + "WHERE SYSTEM_ID='{@SYSTEM_ID}' ⇒ WHERE SYSTEM_ID='GE'" ); 140 usableProparty.put( "bulkKey", "SQL文中の{@XXXX}文字列をProcess_BulkQuery等で取得した値で置き換えます。" + 141 CR + "WHERE SYSTEM_ID IN ( {@XXXX} ) ⇒ WHERE SYSTEM_ID IN ( 'AA','BB','CC' )" ); 142 usableProparty.put( "bulkType", "Bulkの値を文字列に変換する場合に、文字型か、数字型を指定します。" + 143 CR + "数字型では、AA,BB,CC とし、文字型では、'AA','BB','CC' に変換します。(初期値:STR)" ); 144 usableProparty.put( "fetchSize","フェッチする行数 (初期値:100)" ); 145 usableProparty.put( "display", "結果を標準出力に表示する(true)かしない(false)か" + 146 CR + "(初期値:false:表示しない)" ); 147 usableProparty.put( "debug", "デバッグ情報を標準出力に表示する(true)かしない(false)か" + 148 CR + "(初期値:false:表示しない)" ); 149 } 150 151 /** 152 * デフォルトコンストラクター。 153 * このクラスは、動的作成されます。デフォルトコンストラクターで、 154 * super クラスに対して、必要な初期化を行っておきます。 155 * 156 */ 157 public Process_BulkQuery() { 158 super( "org.opengion.fukurou.process.Process_BulkQuery",mustProparty,usableProparty ); 159 } 160 161 /** 162 * プロセスの初期化を行います。初めに一度だけ、呼び出されます。 163 * 初期処理(ファイルオープン、DBオープン等)に使用します。 164 * 165 * @og.rev 5.3.9.0 (2011/09/01) 1000件を超えた場合の処理を追加 166 * 167 * @param paramProcess データベースの接続先情報などを持っているオブジェクト 168 */ 169 public void init( final ParamProcess paramProcess ) { 170 Argument arg = getArgument(); 171 172 actionCmd = arg.getProparty("action" , null , ACTION_LST ); 173 174 fetchSize = arg.getProparty("fetchSize",fetchSize); 175 display = arg.getProparty("display",display); 176 debug = arg.getProparty("debug",debug); 177 178 dbid = arg.getProparty("dbid"); 179 String sql = arg.getFileProparty("sql","sqlFile",true); 180 if( debug ) { println( "入力SQL:" + sql ); } 181 182 HybsEntry[] entry =arg.getEntrys( "sql_" ); //配列 183 SystemParameter sysParam = new SystemParameter( sql ); 184 sql = sysParam.replace( entry ); 185 if( debug ) { println( "変換SQL:" + sql ); } 186 187 if( ACT_BULKSET.equalsIgnoreCase( actionCmd ) ) { 188 bulkKey = arg.getProparty("bulkKey"); 189 String bkType = arg.getProparty("bulkType"); 190 if( bkType != null ) { bulkType = "STR".equalsIgnoreCase( bkType ); } // 初期値が true なので、null チャックは外せません。 191 192 Set<String> setData = paramProcess.getBulkData(); 193 if( debug ) { println( setData.toString() ); } 194 setCount = setData.size(); 195 196 if( setCount > 0 ) { 197 // 5.3.9.0 (2011/09/01) 1000件を超えた場合の処理を追加 198 String[] sqls = makeBulkQuery( sql,bulkKey,bulkType,setData ); 199 for( int i=0; i<sqls.length; i++ ) { 200 if( debug ) { println( "BulkSQL:" + sqls[i] ); } 201 createSetData( paramProcess, dbid, sqls[i] ); 202 } 203 } 204 } 205 else if( ACT_QUERY.equalsIgnoreCase( actionCmd ) ) { 206 Set<String> setData2 = createSetData( paramProcess, dbid, sql ); 207 if( debug ) { println( setData2.toString() ); } 208 setCount = setData2.size(); 209 outCount = setCount; 210 paramProcess.setBulkData( setData2 ); 211 } 212 else { 213 Set<String> setData = paramProcess.getBulkData(); 214 Set<String> setData2 = createSetData( paramProcess, dbid, sql ); 215 setCount = setData2.size(); 216 217 if( ACT_MINUS.equalsIgnoreCase( actionCmd ) ) { 218 setData.removeAll( setData2 ); 219 } 220 else if( ACT_INTERSECT.equalsIgnoreCase( actionCmd ) ) { 221 setData.retainAll( setData2 ); 222 } 223 outCount = setData.size(); 224 if( debug ) { println( setData.toString() ); } 225 paramProcess.setBulkData( setData ); 226 } 227 } 228 229 /** 230 * プロセスの終了を行います。最後に一度だけ、呼び出されます。 231 * 終了処理(ファイルクローズ、DBクローズ等)に使用します。 232 * 233 * @param isOK トータルで、OKだったかどうか [true:成功/false:失敗] 234 */ 235 public void end( final boolean isOK ) { 236 // 何もありません。 237 } 238 239 /** 240 * このデータの処理において、次の処理が出来るかどうかを問い合わせます。 241 * この呼び出し1回毎に、次のデータを取得する準備を行います。 242 * 243 * @return 処理できる:true / 処理できない:false 244 */ 245 public boolean next() { 246 return firstTime; 247 } 248 249 /** 250 * 引数の LineModel を処理するメソッドです。 251 * 変換処理後の LineModel を返します。 252 * 後続処理を行わない場合(データのフィルタリングを行う場合)は、 253 * null データを返します。つまり、null データは、後続処理を行わない 254 * フラグの代わりにも使用しています。 255 * なお、変換処理後の LineModel と、オリジナルの LineModel が、 256 * 同一か、コピー(クローン)かは、各処理メソッド内で決めています。 257 * ドキュメントに明記されていない場合は、副作用が問題になる場合は、 258 * 各処理ごとに自分でコピー(クローン)して下さい。 259 * 260 * @param data オリジナルのLineModel 261 * 262 * @return 処理変換後のLineModel 263 */ 264 @SuppressWarnings(value={"unchecked"}) 265 public LineModel action( final LineModel data ) { 266 return data ; 267 } 268 269 /** 270 * 最初に、 行データである LineModel を作成します 271 * FirstProcess は、次々と処理をチェインしていく最初の行データを 272 * 作成して、後続の ChainProcess クラスに処理データを渡します。 273 * 274 * @param rowNo 処理中の行番号 275 * 276 * @return 処理変換後のLineModel 277 */ 278 public LineModel makeLineModel( final int rowNo ) { 279 firstTime = false; // 一度しか処理しないため、false を設定する。 280 281 LineModel model = new LineModel(); 282 283 model.setRowNo( rowNo ); 284 285 return model; 286 } 287 288 /** 289 * 内部で使用する Set オブジェクトを作成します。 290 * Exception 以外では、必ず Set<String> オブジェクトを返します。 291 * 292 * @og.rev 5.3.9.0 (2011/09/01) 1000件を超えた場合の処理を追加 293 * 294 * @param paramProcess データベースの接続先情報などを持っているオブジェクト 295 * @param dbid 接続先ID 296 * @param sql 実行するSQL文(検索系) 297 * 298 * @return 実行結果から取り出した、最初のカラムのみを集めた Setオブジェクト 299 * @throws RuntimeException データベース処理ができなかった場合。 300 */ 301 private Set<String> createSetData( final ParamProcess paramProcess, final String dbid, final String sql ) { 302 Set<String> data = new HashSet<String>(); 303 304 Connection connection = null; 305 Statement stmt = null; 306 ResultSet resultSet = null; 307 308 try { 309 connection = paramProcess.getConnection( dbid ); 310 stmt = connection.createStatement(); 311 if( fetchSize > 0 ) { stmt.setFetchSize( fetchSize ); } 312 if( stmt.execute( sql ) ) { // true:検索系 , false:更新系 313 resultSet = stmt.getResultSet(); 314 while( resultSet.next() ) { 315 sqlCount++ ; 316 String str = resultSet.getString(1); 317 if( display ) { println( str ); } 318 data.add( str ); 319 } 320 } 321 else { 322 sqlCount += stmt.getUpdateCount(); 323 } 324 } 325 catch (SQLException ex) { 326 String errMsg = "SQL を実行できませんでした。" + CR 327 + "errMsg=[" + ex.getMessage() + "]" + CR 328 + "errorCode=[" + ex.getErrorCode() + "] State=[" + ex.getSQLState() + "]" + CR 329 + "DBID=" + dbid + CR 330 + "SQL =" + sql ; 331 332 throw new RuntimeException( errMsg,ex ); 333 } 334 finally { 335 Closer.resultClose( resultSet ); 336 Closer.stmtClose( stmt ); 337 338 ConnectionFactory.remove( connection,dbid ); 339 } 340 return data; 341 } 342 343 /** 344 * 内部で使用する Set オブジェクトを作成します。 345 * Exception 以外では、必ず Set<String[]> オブジェクトを返します。 346 * 347 * @og.rev 5.3.9.0 (2011/09/01) 1000件を超えた場合の処理を追加 348 * 349 * @param sql オリジナルのSQL文 350 * @param bulkKey 一括処理で置き換えるキー文字列 351 * @param bulkType 文字型(true)か、数字型(false)を指定 352 * @param setData 一括処理の元となるSetオブジェクト 353 * 354 * @return オリジナルのSQL文 に 一括処理の文字列と置換したSQL文の配列 355 */ 356 private String[] makeBulkQuery( final String sql, final String bulkKey, final boolean bulkType,final Set<String> setData ) { 357 String[] sqls = new String[ setData.size()/MAX_BULK_SET + 1 ]; 358 int idx = 0; 359 int cnt = 0; 360 361 StringBuilder buf = new StringBuilder(); 362 String bulkVal = null; 363 if( bulkType ) { // 文字列の場合 364 for( String key : setData ) { 365 cnt++; 366 buf.append( ",'" ).append( key ).append( "'" ); 367 if( cnt >= MAX_BULK_SET ) { 368 bulkVal = buf.substring( 1 ); // 先頭のコロンをはずす 369 sqls[idx++] = sql.replace( "{@" + bulkKey + "}" ,bulkVal ); 370 cnt = 0; 371 buf = new StringBuilder(); 372 } 373 } 374 if( cnt > 0 ) { // きっちりで終わらない場合 375 bulkVal = buf.substring( 1 ); // 先頭のコロンをはずす 376 sqls[idx] = sql.replace( "{@" + bulkKey + "}" ,bulkVal ); 377 } 378 } 379 else { // 数字の場合 380 for( String key : setData ) { 381 cnt++; 382 buf.append( "," ).append( key ); 383 if( cnt >= MAX_BULK_SET ) { 384 bulkVal = buf.substring( 1 ); // 先頭のコロンをはずす 385 sqls[idx++] = sql.replace( "{@" + bulkKey + "}" ,bulkVal ); 386 cnt = 0; 387 buf = new StringBuilder(); 388 } 389 } 390 if( cnt > 0 ) { // きっちりで終わらない場合 391 bulkVal = buf.substring( 1 ); // 先頭のコロンをはずす 392 sqls[idx] = sql.replace( "{@" + bulkKey + "}" ,bulkVal ); 393 } 394 } 395 396 return sqls; 397 } 398 399 /** 400 * プロセスの処理結果のレポート表現を返します。 401 * 処理プログラム名、入力件数、出力件数などの情報です。 402 * この文字列をそのまま、標準出力に出すことで、結果レポートと出来るような 403 * 形式で出してください。 404 * 405 * @return 処理結果のレポート 406 */ 407 public String report() { 408 String report = "[" + getClass().getName() + "]" + CR 409 + TAB + "Action : " + actionCmd + CR 410 + TAB + "DBID : " + dbid + CR 411 + TAB + "sqlCount : " + sqlCount + CR 412 + TAB + "setCount : " + setCount + CR 413 + TAB + "outCount : " + outCount ; 414 415 return report ; 416 } 417 418 /** 419 * このクラスの使用方法を返します。 420 * 421 * @return このクラスの使用方法 422 */ 423 public String usage() { 424 StringBuilder buf = new StringBuilder(); 425 426 buf.append( "Process_BulkQueryは、データベースから読み取った内容を、一括処理するために、" ).append( CR ); 427 buf.append( "ParamProcess のサブクラス(Process_DBParam)にセットしたり、加工したりする" ).append( CR ); 428 buf.append( "FirstProcess と、ChainProcess のインターフェースを両方持った、実装クラスです。" ).append( CR ); 429 buf.append( CR ); 430 buf.append( "このクラスは、上流から、下流への処理は、1度しか実行されません。" ).append( CR ); 431 buf.append( "FirstProcess の検索結果は、Set オブジェクトとして、Process_DBParam に渡します。" ).append( CR ); 432 buf.append( "ChainProcess は、その結果を取り出し、自分自身の処理結果と合せて加工します。" ).append( CR ); 433 buf.append( CR ); 434 buf.append( "FirstProcess では、-action は、query のみです。" ).append( CR ); 435 buf.append( " query は、指定のSQL文を実行し、結果のSetをParamProcessに設定します。" ).append( CR ); 436 buf.append( "ChainProcess では、-action は、query、bulkSet、minus、intersect が指定できます。" ).append( CR ); 437 buf.append( " query は、上記と同じです。" ).append( CR ); 438 buf.append( " minus は、先のSetから、SQL文の実行結果を引き算し、結果Setを再設定します。" ).append( CR ); 439 buf.append( " intersect は、先のSetから、SQL文の実行結果と重複する結果Setを再設定します。" ).append( CR ); 440 buf.append( " bulkSet は、先のSetを取り出し、SQL文に加味して処理します。" ).append( CR ); 441 buf.append( CR ); 442 buf.append( "流れ的には、query で検索し、minusまたはintersect でSetオブジェクトを加工し、" ).append( CR ); 443 buf.append( "bulkSet で利用します。例えば、ORACLEから、ユニークキーのSetを作成し、" ).append( CR ); 444 buf.append( "SQLServerのユニークキーをminusした結果を、ORACLEからDELETEすれば、不要な" ).append( CR ); 445 buf.append( "データを削除するなどの処理が実行可能になります。また、単純に、query だけを、" ).append( CR ); 446 buf.append( "チェインすれば、単発のUPDATE文を実行することが可能です。" ).append( CR ); 447 buf.append( CR ); 448 buf.append( "データベース接続先等は、ParamProcess のサブクラス(Process_DBParam)に" ).append( CR ); 449 buf.append( "設定された接続(Connection)を使用します。" ).append( CR ); 450 buf.append( CR ); 451 buf.append( "引数文字列中に空白を含む場合は、ダブルコーテーション(\"\") で括って下さい。" ).append( CR ); 452 buf.append( "引数文字列の 『=』の前後には、空白は挟めません。必ず、-key=value の様に" ).append( CR ); 453 buf.append( "繋げてください。" ).append( CR ); 454 buf.append( CR ); 455 buf.append( "SQL文には、{@DATE.YMDH}等のシステム変数が使用できます。" ).append( CR ); 456 buf.append( CR ).append( CR ); 457 458 buf.append( getArgument().usage() ).append( CR ); 459 460 return buf.toString(); 461 } 462 463 /** 464 * このクラスは、main メソッドから実行できません。 465 * 466 * @param args コマンド引数配列 467 */ 468 public static void main( final String[] args ) { 469 LogWriter.log( new Process_BulkQuery().usage() ); 470 } 471}