[MINOR] Fix typos (#4053)
This commit is contained in:
@@ -106,14 +106,14 @@ public class HiveIncrementalPuller {
|
|||||||
|
|
||||||
private Connection connection;
|
private Connection connection;
|
||||||
protected final Config config;
|
protected final Config config;
|
||||||
private final ST incrementalPullSQLtemplate;
|
private final ST incrementalPullSQLTemplate;
|
||||||
|
|
||||||
public HiveIncrementalPuller(Config config) throws IOException {
|
public HiveIncrementalPuller(Config config) throws IOException {
|
||||||
this.config = config;
|
this.config = config;
|
||||||
validateConfig(config);
|
validateConfig(config);
|
||||||
String templateContent =
|
String templateContent =
|
||||||
FileIOUtils.readAsUTFString(this.getClass().getResourceAsStream("/IncrementalPull.sqltemplate"));
|
FileIOUtils.readAsUTFString(this.getClass().getResourceAsStream("/IncrementalPull.sqltemplate"));
|
||||||
incrementalPullSQLtemplate = new ST(templateContent);
|
incrementalPullSQLTemplate = new ST(templateContent);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void validateConfig(Config config) {
|
private void validateConfig(Config config) {
|
||||||
@@ -165,19 +165,19 @@ public class HiveIncrementalPuller {
|
|||||||
stmt.close();
|
stmt.close();
|
||||||
}
|
}
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
LOG.error("Could not close the resultset opened ", e);
|
LOG.error("Could not close the resultSet opened ", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void executeIncrementalSQL(String tempDbTable, String tempDbTablePath, Statement stmt)
|
private void executeIncrementalSQL(String tempDbTable, String tempDbTablePath, Statement stmt)
|
||||||
throws FileNotFoundException, SQLException {
|
throws FileNotFoundException, SQLException {
|
||||||
incrementalPullSQLtemplate.add("tempDbTable", tempDbTable);
|
incrementalPullSQLTemplate.add("tempDbTable", tempDbTable);
|
||||||
incrementalPullSQLtemplate.add("tempDbTablePath", tempDbTablePath);
|
incrementalPullSQLTemplate.add("tempDbTablePath", tempDbTablePath);
|
||||||
|
|
||||||
String storedAsClause = getStoredAsClause();
|
String storedAsClause = getStoredAsClause();
|
||||||
|
|
||||||
incrementalPullSQLtemplate.add("storedAsClause", storedAsClause);
|
incrementalPullSQLTemplate.add("storedAsClause", storedAsClause);
|
||||||
String incrementalSQL = new Scanner(new File(config.incrementalSQLFile)).useDelimiter("\\Z").next();
|
String incrementalSQL = new Scanner(new File(config.incrementalSQLFile)).useDelimiter("\\Z").next();
|
||||||
if (!incrementalSQL.contains(config.sourceDb + "." + config.sourceTable)) {
|
if (!incrementalSQL.contains(config.sourceDb + "." + config.sourceTable)) {
|
||||||
LOG.error("Incremental SQL does not have " + config.sourceDb + "." + config.sourceTable
|
LOG.error("Incremental SQL does not have " + config.sourceDb + "." + config.sourceTable
|
||||||
@@ -194,8 +194,8 @@ public class HiveIncrementalPuller {
|
|||||||
+ "means its not pulling incrementally");
|
+ "means its not pulling incrementally");
|
||||||
}
|
}
|
||||||
|
|
||||||
incrementalPullSQLtemplate.add("incrementalSQL", String.format(incrementalSQL, config.fromCommitTime));
|
incrementalPullSQLTemplate.add("incrementalSQL", String.format(incrementalSQL, config.fromCommitTime));
|
||||||
String sql = incrementalPullSQLtemplate.render();
|
String sql = incrementalPullSQLTemplate.render();
|
||||||
// Check if the SQL is pulling from the right database
|
// Check if the SQL is pulling from the right database
|
||||||
executeStatement(sql, stmt);
|
executeStatement(sql, stmt);
|
||||||
}
|
}
|
||||||
@@ -208,13 +208,13 @@ public class HiveIncrementalPuller {
|
|||||||
LOG.info("Setting up Hive JDBC Session with properties");
|
LOG.info("Setting up Hive JDBC Session with properties");
|
||||||
// set the queue
|
// set the queue
|
||||||
executeStatement("set mapred.job.queue.name=" + config.yarnQueueName, stmt);
|
executeStatement("set mapred.job.queue.name=" + config.yarnQueueName, stmt);
|
||||||
// Set the inputformat to HoodieCombineHiveInputFormat
|
// Set the inputFormat to HoodieCombineHiveInputFormat
|
||||||
executeStatement("set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat", stmt);
|
executeStatement("set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat", stmt);
|
||||||
// Allow queries without partition predicate
|
// Allow queries without partition predicate
|
||||||
executeStatement("set hive.strict.checks.large.query=false", stmt);
|
executeStatement("set hive.strict.checks.large.query=false", stmt);
|
||||||
// Dont gather stats for the table created
|
// Don't gather stats for the table created
|
||||||
executeStatement("set hive.stats.autogather=false", stmt);
|
executeStatement("set hive.stats.autogather=false", stmt);
|
||||||
// Set the hoodie modie
|
// Set the hoodie mode
|
||||||
executeStatement("set hoodie." + config.sourceTable + ".consume.mode=INCREMENTAL", stmt);
|
executeStatement("set hoodie." + config.sourceTable + ".consume.mode=INCREMENTAL", stmt);
|
||||||
// Set the from commit time
|
// Set the from commit time
|
||||||
executeStatement("set hoodie." + config.sourceTable + ".consume.start.timestamp=" + config.fromCommitTime, stmt);
|
executeStatement("set hoodie." + config.sourceTable + ".consume.start.timestamp=" + config.fromCommitTime, stmt);
|
||||||
@@ -263,7 +263,7 @@ public class HiveIncrementalPuller {
|
|||||||
resultSet.close();
|
resultSet.close();
|
||||||
}
|
}
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
LOG.error("Could not close the resultset opened ", e);
|
LOG.error("Could not close the resultSet opened ", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
|||||||
Reference in New Issue
Block a user