1
0

[HUDI-296] Explore use of spotless to auto fix formatting errors (#945)

- Add spotless format fixing to project
- One time reformatting for conformity
- Build fails for formatting changes and mvn spotless:apply autofixes them
This commit is contained in:
leesf
2019-10-10 20:19:40 +08:00
committed by vinoth chandar
parent 834c591955
commit b19bed442d
381 changed files with 7350 additions and 9064 deletions

View File

@@ -75,8 +75,8 @@ public class HDFSParquetImporter implements Serializable {
public HDFSParquetImporter(Config cfg) throws IOException {
this.cfg = cfg;
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) :
UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs)
: UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
log.info("Creating Cleaner with configs : " + props.toString());
}
@@ -88,8 +88,8 @@ public class HDFSParquetImporter implements Serializable {
System.exit(1);
}
HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
JavaSparkContext jssc = UtilHelpers
.buildSparkContext("data-importer-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory);
JavaSparkContext jssc =
UtilHelpers.buildSparkContext("data-importer-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory);
try {
dataImporter.dataImport(jssc, cfg.retry);
} finally {
@@ -123,18 +123,17 @@ public class HDFSParquetImporter implements Serializable {
fs.delete(new Path(cfg.targetPath), true);
}
//Get schema.
// Get schema.
String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
// Initialize target hoodie table.
Properties properties = new Properties();
properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, cfg.tableName);
properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, cfg.tableType);
HoodieTableMetaClient
.initDatasetAndGetMetaClient(jsc.hadoopConfiguration(), cfg.targetPath, properties);
HoodieTableMetaClient.initDatasetAndGetMetaClient(jsc.hadoopConfiguration(), cfg.targetPath, properties);
HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.targetPath, schemaStr,
cfg.parallelism, Option.empty(), props);
HoodieWriteClient client =
UtilHelpers.createHoodieClient(jsc, cfg.targetPath, schemaStr, cfg.parallelism, Option.empty(), props);
JavaRDD<HoodieRecord<HoodieRecordPayload>> hoodieRecords = buildHoodieRecordsForImport(jsc, schemaStr);
// Get instant time.
@@ -147,66 +146,56 @@ public class HDFSParquetImporter implements Serializable {
return -1;
}
protected JavaRDD<HoodieRecord<HoodieRecordPayload>> buildHoodieRecordsForImport(
JavaSparkContext jsc, String schemaStr) throws IOException {
protected JavaRDD<HoodieRecord<HoodieRecordPayload>> buildHoodieRecordsForImport(JavaSparkContext jsc,
String schemaStr) throws IOException {
Job job = Job.getInstance(jsc.hadoopConfiguration());
// Allow recursive directories to be found
job.getConfiguration().set(FileInputFormat.INPUT_DIR_RECURSIVE, "true");
// To parallelize reading file status.
job.getConfiguration().set(FileInputFormat.LIST_STATUS_NUM_THREADS, "1024");
AvroReadSupport
.setAvroReadSchema(jsc.hadoopConfiguration(), (new Schema.Parser().parse(schemaStr)));
AvroReadSupport.setAvroReadSchema(jsc.hadoopConfiguration(), (new Schema.Parser().parse(schemaStr)));
ParquetInputFormat.setReadSupportClass(job, (AvroReadSupport.class));
return jsc.newAPIHadoopFile(cfg.srcPath,
ParquetInputFormat.class, Void.class, GenericRecord.class, job.getConfiguration())
return jsc
.newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class,
job.getConfiguration())
// To reduce large number of
// tasks.
.coalesce(16 * cfg.parallelism)
.map(entry -> {
GenericRecord genericRecord
= ((Tuple2<Void, GenericRecord>) entry)._2();
Object partitionField =
genericRecord.get(cfg.partitionKey);
.coalesce(16 * cfg.parallelism).map(entry -> {
GenericRecord genericRecord = ((Tuple2<Void, GenericRecord>) entry)._2();
Object partitionField = genericRecord.get(cfg.partitionKey);
if (partitionField == null) {
throw new HoodieIOException(
"partition key is missing. :"
+ cfg.partitionKey);
throw new HoodieIOException("partition key is missing. :" + cfg.partitionKey);
}
Object rowField = genericRecord.get(cfg.rowKey);
if (rowField == null) {
throw new HoodieIOException(
"row field is missing. :" + cfg.rowKey);
throw new HoodieIOException("row field is missing. :" + cfg.rowKey);
}
String partitionPath = partitionField.toString();
logger.info("Row Key : " + rowField + ", Partition Path is (" + partitionPath + ")");
if (partitionField instanceof Number) {
try {
long ts = (long) (Double.parseDouble(partitionField.toString()) * 1000L);
partitionPath =
PARTITION_FORMATTER.format(new Date(ts));
partitionPath = PARTITION_FORMATTER.format(new Date(ts));
} catch (NumberFormatException nfe) {
logger.warn("Unable to parse date from partition field. Assuming partition as (" + partitionField + ")");
}
}
return new HoodieRecord<>(
new HoodieKey(
(String) rowField, partitionPath),
new HoodieJsonPayload(
genericRecord.toString()));
return new HoodieRecord<>(new HoodieKey((String) rowField, partitionPath),
new HoodieJsonPayload(genericRecord.toString()));
});
}
/**
* Imports records to Hoodie dataset
*
* @param client Hoodie Client
* @param instantTime Instant Time
* @param client Hoodie Client
* @param instantTime Instant Time
* @param hoodieRecords Hoodie Records
* @param <T> Type
* @param <T> Type
*/
protected <T extends HoodieRecordPayload> JavaRDD<WriteStatus> load(HoodieWriteClient client,
String instantTime, JavaRDD<HoodieRecord<T>> hoodieRecords) {
protected <T extends HoodieRecordPayload> JavaRDD<WriteStatus> load(HoodieWriteClient client, String instantTime,
JavaRDD<HoodieRecord<T>> hoodieRecords) {
if (cfg.command.toLowerCase().equals("insert")) {
return client.insert(hoodieRecords, instantTime);
}
@@ -220,48 +209,40 @@ public class HDFSParquetImporter implements Serializable {
@Override
public void validate(String name, String value) throws ParameterException {
if (value == null || !validFormats.contains(value)) {
throw new ParameterException(String.format(
"Invalid format type: value:%s: supported formats:%s", value, validFormats));
throw new ParameterException(
String.format("Invalid format type: value:%s: supported formats:%s", value, validFormats));
}
}
}
public static class Config implements Serializable {
@Parameter(names = {"--command", "-c"},
description = "Write command Valid values are insert(default)/upsert",
@Parameter(names = {"--command", "-c"}, description = "Write command Valid values are insert(default)/upsert",
required = false)
public String command = "INSERT";
@Parameter(names = {"--src-path",
"-sp"}, description = "Base path for the input dataset", required = true)
@Parameter(names = {"--src-path", "-sp"}, description = "Base path for the input dataset", required = true)
public String srcPath = null;
@Parameter(names = {"--target-path",
"-tp"}, description = "Base path for the target hoodie dataset", required = true)
@Parameter(names = {"--target-path", "-tp"}, description = "Base path for the target hoodie dataset",
required = true)
public String targetPath = null;
@Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
public String tableName = null;
@Parameter(names = {"--table-type", "-tt"}, description = "Table type", required = true)
public String tableType = null;
@Parameter(names = {"--row-key-field",
"-rk"}, description = "Row key field name", required = true)
@Parameter(names = {"--row-key-field", "-rk"}, description = "Row key field name", required = true)
public String rowKey = null;
@Parameter(names = {"--partition-key-field",
"-pk"}, description = "Partition key field name", required = true)
@Parameter(names = {"--partition-key-field", "-pk"}, description = "Partition key field name", required = true)
public String partitionKey = null;
@Parameter(names = {"--parallelism",
"-pl"}, description = "Parallelism for hoodie insert", required = true)
@Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = true)
public int parallelism = 1;
@Parameter(names = {"--schema-file",
"-sf"}, description = "path for Avro schema file", required = true)
@Parameter(names = {"--schema-file", "-sf"}, description = "path for Avro schema file", required = true)
public String schemaFile = null;
@Parameter(names = {"--format",
"-f"}, description = "Format for the input data.", required = false, validateValueWith =
FormatValidator.class)
@Parameter(names = {"--format", "-f"}, description = "Format for the input data.", required = false,
validateValueWith = FormatValidator.class)
public String format = null;
@Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false)
public String sparkMaster = null;
@Parameter(names = {"--spark-memory",
"-sm"}, description = "spark memory to use", required = true)
@Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true)
public String sparkMemory = null;
@Parameter(names = {"--retry", "-rt"}, description = "number of retries", required = false)
public int retry = 0;

View File

@@ -49,13 +49,13 @@ import org.apache.log4j.Logger;
import org.stringtemplate.v4.ST;
/**
* Utility to pull data after a given commit, based on the supplied HiveQL and save the delta as
* another hive temporary table.
* Utility to pull data after a given commit, based on the supplied HiveQL and save the delta as another hive temporary
* table.
* <p>
* Current Limitations:
* <p>
* - Only the source table can be incrementally pulled (usually the largest table) - The
* incrementally pulled table can't be referenced more than once.
* - Only the source table can be incrementally pulled (usually the largest table) - The incrementally pulled table
* can't be referenced more than once.
*/
public class HiveIncrementalPuller {
@@ -109,8 +109,8 @@ public class HiveIncrementalPuller {
public HiveIncrementalPuller(Config config) throws IOException {
this.config = config;
validateConfig(config);
String templateContent = FileIOUtils.readAsUTFString(
this.getClass().getResourceAsStream("IncrementalPull.sqltemplate"));
String templateContent =
FileIOUtils.readAsUTFString(this.getClass().getResourceAsStream("IncrementalPull.sqltemplate"));
incrementalPullSQLtemplate = new ST(templateContent);
}
@@ -143,14 +143,12 @@ public class HiveIncrementalPuller {
// drop the temp table if exists
String tempDbTable = config.tmpDb + "." + config.targetTable + "__" + config.sourceTable;
String tempDbTablePath =
config.hoodieTmpDir + "/" + config.targetTable + "__" + config.sourceTable + "/"
+ lastCommitTime;
config.hoodieTmpDir + "/" + config.targetTable + "__" + config.sourceTable + "/" + lastCommitTime;
executeStatement("drop table " + tempDbTable, stmt);
deleteHDFSPath(fs, tempDbTablePath);
if (!ensureTempPathExists(fs, lastCommitTime)) {
throw new IllegalStateException(
"Could not create target path at " + new Path(config.hoodieTmpDir,
config.targetTable + "/" + lastCommitTime));
throw new IllegalStateException("Could not create target path at "
+ new Path(config.hoodieTmpDir, config.targetTable + "/" + lastCommitTime));
}
initHiveBeelineProperties(stmt);
@@ -178,12 +176,10 @@ public class HiveIncrementalPuller {
String storedAsClause = getStoredAsClause();
incrementalPullSQLtemplate.add("storedAsClause", storedAsClause);
String incrementalSQL = new Scanner(new File(config.incrementalSQLFile)).useDelimiter("\\Z")
.next();
String incrementalSQL = new Scanner(new File(config.incrementalSQLFile)).useDelimiter("\\Z").next();
if (!incrementalSQL.contains(config.sourceDb + "." + config.sourceTable)) {
log.info("Incremental SQL does not have " + config.sourceDb + "." + config.sourceTable
+ ", which means its pulling from a different table. Fencing this from "
+ "happening.");
+ ", which means its pulling from a different table. Fencing this from " + "happening.");
throw new HoodieIncrementalPullSQLException(
"Incremental SQL does not have " + config.sourceDb + "." + config.sourceTable);
}
@@ -196,8 +192,7 @@ public class HiveIncrementalPuller {
+ "means its not pulling incrementally");
}
incrementalPullSQLtemplate
.add("incrementalSQL", String.format(incrementalSQL, config.fromCommitTime));
incrementalPullSQLtemplate.add("incrementalSQL", String.format(incrementalSQL, config.fromCommitTime));
String sql = incrementalPullSQLtemplate.render();
// Check if the SQL is pulling from the right database
executeStatement(sql, stmt);
@@ -212,8 +207,7 @@ public class HiveIncrementalPuller {
// set the queue
executeStatement("set mapred.job.queue.name=" + config.yarnQueueName, stmt);
// Set the inputformat to HoodieCombineHiveInputFormat
executeStatement(
"set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat", stmt);
executeStatement("set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat", stmt);
// Allow queries without partition predicate
executeStatement("set hive.strict.checks.large.query=false", stmt);
// Dont gather stats for the table created
@@ -221,12 +215,10 @@ public class HiveIncrementalPuller {
// Set the hoodie modie
executeStatement("set hoodie." + config.sourceTable + ".consume.mode=INCREMENTAL", stmt);
// Set the from commit time
executeStatement(
"set hoodie." + config.sourceTable + ".consume.start.timestamp=" + config.fromCommitTime,
stmt);
executeStatement("set hoodie." + config.sourceTable + ".consume.start.timestamp=" + config.fromCommitTime, stmt);
// Set number of commits to pull
executeStatement("set hoodie." + config.sourceTable + ".consume.max.commits=" + String.valueOf(
config.maxCommits), stmt);
executeStatement("set hoodie." + config.sourceTable + ".consume.max.commits=" + String.valueOf(config.maxCommits),
stmt);
}
private boolean deleteHDFSPath(FileSystem fs, String path) throws IOException {
@@ -240,9 +232,8 @@ public class HiveIncrementalPuller {
}
private String inferCommitTime(FileSystem fs) throws SQLException, IOException {
log.info(
"FromCommitTime not specified. Trying to infer it from Hoodie dataset " + config.targetDb
+ "." + config.targetTable);
log.info("FromCommitTime not specified. Trying to infer it from Hoodie dataset " + config.targetDb + "."
+ config.targetTable);
String targetDataLocation = getTableLocation(config.targetDb, config.targetTable);
return scanForCommitTime(fs, targetDataLocation);
}
@@ -256,14 +247,12 @@ public class HiveIncrementalPuller {
resultSet = stmt.executeQuery("describe formatted `" + db + "." + table + "`");
while (resultSet.next()) {
if (resultSet.getString(1).trim().equals("Location:")) {
log.info(
"Inferred table location for " + db + "." + table + " as " + resultSet.getString(2));
log.info("Inferred table location for " + db + "." + table + " as " + resultSet.getString(2));
return resultSet.getString(2);
}
}
} catch (SQLException e) {
throw new HoodieIncrementalPullException(
"Failed to get data location for table " + db + "." + table, e);
throw new HoodieIncrementalPullException("Failed to get data location for table " + db + "." + table, e);
} finally {
try {
if (stmt != null) {
@@ -281,16 +270,15 @@ public class HiveIncrementalPuller {
private String scanForCommitTime(FileSystem fs, String targetDataPath) throws IOException {
if (targetDataPath == null) {
throw new IllegalArgumentException(
"Please specify either --fromCommitTime or --targetDataPath");
throw new IllegalArgumentException("Please specify either --fromCommitTime or --targetDataPath");
}
if (!fs.exists(new Path(targetDataPath)) || !fs.exists(new Path(targetDataPath + "/.hoodie"))) {
return "0";
}
HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs.getConf(), targetDataPath);
Option<HoodieInstant> lastCommit = metadata.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants().lastInstant();
Option<HoodieInstant> lastCommit =
metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
if (lastCommit.isPresent()) {
return lastCommit.get().getTimestamp();
}
@@ -298,15 +286,13 @@ public class HiveIncrementalPuller {
}
private boolean ensureTempPathExists(FileSystem fs, String lastCommitTime) throws IOException {
Path targetBaseDirPath = new Path(config.hoodieTmpDir,
config.targetTable + "__" + config.sourceTable);
Path targetBaseDirPath = new Path(config.hoodieTmpDir, config.targetTable + "__" + config.sourceTable);
if (!fs.exists(targetBaseDirPath)) {
log.info("Creating " + targetBaseDirPath + " with permission drwxrwxrwx");
boolean result = FileSystem.mkdirs(fs, targetBaseDirPath,
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
boolean result =
FileSystem.mkdirs(fs, targetBaseDirPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
if (!result) {
throw new HoodieException(
"Could not create " + targetBaseDirPath + " with the required permissions");
throw new HoodieException("Could not create " + targetBaseDirPath + " with the required permissions");
}
}
@@ -318,23 +304,20 @@ public class HiveIncrementalPuller {
}
}
log.info("Creating " + targetPath + " with permission drwxrwxrwx");
return FileSystem.mkdirs(fs, targetBaseDirPath,
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
return FileSystem.mkdirs(fs, targetBaseDirPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
}
private String getLastCommitTimePulled(FileSystem fs, String sourceTableLocation)
throws IOException {
private String getLastCommitTimePulled(FileSystem fs, String sourceTableLocation) throws IOException {
HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs.getConf(), sourceTableLocation);
List<String> commitsToSync = metadata.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants()
.findInstantsAfter(config.fromCommitTime, config.maxCommits)
.getInstants().map(HoodieInstant::getTimestamp)
List<String> commitsToSync = metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
.findInstantsAfter(config.fromCommitTime, config.maxCommits).getInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
if (commitsToSync.isEmpty()) {
log.warn("Nothing to sync. All commits in " + config.sourceTable + " are "
+ metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
.getInstants().collect(Collectors.toList())
+ " and from commit time is " + config.fromCommitTime);
log.warn(
"Nothing to sync. All commits in "
+ config.sourceTable + " are " + metadata.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants().getInstants().collect(Collectors.toList())
+ " and from commit time is " + config.fromCommitTime);
return null;
}
log.info("Syncing commits " + commitsToSync);

View File

@@ -62,8 +62,8 @@ public class HoodieCleaner {
this.cfg = cfg;
this.jssc = jssc;
this.fs = FSUtils.getFs(cfg.basePath, jssc.hadoopConfiguration());
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) :
UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs)
: UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
log.info("Creating Cleaner with configs : " + props.toString());
}
@@ -74,8 +74,7 @@ public class HoodieCleaner {
}
private HoodieWriteConfig getHoodieClientConfig() throws Exception {
return HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(cfg.basePath)
.withAutoCommit(false)
return HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(cfg.basePath).withAutoCommit(false)
.withProps(props).build();
}

View File

@@ -74,26 +74,23 @@ public class HoodieCompactionAdminTool {
serializeOperationResult(fs, res);
break;
case UNSCHEDULE_FILE:
List<RenameOpResult> r =
admin.unscheduleCompactionFileId(new HoodieFileGroupId(cfg.partitionPath, cfg.fileId),
cfg.skipValidation, cfg.dryRun);
List<RenameOpResult> r = admin.unscheduleCompactionFileId(
new HoodieFileGroupId(cfg.partitionPath, cfg.fileId), cfg.skipValidation, cfg.dryRun);
if (cfg.printOutput) {
System.out.println(r);
}
serializeOperationResult(fs, r);
break;
case UNSCHEDULE_PLAN:
List<RenameOpResult> r2 =
admin
.unscheduleCompactionPlan(cfg.compactionInstantTime, cfg.skipValidation, cfg.parallelism, cfg.dryRun);
List<RenameOpResult> r2 = admin.unscheduleCompactionPlan(cfg.compactionInstantTime, cfg.skipValidation,
cfg.parallelism, cfg.dryRun);
if (cfg.printOutput) {
printOperationResult("Result of Unscheduling Compaction Plan :", r2);
}
serializeOperationResult(fs, r2);
break;
case REPAIR:
List<RenameOpResult> r3 =
admin.repairCompaction(cfg.compactionInstantTime, cfg.parallelism, cfg.dryRun);
List<RenameOpResult> r3 = admin.repairCompaction(cfg.compactionInstantTime, cfg.parallelism, cfg.dryRun);
if (cfg.printOutput) {
printOperationResult("Result of Repair Operation :", r3);
}
@@ -122,7 +119,7 @@ public class HoodieCompactionAdminTool {
* Print Operation Result
*
* @param initialLine Initial Line
* @param result Result
* @param result Result
*/
private <T> void printOperationResult(String initialLine, List<T> result) {
System.out.println(initialLine);
@@ -135,10 +132,7 @@ public class HoodieCompactionAdminTool {
* Operation Types
*/
public enum Operation {
VALIDATE,
UNSCHEDULE_PLAN,
UNSCHEDULE_FILE,
REPAIR
VALIDATE, UNSCHEDULE_PLAN, UNSCHEDULE_FILE, REPAIR
}
/**

View File

@@ -45,29 +45,24 @@ public class HoodieCompactor {
public HoodieCompactor(Config cfg) {
this.cfg = cfg;
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) :
UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs)
: UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
}
public static class Config implements Serializable {
@Parameter(names = {"--base-path",
"-sp"}, description = "Base path for the dataset", required = true)
@Parameter(names = {"--base-path", "-sp"}, description = "Base path for the dataset", required = true)
public String basePath = null;
@Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
public String tableName = null;
@Parameter(names = {"--instant-time",
"-sp"}, description = "Compaction Instant time", required = true)
@Parameter(names = {"--instant-time", "-sp"}, description = "Compaction Instant time", required = true)
public String compactionInstantTime = null;
@Parameter(names = {"--parallelism",
"-pl"}, description = "Parallelism for hoodie insert", required = true)
@Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = true)
public int parallelism = 1;
@Parameter(names = {"--schema-file",
"-sf"}, description = "path for Avro schema file", required = true)
@Parameter(names = {"--schema-file", "-sf"}, description = "path for Avro schema file", required = true)
public String schemaFile = null;
@Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false)
public String sparkMaster = null;
@Parameter(names = {"--spark-memory",
"-sm"}, description = "spark memory to use", required = true)
@Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true)
public String sparkMemory = null;
@Parameter(names = {"--retry", "-rt"}, description = "number of retries", required = false)
public int retry = 0;
@@ -120,18 +115,18 @@ public class HoodieCompactor {
}
private int doCompact(JavaSparkContext jsc) throws Exception {
//Get schema.
// Get schema.
String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism,
Option.empty(), props);
HoodieWriteClient client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props);
JavaRDD<WriteStatus> writeResponse = client.compact(cfg.compactionInstantTime);
return UtilHelpers.handleErrors(jsc, cfg.compactionInstantTime, writeResponse);
}
private int doSchedule(JavaSparkContext jsc) throws Exception {
//Get schema.
HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism,
Option.of(cfg.strategyClassName), props);
// Get schema.
HoodieWriteClient client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism, Option.of(cfg.strategyClassName), props);
client.scheduleCompactionAtInstant(cfg.compactionInstantTime, Option.empty());
return 0;
}

View File

@@ -47,8 +47,7 @@ import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
/**
* Hoodie snapshot copy job which copies latest files from all partitions to another place, for
* snapshot backup.
* Hoodie snapshot copy job which copies latest files from all partitions to another place, for snapshot backup.
*/
public class HoodieSnapshotCopier implements Serializable {
@@ -56,50 +55,42 @@ public class HoodieSnapshotCopier implements Serializable {
static class Config implements Serializable {
@Parameter(names = {"--base-path",
"-bp"}, description = "Hoodie table base path", required = true)
@Parameter(names = {"--base-path", "-bp"}, description = "Hoodie table base path", required = true)
String basePath = null;
@Parameter(names = {"--output-path",
"-op"}, description = "The snapshot output path", required = true)
@Parameter(names = {"--output-path", "-op"}, description = "The snapshot output path", required = true)
String outputPath = null;
@Parameter(names = {"--date-partitioned",
"-dp"}, description = "Can we assume date partitioning?")
@Parameter(names = {"--date-partitioned", "-dp"}, description = "Can we assume date partitioning?")
boolean shouldAssumeDatePartitioning = false;
}
public void snapshot(JavaSparkContext jsc, String baseDir, final String outputDir,
final boolean shouldAssumeDatePartitioning) throws IOException {
FileSystem fs = FSUtils.getFs(baseDir, jsc.hadoopConfiguration());
final SerializableConfiguration serConf = new SerializableConfiguration(
jsc.hadoopConfiguration());
final SerializableConfiguration serConf = new SerializableConfiguration(jsc.hadoopConfiguration());
final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs.getConf(), baseDir);
final ReadOptimizedView fsView = new HoodieTableFileSystemView(
tableMetadata,
final ReadOptimizedView fsView = new HoodieTableFileSystemView(tableMetadata,
tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
// Get the latest commit
Option<HoodieInstant> latestCommit = tableMetadata.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants().lastInstant();
Option<HoodieInstant> latestCommit =
tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
if (!latestCommit.isPresent()) {
logger.warn("No commits present. Nothing to snapshot");
return;
}
final String latestCommitTimestamp = latestCommit.get().getTimestamp();
logger.info(String.format(
"Starting to snapshot latest version files which are also no-late-than %s.",
logger.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.",
latestCommitTimestamp));
List<String> partitions = FSUtils
.getAllPartitionPaths(fs, baseDir, shouldAssumeDatePartitioning);
List<String> partitions = FSUtils.getAllPartitionPaths(fs, baseDir, shouldAssumeDatePartitioning);
if (partitions.size() > 0) {
logger.info(String.format("The job needs to copy %d partitions.", partitions.size()));
// Make sure the output directory is empty
Path outputPath = new Path(outputDir);
if (fs.exists(outputPath)) {
logger.warn(
String.format("The output path %s targetBasePath already exists, deleting", outputPath));
logger.warn(String.format("The output path %s targetBasePath already exists, deleting", outputPath));
fs.delete(new Path(outputDir), true);
}
@@ -107,14 +98,12 @@ public class HoodieSnapshotCopier implements Serializable {
// Only take latest version files <= latestCommit.
FileSystem fs1 = FSUtils.getFs(baseDir, serConf.newCopy());
List<Tuple2<String, String>> filePaths = new ArrayList<>();
Stream<HoodieDataFile> dataFiles = fsView.getLatestDataFilesBeforeOrOn(partition,
latestCommitTimestamp);
dataFiles.forEach(
hoodieDataFile -> filePaths.add(new Tuple2<>(partition, hoodieDataFile.getPath())));
Stream<HoodieDataFile> dataFiles = fsView.getLatestDataFilesBeforeOrOn(partition, latestCommitTimestamp);
dataFiles.forEach(hoodieDataFile -> filePaths.add(new Tuple2<>(partition, hoodieDataFile.getPath())));
// also need to copy over partition metadata
Path partitionMetaFile = new Path(new Path(baseDir, partition),
HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE);
Path partitionMetaFile =
new Path(new Path(baseDir, partition), HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE);
if (fs1.exists(partitionMetaFile)) {
filePaths.add(new Tuple2<>(partition, partitionMetaFile.toString()));
}
@@ -129,15 +118,14 @@ public class HoodieSnapshotCopier implements Serializable {
if (!ifs.exists(toPartitionPath)) {
ifs.mkdirs(toPartitionPath);
}
FileUtil.copy(ifs, sourceFilePath, ifs, new Path(toPartitionPath, sourceFilePath.getName()),
false, ifs.getConf());
FileUtil.copy(ifs, sourceFilePath, ifs, new Path(toPartitionPath, sourceFilePath.getName()), false,
ifs.getConf());
});
// Also copy the .commit files
logger.info(
String.format("Copying .commit files which are no-late-than %s.", latestCommitTimestamp));
FileStatus[] commitFilesToCopy = fs.listStatus(
new Path(baseDir + "/" + HoodieTableMetaClient.METAFOLDER_NAME), (commitFilePath) -> {
logger.info(String.format("Copying .commit files which are no-late-than %s.", latestCommitTimestamp));
FileStatus[] commitFilesToCopy =
fs.listStatus(new Path(baseDir + "/" + HoodieTableMetaClient.METAFOLDER_NAME), (commitFilePath) -> {
if (commitFilePath.getName().equals(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) {
return true;
} else {
@@ -147,15 +135,14 @@ public class HoodieSnapshotCopier implements Serializable {
}
});
for (FileStatus commitStatus : commitFilesToCopy) {
Path targetFilePath = new Path(
outputDir + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitStatus.getPath()
.getName());
Path targetFilePath =
new Path(outputDir + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitStatus.getPath().getName());
if (!fs.exists(targetFilePath.getParent())) {
fs.mkdirs(targetFilePath.getParent());
}
if (fs.exists(targetFilePath)) {
logger.error(String.format(
"The target output commit file (%s targetBasePath) already exists.", targetFilePath));
logger.error(
String.format("The target output commit file (%s targetBasePath) already exists.", targetFilePath));
}
FileUtil.copy(fs, commitStatus.getPath(), fs, targetFilePath, false, fs.getConf());
}
@@ -166,8 +153,7 @@ public class HoodieSnapshotCopier implements Serializable {
// Create the _SUCCESS tag
Path successTagPath = new Path(outputDir + "/_SUCCESS");
if (!fs.exists(successTagPath)) {
logger.info(String.format(
"Creating _SUCCESS under targetBasePath: $s", outputDir));
logger.info(String.format("Creating _SUCCESS under targetBasePath: $s", outputDir));
fs.createNewFile(successTagPath);
}
}
@@ -176,8 +162,8 @@ public class HoodieSnapshotCopier implements Serializable {
// Take input configs
final Config cfg = new Config();
new JCommander(cfg, args);
logger.info(String.format("Snapshot hoodie table from %s targetBasePath to %stargetBasePath",
cfg.basePath, cfg.outputPath));
logger.info(String.format("Snapshot hoodie table from %s targetBasePath to %stargetBasePath", cfg.basePath,
cfg.outputPath));
// Create a spark job to do the snapshot copy
SparkConf sparkConf = new SparkConf().setAppName("Hoodie-snapshot-copier");

View File

@@ -51,8 +51,7 @@ public class HoodieWithTimelineServer implements Serializable {
@Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false)
public String sparkMaster = null;
@Parameter(names = {"--spark-memory",
"-sm"}, description = "spark memory to use", required = true)
@Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true)
public String sparkMemory = null;
@Parameter(names = {"--num-partitions", "-n"}, description = "Num Partitions", required = false)
public Integer numPartitions = 100;
@@ -87,8 +86,7 @@ public class HoodieWithTimelineServer implements Serializable {
System.out.println("Driver Hostname is :" + driverHost);
List<String> messages = new ArrayList<>();
IntStream.range(0, cfg.numPartitions).forEach(i -> messages.add("Hello World"));
List<String> gotMessages =
jsc.parallelize(messages).map(msg -> sendRequest(driverHost, cfg.serverPort)).collect();
List<String> gotMessages = jsc.parallelize(messages).map(msg -> sendRequest(driverHost, cfg.serverPort)).collect();
System.out.println("Got Messages :" + gotMessages);
Preconditions.checkArgument(gotMessages.equals(messages), "Got expected reply from Server");
}

View File

@@ -58,23 +58,22 @@ import org.apache.spark.sql.SparkSession;
public class UtilHelpers {
private static Logger logger = LogManager.getLogger(UtilHelpers.class);
public static Source createSource(String sourceClass, TypedProperties cfg,
JavaSparkContext jssc, SparkSession sparkSession, SchemaProvider schemaProvider)
throws IOException {
public static Source createSource(String sourceClass, TypedProperties cfg, JavaSparkContext jssc,
SparkSession sparkSession, SchemaProvider schemaProvider) throws IOException {
try {
return (Source) ReflectionUtils.loadClass(sourceClass,
new Class<?>[]{TypedProperties.class, JavaSparkContext.class, SparkSession.class, SchemaProvider.class},
cfg, jssc, sparkSession, schemaProvider);
new Class<?>[] {TypedProperties.class, JavaSparkContext.class, SparkSession.class, SchemaProvider.class}, cfg,
jssc, sparkSession, schemaProvider);
} catch (Throwable e) {
throw new IOException("Could not load source class " + sourceClass, e);
}
}
public static SchemaProvider createSchemaProvider(String schemaProviderClass,
TypedProperties cfg, JavaSparkContext jssc) throws IOException {
public static SchemaProvider createSchemaProvider(String schemaProviderClass, TypedProperties cfg,
JavaSparkContext jssc) throws IOException {
try {
return schemaProviderClass == null ? null :
(SchemaProvider) ReflectionUtils.loadClass(schemaProviderClass, cfg, jssc);
return schemaProviderClass == null ? null
: (SchemaProvider) ReflectionUtils.loadClass(schemaProviderClass, cfg, jssc);
} catch (Throwable e) {
throw new IOException("Could not load schema provider class " + schemaProviderClass, e);
}
@@ -116,7 +115,7 @@ public class UtilHelpers {
/**
* Parse Schema from file
*
* @param fs File System
* @param fs File System
* @param schemaFile Schema File
*/
public static String parseSchema(FileSystem fs, String schemaFile) throws Exception {
@@ -149,8 +148,7 @@ public class UtilHelpers {
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.hadoop.mapred.output.compress", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec",
"org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
additionalConfigs.entrySet().forEach(e -> sparkConf.set(e.getKey(), e.getValue()));
@@ -168,6 +166,7 @@ public class UtilHelpers {
/**
* Build Spark Context for ingestion/compaction
*
* @return
*/
public static JavaSparkContext buildSparkContext(String appName, String sparkMaster, String sparkMemory) {
@@ -179,25 +178,22 @@ public class UtilHelpers {
/**
* Build Hoodie write client
*
* @param jsc Java Spark Context
* @param basePath Base Path
* @param schemaStr Schema
* @param jsc Java Spark Context
* @param basePath Base Path
* @param schemaStr Schema
* @param parallelism Parallelism
*/
public static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath,
String schemaStr, int parallelism, Option<String> compactionStrategyClass, TypedProperties properties)
throws Exception {
HoodieCompactionConfig compactionConfig =
compactionStrategyClass.map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false)
.withCompactionStrategy(ReflectionUtils.loadClass(strategy))
.build()).orElse(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build());
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withParallelism(parallelism, parallelism).withSchema(schemaStr)
.combineInput(true, true)
.withCompactionConfig(compactionConfig)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withProps(properties)
.build();
public static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr,
int parallelism, Option<String> compactionStrategyClass, TypedProperties properties) throws Exception {
HoodieCompactionConfig compactionConfig = compactionStrategyClass
.map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false)
.withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build())
.orElse(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build());
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath).withParallelism(parallelism, parallelism)
.withSchema(schemaStr).combineInput(true, true).withCompactionConfig(compactionConfig)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withProps(properties).build();
return new HoodieWriteClient(jsc, config);
}
@@ -206,13 +202,11 @@ public class UtilHelpers {
writeResponse.foreach(writeStatus -> {
if (writeStatus.hasErrors()) {
errors.add(1);
logger.error(String.format("Error processing records :writeStatus:%s",
writeStatus.getStat().toString()));
logger.error(String.format("Error processing records :writeStatus:%s", writeStatus.getStat().toString()));
}
});
if (errors.value() == 0) {
logger.info(
String.format("Dataset imported into hoodie dataset with %s instant time.", instantTime));
logger.info(String.format("Dataset imported into hoodie dataset with %s instant time.", instantTime));
return 0;
}
logger.error(String.format("Import failed with %d errors.", errors.value()));

View File

@@ -38,10 +38,9 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
/**
* This is an one-time use class meant for migrating the configuration for
* "hoodie.compaction.payload.class" in .hoodie/hoodie.properties from com.uber.hoodie to
* org.apache.hudi
* It takes in a file containing base-paths for a set of hudi datasets and does the migration
* This is an one-time use class meant for migrating the configuration for "hoodie.compaction.payload.class" in
* .hoodie/hoodie.properties from com.uber.hoodie to org.apache.hudi It takes in a file containing base-paths for a set
* of hudi datasets and does the migration
*/
public class UpgradePayloadFromUberToApache implements Serializable {
@@ -66,8 +65,8 @@ public class UpgradePayloadFromUberToApache implements Serializable {
if (!basePath.startsWith("#")) {
logger.info("Performing upgrade for " + basePath);
String metaPath = String.format("%s/.hoodie", basePath);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(
FSUtils.prepareHadoopConf(new Configuration()), basePath, false);
HoodieTableMetaClient metaClient =
new HoodieTableMetaClient(FSUtils.prepareHadoopConf(new Configuration()), basePath, false);
HoodieTableConfig tableConfig = metaClient.getTableConfig();
if (tableConfig.getTableType().equals(HoodieTableType.MERGE_ON_READ)) {
Map<String, String> propsMap = tableConfig.getProps();
@@ -75,10 +74,8 @@ public class UpgradePayloadFromUberToApache implements Serializable {
String payloadClass = propsMap.get(HoodieCompactionConfig.PAYLOAD_CLASS_PROP);
logger.info("Found payload class=" + payloadClass);
if (payloadClass.startsWith("com.uber.hoodie")) {
String newPayloadClass = payloadClass.replace("com.uber.hoodie",
"org.apache.hudi");
logger.info("Replacing payload class (" + payloadClass
+ ") with (" + newPayloadClass + ")");
String newPayloadClass = payloadClass.replace("com.uber.hoodie", "org.apache.hudi");
logger.info("Replacing payload class (" + payloadClass + ") with (" + newPayloadClass + ")");
Map<String, String> newPropsMap = new HashMap<>(propsMap);
newPropsMap.put(HoodieCompactionConfig.PAYLOAD_CLASS_PROP, newPayloadClass);
Properties props = new Properties();

View File

@@ -61,6 +61,7 @@ public abstract class AbstractDeltaStreamerService implements Serializable {
/**
* Wait till the service shutdown. If the service shutdown with exception, it will be thrown
*
* @throws ExecutionException
* @throws InterruptedException
*/
@@ -76,6 +77,7 @@ public abstract class AbstractDeltaStreamerService implements Serializable {
/**
* Request shutdown either forcefully or gracefully. Graceful shutdown allows the service to finish up the current
* round of work and shutdown. For graceful shutdown, it waits till the service is shutdown
*
* @param force Forcefully shutdown
*/
void shutdown(boolean force) {
@@ -98,8 +100,9 @@ public abstract class AbstractDeltaStreamerService implements Serializable {
}
/**
* Start the service. Runs the service in a different thread and returns. Also starts a monitor thread
* to run-callbacks in case of shutdown
* Start the service. Runs the service in a different thread and returns. Also starts a monitor thread to
* run-callbacks in case of shutdown
*
* @param onShutdownCallback
*/
public void start(Function<Boolean, Boolean> onShutdownCallback) {
@@ -112,12 +115,14 @@ public abstract class AbstractDeltaStreamerService implements Serializable {
/**
* Service implementation
*
* @return
*/
protected abstract Pair<CompletableFuture, ExecutorService> startService();
/**
* A monitor thread is started which would trigger a callback if the service is shutdown
*
* @param onShutdownCallback
*/
private void monitorThreads(Function<Boolean, Boolean> onShutdownCallback) {
@@ -128,8 +133,7 @@ public abstract class AbstractDeltaStreamerService implements Serializable {
log.info("Monitoring thread(s) !!");
future.get();
} catch (ExecutionException ex) {
log.error("Monitor noticed one or more threads failed."
+ " Requesting graceful shutdown of other threads", ex);
log.error("Monitor noticed one or more threads failed." + " Requesting graceful shutdown of other threads", ex);
error = true;
shutdown(false);
} catch (InterruptedException ie) {

View File

@@ -51,10 +51,9 @@ public class Compactor implements Serializable {
long numWriteErrors = res.collect().stream().filter(r -> r.hasErrors()).count();
if (numWriteErrors != 0) {
// We treat even a single error in compaction as fatal
log.error("Compaction for instant (" + instant + ") failed with write errors. "
+ "Errors :" + numWriteErrors);
throw new HoodieException("Compaction for instant (" + instant + ") failed with write errors. "
+ "Errors :" + numWriteErrors);
log.error("Compaction for instant (" + instant + ") failed with write errors. " + "Errors :" + numWriteErrors);
throw new HoodieException(
"Compaction for instant (" + instant + ") failed with write errors. " + "Errors :" + numWriteErrors);
}
// Commit compaction
compactionClient.commitCompaction(instant.getTimestamp(), res, Option.empty());

View File

@@ -155,11 +155,9 @@ public class DeltaSync implements Serializable {
private final HoodieTableType tableType;
public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,
SchemaProvider schemaProvider, HoodieTableType tableType, TypedProperties props,
JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf,
Function<HoodieWriteClient, Boolean> onInitializingHoodieWriteClient)
throws IOException {
public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider,
HoodieTableType tableType, TypedProperties props, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf,
Function<HoodieWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
this.cfg = cfg;
this.jssc = jssc;
@@ -176,8 +174,8 @@ public class DeltaSync implements Serializable {
this.transformer = UtilHelpers.createTransformer(cfg.transformerClassName);
this.keyGenerator = DataSourceUtils.createKeyGenerator(props);
this.formatAdapter = new SourceFormatAdapter(UtilHelpers.createSource(cfg.sourceClassName, props, jssc,
sparkSession, schemaProvider));
this.formatAdapter = new SourceFormatAdapter(
UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider));
this.hiveConf = hiveConf;
if (cfg.filterDupes) {
@@ -194,8 +192,7 @@ public class DeltaSync implements Serializable {
private void refreshTimeline() throws IOException {
if (fs.exists(new Path(cfg.targetBasePath))) {
HoodieTableMetaClient meta = new HoodieTableMetaClient(new Configuration(fs.getConf()), cfg.targetBasePath);
this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants());
this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
} else {
this.commitTimelineOpt = Option.empty();
HoodieTableMetaClient.initTableType(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath,
@@ -214,8 +211,7 @@ public class DeltaSync implements Serializable {
// Refresh Timeline
refreshTimeline();
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> srcRecordsWithCkpt =
readFromSource(commitTimelineOpt);
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> srcRecordsWithCkpt = readFromSource(commitTimelineOpt);
if (null != srcRecordsWithCkpt) {
// this is the first input batch. If schemaProvider not set, use it and register Avro Schema and start
@@ -246,8 +242,8 @@ public class DeltaSync implements Serializable {
if (commitTimelineOpt.isPresent()) {
Option<HoodieInstant> lastCommit = commitTimelineOpt.get().lastInstant();
if (lastCommit.isPresent()) {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), HoodieCommitMetadata.class);
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), HoodieCommitMetadata.class);
if (cfg.checkpoint != null && !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
resumeCheckpointStr = Option.of(cfg.checkpoint);
} else if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null) {
@@ -274,25 +270,22 @@ public class DeltaSync implements Serializable {
if (transformer != null) {
// Transformation is needed. Fetch New rows in Row Format, apply transformation and then convert them
// to generic records for writing
InputBatch<Dataset<Row>> dataAndCheckpoint = formatAdapter.fetchNewDataInRowFormat(
resumeCheckpointStr, cfg.sourceLimit);
InputBatch<Dataset<Row>> dataAndCheckpoint =
formatAdapter.fetchNewDataInRowFormat(resumeCheckpointStr, cfg.sourceLimit);
Option<Dataset<Row>> transformed =
dataAndCheckpoint.getBatch().map(data -> transformer.apply(jssc, sparkSession, data, props));
checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
avroRDDOptional = transformed.map(t ->
AvroConversionUtils.createRdd(t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
);
avroRDDOptional = transformed
.map(t -> AvroConversionUtils.createRdd(t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD());
// Use Transformed Row's schema if not overridden
// Use Transformed Row's schema if not overridden. If target schema is not specified
// default to RowBasedSchemaProvider
schemaProvider =
this.schemaProvider == null || this.schemaProvider.getTargetSchema() == null
? transformed
.map(r -> (SchemaProvider) new RowBasedSchemaProvider(r.schema()))
.orElse(dataAndCheckpoint.getSchemaProvider())
: this.schemaProvider;
schemaProvider = this.schemaProvider == null || this.schemaProvider.getTargetSchema() == null
? transformed.map(r -> (SchemaProvider) new RowBasedSchemaProvider(r.schema())).orElse(
dataAndCheckpoint.getSchemaProvider())
: this.schemaProvider;
} else {
// Pull the data from the source & prepare the write
InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint =
@@ -303,8 +296,8 @@ public class DeltaSync implements Serializable {
}
if (Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) {
log.info("No new data, source checkpoint has not changed. Nothing to commit."
+ "Old checkpoint=(" + resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")");
log.info("No new data, source checkpoint has not changed. Nothing to commit." + "Old checkpoint=("
+ resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")");
return null;
}
@@ -339,8 +332,7 @@ public class DeltaSync implements Serializable {
if (cfg.filterDupes) {
// turn upserts to insert
cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation;
records = DataSourceUtils.dropDuplicates(jssc, records, writeClient.getConfig(),
writeClient.getTimelineServer());
records = DataSourceUtils.dropDuplicates(jssc, records, writeClient.getConfig(), writeClient.getTimelineServer());
}
boolean isEmpty = records.isEmpty();
@@ -375,8 +367,7 @@ public class DeltaSync implements Serializable {
+ totalErrorRecords + "/" + totalRecords);
}
boolean success = writeClient.commit(commitTime, writeStatusRDD,
Option.of(checkpointCommitMetadata));
boolean success = writeClient.commit(commitTime, writeStatusRDD, Option.of(checkpointCommitMetadata));
if (success) {
log.info("Commit " + commitTime + " successful!");
@@ -396,14 +387,12 @@ public class DeltaSync implements Serializable {
throw new HoodieException("Commit " + commitTime + " failed!");
}
} else {
log.error("Delta Sync found errors when writing. Errors/Total="
+ totalErrorRecords + "/" + totalRecords);
log.error("Delta Sync found errors when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);
log.error("Printing out the top 100 errors");
writeStatusRDD.filter(ws -> ws.hasErrors()).take(100).forEach(ws -> {
log.error("Global error :", ws.getGlobalError());
if (ws.getErrors().size() > 0) {
ws.getErrors().entrySet().forEach(r ->
log.trace("Error for key:" + r.getKey() + " is " + r.getValue()));
ws.getErrors().entrySet().forEach(r -> log.trace("Error for key:" + r.getKey() + " is " + r.getValue()));
}
});
// Rolling back instant
@@ -432,7 +421,7 @@ public class DeltaSync implements Serializable {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//No-Op
// No-Op
}
}
}
@@ -445,8 +434,8 @@ public class DeltaSync implements Serializable {
private void syncHive() throws ClassNotFoundException {
if (cfg.enableHiveSync) {
HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath);
log.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName
+ "). Hive metastore URL :" + hiveSyncConfig.jdbcUrl + ", basePath :" + cfg.targetBasePath);
log.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :"
+ hiveSyncConfig.jdbcUrl + ", basePath :" + cfg.targetBasePath);
new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable();
}
@@ -474,17 +463,13 @@ public class DeltaSync implements Serializable {
*/
private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) {
HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder()
.withPath(cfg.targetBasePath)
.combineInput(cfg.filterDupes, true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withPayloadClass(cfg.payloadClassName)
HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName)
// Inline compaction is disabled for continuous mode. otherwise enabled for MOR
.withInlineCompaction(cfg.isInlineCompactionEnabled()).build())
.forTable(cfg.targetTableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withAutoCommit(false)
.withProps(props);
.withAutoCommit(false).withProps(props);
if (null != schemaProvider && null != schemaProvider.getTargetSchema()) {
builder = builder.withSchema(schemaProvider.getTargetSchema().toString());

View File

@@ -71,13 +71,9 @@ import org.apache.spark.sql.SparkSession;
* dataset. Does not maintain any state, queries at runtime to see how far behind the target dataset is from the source
* dataset. This can be overriden to force sync from a timestamp.
*
* In continuous mode, DeltaStreamer runs in loop-mode going through the below operations
* (a) pull-from-source
* (b) write-to-sink
* (c) Schedule Compactions if needed
* (d) Conditionally Sync to Hive
* each cycle. For MOR table with continuous mode enabled, a seperate compactor thread is allocated to execute
* compactions
* In continuous mode, DeltaStreamer runs in loop-mode going through the below operations (a) pull-from-source (b)
* write-to-sink (c) Schedule Compactions if needed (d) Conditionally Sync to Hive each cycle. For MOR table with
* continuous mode enabled, a seperate compactor thread is allocated to execute compactions
*/
public class HoodieDeltaStreamer implements Serializable {
@@ -111,6 +107,7 @@ public class HoodieDeltaStreamer implements Serializable {
/**
* Main method to start syncing
*
* @throws Exception
*/
public void sync() throws Exception {
@@ -146,8 +143,9 @@ public class HoodieDeltaStreamer implements Serializable {
public static class Config implements Serializable {
@Parameter(names = {"--target-base-path"}, description = "base path for the target hoodie dataset. "
+ "(Will be created if did not exist first time around. If exists, expected to be a hoodie dataset)",
@Parameter(names = {"--target-base-path"},
description = "base path for the target hoodie dataset. "
+ "(Will be created if did not exist first time around. If exists, expected to be a hoodie dataset)",
required = true)
public String targetBasePath;
@@ -155,8 +153,8 @@ public class HoodieDeltaStreamer implements Serializable {
@Parameter(names = {"--target-table"}, description = "name of the target table in Hive", required = true)
public String targetTableName;
@Parameter(names = {"--storage-type"}, description = "Type of Storage. "
+ "COPY_ON_WRITE (or) MERGE_ON_READ", required = true)
@Parameter(names = {"--storage-type"}, description = "Type of Storage. " + "COPY_ON_WRITE (or) MERGE_ON_READ",
required = true)
public String storageType;
@Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
@@ -170,9 +168,10 @@ public class HoodieDeltaStreamer implements Serializable {
+ "(using the CLI parameter \"--propsFilePath\") can also be passed command line using this parameter")
public List<String> configs = new ArrayList<>();
@Parameter(names = {"--source-class"}, description = "Subclass of org.apache.hudi.utilities.sources to read data. "
+ "Built-in options: org.apache.hudi.utilities.sources.{JsonDFSSource (default), AvroDFSSource, "
+ "JsonKafkaSource, AvroKafkaSource, HiveIncrPullSource}")
@Parameter(names = {"--source-class"},
description = "Subclass of org.apache.hudi.utilities.sources to read data. "
+ "Built-in options: org.apache.hudi.utilities.sources.{JsonDFSSource (default), AvroDFSSource, "
+ "JsonKafkaSource, AvroKafkaSource, HiveIncrPullSource}")
public String sourceClassName = JsonDFSSource.class.getName();
@Parameter(names = {"--source-ordering-field"}, description = "Field within source record to decide how"
@@ -203,12 +202,11 @@ public class HoodieDeltaStreamer implements Serializable {
public long sourceLimit = Long.MAX_VALUE;
@Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input "
+ "is purely new data/inserts to gain speed)",
converter = OperationConvertor.class)
+ "is purely new data/inserts to gain speed)", converter = OperationConvertor.class)
public Operation operation = Operation.UPSERT;
@Parameter(names = {"--filter-dupes"}, description = "Should duplicate records from source be dropped/filtered out"
+ "before insert/bulk-insert")
@Parameter(names = {"--filter-dupes"},
description = "Should duplicate records from source be dropped/filtered out" + "before insert/bulk-insert")
public Boolean filterDupes = false;
@Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive")
@@ -223,8 +221,8 @@ public class HoodieDeltaStreamer implements Serializable {
+ " source-fetch -> Transform -> Hudi Write in loop")
public Boolean continuousMode = false;
@Parameter(names = {"--min-sync-interval-seconds"}, description = "the min sync interval of each sync in "
+ "continuous mode")
@Parameter(names = {"--min-sync-interval-seconds"},
description = "the min sync interval of each sync in " + "continuous mode")
public Integer minSyncIntervalSeconds = 0;
@Parameter(names = {"--spark-master"}, description = "spark master to use.")
@@ -233,8 +231,8 @@ public class HoodieDeltaStreamer implements Serializable {
@Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written")
public Boolean commitOnErrors = false;
@Parameter(names = {"--delta-sync-scheduling-weight"}, description =
"Scheduling weight for delta sync as defined in "
@Parameter(names = {"--delta-sync-scheduling-weight"},
description = "Scheduling weight for delta sync as defined in "
+ "https://spark.apache.org/docs/latest/job-scheduling.html")
public Integer deltaSyncSchedulingWeight = 1;
@@ -253,8 +251,8 @@ public class HoodieDeltaStreamer implements Serializable {
/**
* Compaction is enabled for MoR table by default. This flag disables it
*/
@Parameter(names = {"--disable-compaction"}, description = "Compaction is enabled for MoR table by default."
+ "This flag disables it ")
@Parameter(names = {"--disable-compaction"},
description = "Compaction is enabled for MoR table by default." + "This flag disables it ")
public Boolean forceDisableCompaction = false;
/**
@@ -288,8 +286,8 @@ public class HoodieDeltaStreamer implements Serializable {
}
Map<String, String> additionalSparkConfigs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
JavaSparkContext jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName,
cfg.sparkMaster, additionalSparkConfigs);
JavaSparkContext jssc =
UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, cfg.sparkMaster, additionalSparkConfigs);
try {
new HoodieDeltaStreamer(cfg, jssc).sync();
} finally {
@@ -349,8 +347,8 @@ public class HoodieDeltaStreamer implements Serializable {
this.sparkSession = SparkSession.builder().config(jssc.getConf()).getOrCreate();
if (fs.exists(new Path(cfg.targetBasePath))) {
HoodieTableMetaClient meta = new HoodieTableMetaClient(
new Configuration(fs.getConf()), cfg.targetBasePath, false);
HoodieTableMetaClient meta =
new HoodieTableMetaClient(new Configuration(fs.getConf()), cfg.targetBasePath, false);
tableType = meta.getTableType();
// This will guarantee there is no surprise with table type
Preconditions.checkArgument(tableType.equals(HoodieTableType.valueOf(cfg.storageType)),
@@ -367,8 +365,8 @@ public class HoodieDeltaStreamer implements Serializable {
cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation;
}
deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, tableType,
props, jssc, fs, hiveConf, this::onInitializingWriteClient);
deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, tableType, props, jssc, fs, hiveConf,
this::onInitializingWriteClient);
}
public DeltaSync getDeltaSync() {
@@ -392,9 +390,8 @@ public class HoodieDeltaStreamer implements Serializable {
Option<String> scheduledCompactionInstant = deltaSync.syncOnce();
if (scheduledCompactionInstant.isPresent()) {
log.info("Enqueuing new pending compaction instant (" + scheduledCompactionInstant + ")");
asyncCompactService.enqueuePendingCompaction(new HoodieInstant(
State.REQUESTED, HoodieTimeline.COMPACTION_ACTION,
scheduledCompactionInstant.get()));
asyncCompactService.enqueuePendingCompaction(new HoodieInstant(State.REQUESTED,
HoodieTimeline.COMPACTION_ACTION, scheduledCompactionInstant.get()));
asyncCompactService.waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions);
}
long toSleepMs = cfg.minSyncIntervalSeconds * 1000 - (System.currentTimeMillis() - start);
@@ -429,6 +426,7 @@ public class HoodieDeltaStreamer implements Serializable {
/**
* Callback to initialize write client and start compaction service if required
*
* @param writeClient HoodieWriteClient
* @return
*/
@@ -436,8 +434,8 @@ public class HoodieDeltaStreamer implements Serializable {
if (cfg.isAsyncCompactionEnabled()) {
asyncCompactService = new AsyncCompactService(jssc, writeClient);
// Enqueue existing pending compactions first
HoodieTableMetaClient meta = new HoodieTableMetaClient(
new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, true);
HoodieTableMetaClient meta =
new HoodieTableMetaClient(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, true);
List<HoodieInstant> pending = CompactionUtils.getPendingCompactionInstantTimes(meta);
pending.stream().forEach(hoodieInstant -> asyncCompactService.enqueuePendingCompaction(hoodieInstant));
asyncCompactService.start((error) -> {
@@ -499,7 +497,7 @@ public class HoodieDeltaStreamer implements Serializable {
public AsyncCompactService(JavaSparkContext jssc, HoodieWriteClient client) {
this.jssc = jssc;
this.compactor = new Compactor(client, jssc);
//TODO: HUDI-157 : Only allow 1 compactor to run in parallel till Incremental View on MOR is fully implemented.
// TODO: HUDI-157 : Only allow 1 compactor to run in parallel till Incremental View on MOR is fully implemented.
this.maxConcurrentCompaction = 1;
}
@@ -512,6 +510,7 @@ public class HoodieDeltaStreamer implements Serializable {
/**
* Wait till outstanding pending compactions reduces to the passed in value
*
* @param numPendingCompactions Maximum pending compactions allowed
* @throws InterruptedException
*/
@@ -528,6 +527,7 @@ public class HoodieDeltaStreamer implements Serializable {
/**
* Fetch Next pending compaction if available
*
* @return
* @throws InterruptedException
*/

View File

@@ -32,8 +32,8 @@ import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
/**
* Utility Class to generate Spark Scheduling allocation file. This kicks in only when user
* sets spark.scheduler.mode=FAIR at spark-submit time
* Utility Class to generate Spark Scheduling allocation file. This kicks in only when user sets
* spark.scheduler.mode=FAIR at spark-submit time
*/
public class SchedulerConfGenerator {
@@ -45,25 +45,16 @@ public class SchedulerConfGenerator {
public static final String SPARK_SCHEDULER_ALLOCATION_FILE_KEY = "spark.scheduler.allocation.file";
private static String SPARK_SCHEDULING_PATTERN =
"<?xml version=\"1.0\"?>\n"
+ "<allocations>\n"
+ " <pool name=\"%s\">\n"
+ " <schedulingMode>%s</schedulingMode>\n"
+ " <weight>%s</weight>\n"
+ " <minShare>%s</minShare>\n"
+ " </pool>\n"
+ " <pool name=\"%s\">\n"
+ " <schedulingMode>%s</schedulingMode>\n"
+ " <weight>%s</weight>\n"
+ " <minShare>%s</minShare>\n"
+ " </pool>\n"
+ "</allocations>";
"<?xml version=\"1.0\"?>\n" + "<allocations>\n" + " <pool name=\"%s\">\n"
+ " <schedulingMode>%s</schedulingMode>\n" + " <weight>%s</weight>\n" + " <minShare>%s</minShare>\n"
+ " </pool>\n" + " <pool name=\"%s\">\n" + " <schedulingMode>%s</schedulingMode>\n"
+ " <weight>%s</weight>\n" + " <minShare>%s</minShare>\n" + " </pool>\n" + "</allocations>";
private static String generateConfig(Integer deltaSyncWeight, Integer compactionWeight, Integer deltaSyncMinShare,
Integer compactionMinShare) {
return String.format(SPARK_SCHEDULING_PATTERN,
DELTASYNC_POOL_NAME, "FAIR", deltaSyncWeight.toString(), deltaSyncMinShare.toString(),
COMPACT_POOL_NAME, "FAIR", compactionWeight.toString(), compactionMinShare.toString());
return String.format(SPARK_SCHEDULING_PATTERN, DELTASYNC_POOL_NAME, "FAIR", deltaSyncWeight.toString(),
deltaSyncMinShare.toString(), COMPACT_POOL_NAME, "FAIR", compactionWeight.toString(),
compactionMinShare.toString());
}
@@ -75,13 +66,11 @@ public class SchedulerConfGenerator {
public static Map<String, String> getSparkSchedulingConfigs(HoodieDeltaStreamer.Config cfg) throws Exception {
scala.Option<String> scheduleModeKeyOption = new SparkConf().getOption(SPARK_SCHEDULER_MODE_KEY);
final Option<String> sparkSchedulerMode =
scheduleModeKeyOption.isDefined()
? Option.of(scheduleModeKeyOption.get())
: Option.empty();
scheduleModeKeyOption.isDefined() ? Option.of(scheduleModeKeyOption.get()) : Option.empty();
Map<String, String> additionalSparkConfigs = new HashMap<>();
if (sparkSchedulerMode.isPresent() && "FAIR".equals(sparkSchedulerMode.get())
&& cfg.continuousMode && cfg.storageType.equals(HoodieTableType.MERGE_ON_READ.name())) {
if (sparkSchedulerMode.isPresent() && "FAIR".equals(sparkSchedulerMode.get()) && cfg.continuousMode
&& cfg.storageType.equals(HoodieTableType.MERGE_ON_READ.name())) {
String sparkSchedulingConfFile = generateAndStoreConfig(cfg.deltaSyncSchedulingWeight,
cfg.compactSchedulingWeight, cfg.deltaSyncSchedulingMinShare, cfg.compactSchedulingMinShare);
additionalSparkConfigs.put(SPARK_SCHEDULER_ALLOCATION_FILE_KEY, sparkSchedulingConfFile);
@@ -92,10 +81,8 @@ public class SchedulerConfGenerator {
return additionalSparkConfigs;
}
private static String generateAndStoreConfig(Integer deltaSyncWeight,
Integer compactionWeight,
Integer deltaSyncMinShare,
Integer compactionMinShare) throws IOException {
private static String generateAndStoreConfig(Integer deltaSyncWeight, Integer compactionWeight,
Integer deltaSyncMinShare, Integer compactionMinShare) throws IOException {
File tempConfigFile = File.createTempFile(UUID.randomUUID().toString(), ".xml");
BufferedWriter bw = new BufferedWriter(new FileWriter(tempConfigFile));
bw.write(generateConfig(deltaSyncWeight, compactionWeight, deltaSyncMinShare, compactionMinShare));

View File

@@ -49,26 +49,24 @@ public final class SourceFormatAdapter {
}
/**
* Fetch new data in avro format. If the source provides data in different format, they are translated
* to Avro format
* Fetch new data in avro format. If the source provides data in different format, they are translated to Avro format
*
* @param lastCkptStr
* @param sourceLimit
* @return
*/
public InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat(Option<String> lastCkptStr,
long sourceLimit) {
public InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat(Option<String> lastCkptStr, long sourceLimit) {
switch (source.getSourceType()) {
case AVRO:
return ((AvroSource)source).fetchNext(lastCkptStr, sourceLimit);
return ((AvroSource) source).fetchNext(lastCkptStr, sourceLimit);
case JSON: {
InputBatch<JavaRDD<String>> r = ((JsonSource)source).fetchNext(lastCkptStr, sourceLimit);
InputBatch<JavaRDD<String>> r = ((JsonSource) source).fetchNext(lastCkptStr, sourceLimit);
AvroConvertor convertor = new AvroConvertor(r.getSchemaProvider().getSourceSchema());
return new InputBatch<>(Option.ofNullable(
r.getBatch().map(rdd -> rdd.map(convertor::fromJson))
.orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
return new InputBatch<>(Option.ofNullable(r.getBatch().map(rdd -> rdd.map(convertor::fromJson)).orElse(null)),
r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
case ROW: {
InputBatch<Dataset<Row>> r = ((RowSource)source).fetchNext(lastCkptStr, sourceLimit);
InputBatch<Dataset<Row>> r = ((RowSource) source).fetchNext(lastCkptStr, sourceLimit);
return new InputBatch<>(Option.ofNullable(r.getBatch().map(
rdd -> (AvroConversionUtils.createRdd(rdd, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()))
.orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
@@ -79,8 +77,8 @@ public final class SourceFormatAdapter {
}
/**
* Fetch new data in row format. If the source provides data in different format, they are translated
* to Row format
* Fetch new data in row format. If the source provides data in different format, they are translated to Row format
*
* @param lastCkptStr
* @param sourceLimit
* @return
@@ -88,22 +86,27 @@ public final class SourceFormatAdapter {
public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> lastCkptStr, long sourceLimit) {
switch (source.getSourceType()) {
case ROW:
return ((RowSource)source).fetchNext(lastCkptStr, sourceLimit);
return ((RowSource) source).fetchNext(lastCkptStr, sourceLimit);
case AVRO: {
InputBatch<JavaRDD<GenericRecord>> r = ((AvroSource)source).fetchNext(lastCkptStr, sourceLimit);
InputBatch<JavaRDD<GenericRecord>> r = ((AvroSource) source).fetchNext(lastCkptStr, sourceLimit);
Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
return new InputBatch<>(Option.ofNullable(
r.getBatch().map(rdd -> AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd),
sourceSchema.toString(), source.getSparkSession()))
.orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
return new InputBatch<>(
Option
.ofNullable(
r.getBatch()
.map(rdd -> AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(),
source.getSparkSession()))
.orElse(null)),
r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
case JSON: {
InputBatch<JavaRDD<String>> r = ((JsonSource)source).fetchNext(lastCkptStr, sourceLimit);
InputBatch<JavaRDD<String>> r = ((JsonSource) source).fetchNext(lastCkptStr, sourceLimit);
Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
StructType dataType = AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema);
return new InputBatch<>(Option.ofNullable(
r.getBatch().map(rdd -> source.getSparkSession().read().schema(dataType).json(rdd))
.orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
return new InputBatch<>(
Option.ofNullable(
r.getBatch().map(rdd -> source.getSparkSession().read().schema(dataType).json(rdd)).orElse(null)),
r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
default:
throw new IllegalArgumentException("Unknown source type (" + source.getSourceType() + ")");

View File

@@ -55,14 +55,11 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
static class Config {
// One value from TimestampType above
private static final String TIMESTAMP_TYPE_FIELD_PROP = "hoodie.deltastreamer.keygen"
+ ".timebased.timestamp.type";
private static final String TIMESTAMP_INPUT_DATE_FORMAT_PROP = "hoodie.deltastreamer.keygen"
+ ".timebased.input"
+ ".dateformat";
private static final String TIMESTAMP_OUTPUT_DATE_FORMAT_PROP = "hoodie.deltastreamer.keygen"
+ ".timebased.output"
+ ".dateformat";
private static final String TIMESTAMP_TYPE_FIELD_PROP = "hoodie.deltastreamer.keygen" + ".timebased.timestamp.type";
private static final String TIMESTAMP_INPUT_DATE_FORMAT_PROP =
"hoodie.deltastreamer.keygen" + ".timebased.input" + ".dateformat";
private static final String TIMESTAMP_OUTPUT_DATE_FORMAT_PROP =
"hoodie.deltastreamer.keygen" + ".timebased.output" + ".dateformat";
}
public TimestampBasedKeyGenerator(TypedProperties config) {
@@ -73,10 +70,9 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
this.outputDateFormat = config.getString(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP);
if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) {
DataSourceUtils
.checkRequiredProperties(config, Collections.singletonList(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP));
this.inputDateFormat = new SimpleDateFormat(
config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP));
DataSourceUtils.checkRequiredProperties(config,
Collections.singletonList(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP));
this.inputDateFormat = new SimpleDateFormat(config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP));
this.inputDateFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
}
}
@@ -105,8 +101,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
return new HoodieKey(DataSourceUtils.getNestedFieldValAsString(record, recordKeyField),
partitionPathFormat.format(new Date(unixTime * 1000)));
} catch (ParseException pe) {
throw new HoodieDeltaStreamerException(
"Unable to parse input partition field :" + partitionVal, pe);
throw new HoodieDeltaStreamerException("Unable to parse input partition field :" + partitionVal, pe);
}
}
}

View File

@@ -82,8 +82,8 @@ public class TimelineServerPerf implements Serializable {
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(timelineServer.getFs(), cfg.basePath, true);
Collections.shuffle(allPartitionPaths);
List<String> selected = allPartitionPaths.stream().filter(p -> !p.contains("error"))
.limit(cfg.maxPartitions).collect(Collectors.toList());
List<String> selected = allPartitionPaths.stream().filter(p -> !p.contains("error")).limit(cfg.maxPartitions)
.collect(Collectors.toList());
JavaSparkContext jsc = UtilHelpers.buildSparkContext("hudi-view-perf-" + cfg.basePath, cfg.sparkMaster);
if (!useExternalTimelineServer) {
this.timelineServer.startService();
@@ -100,15 +100,13 @@ public class TimelineServerPerf implements Serializable {
String dumpPrefix = UUID.randomUUID().toString();
System.out.println("First Iteration to load all partitions");
Dumper d = new Dumper(metaClient.getFs(), new Path(reportDir,
String.format("1_%s.csv", dumpPrefix)));
Dumper d = new Dumper(metaClient.getFs(), new Path(reportDir, String.format("1_%s.csv", dumpPrefix)));
d.init();
d.dump(runLookups(jsc, selected, fsView, 1, 0));
d.close();
System.out.println("\n\n\n First Iteration is done");
Dumper d2 = new Dumper(metaClient.getFs(), new Path(reportDir,
String.format("2_%s.csv", dumpPrefix)));
Dumper d2 = new Dumper(metaClient.getFs(), new Path(reportDir, String.format("2_%s.csv", dumpPrefix)));
d2.init();
d2.dump(runLookups(jsc, selected, fsView, cfg.numIterations, cfg.numCoresPerExecutor));
d2.close();
@@ -164,8 +162,8 @@ public class TimelineServerPerf implements Serializable {
long beginTs = System.currentTimeMillis();
Option<FileSlice> c = fsView.getLatestFileSlice(partition, fileId);
long endTs = System.currentTimeMillis();
System.out.println("Latest File Slice for part=" + partition + ", fileId="
+ fileId + ", Slice=" + c + ", Time=" + (endTs - beginTs));
System.out.println("Latest File Slice for part=" + partition + ", fileId=" + fileId + ", Slice=" + c + ", Time="
+ (endTs - beginTs));
latencyHistogram.update(endTs - beginTs);
}
return new PerfStats(partition, id, latencyHistogram.getSnapshot());
@@ -288,8 +286,7 @@ public class TimelineServerPerf implements Serializable {
description = "Directory where spilled view entries will be stored. Used for SPILLABLE_DISK storage type")
public String baseStorePathForFileGroups = FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR;
@Parameter(names = {"--rocksdb-path", "-rp"},
description = "Root directory for RocksDB")
@Parameter(names = {"--rocksdb-path", "-rp"}, description = "Root directory for RocksDB")
public String rocksDBPath = FileSystemViewStorageConfig.DEFAULT_ROCKSDB_BASE_PATH;
@Parameter(names = {"--wait-for-manual-queries", "-ww"})

View File

@@ -38,10 +38,8 @@ public class FilebasedSchemaProvider extends SchemaProvider {
* Configs supported
*/
public static class Config {
private static final String SOURCE_SCHEMA_FILE_PROP = "hoodie.deltastreamer.schemaprovider"
+ ".source.schema.file";
private static final String TARGET_SCHEMA_FILE_PROP = "hoodie.deltastreamer.schemaprovider"
+ ".target.schema.file";
private static final String SOURCE_SCHEMA_FILE_PROP = "hoodie.deltastreamer.schemaprovider" + ".source.schema.file";
private static final String TARGET_SCHEMA_FILE_PROP = "hoodie.deltastreamer.schemaprovider" + ".target.schema.file";
}
private final FileSystem fs;
@@ -55,11 +53,10 @@ public class FilebasedSchemaProvider extends SchemaProvider {
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SOURCE_SCHEMA_FILE_PROP));
this.fs = FSUtils.getFs(props.getString(Config.SOURCE_SCHEMA_FILE_PROP), jssc.hadoopConfiguration());
try {
this.sourceSchema = new Schema.Parser().parse(
fs.open(new Path(props.getString(Config.SOURCE_SCHEMA_FILE_PROP))));
this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(props.getString(Config.SOURCE_SCHEMA_FILE_PROP))));
if (props.containsKey(Config.TARGET_SCHEMA_FILE_PROP)) {
this.targetSchema = new Schema.Parser().parse(
fs.open(new Path(props.getString(Config.TARGET_SCHEMA_FILE_PROP))));
this.targetSchema =
new Schema.Parser().parse(fs.open(new Path(props.getString(Config.TARGET_SCHEMA_FILE_PROP))));
}
} catch (IOException ioe) {
throw new HoodieIOException("Error reading schema", ioe);

View File

@@ -23,9 +23,8 @@ import org.apache.hudi.common.util.TypedProperties;
import org.apache.spark.api.java.JavaSparkContext;
/**
* Schema provider that will force DeltaStreamer to infer target schema from the dataset.
* It can be used with SQL or Flattening transformers to avoid having a target schema in the schema
* registry.
* Schema provider that will force DeltaStreamer to infer target schema from the dataset. It can be used with SQL or
* Flattening transformers to avoid having a target schema in the schema registry.
*/
public class NullTargetSchemaRegistryProvider extends SchemaRegistryProvider {

View File

@@ -38,6 +38,6 @@ public class RowBasedSchemaProvider extends SchemaProvider {
@Override
public Schema getSourceSchema() {
return AvroConversionUtils.convertStructTypeToAvroSchema(rowStruct, HOODIE_RECORD_STRUCT_NAME,
HOODIE_RECORD_NAMESPACE);
HOODIE_RECORD_NAMESPACE);
}
}

View File

@@ -46,20 +46,17 @@ public class AvroDFSSource extends AvroSource {
}
@Override
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCkptStr,
long sourceLimit) {
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) {
Pair<Option<String>, String> selectPathsWithMaxModificationTime =
pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, sourceLimit);
return selectPathsWithMaxModificationTime.getLeft().map(pathStr -> new InputBatch<>(
Option.of(fromFiles(pathStr)),
selectPathsWithMaxModificationTime.getRight()))
return selectPathsWithMaxModificationTime.getLeft()
.map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)), selectPathsWithMaxModificationTime.getRight()))
.orElseGet(() -> new InputBatch<>(Option.empty(), selectPathsWithMaxModificationTime.getRight()));
}
private JavaRDD<GenericRecord> fromFiles(String pathStr) {
JavaPairRDD<AvroKey, NullWritable> avroRDD = sparkContext.newAPIHadoopFile(pathStr,
AvroKeyInputFormat.class, AvroKey.class, NullWritable.class,
sparkContext.hadoopConfiguration());
JavaPairRDD<AvroKey, NullWritable> avroRDD = sparkContext.newAPIHadoopFile(pathStr, AvroKeyInputFormat.class,
AvroKey.class, NullWritable.class, sparkContext.hadoopConfiguration());
return avroRDD.keys().map(r -> ((GenericRecord) r.datum()));
}
}

View File

@@ -50,24 +50,21 @@ public class AvroKafkaSource extends AvroSource {
}
@Override
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCheckpointStr,
long sourceLimit) {
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit);
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
if (totalNewMsgs <= 0) {
return new InputBatch<>(Option.empty(),
lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : "");
return new InputBatch<>(Option.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : "");
} else {
log.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
}
JavaRDD<GenericRecord> newDataRDD = toRDD(offsetRanges);
return new InputBatch<>(Option.of(newDataRDD),
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
return new InputBatch<>(Option.of(newDataRDD), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
}
private JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) {
JavaRDD<GenericRecord> recordRDD = KafkaUtils
.createRDD(sparkContext, String.class, Object.class, StringDecoder.class, KafkaAvroDecoder.class,
JavaRDD<GenericRecord> recordRDD =
KafkaUtils.createRDD(sparkContext, String.class, Object.class, StringDecoder.class, KafkaAvroDecoder.class,
offsetGen.getKafkaParams(), offsetRanges).values().map(obj -> (GenericRecord) obj);
return recordRDD;
}

View File

@@ -27,9 +27,7 @@ import org.apache.spark.sql.SparkSession;
public abstract class AvroSource extends Source<JavaRDD<GenericRecord>> {
public AvroSource(TypedProperties props,
JavaSparkContext sparkContext,
SparkSession sparkSession,
public AvroSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider, SourceType.AVRO);
}

View File

@@ -46,13 +46,12 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
/**
* Source to read deltas produced by {@link HiveIncrementalPuller}, commit by commit and apply
* to the target table
* Source to read deltas produced by {@link HiveIncrementalPuller}, commit by commit and apply to the target table
* <p>
* The general idea here is to have commits sync across the data pipeline.
* <p>
* [Source Tables(s)] ====> HiveIncrementalScanner ==> incrPullRootPath ==> targetTable {c1,c2,c3,...}
* {c1,c2,c3,...} {c1,c2,c3,...}
* [Source Tables(s)] ====> HiveIncrementalScanner ==> incrPullRootPath ==> targetTable {c1,c2,c3,...} {c1,c2,c3,...}
* {c1,c2,c3,...}
* <p>
* This produces beautiful causality, that makes data issues in ETLs very easy to debug
*/
@@ -84,8 +83,7 @@ public class HiveIncrPullSource extends AvroSource {
/**
* Finds the first commit from source, greater than the target's last commit, and reads it out.
*/
private Option<String> findCommitToPull(Option<String> latestTargetCommit)
throws IOException {
private Option<String> findCommitToPull(Option<String> latestTargetCommit) throws IOException {
log.info("Looking for commits ");
@@ -104,7 +102,7 @@ public class HiveIncrPullSource extends AvroSource {
}
for (String commitTime : commitTimes) {
//TODO(vc): Add an option to delete consumed commits
// TODO(vc): Add an option to delete consumed commits
if (commitTime.compareTo(latestTargetCommit.get()) > 0) {
return Option.of(commitTime);
}
@@ -113,30 +111,24 @@ public class HiveIncrPullSource extends AvroSource {
}
@Override
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(
Option<String> lastCheckpointStr, long sourceLimit) {
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
try {
// find the source commit to pull
Option<String> commitToPull = findCommitToPull(lastCheckpointStr);
if (!commitToPull.isPresent()) {
return new InputBatch<>(Option.empty(),
lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : "");
return new InputBatch<>(Option.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : "");
}
// read the files out.
List<FileStatus> commitDeltaFiles = Arrays.asList(
fs.listStatus(new Path(incrPullRootPath, commitToPull.get())));
String pathStr = commitDeltaFiles.stream().map(f -> f.getPath().toString())
.collect(Collectors.joining(","));
JavaPairRDD<AvroKey, NullWritable> avroRDD = sparkContext.newAPIHadoopFile(pathStr,
AvroKeyInputFormat.class, AvroKey.class, NullWritable.class,
sparkContext.hadoopConfiguration());
List<FileStatus> commitDeltaFiles = Arrays.asList(fs.listStatus(new Path(incrPullRootPath, commitToPull.get())));
String pathStr = commitDeltaFiles.stream().map(f -> f.getPath().toString()).collect(Collectors.joining(","));
JavaPairRDD<AvroKey, NullWritable> avroRDD = sparkContext.newAPIHadoopFile(pathStr, AvroKeyInputFormat.class,
AvroKey.class, NullWritable.class, sparkContext.hadoopConfiguration());
return new InputBatch<>(Option.of(avroRDD.keys().map(r -> ((GenericRecord) r.datum()))),
String.valueOf(commitToPull.get()));
} catch (IOException ioe) {
throw new HoodieIOException(
"Unable to read from source from checkpoint: " + lastCheckpointStr, ioe);
throw new HoodieIOException("Unable to read from source from checkpoint: " + lastCheckpointStr, ioe);
}
}
}

View File

@@ -72,8 +72,7 @@ public class HoodieIncrSource extends RowSource {
private static final Boolean DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT = false;
}
public HoodieIncrSource(TypedProperties props,
JavaSparkContext sparkContext, SparkSession sparkSession,
public HoodieIncrSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
}
@@ -84,13 +83,12 @@ public class HoodieIncrSource extends RowSource {
DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.HOODIE_SRC_BASE_PATH));
/**
DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.HOODIE_SRC_BASE_PATH,
Config.HOODIE_SRC_PARTITION_FIELDS));
List<String> partitionFields = props.getStringList(Config.HOODIE_SRC_PARTITION_FIELDS, ",",
new ArrayList<>());
PartitionValueExtractor extractor = DataSourceUtils.createPartitionExtractor(props.getString(
Config.HOODIE_SRC_PARTITION_EXTRACTORCLASS, Config.DEFAULT_HOODIE_SRC_PARTITION_EXTRACTORCLASS));
**/
* DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.HOODIE_SRC_BASE_PATH,
* Config.HOODIE_SRC_PARTITION_FIELDS)); List<String> partitionFields =
* props.getStringList(Config.HOODIE_SRC_PARTITION_FIELDS, ",", new ArrayList<>()); PartitionValueExtractor
* extractor = DataSourceUtils.createPartitionExtractor(props.getString( Config.HOODIE_SRC_PARTITION_EXTRACTORCLASS,
* Config.DEFAULT_HOODIE_SRC_PARTITION_EXTRACTORCLASS));
**/
String srcPath = props.getString(Config.HOODIE_SRC_BASE_PATH);
int numInstantsPerFetch = props.getInteger(Config.NUM_INSTANTS_PER_FETCH, Config.DEFAULT_NUM_INSTANTS_PER_FETCH);
boolean readLatestOnMissingCkpt = props.getBoolean(Config.READ_LATEST_INSTANT_ON_MISSING_CKPT,
@@ -117,42 +115,31 @@ public class HoodieIncrSource extends RowSource {
Dataset<Row> source = reader.load(srcPath);
/**
log.info("Partition Fields are : (" + partitionFields + "). Initial Source Schema :" + source.schema());
StructType newSchema = new StructType(source.schema().fields());
for (String field : partitionFields) {
newSchema = newSchema.add(field, DataTypes.StringType, true);
}
/**
* Validates if the commit time is sane and also generates Partition fields from _hoodie_partition_path if
* log.info("Partition Fields are : (" + partitionFields + "). Initial Source Schema :" + source.schema());
*
* StructType newSchema = new StructType(source.schema().fields()); for (String field : partitionFields) { newSchema
* = newSchema.add(field, DataTypes.StringType, true); }
*
* /** Validates if the commit time is sane and also generates Partition fields from _hoodie_partition_path if
* configured
*
Dataset<Row> validated = source.map((MapFunction<Row, Row>) (Row row) -> {
// _hoodie_instant_time
String instantTime = row.getString(0);
IncrSourceHelper.validateInstantTime(row, instantTime, instantEndpts.getKey(), instantEndpts.getValue());
if (!partitionFields.isEmpty()) {
// _hoodie_partition_path
String hoodiePartitionPath = row.getString(3);
List<Object> partitionVals = extractor.extractPartitionValuesInPath(hoodiePartitionPath).stream()
.map(o -> (Object) o).collect(Collectors.toList());
Preconditions.checkArgument(partitionVals.size() == partitionFields.size(),
"#partition-fields != #partition-values-extracted");
List<Object> rowObjs = new ArrayList<>(scala.collection.JavaConversions.seqAsJavaList(row.toSeq()));
rowObjs.addAll(partitionVals);
return RowFactory.create(rowObjs.toArray());
}
return row;
}, RowEncoder.apply(newSchema));
log.info("Validated Source Schema :" + validated.schema());
**/
* Dataset<Row> validated = source.map((MapFunction<Row, Row>) (Row row) -> { // _hoodie_instant_time String
* instantTime = row.getString(0); IncrSourceHelper.validateInstantTime(row, instantTime, instantEndpts.getKey(),
* instantEndpts.getValue()); if (!partitionFields.isEmpty()) { // _hoodie_partition_path String hoodiePartitionPath
* = row.getString(3); List<Object> partitionVals =
* extractor.extractPartitionValuesInPath(hoodiePartitionPath).stream() .map(o -> (Object)
* o).collect(Collectors.toList()); Preconditions.checkArgument(partitionVals.size() == partitionFields.size(),
* "#partition-fields != #partition-values-extracted"); List<Object> rowObjs = new
* ArrayList<>(scala.collection.JavaConversions.seqAsJavaList(row.toSeq())); rowObjs.addAll(partitionVals); return
* RowFactory.create(rowObjs.toArray()); } return row; }, RowEncoder.apply(newSchema));
*
* log.info("Validated Source Schema :" + validated.schema());
**/
// Remove Hoodie meta columns except partition path from input source
final Dataset<Row> src = source.drop(HoodieRecord.HOODIE_META_COLUMNS.stream()
.filter(x -> !x.equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toArray(String[]::new));
//log.info("Final Schema from Source is :" + src.schema());
// log.info("Final Schema from Source is :" + src.schema());
return Pair.of(Option.of(src), instantEndpts.getRight());
}
}

View File

@@ -27,8 +27,7 @@ public class InputBatch<T> {
private final String checkpointForNextBatch;
private final SchemaProvider schemaProvider;
public InputBatch(Option<T> batch, String checkpointForNextBatch,
SchemaProvider schemaProvider) {
public InputBatch(Option<T> batch, String checkpointForNextBatch, SchemaProvider schemaProvider) {
this.batch = batch;
this.checkpointForNextBatch = checkpointForNextBatch;
this.schemaProvider = schemaProvider;

View File

@@ -41,12 +41,11 @@ public class JsonDFSSource extends JsonSource {
}
@Override
protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCkptStr,
long sourceLimit) {
protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) {
Pair<Option<String>, String> selPathsWithMaxModificationTime =
pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, sourceLimit);
return selPathsWithMaxModificationTime.getLeft().map(pathStr -> new InputBatch<>(
Option.of(fromFiles(pathStr)), selPathsWithMaxModificationTime.getRight()))
return selPathsWithMaxModificationTime.getLeft()
.map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)), selPathsWithMaxModificationTime.getRight()))
.orElse(new InputBatch<>(Option.empty(), selPathsWithMaxModificationTime.getRight()));
}

View File

@@ -48,13 +48,11 @@ public class JsonKafkaSource extends JsonSource {
}
@Override
protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCheckpointStr,
long sourceLimit) {
protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit);
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
if (totalNewMsgs <= 0) {
return new InputBatch<>(Option.empty(),
lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : "");
return new InputBatch<>(Option.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : "");
}
log.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
JavaRDD<String> newDataRDD = toRDD(offsetRanges);

View File

@@ -26,9 +26,7 @@ import org.apache.spark.sql.SparkSession;
public abstract class JsonSource extends Source<JavaRDD<String>> {
public JsonSource(TypedProperties props,
JavaSparkContext sparkContext,
SparkSession sparkSession,
public JsonSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider, SourceType.JSON);
}

View File

@@ -30,15 +30,12 @@ import org.apache.spark.sql.SparkSession;
public abstract class RowSource extends Source<Dataset<Row>> {
public RowSource(TypedProperties props,
JavaSparkContext sparkContext,
SparkSession sparkSession,
public RowSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider, SourceType.ROW);
}
protected abstract Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr,
long sourceLimit);
protected abstract Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit);
@Override
protected final InputBatch<Dataset<Row>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) {

View File

@@ -34,9 +34,7 @@ public abstract class Source<T> implements Serializable {
protected static volatile Logger log = LogManager.getLogger(Source.class);
public enum SourceType {
JSON,
AVRO,
ROW
JSON, AVRO, ROW
}
protected transient TypedProperties props;
@@ -64,6 +62,7 @@ public abstract class Source<T> implements Serializable {
/**
* Main API called by Hoodie Delta Streamer to fetch records
*
* @param lastCkptStr Last Checkpoint
* @param sourceLimit Source Limit
* @return
@@ -71,8 +70,8 @@ public abstract class Source<T> implements Serializable {
public final InputBatch<T> fetchNext(Option<String> lastCkptStr, long sourceLimit) {
InputBatch<T> batch = fetchNewData(lastCkptStr, sourceLimit);
// If overriddenSchemaProvider is passed in CLI, use it
return overriddenSchemaProvider == null ? batch : new InputBatch<>(batch.getBatch(),
batch.getCheckpointForNextBatch(), overriddenSchemaProvider);
return overriddenSchemaProvider == null ? batch
: new InputBatch<>(batch.getBatch(), batch.getCheckpointForNextBatch(), overriddenSchemaProvider);
}
public SourceType getSourceType() {

View File

@@ -27,8 +27,8 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.avro.MercifulJsonConverter;
/**
* Convert a variety of datum into Avro GenericRecords. Has a bunch of lazy
* fields to circumvent issues around serializing these objects from driver to executors
* Convert a variety of datum into Avro GenericRecords. Has a bunch of lazy fields to circumvent issues around
* serializing these objects from driver to executors
*/
public class AvroConvertor implements Serializable {

View File

@@ -59,18 +59,18 @@ public class DFSPathSelector {
this.fs = FSUtils.getFs(props.getString(Config.ROOT_INPUT_PATH_PROP), hadoopConf);
}
public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(
Option<String> lastCheckpointStr, long sourceLimit) {
public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(Option<String> lastCheckpointStr,
long sourceLimit) {
try {
// obtain all eligible files under root folder.
List<FileStatus> eligibleFiles = new ArrayList<>();
RemoteIterator<LocatedFileStatus> fitr = fs.listFiles(
new Path(props.getString(Config.ROOT_INPUT_PATH_PROP)), true);
RemoteIterator<LocatedFileStatus> fitr =
fs.listFiles(new Path(props.getString(Config.ROOT_INPUT_PATH_PROP)), true);
while (fitr.hasNext()) {
LocatedFileStatus fileStatus = fitr.next();
if (fileStatus.isDirectory() || IGNORE_FILEPREFIX_LIST.stream()
.anyMatch(pfx -> fileStatus.getPath().getName().startsWith(pfx))) {
if (fileStatus.isDirectory()
|| IGNORE_FILEPREFIX_LIST.stream().anyMatch(pfx -> fileStatus.getPath().getName().startsWith(pfx))) {
continue;
}
eligibleFiles.add(fileStatus);
@@ -83,8 +83,7 @@ public class DFSPathSelector {
long maxModificationTime = Long.MIN_VALUE;
List<FileStatus> filteredFiles = new ArrayList<>();
for (FileStatus f : eligibleFiles) {
if (lastCheckpointStr.isPresent() && f.getModificationTime() <= Long.valueOf(
lastCheckpointStr.get())) {
if (lastCheckpointStr.isPresent() && f.getModificationTime() <= Long.valueOf(lastCheckpointStr.get())) {
// skip processed files
continue;
}
@@ -101,20 +100,15 @@ public class DFSPathSelector {
// no data to read
if (filteredFiles.size() == 0) {
return new ImmutablePair<>(Option.empty(),
lastCheckpointStr.orElseGet(() -> String.valueOf(Long.MIN_VALUE)));
return new ImmutablePair<>(Option.empty(), lastCheckpointStr.orElseGet(() -> String.valueOf(Long.MIN_VALUE)));
}
// read the files out.
String pathStr = filteredFiles.stream().map(f -> f.getPath().toString())
.collect(Collectors.joining(","));
String pathStr = filteredFiles.stream().map(f -> f.getPath().toString()).collect(Collectors.joining(","));
return new ImmutablePair<>(
Option.ofNullable(pathStr),
String.valueOf(maxModificationTime));
return new ImmutablePair<>(Option.ofNullable(pathStr), String.valueOf(maxModificationTime));
} catch (IOException ioe) {
throw new HoodieIOException(
"Unable to read from source from checkpoint: " + lastCheckpointStr, ioe);
throw new HoodieIOException("Unable to read from source from checkpoint: " + lastCheckpointStr, ioe);
}
}
}

View File

@@ -51,13 +51,11 @@ public class IncrSourceHelper {
* @param readLatestOnMissingBeginInstant when begin instant is missing, allow reading from latest committed instant
* @return begin and end instants
*/
public static Pair<String, String> calculateBeginAndEndInstants(
JavaSparkContext jssc, String srcBasePath, int numInstantsPerFetch, Option<String> beginInstant,
boolean readLatestOnMissingBeginInstant) {
Preconditions.checkArgument(numInstantsPerFetch > 0, "Make sure the config"
+ " hoodie.deltastreamer.source.hoodieincr.num_instants is set to a positive value");
HoodieTableMetaClient srcMetaClient = new HoodieTableMetaClient(jssc.hadoopConfiguration(),
srcBasePath, true);
public static Pair<String, String> calculateBeginAndEndInstants(JavaSparkContext jssc, String srcBasePath,
int numInstantsPerFetch, Option<String> beginInstant, boolean readLatestOnMissingBeginInstant) {
Preconditions.checkArgument(numInstantsPerFetch > 0,
"Make sure the config" + " hoodie.deltastreamer.source.hoodieincr.num_instants is set to a positive value");
HoodieTableMetaClient srcMetaClient = new HoodieTableMetaClient(jssc.hadoopConfiguration(), srcBasePath, true);
final HoodieTimeline activeCommitTimeline =
srcMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
@@ -72,11 +70,8 @@ public class IncrSourceHelper {
}
});
Option<HoodieInstant> nthInstant = Option.fromJavaOptional(
activeCommitTimeline
.findInstantsAfter(beginInstantTime, numInstantsPerFetch)
.getInstants()
.reduce((x, y) -> y));
Option<HoodieInstant> nthInstant = Option.fromJavaOptional(activeCommitTimeline
.findInstantsAfter(beginInstantTime, numInstantsPerFetch).getInstants().reduce((x, y) -> y));
return Pair.of(beginInstantTime, nthInstant.map(instant -> instant.getTimestamp()).orElse(beginInstantTime));
}
@@ -90,14 +85,12 @@ public class IncrSourceHelper {
*/
public static void validateInstantTime(Row row, String instantTime, String sinceInstant, String endInstant) {
Preconditions.checkNotNull(instantTime);
Preconditions.checkArgument(HoodieTimeline.compareTimestamps(instantTime,
sinceInstant, HoodieTimeline.GREATER),
"Instant time(_hoodie_commit_time) in row (" + row + ") was : " + instantTime
+ "but expected to be between " + sinceInstant + "(excl) - "
+ endInstant + "(incl)");
Preconditions.checkArgument(HoodieTimeline.compareTimestamps(instantTime,
endInstant, HoodieTimeline.LESSER_OR_EQUAL),
"Instant time(_hoodie_commit_time) in row (" + row + ") was : " + instantTime
+ "but expected to be between " + sinceInstant + "(excl) - " + endInstant + "(incl)");
Preconditions.checkArgument(HoodieTimeline.compareTimestamps(instantTime, sinceInstant, HoodieTimeline.GREATER),
"Instant time(_hoodie_commit_time) in row (" + row + ") was : " + instantTime + "but expected to be between "
+ sinceInstant + "(excl) - " + endInstant + "(incl)");
Preconditions.checkArgument(
HoodieTimeline.compareTimestamps(instantTime, endInstant, HoodieTimeline.LESSER_OR_EQUAL),
"Instant time(_hoodie_commit_time) in row (" + row + ") was : " + instantTime + "but expected to be between "
+ sinceInstant + "(excl) - " + endInstant + "(incl)");
}
}

View File

@@ -58,8 +58,7 @@ public class KafkaOffsetGen {
/**
* Reconstruct checkpoint from string.
*/
public static HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> strToOffsets(
String checkpointStr) {
public static HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> strToOffsets(String checkpointStr) {
HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> offsetMap = new HashMap<>();
if (checkpointStr.length() == 0) {
return offsetMap;
@@ -75,14 +74,15 @@ public class KafkaOffsetGen {
}
/**
* String representation of checkpoint <p> Format: topic1,0:offset0,1:offset1,2:offset2, .....
* String representation of checkpoint
* <p>
* Format: topic1,0:offset0,1:offset1,2:offset2, .....
*/
public static String offsetsToStr(OffsetRange[] ranges) {
StringBuilder sb = new StringBuilder();
// at least 1 partition will be present.
sb.append(ranges[0].topic() + ",");
sb.append(Arrays.stream(ranges)
.map(r -> String.format("%s:%d", r.partition(), r.untilOffset()))
sb.append(Arrays.stream(ranges).map(r -> String.format("%s:%d", r.partition(), r.untilOffset()))
.collect(Collectors.joining(",")));
return sb.toString();
}
@@ -94,10 +94,8 @@ public class KafkaOffsetGen {
* @param toOffsetMap offsets of where each partitions is currently at
* @param numEvents maximum number of events to read.
*/
public static OffsetRange[] computeOffsetRanges(
HashMap<TopicAndPartition, LeaderOffset> fromOffsetMap,
HashMap<TopicAndPartition, LeaderOffset> toOffsetMap,
long numEvents) {
public static OffsetRange[] computeOffsetRanges(HashMap<TopicAndPartition, LeaderOffset> fromOffsetMap,
HashMap<TopicAndPartition, LeaderOffset> toOffsetMap, long numEvents) {
Comparator<OffsetRange> byPartition = Comparator.comparing(OffsetRange::partition);
@@ -114,8 +112,8 @@ public class KafkaOffsetGen {
// keep going until we have events to allocate and partitions still not exhausted.
while (allocedEvents < numEvents && exhaustedPartitions.size() < toOffsetMap.size()) {
long remainingEvents = numEvents - allocedEvents;
long eventsPerPartition = (long) Math
.ceil((1.0 * remainingEvents) / (toOffsetMap.size() - exhaustedPartitions.size()));
long eventsPerPartition =
(long) Math.ceil((1.0 * remainingEvents) / (toOffsetMap.size() - exhaustedPartitions.size()));
// Allocate the remaining events to non-exhausted partitions, in round robin fashion
for (int i = 0; i < ranges.length; i++) {
@@ -162,8 +160,7 @@ public class KafkaOffsetGen {
* Kafka reset offset strategies
*/
enum KafkaResetOffsetStrategies {
LARGEST,
SMALLEST
LARGEST, SMALLEST
}
/**
@@ -193,12 +190,11 @@ public class KafkaOffsetGen {
// Obtain current metadata for the topic
KafkaCluster cluster = new KafkaCluster(ScalaHelpers.toScalaMap(kafkaParams));
Either<ArrayBuffer<Throwable>, Set<TopicAndPartition>> either = cluster.getPartitions(
ScalaHelpers.toScalaSet(new HashSet<>(Collections.singletonList(topicName))));
Either<ArrayBuffer<Throwable>, Set<TopicAndPartition>> either =
cluster.getPartitions(ScalaHelpers.toScalaSet(new HashSet<>(Collections.singletonList(topicName))));
if (either.isLeft()) {
// log errors. and bail out.
throw new HoodieDeltaStreamerException("Error obtaining partition metadata",
either.left().get().head());
throw new HoodieDeltaStreamerException("Error obtaining partition metadata", either.left().get().head());
}
Set<TopicAndPartition> topicPartitions = either.right().get();
@@ -208,26 +204,25 @@ public class KafkaOffsetGen {
if (lastCheckpointStr.isPresent()) {
fromOffsets = checkupValidOffsets(cluster, lastCheckpointStr, topicPartitions);
} else {
KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies.valueOf(
props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies
.valueOf(props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
switch (autoResetValue) {
case SMALLEST:
fromOffsets = new HashMap(ScalaHelpers.toJavaMap(
cluster.getEarliestLeaderOffsets(topicPartitions).right().get()));
fromOffsets =
new HashMap(ScalaHelpers.toJavaMap(cluster.getEarliestLeaderOffsets(topicPartitions).right().get()));
break;
case LARGEST:
fromOffsets = new HashMap(
ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get()));
fromOffsets =
new HashMap(ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get()));
break;
default:
throw new HoodieNotSupportedException(
"Auto reset value must be one of 'smallest' or 'largest' ");
throw new HoodieNotSupportedException("Auto reset value must be one of 'smallest' or 'largest' ");
}
}
// Obtain the latest offsets.
HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> toOffsets = new HashMap(
ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get()));
HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> toOffsets =
new HashMap(ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get()));
// Come up with final set of OffsetRanges to read (account for new partitions, limit number of events)
long numEvents = Math.min(DEFAULT_MAX_EVENTS_TO_READ, sourceLimit);
@@ -236,22 +231,17 @@ public class KafkaOffsetGen {
return offsetRanges;
}
// check up checkpoint offsets is valid or not, if true, return checkpoint offsets,
// check up checkpoint offsets is valid or not, if true, return checkpoint offsets,
// else return earliest offsets
private HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkupValidOffsets(
KafkaCluster cluster,
Option<String> lastCheckpointStr,
Set<TopicAndPartition> topicPartitions) {
private HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkupValidOffsets(KafkaCluster cluster,
Option<String> lastCheckpointStr, Set<TopicAndPartition> topicPartitions) {
HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkpointOffsets =
CheckpointUtils.strToOffsets(lastCheckpointStr.get());
CheckpointUtils.strToOffsets(lastCheckpointStr.get());
HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> earliestOffsets =
new HashMap(ScalaHelpers.toJavaMap(
cluster.getEarliestLeaderOffsets(topicPartitions).right().get()));
new HashMap(ScalaHelpers.toJavaMap(cluster.getEarliestLeaderOffsets(topicPartitions).right().get()));
boolean checkpointOffsetReseter = checkpointOffsets.entrySet()
.stream()
.anyMatch(offset -> offset.getValue().offset()
< earliestOffsets.get(offset.getKey()).offset());
boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream()
.anyMatch(offset -> offset.getValue().offset() < earliestOffsets.get(offset.getKey()).offset());
return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
}

View File

@@ -39,18 +39,14 @@ public class FlatteningTransformer implements Transformer {
/** Configs supported */
@Override
public Dataset<Row> apply(
JavaSparkContext jsc,
SparkSession sparkSession,
Dataset<Row> rowDataset,
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
TypedProperties properties) {
// tmp table name doesn't like dashes
String tmpTable = TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
log.info("Registering tmp table : " + tmpTable);
rowDataset.registerTempTable(tmpTable);
return sparkSession.sql("select " + flattenSchema(rowDataset.schema(), null)
+ " from " + tmpTable);
return sparkSession.sql("select " + flattenSchema(rowDataset.schema(), null) + " from " + tmpTable);
}
public String flattenSchema(StructType schema, String prefix) {
@@ -75,7 +71,7 @@ public class FlatteningTransformer implements Transformer {
}
if (selectSQLQuery.length() > 0) {
selectSQLQuery. deleteCharAt(selectSQLQuery.length() - 1);
selectSQLQuery.deleteCharAt(selectSQLQuery.length() - 1);
}
return selectSQLQuery.toString();

View File

@@ -30,8 +30,8 @@ import org.apache.spark.sql.SparkSession;
public class IdentityTransformer implements Transformer {
@Override
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession,
Dataset<Row> rowDataset, TypedProperties properties) {
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
TypedProperties properties) {
return rowDataset;
}
}

View File

@@ -48,8 +48,8 @@ public class SqlQueryBasedTransformer implements Transformer {
}
@Override
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession,
Dataset<Row> rowDataset, TypedProperties properties) {
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
TypedProperties properties) {
String transformerSQL = properties.getString(Config.TRANSFORMER_SQL);
if (null == transformerSQL) {
throw new IllegalArgumentException("Missing configuration : (" + Config.TRANSFORMER_SQL + ")");

View File

@@ -38,6 +38,5 @@ public interface Transformer {
* @param properties Config properties
* @return Transformed Dataset
*/
Dataset apply(JavaSparkContext jsc, SparkSession sparkSession,
Dataset<Row> rowDataset, TypedProperties properties);
Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties);
}

View File

@@ -43,8 +43,8 @@ public class SchedulerConfGeneratorTest {
cfg.continuousMode = true;
cfg.storageType = HoodieTableType.COPY_ON_WRITE.name();
configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
assertNull("storageType is not MERGE_ON_READ", configs.get(
SchedulerConfGenerator.SPARK_SCHEDULER_ALLOCATION_FILE_KEY));
assertNull("storageType is not MERGE_ON_READ",
configs.get(SchedulerConfGenerator.SPARK_SCHEDULER_ALLOCATION_FILE_KEY));
cfg.storageType = HoodieTableType.MERGE_ON_READ.name();
configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);

View File

@@ -34,23 +34,20 @@ public class TestFlatteningTransformer {
FlatteningTransformer transformer = new FlatteningTransformer();
// Init
StructField[] nestedStructFields = new StructField[]{
new StructField("nestedIntColumn", DataTypes.IntegerType, true, Metadata.empty()),
new StructField("nestedStringColumn", DataTypes.StringType, true, Metadata.empty()),
};
StructField[] nestedStructFields =
new StructField[] {new StructField("nestedIntColumn", DataTypes.IntegerType, true, Metadata.empty()),
new StructField("nestedStringColumn", DataTypes.StringType, true, Metadata.empty()),};
StructField[] structFields = new StructField[]{
new StructField("intColumn", DataTypes.IntegerType, true, Metadata.empty()),
new StructField("stringColumn", DataTypes.StringType, true, Metadata.empty()),
new StructField("nestedStruct", DataTypes.createStructType(nestedStructFields), true, Metadata.empty())
};
StructField[] structFields =
new StructField[] {new StructField("intColumn", DataTypes.IntegerType, true, Metadata.empty()),
new StructField("stringColumn", DataTypes.StringType, true, Metadata.empty()),
new StructField("nestedStruct", DataTypes.createStructType(nestedStructFields), true, Metadata.empty())};
StructType schema = new StructType(structFields);
String flattenedSql = transformer.flattenSchema(schema, null);
assertEquals("intColumn as intColumn,stringColumn as stringColumn,"
+ "nestedStruct.nestedIntColumn as nestedStruct_nestedIntColumn,"
+ "nestedStruct.nestedStringColumn as nestedStruct_nestedStringColumn",
flattenedSql);
+ "nestedStruct.nestedIntColumn as nestedStruct_nestedIntColumn,"
+ "nestedStruct.nestedStringColumn as nestedStruct_nestedStringColumn", flattenedSql);
}
}

View File

@@ -90,8 +90,7 @@ public class TestHDFSParquetImporter implements Serializable {
jsc = getJavaSparkContext();
// Test root folder.
String basePath = (new Path(dfsBasePath,
Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
String basePath = (new Path(dfsBasePath, Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
// Hoodie root folder
Path hoodieFolder = new Path(basePath, "testTarget");
@@ -99,13 +98,12 @@ public class TestHDFSParquetImporter implements Serializable {
// Create schema file.
String schemaFile = new Path(basePath, "file.schema").toString();
//Create generic records.
// Create generic records.
Path srcFolder = new Path(basePath, "testSrc");
createRecords(srcFolder);
HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(),
hoodieFolder.toString(), "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1,
schemaFile);
HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
"testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
AtomicInteger retry = new AtomicInteger(3);
AtomicInteger fileCreated = new AtomicInteger(0);
HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg) {
@@ -134,8 +132,7 @@ public class TestHDFSParquetImporter implements Serializable {
RemoteIterator<LocatedFileStatus> hoodieFiles = dfs.listFiles(hoodieFolder, true);
while (hoodieFiles.hasNext()) {
LocatedFileStatus f = hoodieFiles.next();
isCommitFilePresent =
isCommitFilePresent || f.getPath().toString().endsWith(HoodieTimeline.COMMIT_EXTENSION);
isCommitFilePresent = isCommitFilePresent || f.getPath().toString().endsWith(HoodieTimeline.COMMIT_EXTENSION);
if (f.getPath().toString().endsWith("parquet")) {
SQLContext sc = new SQLContext(jsc);
@@ -164,14 +161,11 @@ public class TestHDFSParquetImporter implements Serializable {
long startTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000;
List<GenericRecord> records = new ArrayList<GenericRecord>();
for (long recordNum = 0; recordNum < 96; recordNum++) {
records.add(HoodieTestDataGenerator
.generateGenericRecord(Long.toString(recordNum), "rider-" + recordNum,
"driver-" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "rider-" + recordNum,
"driver-" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
}
ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(srcFile)
.withSchema(HoodieTestDataGenerator.avroSchema)
.withConf(HoodieTestUtils.getDefaultHadoopConf())
.build();
.withSchema(HoodieTestDataGenerator.avroSchema).withConf(HoodieTestUtils.getDefaultHadoopConf()).build();
for (GenericRecord record : records) {
writer.write(record);
}
@@ -194,15 +188,13 @@ public class TestHDFSParquetImporter implements Serializable {
jsc = getJavaSparkContext();
// Test root folder.
String basePath = (new Path(dfsBasePath,
Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
String basePath = (new Path(dfsBasePath, Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
// Hoodie root folder
Path hoodieFolder = new Path(basePath, "testTarget");
Path srcFolder = new Path(basePath.toString(), "srcTest");
Path schemaFile = new Path(basePath.toString(), "missingFile.schema");
HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(),
hoodieFolder.toString(), "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1,
schemaFile.toString());
HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
"testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile.toString());
HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
// Should fail - return : -1.
assertEquals(-1, dataImporter.dataImport(jsc, 0));
@@ -228,12 +220,11 @@ public class TestHDFSParquetImporter implements Serializable {
jsc = getJavaSparkContext();
// Test root folder.
String basePath = (new Path(dfsBasePath,
Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
String basePath = (new Path(dfsBasePath, Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
// Hoodie root folder
Path hoodieFolder = new Path(basePath, "testTarget");
//Create generic records.
// Create generic records.
Path srcFolder = new Path(basePath, "testSrc");
createRecords(srcFolder);
@@ -245,14 +236,14 @@ public class TestHDFSParquetImporter implements Serializable {
HDFSParquetImporter.Config cfg;
// Check for invalid row key.
cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(), "testTable",
"COPY_ON_WRITE", "invalidRowKey", "timestamp", 1, schemaFile.toString());
cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(), "testTable", "COPY_ON_WRITE",
"invalidRowKey", "timestamp", 1, schemaFile.toString());
dataImporter = new HDFSParquetImporter(cfg);
assertEquals(-1, dataImporter.dataImport(jsc, 0));
// Check for invalid partition key.
cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(), "testTable",
"COPY_ON_WRITE", "_row_key", "invalidTimeStamp", 1, schemaFile.toString());
cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(), "testTable", "COPY_ON_WRITE",
"_row_key", "invalidTimeStamp", 1, schemaFile.toString());
dataImporter = new HDFSParquetImporter(cfg);
assertEquals(-1, dataImporter.dataImport(jsc, 0));
@@ -263,9 +254,8 @@ public class TestHDFSParquetImporter implements Serializable {
}
}
private HDFSParquetImporter.Config getHDFSParquetImporterConfig(String srcPath, String targetPath,
String tableName, String tableType, String rowKey, String partitionKey, int parallelism,
String schemaFile) {
private HDFSParquetImporter.Config getHDFSParquetImporterConfig(String srcPath, String targetPath, String tableName,
String tableType, String rowKey, String partitionKey, int parallelism, String schemaFile) {
HDFSParquetImporter.Config cfg = new HDFSParquetImporter.Config();
cfg.srcPath = srcPath;
cfg.targetPath = targetPath;

View File

@@ -77,8 +77,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
/**
* Basic tests against {@link HoodieDeltaStreamer}, by issuing bulk_inserts,
* upserts, inserts. Check counts at the end.
* Basic tests against {@link HoodieDeltaStreamer}, by issuing bulk_inserts, upserts, inserts. Check counts at the end.
*/
public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
@@ -124,8 +123,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
// Source schema is the target schema of upstream table
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/target.avsc");
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
UtilitiesTestBase.Helpers.savePropsToDFS(downstreamProps, dfs,
dfsBasePath + "/test-downstream-source.properties");
UtilitiesTestBase.Helpers.savePropsToDFS(downstreamProps, dfs, dfsBasePath + "/test-downstream-source.properties");
// Properties used for testing invalid key generator
TypedProperties invalidProps = new TypedProperties();
@@ -135,8 +133,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
invalidProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs,
dfsBasePath + "/" + PROPS_FILENAME_TEST_INVALID);
UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_INVALID);
}
@AfterClass
@@ -235,7 +232,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), datasetPath);
HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
log.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
int numCompactionCommits = (int)timeline.getInstants().count();
int numCompactionCommits = (int) timeline.getInstants().count();
assertTrue("Got=" + numCompactionCommits + ", exp >=" + minExpected, minExpected <= numCompactionCommits);
}
@@ -243,7 +240,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), datasetPath);
HoodieTimeline timeline = meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
log.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
int numDeltaCommits = (int)timeline.getInstants().count();
int numDeltaCommits = (int) timeline.getInstants().count();
assertTrue("Got=" + numDeltaCommits + ", exp >=" + minExpected, minExpected <= numDeltaCommits);
}
@@ -252,8 +249,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), datasetPath);
HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
HoodieInstant lastInstant = timeline.lastInstant().get();
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
timeline.getInstantDetails(lastInstant).get(), HoodieCommitMetadata.class);
HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(lastInstant).get(), HoodieCommitMetadata.class);
assertEquals(totalCommits, timeline.countInstants());
assertEquals(expected, commitMetadata.getMetadata(HoodieDeltaStreamer.CHECKPOINT_KEY));
return lastInstant.getTimestamp();
@@ -279,28 +276,25 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
@Test
public void testProps() throws IOException {
TypedProperties props = new DFSPropertiesConfiguration(
dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getConfig();
TypedProperties props =
new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getConfig();
assertEquals(2, props.getInteger("hoodie.upsert.shuffle.parallelism"));
assertEquals("_row_key", props.getString("hoodie.datasource.write.recordkey.field"));
assertEquals(
"org.apache.hudi.utilities.TestHoodieDeltaStreamer$TestGenerator",
props.getString("hoodie.datasource.write.keygenerator.class")
);
assertEquals("org.apache.hudi.utilities.TestHoodieDeltaStreamer$TestGenerator",
props.getString("hoodie.datasource.write.keygenerator.class"));
}
@Test
public void testPropsWithInvalidKeyGenerator() throws Exception {
try {
String datasetBasePath = dfsBasePath + "/test_dataset";
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(
datasetBasePath, Operation.BULK_INSERT, TripsWithDistanceTransformer.class.getName(),
PROPS_FILENAME_TEST_INVALID, false), jsc);
HoodieDeltaStreamer deltaStreamer =
new HoodieDeltaStreamer(TestHelpers.makeConfig(datasetBasePath, Operation.BULK_INSERT,
TripsWithDistanceTransformer.class.getName(), PROPS_FILENAME_TEST_INVALID, false), jsc);
deltaStreamer.sync();
fail("Should error out when setting the key generator class property to an invalid value");
} catch (IOException e) {
//expected
// expected
log.error("Expected error during getting the key generator", e);
assertTrue(e.getMessage().contains("Could not load key generator class"));
}
@@ -310,12 +304,12 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
public void testDatasetCreation() throws Exception {
try {
dfs.mkdirs(new Path(dfsBasePath + "/not_a_dataset"));
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(dfsBasePath + "/not_a_dataset", Operation.BULK_INSERT), jsc);
HoodieDeltaStreamer deltaStreamer =
new HoodieDeltaStreamer(TestHelpers.makeConfig(dfsBasePath + "/not_a_dataset", Operation.BULK_INSERT), jsc);
deltaStreamer.sync();
fail("Should error out when pointed out at a dir thats not a dataset");
} catch (DatasetNotFoundException e) {
//expected
// expected
log.error("Expected error during dataset creation", e);
}
}
@@ -395,11 +389,9 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
}
/**
* Test Bulk Insert and upserts with hive syncing. Tests Hudi incremental processing using a 2 step pipeline
* The first step involves using a SQL template to transform a source
* TEST-DATA-SOURCE ============================> HUDI TABLE 1 ===============> HUDI TABLE 2
* (incr-pull with transform) (incr-pull)
* Hudi Table 1 is synced with Hive.
* Test Bulk Insert and upserts with hive syncing. Tests Hudi incremental processing using a 2 step pipeline The first
* step involves using a SQL template to transform a source TEST-DATA-SOURCE ============================> HUDI TABLE
* 1 ===============> HUDI TABLE 2 (incr-pull with transform) (incr-pull) Hudi Table 1 is synced with Hive.
*/
@Test
public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() throws Exception {
@@ -467,12 +459,11 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
// Test Hive integration
HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(), dfs);
assertTrue("Table " + hiveSyncConfig.tableName + " should exist",
hiveClient.doesTableExist());
assertTrue("Table " + hiveSyncConfig.tableName + " should exist", hiveClient.doesTableExist());
assertEquals("Table partitions should match the number of partitions we wrote", 1,
hiveClient.scanTablePartitions().size());
assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES",
lastInstantForUpstreamTable, hiveClient.getLastCommitTimeSynced().get());
assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", lastInstantForUpstreamTable,
hiveClient.getLastCommitTimeSynced().get());
}
@Test
@@ -499,8 +490,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
// Test with empty commits
HoodieTableMetaClient mClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), datasetBasePath, true);
HoodieInstant lastFinished =
mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
HoodieInstant lastFinished = mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
HoodieDeltaStreamer.Config cfg2 = TestHelpers.makeDropAllConfig(datasetBasePath, Operation.UPSERT);
cfg2.filterDupes = true;
cfg2.sourceLimit = 2000;
@@ -509,14 +499,13 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg2, jsc);
ds2.sync();
mClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), datasetBasePath, true);
HoodieInstant newLastFinished =
mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
HoodieInstant newLastFinished = mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
Assert.assertTrue(HoodieTimeline.compareTimestamps(newLastFinished.getTimestamp(), lastFinished.getTimestamp(),
HoodieTimeline.GREATER));
// Ensure it is empty
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
mClient.getActiveTimeline().getInstantDetails(newLastFinished).get(), HoodieCommitMetadata.class);
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(mClient.getActiveTimeline().getInstantDetails(newLastFinished).get(), HoodieCommitMetadata.class);
System.out.println("New Commit Metadata=" + commitMetadata);
Assert.assertTrue(commitMetadata.getPartitionToWriteStats().isEmpty());
}
@@ -527,8 +516,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
props.setProperty(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, "1000");
props.setProperty(TestSourceConfig.NUM_SOURCE_PARTITIONS_PROP, "1");
props.setProperty(TestSourceConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS, "true");
DistributedTestDataSource distributedTestDataSource = new DistributedTestDataSource(props,
jsc, sparkSession, null);
DistributedTestDataSource distributedTestDataSource = new DistributedTestDataSource(props, jsc, sparkSession, null);
InputBatch<JavaRDD<GenericRecord>> batch = distributedTestDataSource.fetchNext(Option.empty(), 10000000);
batch.getBatch().get().cache();
long c = batch.getBatch().get().count();
@@ -542,13 +530,10 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
/**
* Taken from https://stackoverflow.com/questions/3694380/calculating-distance-between-two-points-using-latitude-
* longitude-what-am-i-doi
* Calculate distance between two points in latitude and longitude taking
* into account height difference. If you are not interested in height
* difference pass 0.0. Uses Haversine method as its base.
* longitude-what-am-i-doi Calculate distance between two points in latitude and longitude taking into account
* height difference. If you are not interested in height difference pass 0.0. Uses Haversine method as its base.
*
* lat1, lon1 Start point lat2, lon2 End point el1 Start altitude in meters
* el2 End altitude in meters
* lat1, lon1 Start point lat2, lon2 End point el1 Start altitude in meters el2 End altitude in meters
*
* @returns Distance in Meters
*/
@@ -559,9 +544,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
double latDistance = Math.toRadians(lat2 - lat1);
double lonDistance = Math.toRadians(lon2 - lon1);
double a = Math.sin(latDistance / 2) * Math.sin(latDistance / 2)
+ Math.cos(Math.toRadians(lat1)) * Math.cos(Math.toRadians(lat2))
* Math.sin(lonDistance / 2) * Math.sin(lonDistance / 2);
double a = Math.sin(latDistance / 2) * Math.sin(latDistance / 2) + Math.cos(Math.toRadians(lat1))
* Math.cos(Math.toRadians(lat2)) * Math.sin(lonDistance / 2) * Math.sin(lonDistance / 2);
double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
double distance = R * c * 1000; // convert to meters
@@ -579,12 +563,11 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
public static class TripsWithDistanceTransformer implements Transformer {
@Override
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession,
Dataset<Row> rowDataset, TypedProperties properties) {
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
TypedProperties properties) {
rowDataset.sqlContext().udf().register("distance_udf", new DistanceUDF(), DataTypes.DoubleType);
return rowDataset.withColumn("haversine_distance",
functions.callUDF("distance_udf", functions.col("begin_lat"),
functions.col("end_lat"), functions.col("begin_lon"), functions.col("end_lat")));
return rowDataset.withColumn("haversine_distance", functions.callUDF("distance_udf", functions.col("begin_lat"),
functions.col("end_lat"), functions.col("begin_lon"), functions.col("end_lat")));
}
}

View File

@@ -77,8 +77,8 @@ public class TestHoodieSnapshotCopier extends HoodieCommonTestHarness {
assertFalse(fs.exists(new Path(outputPath + "/_SUCCESS")));
}
//TODO - uncomment this after fixing test failures
//@Test
// TODO - uncomment this after fixing test failures
// @Test
public void testSnapshotCopy() throws Exception {
// Generate some commits and corresponding parquets
String commitTime1 = "20160501010101";
@@ -95,40 +95,30 @@ public class TestHoodieSnapshotCopier extends HoodieCommonTestHarness {
new File(basePath + "/2016/05/01/").mkdirs();
new File(basePath + "/2016/05/02/").mkdirs();
new File(basePath + "/2016/05/06/").mkdirs();
HoodieTestDataGenerator
.writePartitionMetadata(fs, new String[] {"2016/05/01", "2016/05/02", "2016/05/06"},
basePath);
HoodieTestDataGenerator.writePartitionMetadata(fs, new String[] {"2016/05/01", "2016/05/02", "2016/05/06"},
basePath);
// Make commit1
File file11 = new File(
basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id11"));
File file11 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id11"));
file11.createNewFile();
File file12 = new File(
basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id12"));
File file12 = new File(basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id12"));
file12.createNewFile();
File file13 = new File(
basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id13"));
File file13 = new File(basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id13"));
file13.createNewFile();
// Make commit2
File file21 = new File(
basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id21"));
File file21 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id21"));
file21.createNewFile();
File file22 = new File(
basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id22"));
File file22 = new File(basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id22"));
file22.createNewFile();
File file23 = new File(
basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id23"));
File file23 = new File(basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id23"));
file23.createNewFile();
// Make commit3
File file31 = new File(
basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id31"));
File file31 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id31"));
file31.createNewFile();
File file32 = new File(
basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id32"));
File file32 = new File(basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id32"));
file32.createNewFile();
File file33 = new File(
basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id33"));
File file33 = new File(basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id33"));
file33.createNewFile();
// Do a snapshot copy

View File

@@ -110,6 +110,7 @@ public class UtilitiesTestBase {
/**
* Helper to get hive sync config
*
* @param basePath
* @param tableName
* @return
@@ -130,6 +131,7 @@ public class UtilitiesTestBase {
/**
* Initialize Hive DB
*
* @throws IOException
*/
private static void clearHiveDb() throws IOException {
@@ -151,8 +153,8 @@ public class UtilitiesTestBase {
private static ClassLoader classLoader = Helpers.class.getClassLoader();
public static void copyToDFS(String testResourcePath, FileSystem fs, String targetPath) throws IOException {
BufferedReader reader = new BufferedReader(
new InputStreamReader(classLoader.getResourceAsStream(testResourcePath)));
BufferedReader reader =
new BufferedReader(new InputStreamReader(classLoader.getResourceAsStream(testResourcePath)));
PrintStream os = new PrintStream(fs.create(new Path(targetPath), true));
String line;
while ((line = reader.readLine()) != null) {

View File

@@ -69,16 +69,15 @@ public abstract class AbstractBaseTestSource extends AvroSource {
dataGeneratorMap.clear();
}
protected AbstractBaseTestSource(TypedProperties props,
JavaSparkContext sparkContext, SparkSession sparkSession,
protected AbstractBaseTestSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
}
protected static Stream<GenericRecord> fetchNextBatch(TypedProperties props, int sourceLimit, String commitTime,
int partition) {
int maxUniqueKeys = props.getInteger(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP,
TestSourceConfig.DEFAULT_MAX_UNIQUE_RECORDS);
int maxUniqueKeys =
props.getInteger(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, TestSourceConfig.DEFAULT_MAX_UNIQUE_RECORDS);
HoodieTestDataGenerator dataGenerator = dataGeneratorMap.get(partition);

View File

@@ -37,12 +37,11 @@ public class DistributedTestDataSource extends AbstractBaseTestSource {
private final int numTestSourcePartitions;
public DistributedTestDataSource(TypedProperties props,
JavaSparkContext sparkContext, SparkSession sparkSession,
public DistributedTestDataSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
this.numTestSourcePartitions = props.getInteger(TestSourceConfig.NUM_SOURCE_PARTITIONS_PROP,
TestSourceConfig.DEFAULT_NUM_SOURCE_PARTITIONS);
this.numTestSourcePartitions =
props.getInteger(TestSourceConfig.NUM_SOURCE_PARTITIONS_PROP, TestSourceConfig.DEFAULT_NUM_SOURCE_PARTITIONS);
}
@Override
@@ -60,20 +59,21 @@ public class DistributedTestDataSource extends AbstractBaseTestSource {
newProps.putAll(props);
// Set the maxUniqueRecords per partition for TestDataSource
int maxUniqueRecords = props.getInteger(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP,
TestSourceConfig.DEFAULT_MAX_UNIQUE_RECORDS);
int maxUniqueRecords =
props.getInteger(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, TestSourceConfig.DEFAULT_MAX_UNIQUE_RECORDS);
String maxUniqueRecordsPerPartition = String.valueOf(Math.max(1, maxUniqueRecords / numTestSourcePartitions));
newProps.setProperty(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, maxUniqueRecordsPerPartition);
int perPartitionSourceLimit = Math.max(1, (int) (sourceLimit / numTestSourcePartitions));
JavaRDD<GenericRecord> avroRDD = sparkContext.parallelize(IntStream.range(0, numTestSourcePartitions).boxed()
.collect(Collectors.toList()), numTestSourcePartitions).mapPartitionsWithIndex((p, idx) -> {
log.info("Initializing source with newProps=" + newProps);
if (!dataGeneratorMap.containsKey(p)) {
initDataGen(newProps, p);
}
Iterator<GenericRecord> itr = fetchNextBatch(newProps, perPartitionSourceLimit, commitTime, p).iterator();
return itr;
}, true);
JavaRDD<GenericRecord> avroRDD =
sparkContext.parallelize(IntStream.range(0, numTestSourcePartitions).boxed().collect(Collectors.toList()),
numTestSourcePartitions).mapPartitionsWithIndex((p, idx) -> {
log.info("Initializing source with newProps=" + newProps);
if (!dataGeneratorMap.containsKey(p)) {
initDataGen(newProps, p);
}
Iterator<GenericRecord> itr = fetchNextBatch(newProps, perPartitionSourceLimit, commitTime, p).iterator();
return itr;
}, true);
return new InputBatch<>(Option.of(avroRDD), commitTime);
}
}

View File

@@ -80,16 +80,13 @@ public class TestDFSSource extends UtilitiesTestBase {
// 1. Extract without any checkpoint => get all the data, respecting sourceLimit
assertEquals(Option.empty(), jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
UtilitiesTestBase.Helpers.saveStringsToDFS(
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 100)), dfs,
UtilitiesTestBase.Helpers.saveStringsToDFS(Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 100)), dfs,
dfsBasePath + "/jsonFiles/1.json");
assertEquals(Option.empty(), jsonSource.fetchNewDataInAvroFormat(Option.empty(), 10).getBatch());
InputBatch<JavaRDD<GenericRecord>> fetch1 =
jsonSource.fetchNewDataInAvroFormat(Option.empty(), 1000000);
InputBatch<JavaRDD<GenericRecord>> fetch1 = jsonSource.fetchNewDataInAvroFormat(Option.empty(), 1000000);
assertEquals(100, fetch1.getBatch().get().count());
// Test json -> Row format
InputBatch<Dataset<Row>> fetch1AsRows =
jsonSource.fetchNewDataInRowFormat(Option.empty(), 1000000);
InputBatch<Dataset<Row>> fetch1AsRows = jsonSource.fetchNewDataInRowFormat(Option.empty(), 1000000);
assertEquals(100, fetch1AsRows.getBatch().get().count());
// Test Avro -> Row format
Dataset<Row> fetch1Rows = AvroConversionUtils.createDataFrame(JavaRDD.toRDD(fetch1.getBatch().get()),
@@ -97,16 +94,15 @@ public class TestDFSSource extends UtilitiesTestBase {
assertEquals(100, fetch1Rows.count());
// 2. Produce new data, extract new data
UtilitiesTestBase.Helpers.saveStringsToDFS(
Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 10000)),
dfs, dfsBasePath + "/jsonFiles/2.json");
InputBatch<Dataset<Row>> fetch2 = jsonSource.fetchNewDataInRowFormat(
Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
UtilitiesTestBase.Helpers.saveStringsToDFS(Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 10000)), dfs,
dfsBasePath + "/jsonFiles/2.json");
InputBatch<Dataset<Row>> fetch2 =
jsonSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(10000, fetch2.getBatch().get().count());
// 3. Extract with previous checkpoint => gives same data back (idempotent)
InputBatch<Dataset<Row>> fetch3 = jsonSource.fetchNewDataInRowFormat(
Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
InputBatch<Dataset<Row>> fetch3 =
jsonSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(10000, fetch3.getBatch().get().count());
assertEquals(fetch2.getCheckpointForNextBatch(), fetch3.getCheckpointForNextBatch());
fetch3.getBatch().get().registerTempTable("test_dfs_table");
@@ -114,8 +110,8 @@ public class TestDFSSource extends UtilitiesTestBase {
assertEquals(10000, rowDataset.count());
// 4. Extract with latest checkpoint => no new data returned
InputBatch<JavaRDD<GenericRecord>> fetch4 = jsonSource.fetchNewDataInAvroFormat(
Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
InputBatch<JavaRDD<GenericRecord>> fetch4 =
jsonSource.fetchNewDataInAvroFormat(Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(Option.empty(), fetch4.getBatch());
}
}

View File

@@ -44,8 +44,7 @@ public class TestDataSource extends AbstractBaseTestSource {
}
@Override
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCheckpointStr,
long sourceLimit) {
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
int nextCommitNum = lastCheckpointStr.map(s -> Integer.parseInt(s) + 1).orElse(0);
String commitTime = String.format("%05d", nextCommitNum);
@@ -56,8 +55,8 @@ public class TestDataSource extends AbstractBaseTestSource {
return new InputBatch<>(Option.empty(), lastCheckpointStr.orElse(null));
}
List<GenericRecord> records = fetchNextBatch(props, (int)sourceLimit, commitTime, DEFAULT_PARTITION_NUM)
.collect(Collectors.toList());
List<GenericRecord> records =
fetchNextBatch(props, (int) sourceLimit, commitTime, DEFAULT_PARTITION_NUM).collect(Collectors.toList());
JavaRDD<GenericRecord> avroRDD = sparkContext.<GenericRecord>parallelize(records, 4);
return new InputBatch<>(Option.of(avroRDD), commitTime);
}

View File

@@ -107,13 +107,13 @@ public class TestKafkaSource extends UtilitiesTestBase {
// 2. Produce new data, extract new data
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
InputBatch<Dataset<Row>> fetch2 = kafkaSource.fetchNewDataInRowFormat(
Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
InputBatch<Dataset<Row>> fetch2 =
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(1100, fetch2.getBatch().get().count());
// 3. Extract with previous checkpoint => gives same data back (idempotent)
InputBatch<JavaRDD<GenericRecord>> fetch3 = kafkaSource.fetchNewDataInAvroFormat(
Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
InputBatch<JavaRDD<GenericRecord>> fetch3 =
kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(fetch2.getBatch().get().count(), fetch3.getBatch().get().count());
assertEquals(fetch2.getCheckpointForNextBatch(), fetch3.getCheckpointForNextBatch());
// Same using Row API
@@ -123,8 +123,8 @@ public class TestKafkaSource extends UtilitiesTestBase {
assertEquals(fetch2.getCheckpointForNextBatch(), fetch3AsRows.getCheckpointForNextBatch());
// 4. Extract with latest checkpoint => no new data returned
InputBatch<JavaRDD<GenericRecord>> fetch4 = kafkaSource.fetchNewDataInAvroFormat(
Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
InputBatch<JavaRDD<GenericRecord>> fetch4 =
kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(Option.empty(), fetch4.getBatch());
// Same using Row API
InputBatch<Dataset<Row>> fetch4AsRows =
@@ -144,26 +144,19 @@ public class TestKafkaSource extends UtilitiesTestBase {
@Test
public void testComputeOffsetRanges() {
// test totalNewMessages()
long totalMsgs = CheckpointUtils.totalNewMessages(new OffsetRange[]{
OffsetRange.apply(TEST_TOPIC_NAME, 0, 0, 100),
OffsetRange.apply(TEST_TOPIC_NAME, 0, 100, 200)
});
long totalMsgs = CheckpointUtils.totalNewMessages(new OffsetRange[] {OffsetRange.apply(TEST_TOPIC_NAME, 0, 0, 100),
OffsetRange.apply(TEST_TOPIC_NAME, 0, 100, 200)});
assertEquals(200, totalMsgs);
// should consume all the full data
OffsetRange[] ranges = CheckpointUtils.computeOffsetRanges(
makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}),
makeOffsetMap(new int[]{0, 1}, new long[]{300000, 350000}),
1000000L
);
OffsetRange[] ranges =
CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, new long[] {200000, 250000}),
makeOffsetMap(new int[] {0, 1}, new long[] {300000, 350000}), 1000000L);
assertEquals(200000, CheckpointUtils.totalNewMessages(ranges));
// should only consume upto limit
ranges = CheckpointUtils.computeOffsetRanges(
makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}),
makeOffsetMap(new int[]{0, 1}, new long[]{300000, 350000}),
10000
);
ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, new long[] {200000, 250000}),
makeOffsetMap(new int[] {0, 1}, new long[] {300000, 350000}), 10000);
assertEquals(10000, CheckpointUtils.totalNewMessages(ranges));
assertEquals(200000, ranges[0].fromOffset());
assertEquals(205000, ranges[0].untilOffset());
@@ -171,30 +164,21 @@ public class TestKafkaSource extends UtilitiesTestBase {
assertEquals(255000, ranges[1].untilOffset());
// should also consume from new partitions.
ranges = CheckpointUtils.computeOffsetRanges(
makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}),
makeOffsetMap(new int[]{0, 1, 2}, new long[]{300000, 350000, 100000}),
1000000L
);
ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, new long[] {200000, 250000}),
makeOffsetMap(new int[] {0, 1, 2}, new long[] {300000, 350000, 100000}), 1000000L);
assertEquals(300000, CheckpointUtils.totalNewMessages(ranges));
assertEquals(3, ranges.length);
// for skewed offsets, does not starve any partition & can catch up
ranges = CheckpointUtils.computeOffsetRanges(
makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}),
makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010, 350000, 10000}),
100000
);
ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, new long[] {200000, 250000}),
makeOffsetMap(new int[] {0, 1, 2}, new long[] {200010, 350000, 10000}), 100000);
assertEquals(100000, CheckpointUtils.totalNewMessages(ranges));
assertEquals(10, ranges[0].count());
assertEquals(89990, ranges[1].count());
assertEquals(10000, ranges[2].count());
ranges = CheckpointUtils.computeOffsetRanges(
makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}),
makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010, 350000, 10000}),
1000000
);
ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, new long[] {200000, 250000}),
makeOffsetMap(new int[] {0, 1, 2}, new long[] {200010, 350000, 10000}), 1000000);
assertEquals(110010, CheckpointUtils.totalNewMessages(ranges));
assertEquals(10, ranges[0].count());
assertEquals(100000, ranges[1].count());

View File

@@ -21,7 +21,7 @@ package org.apache.hudi.utilities.sources.config;
/**
* Configurations for Test Data Sources
*/
public class TestSourceConfig {
public class TestSourceConfig {
// Used by DistributedTestDataSource only. Number of partitions where each partitions generates test-data
public static final String NUM_SOURCE_PARTITIONS_PROP = "hoodie.deltastreamer.source.test.num_partitions";