1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.rsmart.kuali.tools.ant.tasks;
18
19 import java.io.PrintStream;
20 import java.io.Reader;
21
22 import java.lang.reflect.Field;
23
24 import java.sql.Blob;
25 import java.sql.Clob;
26 import java.sql.Connection;
27 import java.sql.DriverManager;
28 import java.sql.DatabaseMetaData;
29 import java.sql.PreparedStatement;
30 import java.sql.SQLException;
31 import java.sql.ResultSet;
32 import java.sql.ResultSetMetaData;
33 import java.sql.Statement;
34 import java.sql.Types;
35
36 import java.util.ArrayList;
37 import java.util.Collection;
38 import java.util.HashMap;
39 import java.util.Map;
40 import java.util.Observable;
41 import java.util.Observer;
42
43 import org.apache.tools.ant.BuildException;
44 import org.apache.tools.ant.DirectoryScanner;
45 import org.apache.tools.ant.Main;
46 import org.apache.tools.ant.Task;
47 import org.apache.tools.ant.types.FileSet;
48
49 import oracle.jdbc.OraclePreparedStatement;
50 import oracle.jdbc.pool.OracleDataSource;
51
52 import static org.apache.tools.ant.Project.MSG_DEBUG;
53
54
55
56
57
58 public class MigrateData extends Task {
59
60 private static final String[] carr = new String[] {"|", "\\", "-", "/"};
61 private static final String RECORD_COUNT_QUERY = "select count(*) as \"COUNT\" from %s";
62 private static final String SELECT_ALL_QUERY = "select * from %s";
63 private static final String INSERT_STATEMENT = "insert into %s (%s) values (%s)";
64 private static final String DATE_CONVERSION = "TO_DATE('%s', 'YYYYMMDDHH24MISS')";
65 private static final String COUNT_FIELD = "COUNT";
66 private static final String LIQUIBASE_TABLE = "DATABASECHANGELOG";
67 private static final int[] QUOTED_TYPES =
68 new int[] {Types.CHAR, Types.VARCHAR, Types.TIME, Types.LONGVARCHAR, Types.DATE, Types.TIMESTAMP};
69
70 private static final String HSQLDB_PUBLIC = "PUBLIC";
71 private static final int MAX_THREADS = 3;
72
73
74 private String source;
75 private String target;
76 private int threadCount;
77
78 public MigrateData() {
79 int threadCount = 1;
80 }
81
82
83 public void setSource(String refid) {
84 this.source = refid;
85 }
86
87 public String getSource() {
88 return this.source;
89 }
90
91 public void setTarget(String refid) {
92 this.target = refid;
93 }
94
95 public String getTarget() {
96 return this.target;
97 }
98
99 public void execute() {
100 final RdbmsConfig source = (RdbmsConfig) getProject().getReference(getSource());
101 final RdbmsConfig target = (RdbmsConfig) getProject().getReference(getTarget());
102
103 log("Migrating data from " + source.getUrl() + " to " + target.getUrl());
104
105 final Incrementor recordCountIncrementor = new Incrementor();
106 final Map<String, Integer> tableData = getTableData(source, target, recordCountIncrementor);
107
108 log("Copying " + tableData.size() + " tables");
109
110 float recordVisitor = 0;
111 final ProgressObserver progressObserver = new ProgressObserver(recordCountIncrementor.getValue(),
112 48f, 48f/100,
113 "\r|%s[%s] %3d%% (%d/%d) records");
114 final ProgressObservable observable = new ProgressObservable();
115 observable.addObserver(progressObserver);
116
117 final ThreadGroup tgroup = new ThreadGroup("Migration Threads");
118
119 for (final String tableName : tableData.keySet()) {
120 debug("Migrating table " + tableName + " with " + tableData.get(tableName) + " records");
121 if (tgroup.activeCount() < MAX_THREADS) {
122 new Thread(tgroup, new Runnable() {
123 public void run() {
124 migrate(source, target, tableName, observable);
125 }
126 }).start();
127 }
128 else {
129 final Map<String,Integer> columns = new HashMap<String, Integer>();
130 migrate(source, target, tableName, observable);
131 }
132 }
133
134
135 try {
136 while(tgroup.activeCount() > 0) {
137 Thread.sleep(5000);
138 }
139 }
140 catch (InterruptedException e) {
141 }
142
143 try {
144 final Connection targetDb = openConnection(target);
145 if (targetDb.getMetaData().getDriverName().toLowerCase().contains("hsqldb")) {
146 Statement st = targetDb.createStatement();
147 st.execute("CHECKPOINT");
148 st.close();
149 }
150 targetDb.close();
151 }
152 catch (Exception e) {
153 throw new BuildException(e);
154 }
155 }
156
157 protected void migrate(final RdbmsConfig source,
158 final RdbmsConfig target,
159 final String tableName,
160 final ProgressObservable observable) {
161 final Connection sourceDb = openConnection(source);
162 final Connection targetDb = openConnection(target);
163 source.setConnection(sourceDb);
164 target.setConnection(targetDb);
165 final Map<String, Integer> columns = getColumnMap(source, target, tableName);
166
167 if (columns.size() < 1) {
168 log("Columns are empty for " + tableName);
169 return;
170 }
171
172 PreparedStatement toStatement = prepareStatement(targetDb, tableName, columns);
173 Statement fromStatement = null;
174
175 final boolean hasClob = columns.values().contains(Types.CLOB);
176 int recordsLost = 0;
177
178 try {
179 fromStatement = sourceDb.createStatement();
180
181 final ResultSet results = fromStatement.executeQuery(String.format(SELECT_ALL_QUERY, tableName));
182 while (results.next()) {
183 try {
184 toStatement.clearParameters();
185
186 int i = 1;
187 for (String columnName : columns.keySet()) {
188 final Object value = results.getObject(columnName);
189
190 if (value != null) {
191 try {
192 handleLob(toStatement, value, i);
193 }
194 catch (Exception e) {
195 System.err.println(String.format("Error processing %s.%s %s", tableName, columnName, columns.get(columnName)));
196 if (Clob.class.isAssignableFrom(value.getClass())) {
197 System.err.println("Got exception trying to insert CLOB with length" + ((Clob) value).length());
198 }
199 e.printStackTrace();
200 }
201 }
202 else {
203 toStatement.setObject(i,value);
204 }
205 i++;
206 }
207
208 boolean retry = true;
209 int retry_count = 0;
210 while(retry) {
211 try {
212 toStatement.execute();
213 retry = false;
214 }
215 catch (SQLException sqle) {
216 retry = false;
217 if (sqle.getMessage().contains("ORA-00942")) {
218 log("Couldn't find " + tableName);
219 log("Tried insert statement " + getStatementBuffer(tableName, columns));
220
221 }
222 else if (sqle.getMessage().contains("ORA-12519")) {
223 retry = true;
224 log("Tried insert statement " + getStatementBuffer(tableName, columns));
225 sqle.printStackTrace();
226 }
227 else if (sqle.getMessage().contains("IN or OUT")) {
228 log("Column count was " + columns.keySet().size());
229 }
230 else if (sqle.getMessage().contains("Error reading")) {
231 if (retry_count > 5) {
232 log("Tried insert statement " + getStatementBuffer(tableName, columns));
233 retry = false;
234 }
235 retry_count++;
236 }
237 else {
238 sqle.printStackTrace();
239 }
240 }
241 }
242 }
243 catch (Exception e) {
244 recordsLost++;
245 throw e;
246 }
247 finally {
248 observable.incrementRecord();
249 }
250 }
251 results.close();
252 }
253 catch (Exception e) {
254 throw new BuildException(e);
255 }
256 finally {
257 if (sourceDb != null) {
258 try {
259 if (sourceDb.getMetaData().getDriverName().toLowerCase().contains("hsqldb")) {
260 Statement st = sourceDb.createStatement();
261 st.execute("CHECKPOINT");
262 st.close();
263 }
264 fromStatement.close();
265 sourceDb.close();
266 }
267 catch (Exception e) {
268 }
269 }
270
271 if (targetDb != null) {
272 try {
273 targetDb.commit();
274 if (targetDb.getMetaData().getDriverName().toLowerCase().contains("hsql")) {
275 Statement st = targetDb.createStatement();
276 st.execute("CHECKPOINT");
277 st.close();
278 }
279 toStatement.close();
280 targetDb.close();
281 }
282 catch (Exception e) {
283 log("Error closing database connection");
284 e.printStackTrace();
285 }
286 }
287 debug("Lost " +recordsLost + " records");
288 columns.clear();
289 }
290 }
291
292 protected void handleLob(final PreparedStatement toStatement, final Object value, final int i) throws SQLException {
293 if (Clob.class.isAssignableFrom(value.getClass())) {
294 toStatement.setAsciiStream(i, ((Clob) value).getAsciiStream(), ((Clob) value).length());
295 }
296 else if (Blob.class.isAssignableFrom(value.getClass())) {
297 toStatement.setBinaryStream(i, ((Blob) value).getBinaryStream(), ((Blob) value).length());
298 }
299 else {
300 toStatement.setObject(i,value);
301 }
302 }
303
304 protected PreparedStatement prepareStatement(Connection conn, String tableName, Map<String, Integer> columns) {
305 final String statement = getStatementBuffer(tableName, columns);
306
307 try {
308 return conn.prepareStatement(statement);
309 }
310 catch (Exception e) {
311 throw new BuildException(e);
312 }
313 }
314
315 private String getStatementBuffer(String tableName, Map<String,Integer> columns) {
316 String retval = null;
317
318 final StringBuilder names = new StringBuilder();
319 final StringBuilder values = new StringBuilder();
320 for (String columnName : columns.keySet()) {
321 names.append(columnName).append(",");
322 values.append("?,");
323 }
324
325 names.setLength(names.length() - 1);
326 values.setLength(values.length() - 1);
327 retval = String.format(INSERT_STATEMENT, tableName, names, values);
328
329
330 return retval;
331 }
332
333 protected boolean isValidTable(final DatabaseMetaData metadata, final String tableName) {
334 return !(tableName.startsWith("BIN$") || tableName.toUpperCase().startsWith(LIQUIBASE_TABLE) || isSequence(metadata, tableName));
335 }
336
337 protected boolean isSequence(final DatabaseMetaData metadata, final String tableName) {
338 final RdbmsConfig source = (RdbmsConfig) getProject().getReference(getSource());
339 try {
340 final ResultSet rs = metadata.getColumns(null, source.getSchema(), tableName, null);
341 int columnCount = 0;
342 boolean hasId = false;
343 while (rs.next()) {
344 columnCount++;
345 if ("yes".equalsIgnoreCase(rs.getString("IS_AUTOINCREMENT"))) {
346 hasId = true;
347 }
348 }
349
350 return (columnCount == 1 && hasId);
351 }
352 catch (Exception e) {
353 return false;
354 }
355 }
356
357
358
359
360 protected Map<String, Integer> getTableData(RdbmsConfig source, RdbmsConfig target, Incrementor incrementor) {
361 Connection sourceConn = openConnection(source);
362 Connection targetConn = openConnection(target);
363 final Map<String, Integer> retval = new HashMap<String, Integer>();
364 final Collection<String> toRemove = new ArrayList<String>();
365
366 debug("Looking up table names");
367 try {
368 final DatabaseMetaData metadata = sourceConn.getMetaData();
369 final ResultSet tableResults = metadata.getTables(sourceConn.getCatalog(), source.getSchema(), null, new String[] { "TABLE" });
370
371 while (tableResults.next()) {
372 final String tableName = tableResults.getString("TABLE_NAME");
373 if (!isValidTable(metadata, tableName)) {
374 continue;
375 }
376 if (tableName.toUpperCase().startsWith(LIQUIBASE_TABLE)) continue;
377 final int rowCount = getTableRecordCount(sourceConn, tableName);
378 if (rowCount < 1) {
379
380 }
381 incrementor.increment(rowCount);
382 debug("Adding table " + tableName);
383 retval.put(tableName, rowCount);
384 }
385 tableResults.close();
386 }
387 catch (Exception e) {
388 throw new BuildException(e);
389 }
390 finally {
391 if (sourceConn != null) {
392 try {
393 sourceConn.close();
394 sourceConn = null;
395 }
396 catch (Exception e) {
397 }
398 }
399 }
400
401 try {
402 for (String tableName : retval.keySet()) {
403 final ResultSet tableResults = targetConn.getMetaData().getTables(targetConn.getCatalog(), target.getSchema(), null, new String[] { "TABLE" });
404 if (!tableResults.next()) {
405 log("Removing " + tableName);
406 toRemove.add(tableName);
407 }
408 tableResults.close();
409 }
410 }
411 catch (Exception e) {
412 throw new BuildException(e);
413 }
414 finally {
415 if (targetConn != null) {
416 try {
417 targetConn.close();
418 targetConn = null;
419 }
420 catch (Exception e) {
421 }
422 }
423 }
424
425 for (String tableName : toRemove) {
426 retval.remove(tableName);
427 }
428
429 return retval;
430 }
431
432 private Map<String, Integer> getColumnMap(final RdbmsConfig source, final RdbmsConfig target, String tableName) {
433 final Connection targetDb = target.getConnection();
434 final Connection sourceDb = source.getConnection();
435 final Map<String,Integer> retval = new HashMap<String,Integer>();
436 final Collection<String> toRemove = new ArrayList<String>();
437 try {
438 final Statement state = targetDb.createStatement();
439 final ResultSet altResults = state.executeQuery("select * from " + tableName + " where 1 = 0");
440 final ResultSetMetaData metadata = altResults.getMetaData();
441
442 for (int i = 1; i <= metadata.getColumnCount(); i++) {
443 retval.put(metadata.getColumnName(i),
444 metadata.getColumnType(i));
445 }
446 altResults.close();
447 state.close();
448 }
449 catch (Exception e) {
450 throw new BuildException(e);
451 }
452
453 for (final String column : retval.keySet()) {
454 try {
455 final Statement state = targetDb.createStatement();
456 final ResultSet altResults = state.executeQuery("select * from " + tableName + " where 1 = 0");
457 final ResultSetMetaData metadata = altResults.getMetaData();
458
459 for (int i = 1; i <= metadata.getColumnCount(); i++) {
460 retval.put(metadata.getColumnName(i),
461 metadata.getColumnType(i));
462 }
463 altResults.close();
464 state.close();
465 }
466 catch (Exception e) {
467 throw new BuildException(e);
468 }
469 }
470
471 for (final String column : toRemove) {
472 retval.remove(column);
473 }
474
475 return retval;
476 }
477
478 private int getTableRecordCount(Connection conn, String tableName) {
479 final String query = String.format(RECORD_COUNT_QUERY, tableName);
480 Statement statement = null;
481 try {
482 statement = conn.createStatement();
483 final ResultSet results = statement.executeQuery(query);
484 results.next();
485 final int retval = results.getInt(COUNT_FIELD);
486 results.close();
487 return retval;
488 }
489 catch (Exception e) {
490 if (e.getMessage().contains("ORA-00942")) {
491 log("Couldn't find " + tableName);
492 log("Tried insert statement " + query);
493 }
494 log("Exception executing " + query);
495 throw new BuildException(e);
496 }
497 finally {
498 try {
499 if (statement != null) {
500 statement.close();
501 statement = null;
502 }
503 }
504 catch (Exception e) {
505 }
506 }
507 }
508
509 private void debug(String msg) {
510 log(msg, MSG_DEBUG);
511 }
512
513 private Connection openSource() {
514 return openConnection(getSource());
515 }
516
517 private Connection openTarget() {
518 return openConnection(getTarget());
519 }
520
521 private Connection openConnection(String reference) {
522 final RdbmsConfig config = (RdbmsConfig) getProject().getReference(reference);
523 return openConnection(config);
524 }
525
526 private Connection openConnection(RdbmsConfig config) {
527 Connection retval = null;
528
529 while (retval == null) {
530 try {
531 debug("Loading schema " + config.getSchema() + " at url " + config.getUrl());
532 Class.forName(config.getDriver());
533
534 retval = DriverManager.getConnection(config.getUrl(), config.getUsername(), config.getPassword());
535 retval.setAutoCommit(false);
536
537
538
539 if (config.getDriver().indexOf("hsqldb") > -1) {
540 debug("Disabling hsqldb log");
541 final Statement st = retval.createStatement();
542 st.execute("SET FILES LOG FALSE");
543 st.close();
544 }
545
546 }
547 catch (Exception e) {
548
549 }
550 }
551
552 return retval;
553 }
554
555
556
557
558 private class Incrementor {
559 private int value;
560
561 public Incrementor() {
562 value = 0;
563 }
564
565 public int getValue() {
566 return value;
567 }
568
569 public void increment() {
570 value++;
571 }
572
573 public void increment(int by) {
574 value += by;
575 }
576 }
577
578 private class ProgressObservable extends Observable {
579 public void incrementRecord() {
580 setChanged();
581 notifyObservers();
582 clearChanged();
583 }
584 }
585
586
587
588
589
590 private class ProgressObserver implements Observer {
591
592 private float total;
593 private float progress;
594 private float length;
595 private float ratio;
596 private String template;
597 private float count;
598 private PrintStream out;
599
600 public ProgressObserver(final float total,
601 final float length,
602 final float ratio,
603 final String template) {
604 this.total = total;
605 this.template = template;
606 this.ratio = ratio;
607 this.length = length;
608 this.count = 0;
609
610 try {
611 final Field field = Main.class.getDeclaredField("out");
612 field.setAccessible(true);
613 out = (PrintStream) field.get(null);
614 }
615 catch (Exception e) {
616 e.printStackTrace();
617 }
618 }
619
620 public synchronized void update(Observable o, Object arg) {
621 count++;
622
623 final int percent = (int) ((count / total) * 100f);
624 final int progress = (int) ((count / total) * (100f * ratio));
625 final StringBuilder progressBuffer = new StringBuilder();
626
627 for (int x = 0; x < progress; x++) {
628 progressBuffer.append('=');
629 }
630
631 for (int x = progress; x < length; x++) {
632 progressBuffer.append(' ');
633 }
634 int roll = (int) (count / (total / 1000));
635
636 if (getProject().getProperty("run_from_ant") == null) {
637 out.print(String.format(template, progressBuffer, carr[roll % carr.length], percent, (int) count, (int) total));
638 }
639 else if ((count % 5000) == 0 || count == total) {
640 out.println(String.format("(%s)%% %s of %s records", (int) ((count / total) * 100), (int) count, (int) total));
641 }
642 }
643 }
644 }