View Javadoc

1   /*
2    * Copyright 2005-2007 The Kuali Foundation
3    *
4    *
5    * Licensed under the Educational Community License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.opensource.org/licenses/ecl2.php
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package com.rsmart.kuali.tools.ant.tasks;
18  
19  import java.io.PrintStream;
20  import java.io.Reader;
21  
22  import java.lang.reflect.Field;
23  
24  import java.sql.Blob;
25  import java.sql.Clob;
26  import java.sql.Connection;
27  import java.sql.DriverManager;
28  import java.sql.DatabaseMetaData;
29  import java.sql.PreparedStatement;
30  import java.sql.SQLException;
31  import java.sql.ResultSet;
32  import java.sql.ResultSetMetaData;
33  import java.sql.Statement;
34  import java.sql.Types;
35  
36  import java.util.ArrayList;
37  import java.util.Collection;
38  import java.util.HashMap;
39  import java.util.Map;
40  import java.util.Observable;
41  import java.util.Observer;
42  
43  import org.apache.tools.ant.BuildException;
44  import org.apache.tools.ant.DirectoryScanner;
45  import org.apache.tools.ant.Main;
46  import org.apache.tools.ant.Task;
47  import org.apache.tools.ant.types.FileSet;
48  
49  import oracle.jdbc.OraclePreparedStatement;
50  import oracle.jdbc.pool.OracleDataSource;
51  
52  import static org.apache.tools.ant.Project.MSG_DEBUG;
53  
54  /**
55   *
56   * @author Leo Przybylski (przybyls@arizona.edu)
57   */
58  public class MigrateData extends Task {
59      
60      private static final String[] carr = new String[] {"|", "\\", "-", "/"};
61      private static final String RECORD_COUNT_QUERY = "select count(*) as \"COUNT\" from %s";
62      private static final String SELECT_ALL_QUERY   = "select * from %s";
63      private static final String INSERT_STATEMENT   = "insert into %s (%s) values (%s)";
64      private static final String DATE_CONVERSION    = "TO_DATE('%s', 'YYYYMMDDHH24MISS')";
65      private static final String COUNT_FIELD        = "COUNT";
66      private static final String LIQUIBASE_TABLE    = "DATABASECHANGELOG";
67      private static final int[]  QUOTED_TYPES       = 
68          new int[] {Types.CHAR, Types.VARCHAR, Types.TIME, Types.LONGVARCHAR, Types.DATE, Types.TIMESTAMP};
69  
70      private static final String HSQLDB_PUBLIC      = "PUBLIC";
71      private static final int    MAX_THREADS        = 3;
72  
73      
74      private String source;
75      private String target;
76      private int threadCount;
77  
78      public MigrateData() { 
79          int threadCount = 1;
80      }
81  
82      
83      public void setSource(String refid) {
84          this.source = refid;
85      }
86      
87      public String getSource() {
88          return this.source;
89      }
90  
91      public void setTarget(String refid) {
92          this.target = refid;
93      }
94  
95      public String getTarget() {
96          return this.target;
97      }
98      
99      public void execute() {
100         final RdbmsConfig source = (RdbmsConfig) getProject().getReference(getSource());
101         final RdbmsConfig target = (RdbmsConfig) getProject().getReference(getTarget());
102 
103         log("Migrating data from " + source.getUrl() + " to " + target.getUrl());
104 
105         final Incrementor recordCountIncrementor = new Incrementor();
106         final Map<String, Integer> tableData = getTableData(source, target, recordCountIncrementor);
107 
108         log("Copying " + tableData.size() + " tables");
109 
110         float recordVisitor = 0;
111         final ProgressObserver progressObserver = new ProgressObserver(recordCountIncrementor.getValue(),
112                                                                        48f, 48f/100,
113                                                                        "\r|%s[%s] %3d%% (%d/%d) records");
114         final ProgressObservable observable = new ProgressObservable();
115         observable.addObserver(progressObserver);
116 
117         final ThreadGroup tgroup = new ThreadGroup("Migration Threads");
118 
119         for (final String tableName : tableData.keySet()) {
120             debug("Migrating table " + tableName + " with " + tableData.get(tableName) + " records");
121             if (tgroup.activeCount() < MAX_THREADS) {
122                 new Thread(tgroup, new Runnable() {
123                         public void run() {
124                             migrate(source, target, tableName, observable);
125                         }
126                     }).start();
127             }
128             else {
129                 final Map<String,Integer> columns = new HashMap<String, Integer>();
130                 migrate(source, target, tableName, observable);
131             }
132         }
133 
134         // Wait for other threads to finish
135         try {
136             while(tgroup.activeCount() > 0) {
137                 Thread.sleep(5000);
138             }
139         }
140         catch (InterruptedException e) {
141         }
142 
143         try {
144             final Connection targetDb = openConnection(target);
145             if (targetDb.getMetaData().getDriverName().toLowerCase().contains("hsqldb")) {
146                 Statement st = targetDb.createStatement();
147                 st.execute("CHECKPOINT"); 
148                 st.close();
149             }
150             targetDb.close();
151         }
152         catch (Exception e) {
153             throw new BuildException(e);
154         }
155     }
156 
157     protected void migrate(final RdbmsConfig source, 
158                            final RdbmsConfig target, 
159                            final String tableName, 
160                            final ProgressObservable observable) {
161         final Connection sourceDb = openConnection(source);
162         final Connection targetDb = openConnection(target);
163         source.setConnection(sourceDb);
164         target.setConnection(targetDb);
165         final Map<String, Integer> columns = getColumnMap(source, target, tableName);
166 
167         if (columns.size() < 1) {
168             log("Columns are empty for " + tableName);
169             return;
170         }
171 
172         PreparedStatement toStatement = prepareStatement(targetDb, tableName, columns);
173         Statement fromStatement = null;
174 
175         final boolean hasClob = columns.values().contains(Types.CLOB);
176         int recordsLost = 0;
177         
178         try {
179             fromStatement = sourceDb.createStatement();
180 
181             final ResultSet results = fromStatement.executeQuery(String.format(SELECT_ALL_QUERY, tableName));
182             while (results.next()) {
183                 try {
184                     toStatement.clearParameters();
185                     
186                     int i = 1;
187                     for (String columnName : columns.keySet()) {
188                         final Object value = results.getObject(columnName);
189                         
190                         if (value != null) {
191                             try {
192                                 handleLob(toStatement, value, i);
193                             }
194                             catch (Exception e) {
195                                 System.err.println(String.format("Error processing %s.%s %s", tableName, columnName, columns.get(columnName)));
196                                 if (Clob.class.isAssignableFrom(value.getClass())) {
197                                     System.err.println("Got exception trying to insert CLOB with length" + ((Clob) value).length());
198                                 }
199                                 e.printStackTrace();
200                             }
201                         }
202                         else {
203                             toStatement.setObject(i,value);
204                         } 
205                         i++;
206                     }
207                     
208                     boolean retry = true;
209                     int retry_count = 0;
210                     while(retry) {
211                         try {
212                             toStatement.execute();
213                             retry = false;
214                         }
215                         catch (SQLException sqle) {
216                             retry = false;
217                             if (sqle.getMessage().contains("ORA-00942")) {
218                                 log("Couldn't find " + tableName);
219                                 log("Tried insert statement " + getStatementBuffer(tableName, columns));
220                                 // sqle.printStackTrace();
221                             }
222                             else if (sqle.getMessage().contains("ORA-12519")) {
223                                 retry = true;
224                                 log("Tried insert statement " + getStatementBuffer(tableName, columns));
225                                 sqle.printStackTrace();
226                             }
227                             else if (sqle.getMessage().contains("IN or OUT")) {
228                                 log("Column count was " + columns.keySet().size());
229                             }
230                             else if (sqle.getMessage().contains("Error reading")) {
231                                 if (retry_count > 5) {
232                                     log("Tried insert statement " + getStatementBuffer(tableName, columns));
233                                     retry = false;
234                                 }
235                                 retry_count++;
236                             }
237                             else {
238                                 sqle.printStackTrace();
239                             }
240                         }
241                     }
242                 }
243                 catch (Exception e) {
244                     recordsLost++;
245                     throw e;
246                 }
247                 finally {
248                     observable.incrementRecord();
249                 }
250             }
251             results.close();
252         }
253         catch (Exception e) {
254             throw new BuildException(e);
255         }
256         finally {
257             if (sourceDb != null) {
258                 try {
259                     if (sourceDb.getMetaData().getDriverName().toLowerCase().contains("hsqldb")) {
260                         Statement st = sourceDb.createStatement();
261                         st.execute("CHECKPOINT"); 
262                         st.close();
263                     }
264                     fromStatement.close();
265                     sourceDb.close();
266                 }
267                 catch (Exception e) {
268                 }
269             }
270 
271             if (targetDb != null) {
272                 try {
273                     targetDb.commit();
274                     if (targetDb.getMetaData().getDriverName().toLowerCase().contains("hsql")) {
275                         Statement st = targetDb.createStatement();
276                         st.execute("CHECKPOINT"); 
277                         st.close();
278                     }
279                     toStatement.close();
280                     targetDb.close();
281                 }
282                 catch (Exception e) {
283                     log("Error closing database connection");
284                     e.printStackTrace();
285                 }
286             }
287             debug("Lost " +recordsLost + " records");
288             columns.clear();
289         }
290     }
291 
292     protected void handleLob(final PreparedStatement toStatement, final Object value, final int i) throws SQLException {
293         if (Clob.class.isAssignableFrom(value.getClass())) {
294             toStatement.setAsciiStream(i, ((Clob) value).getAsciiStream(), ((Clob) value).length());
295         }
296         else if (Blob.class.isAssignableFrom(value.getClass())) {
297             toStatement.setBinaryStream(i, ((Blob) value).getBinaryStream(), ((Blob) value).length());
298         }
299         else {
300             toStatement.setObject(i,value);
301         } 
302     }
303 
304     protected PreparedStatement prepareStatement(Connection conn, String tableName, Map<String, Integer> columns) {
305         final String statement = getStatementBuffer(tableName, columns);
306         
307         try {
308             return conn.prepareStatement(statement);
309         }
310         catch (Exception e) {
311             throw new BuildException(e);
312         }
313     }
314 
315     private String getStatementBuffer(String tableName, Map<String,Integer> columns) {
316         String retval = null;
317 
318         final StringBuilder names  = new StringBuilder();
319         final StringBuilder values = new StringBuilder();
320         for (String columnName : columns.keySet()) {
321             names.append(columnName).append(",");
322             values.append("?,");
323         }
324 
325         names.setLength(names.length() - 1);
326         values.setLength(values.length() - 1);
327         retval = String.format(INSERT_STATEMENT, tableName, names, values);
328         
329 
330         return retval;
331     }
332 
333     protected boolean isValidTable(final DatabaseMetaData metadata, final String tableName) {
334         return !(tableName.startsWith("BIN$") || tableName.toUpperCase().startsWith(LIQUIBASE_TABLE) || isSequence(metadata, tableName));
335     }
336 
337     protected boolean isSequence(final DatabaseMetaData metadata, final String tableName) {
338         final RdbmsConfig source = (RdbmsConfig) getProject().getReference(getSource());
339         try {
340             final ResultSet rs = metadata.getColumns(null, source.getSchema(), tableName, null);
341             int columnCount = 0;
342             boolean hasId = false;
343             while (rs.next()) {
344                 columnCount++;
345                 if ("yes".equalsIgnoreCase(rs.getString("IS_AUTOINCREMENT"))) {
346                     hasId = true;
347                 }
348             }
349                 
350             return (columnCount == 1 && hasId);
351         }
352         catch (Exception e) {
353             return false;
354         }
355     }
356 
357     /**
358      * Get a list of table names available mapped to row counts
359      */
360     protected Map<String, Integer> getTableData(RdbmsConfig source, RdbmsConfig target, Incrementor incrementor) {
361         Connection sourceConn = openConnection(source);
362         Connection targetConn = openConnection(target);
363         final Map<String, Integer> retval = new HashMap<String, Integer>();
364         final Collection<String> toRemove = new ArrayList<String>();
365 
366         debug("Looking up table names");
367         try {
368             final DatabaseMetaData metadata = sourceConn.getMetaData();
369             final ResultSet tableResults = metadata.getTables(sourceConn.getCatalog(), source.getSchema(), null, new String[] { "TABLE" });
370             
371             while (tableResults.next()) {
372                 final String tableName = tableResults.getString("TABLE_NAME");
373                 if (!isValidTable(metadata, tableName)) {
374                     continue;
375                 }
376                 if (tableName.toUpperCase().startsWith(LIQUIBASE_TABLE)) continue;
377                 final int rowCount = getTableRecordCount(sourceConn, tableName);
378                 if (rowCount < 1) { // no point in going through tables with no data
379                     
380                 }
381                 incrementor.increment(rowCount);
382                 debug("Adding table " + tableName);
383                 retval.put(tableName, rowCount);
384             }
385             tableResults.close();
386         }
387         catch (Exception e) {
388             throw new BuildException(e);
389         }
390         finally {
391             if (sourceConn != null) {
392                 try {
393                     sourceConn.close();
394                     sourceConn = null;
395                 }
396                 catch (Exception e) {
397                 }
398             }
399         }
400 
401         try {
402             for (String tableName : retval.keySet()) {
403                 final ResultSet tableResults = targetConn.getMetaData().getTables(targetConn.getCatalog(), target.getSchema(), null, new String[] { "TABLE" });
404                 if (!tableResults.next()) {
405                     log("Removing " + tableName);
406                     toRemove.add(tableName);
407                 }
408                 tableResults.close();
409             }
410         }
411         catch (Exception e) {
412             throw new BuildException(e);
413         }
414         finally {
415             if (targetConn != null) {
416                 try {
417                     targetConn.close();
418                     targetConn = null;
419                 }
420                 catch (Exception e) {
421                 }
422             }
423         }
424 
425         for (String tableName : toRemove) {
426             retval.remove(tableName);
427         }
428         
429         return retval;
430     }
431 
432     private Map<String, Integer> getColumnMap(final RdbmsConfig source, final RdbmsConfig target, String tableName) {
433         final Connection targetDb = target.getConnection();
434         final Connection sourceDb = source.getConnection();
435         final Map<String,Integer> retval = new HashMap<String,Integer>();
436         final Collection<String> toRemove = new ArrayList<String>();
437         try {
438             final Statement state = targetDb.createStatement();                
439             final ResultSet altResults = state.executeQuery("select * from " + tableName + " where 1 = 0");
440             final ResultSetMetaData metadata = altResults.getMetaData();
441             
442             for (int i = 1; i <= metadata.getColumnCount(); i++) {
443                 retval.put(metadata.getColumnName(i),
444                            metadata.getColumnType(i));
445             }
446             altResults.close();
447             state.close();
448         }
449         catch (Exception e) {
450             throw new BuildException(e);
451         }
452 
453         for (final String column : retval.keySet()) {
454             try {
455                 final Statement state = targetDb.createStatement();                
456                 final ResultSet altResults = state.executeQuery("select * from " + tableName + " where 1 = 0");
457                 final ResultSetMetaData metadata = altResults.getMetaData();
458 
459                 for (int i = 1; i <= metadata.getColumnCount(); i++) {
460                     retval.put(metadata.getColumnName(i),
461                                metadata.getColumnType(i));
462                 }
463                 altResults.close();
464                 state.close();
465             }
466             catch (Exception e) {
467                 throw new BuildException(e);
468             }
469         }
470 
471         for (final String column : toRemove) {
472             retval.remove(column);
473         }
474         
475         return retval;
476     }
477 
478     private int getTableRecordCount(Connection conn, String tableName) {
479         final String query = String.format(RECORD_COUNT_QUERY, tableName);
480         Statement statement = null;
481         try {
482             statement = conn.createStatement();
483             final ResultSet results = statement.executeQuery(query);
484             results.next();
485             final int retval = results.getInt(COUNT_FIELD);
486             results.close();
487             return retval;
488         }
489         catch (Exception e) {
490             if (e.getMessage().contains("ORA-00942")) {
491                 log("Couldn't find " + tableName);
492                 log("Tried insert statement " + query);
493             }
494             log("Exception executing " + query);
495             throw new BuildException(e);
496         }
497         finally {
498             try {
499                 if (statement != null) {
500                     statement.close();
501                     statement = null;
502                 }
503             }
504             catch (Exception e) {
505             }
506         }
507     }
508 
509     private void debug(String msg) {
510         log(msg, MSG_DEBUG);
511     }
512 
513     private Connection openSource() {
514         return openConnection(getSource());
515     }
516 
517     private Connection openTarget() {
518         return openConnection(getTarget());
519     }
520 
521     private Connection openConnection(String reference) {
522         final RdbmsConfig config = (RdbmsConfig) getProject().getReference(reference);
523         return openConnection(config);
524     }
525     
526     private Connection openConnection(RdbmsConfig config) {
527         Connection retval = null;
528         
529         while (retval == null) {
530             try {
531                 debug("Loading schema " + config.getSchema() + " at url " + config.getUrl());
532                 Class.forName(config.getDriver());
533 
534                 retval = DriverManager.getConnection(config.getUrl(), config.getUsername(), config.getPassword());
535                 retval.setAutoCommit(false);
536 
537 
538                 // If this is an HSQLDB database, then we probably want to turn off logging for permformance
539                 if (config.getDriver().indexOf("hsqldb") > -1) {
540                     debug("Disabling hsqldb log");
541                     final Statement st = retval.createStatement();
542                     st.execute("SET FILES LOG FALSE");
543                     st.close();
544                 }
545                 
546             }
547             catch (Exception e) {
548                 // throw new BuildException(e);
549             }
550         }
551         
552         return retval;
553     }
554 
555     /**
556      * Helper class for incrementing values
557      */
558     private class Incrementor {
559         private int value;
560         
561         public Incrementor() {
562             value = 0;
563         }
564         
565         public int getValue() {
566             return value;
567         }
568 
569         public void increment() {
570             value++;
571         }
572 
573         public void increment(int by) {
574             value += by;
575         }
576     }
577 
578     private class ProgressObservable extends Observable {
579         public void incrementRecord() {
580             setChanged();
581             notifyObservers();
582             clearChanged();
583         }
584     }
585 
586     /**
587      * Observer for handling progress
588      * 
589      */
590     private class ProgressObserver implements Observer {
591 
592         private float total;
593         private float progress;
594         private float length;
595         private float ratio;
596         private String template;
597         private float count;
598         private PrintStream out;
599         
600         public ProgressObserver(final float total,
601                                 final float length,
602                                 final float ratio,
603                                 final String template) {
604             this.total    = total;
605             this.template = template;
606             this.ratio    = ratio;
607             this.length   = length;
608             this.count    = 0;
609             
610             try {
611                 final Field field = Main.class.getDeclaredField("out");
612                 field.setAccessible(true);
613                 out = (PrintStream) field.get(null);
614             }
615             catch (Exception e) {
616                 e.printStackTrace();
617             }
618         }
619 
620         public synchronized void update(Observable o, Object arg) {
621             count++;
622 
623             final int percent = (int) ((count / total) * 100f);
624             final int progress = (int) ((count / total) * (100f * ratio));
625             final StringBuilder progressBuffer = new StringBuilder();
626                 
627             for (int x = 0; x < progress; x++) {
628                 progressBuffer.append('=');
629             }
630             
631             for (int x = progress; x < length; x++) {
632                 progressBuffer.append(' ');
633             }
634             int roll = (int) (count / (total / 1000));
635 
636             if (getProject().getProperty("run_from_ant") == null) {
637                 out.print(String.format(template, progressBuffer, carr[roll % carr.length], percent, (int) count, (int) total));
638             }
639             else if ((count % 5000) == 0 || count == total) {
640                 out.println(String.format("(%s)%% %s of %s records", (int) ((count / total) * 100), (int) count, (int) total));
641             }
642         }
643     }
644 }