[HUDI-65] commitTime rename to instantTime (#1431)
This commit is contained in:
@@ -132,8 +132,8 @@ public class HoodieSnapshotCopier implements Serializable {
|
||||
if (commitFilePath.getName().equals(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) {
|
||||
return true;
|
||||
} else {
|
||||
String commitTime = FSUtils.getCommitFromCommitFile(commitFilePath.getName());
|
||||
return HoodieTimeline.compareTimestamps(commitTime, latestCommitTimestamp,
|
||||
String instantTime = FSUtils.getCommitFromCommitFile(commitFilePath.getName());
|
||||
return HoodieTimeline.compareTimestamps(instantTime, latestCommitTimestamp,
|
||||
HoodieTimeline.LESSER_OR_EQUAL);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -215,8 +215,8 @@ public class HoodieSnapshotExporter {
|
||||
if (commitFilePath.getName().equals(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) {
|
||||
return true;
|
||||
} else {
|
||||
String commitTime = FSUtils.getCommitFromCommitFile(commitFilePath.getName());
|
||||
return HoodieTimeline.compareTimestamps(commitTime, latestCommitTimestamp,
|
||||
String instantTime = FSUtils.getCommitFromCommitFile(commitFilePath.getName());
|
||||
return HoodieTimeline.compareTimestamps(instantTime, latestCommitTimestamp,
|
||||
HoodieTimeline.LESSER_OR_EQUAL);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -361,16 +361,16 @@ public class DeltaSync implements Serializable {
|
||||
|
||||
boolean isEmpty = records.isEmpty();
|
||||
|
||||
String commitTime = startCommit();
|
||||
LOG.info("Starting commit : " + commitTime);
|
||||
String instantTime = startCommit();
|
||||
LOG.info("Starting commit : " + instantTime);
|
||||
|
||||
JavaRDD<WriteStatus> writeStatusRDD;
|
||||
if (cfg.operation == Operation.INSERT) {
|
||||
writeStatusRDD = writeClient.insert(records, commitTime);
|
||||
writeStatusRDD = writeClient.insert(records, instantTime);
|
||||
} else if (cfg.operation == Operation.UPSERT) {
|
||||
writeStatusRDD = writeClient.upsert(records, commitTime);
|
||||
writeStatusRDD = writeClient.upsert(records, instantTime);
|
||||
} else if (cfg.operation == Operation.BULK_INSERT) {
|
||||
writeStatusRDD = writeClient.bulkInsert(records, commitTime);
|
||||
writeStatusRDD = writeClient.bulkInsert(records, instantTime);
|
||||
} else {
|
||||
throw new HoodieDeltaStreamerException("Unknown operation :" + cfg.operation);
|
||||
}
|
||||
@@ -391,9 +391,9 @@ public class DeltaSync implements Serializable {
|
||||
+ totalErrorRecords + "/" + totalRecords);
|
||||
}
|
||||
|
||||
boolean success = writeClient.commit(commitTime, writeStatusRDD, Option.of(checkpointCommitMetadata));
|
||||
boolean success = writeClient.commit(instantTime, writeStatusRDD, Option.of(checkpointCommitMetadata));
|
||||
if (success) {
|
||||
LOG.info("Commit " + commitTime + " successful!");
|
||||
LOG.info("Commit " + instantTime + " successful!");
|
||||
|
||||
// Schedule compaction if needed
|
||||
if (cfg.isAsyncCompactionEnabled()) {
|
||||
@@ -407,8 +407,8 @@ public class DeltaSync implements Serializable {
|
||||
hiveSyncTimeMs = hiveSyncContext != null ? hiveSyncContext.stop() : 0;
|
||||
}
|
||||
} else {
|
||||
LOG.info("Commit " + commitTime + " failed!");
|
||||
throw new HoodieException("Commit " + commitTime + " failed!");
|
||||
LOG.info("Commit " + instantTime + " failed!");
|
||||
throw new HoodieException("Commit " + instantTime + " failed!");
|
||||
}
|
||||
} else {
|
||||
LOG.error("Delta Sync found errors when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);
|
||||
@@ -420,8 +420,8 @@ public class DeltaSync implements Serializable {
|
||||
}
|
||||
});
|
||||
// Rolling back instant
|
||||
writeClient.rollback(commitTime);
|
||||
throw new HoodieException("Commit " + commitTime + " failed and rolled-back !");
|
||||
writeClient.rollback(instantTime);
|
||||
throw new HoodieException("Commit " + instantTime + " failed and rolled-back !");
|
||||
}
|
||||
long overallTimeMs = overallTimerContext != null ? overallTimerContext.stop() : 0;
|
||||
|
||||
|
||||
@@ -102,10 +102,10 @@ public class HiveIncrPullSource extends AvroSource {
|
||||
return Option.of(commitTimes.get(0));
|
||||
}
|
||||
|
||||
for (String commitTime : commitTimes) {
|
||||
for (String instantTime : commitTimes) {
|
||||
// TODO(vc): Add an option to delete consumed commits
|
||||
if (commitTime.compareTo(latestTargetCommit.get()) > 0) {
|
||||
return Option.of(commitTime);
|
||||
if (instantTime.compareTo(latestTargetCommit.get()) > 0) {
|
||||
return Option.of(instantTime);
|
||||
}
|
||||
}
|
||||
return Option.empty();
|
||||
|
||||
@@ -80,7 +80,7 @@ public abstract class AbstractBaseTestSource extends AvroSource {
|
||||
super(props, sparkContext, sparkSession, schemaProvider);
|
||||
}
|
||||
|
||||
protected static Stream<GenericRecord> fetchNextBatch(TypedProperties props, int sourceLimit, String commitTime,
|
||||
protected static Stream<GenericRecord> fetchNextBatch(TypedProperties props, int sourceLimit, String instantTime,
|
||||
int partition) {
|
||||
int maxUniqueKeys =
|
||||
props.getInteger(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, TestSourceConfig.DEFAULT_MAX_UNIQUE_RECORDS);
|
||||
@@ -116,14 +116,14 @@ public abstract class AbstractBaseTestSource extends AvroSource {
|
||||
LOG.info("After adjustments => NumInserts=" + numInserts + ", NumUpdates=" + (numUpdates - 50) + ", NumDeletes=50, maxUniqueRecords="
|
||||
+ maxUniqueKeys);
|
||||
// if we generate update followed by deletes -> some keys in update batch might be picked up for deletes. Hence generating delete batch followed by updates
|
||||
deleteStream = dataGenerator.generateUniqueDeleteRecordStream(commitTime, 50).map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
|
||||
updateStream = dataGenerator.generateUniqueUpdatesStream(commitTime, numUpdates - 50).map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
|
||||
deleteStream = dataGenerator.generateUniqueDeleteRecordStream(instantTime, 50).map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
|
||||
updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime, numUpdates - 50).map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
|
||||
} else {
|
||||
LOG.info("After adjustments => NumInserts=" + numInserts + ", NumUpdates=" + numUpdates + ", maxUniqueRecords=" + maxUniqueKeys);
|
||||
updateStream = dataGenerator.generateUniqueUpdatesStream(commitTime, numUpdates)
|
||||
updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime, numUpdates)
|
||||
.map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
|
||||
}
|
||||
Stream<GenericRecord> insertStream = dataGenerator.generateInsertsStream(commitTime, numInserts, false)
|
||||
Stream<GenericRecord> insertStream = dataGenerator.generateInsertsStream(instantTime, numInserts, false)
|
||||
.map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
|
||||
return Stream.concat(deleteStream, Stream.concat(updateStream, insertStream));
|
||||
}
|
||||
|
||||
@@ -99,14 +99,14 @@ public abstract class AbstractDFSSourceTestBase extends UtilitiesTestBase {
|
||||
* Generates a batch of test data and writes the data to a file.
|
||||
*
|
||||
* @param filename The name of the file.
|
||||
* @param commitTime The commit time.
|
||||
* @param instantTime The commit time.
|
||||
* @param n The number of records to generate.
|
||||
* @return The file path.
|
||||
* @throws IOException
|
||||
*/
|
||||
Path generateOneFile(String filename, String commitTime, int n) throws IOException {
|
||||
Path generateOneFile(String filename, String instantTime, int n) throws IOException {
|
||||
Path path = new Path(dfsRoot, filename + fileSuffix);
|
||||
writeNewDataToFile(dataGenerator.generateInserts(commitTime, n, useFlattenedSchema), path);
|
||||
writeNewDataToFile(dataGenerator.generateInserts(instantTime, n, useFlattenedSchema), path);
|
||||
return path;
|
||||
}
|
||||
|
||||
|
||||
@@ -52,12 +52,12 @@ public class DistributedTestDataSource extends AbstractBaseTestSource {
|
||||
@Override
|
||||
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) {
|
||||
int nextCommitNum = lastCkptStr.map(s -> Integer.parseInt(s) + 1).orElse(0);
|
||||
String commitTime = String.format("%05d", nextCommitNum);
|
||||
String instantTime = String.format("%05d", nextCommitNum);
|
||||
LOG.info("Source Limit is set to " + sourceLimit);
|
||||
|
||||
// No new data.
|
||||
if (sourceLimit <= 0) {
|
||||
return new InputBatch<>(Option.empty(), commitTime);
|
||||
return new InputBatch<>(Option.empty(), instantTime);
|
||||
}
|
||||
|
||||
TypedProperties newProps = new TypedProperties();
|
||||
@@ -76,8 +76,8 @@ public class DistributedTestDataSource extends AbstractBaseTestSource {
|
||||
if (!dataGeneratorMap.containsKey(p)) {
|
||||
initDataGen(newProps, p);
|
||||
}
|
||||
return fetchNextBatch(newProps, perPartitionSourceLimit, commitTime, p).iterator();
|
||||
return fetchNextBatch(newProps, perPartitionSourceLimit, instantTime, p).iterator();
|
||||
}, true);
|
||||
return new InputBatch<>(Option.of(avroRDD), commitTime);
|
||||
return new InputBatch<>(Option.of(avroRDD), instantTime);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,7 +49,7 @@ public class TestDataSource extends AbstractBaseTestSource {
|
||||
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
|
||||
|
||||
int nextCommitNum = lastCheckpointStr.map(s -> Integer.parseInt(s) + 1).orElse(0);
|
||||
String commitTime = String.format("%05d", nextCommitNum);
|
||||
String instantTime = String.format("%05d", nextCommitNum);
|
||||
LOG.info("Source Limit is set to " + sourceLimit);
|
||||
|
||||
// No new data.
|
||||
@@ -58,8 +58,8 @@ public class TestDataSource extends AbstractBaseTestSource {
|
||||
}
|
||||
|
||||
List<GenericRecord> records =
|
||||
fetchNextBatch(props, (int) sourceLimit, commitTime, DEFAULT_PARTITION_NUM).collect(Collectors.toList());
|
||||
fetchNextBatch(props, (int) sourceLimit, instantTime, DEFAULT_PARTITION_NUM).collect(Collectors.toList());
|
||||
JavaRDD<GenericRecord> avroRDD = sparkContext.<GenericRecord>parallelize(records, 4);
|
||||
return new InputBatch<>(Option.of(avroRDD), commitTime);
|
||||
return new InputBatch<>(Option.of(avroRDD), instantTime);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user