Implement Merge on Read Storage (#76)
1. Create HoodieTable abstraction for commits and fileSystemView 2. HoodieMergeOnReadTable created 3. View is now always obtained from the table and the correct view based on the table type is returned
This commit is contained in:
committed by
Prasanna Rajaperumal
parent
11d2fd3428
commit
eb46e7c72b
@@ -128,9 +128,11 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
* @return A subset of hoodieRecords RDD, with existing records filtered out.
|
||||
*/
|
||||
public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> hoodieRecords) {
|
||||
HoodieTableMetaClient metaClient =
|
||||
new HoodieTableMetaClient(fs, config.getBasePath(), true);
|
||||
JavaRDD<HoodieRecord<T>> recordsWithLocation = index.tagLocation(hoodieRecords, metaClient);
|
||||
// Create a Hoodie table which encapsulated the commits and files visible
|
||||
HoodieTable<T> table = HoodieTable
|
||||
.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config);
|
||||
|
||||
JavaRDD<HoodieRecord<T>> recordsWithLocation = index.tagLocation(hoodieRecords, table);
|
||||
return recordsWithLocation.filter(new Function<HoodieRecord<T>, Boolean>() {
|
||||
@Override
|
||||
public Boolean call(HoodieRecord<T> v1) throws Exception {
|
||||
@@ -144,11 +146,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
*/
|
||||
public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
|
||||
writeContext = metrics.getCommitCtx();
|
||||
HoodieTableMetaClient metaClient =
|
||||
new HoodieTableMetaClient(fs, config.getBasePath(), true);
|
||||
|
||||
final HoodieTable table =
|
||||
HoodieTable.getHoodieTable(metaClient, commitTime, config);
|
||||
// Create a Hoodie table which encapsulated the commits and files visible
|
||||
HoodieTable<T> table = HoodieTable
|
||||
.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config);
|
||||
|
||||
try {
|
||||
// De-dupe/merge if needed
|
||||
@@ -157,15 +157,14 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
config.getUpsertShuffleParallelism());
|
||||
|
||||
// perform index loop up to get existing location of records
|
||||
JavaRDD<HoodieRecord<T>> taggedRecords = index.tagLocation(dedupedRecords, metaClient);
|
||||
JavaRDD<HoodieRecord<T>> taggedRecords = index.tagLocation(dedupedRecords, table);
|
||||
|
||||
// Cache the tagged records, so we don't end up computing both
|
||||
taggedRecords.persist(StorageLevel.MEMORY_AND_DISK_SER());
|
||||
|
||||
|
||||
WorkloadProfile profile = null;
|
||||
if (table.isWorkloadProfileNeeded()) {
|
||||
profile = new WorkloadProfile(taggedRecords);
|
||||
profile = new WorkloadProfile<>(taggedRecords);
|
||||
logger.info("Workload profile :" + profile);
|
||||
}
|
||||
|
||||
@@ -196,7 +195,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
@Override
|
||||
public Iterator<List<WriteStatus>> call(Integer partition,
|
||||
Iterator<HoodieRecord<T>> recordItr) throws Exception {
|
||||
return table.handleUpsertPartition(partition, recordItr, upsertPartitioner);
|
||||
return table.handleUpsertPartition(commitTime, partition, recordItr,
|
||||
upsertPartitioner);
|
||||
}
|
||||
}, true).flatMap(new FlatMapFunction<List<WriteStatus>, WriteStatus>() {
|
||||
@Override
|
||||
@@ -207,7 +207,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
});
|
||||
|
||||
// Update the index back.
|
||||
JavaRDD<WriteStatus> resultRDD = index.updateLocation(upsertStatusRDD, metaClient);
|
||||
JavaRDD<WriteStatus> resultRDD = index.updateLocation(upsertStatusRDD, table);
|
||||
resultRDD = resultRDD.persist(config.getWriteStatusStorageLevel());
|
||||
commitOnAutoCommit(commitTime, resultRDD);
|
||||
return resultRDD;
|
||||
@@ -250,8 +250,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
*/
|
||||
public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
|
||||
writeContext = metrics.getCommitCtx();
|
||||
HoodieTableMetaClient metaClient =
|
||||
new HoodieTableMetaClient(fs, config.getBasePath(), true);
|
||||
// Create a Hoodie table which encapsulated the commits and files visible
|
||||
HoodieTable<T> table = HoodieTable
|
||||
.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config);
|
||||
|
||||
try {
|
||||
// De-dupe/merge if needed
|
||||
@@ -272,7 +273,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
}
|
||||
}, true, config.getInsertShuffleParallelism());
|
||||
JavaRDD<WriteStatus> writeStatusRDD = sortedRecords
|
||||
.mapPartitionsWithIndex(new InsertMapFunction<T>(commitTime, config, metaClient),
|
||||
.mapPartitionsWithIndex(new InsertMapFunction<T>(commitTime, config, table),
|
||||
true).flatMap(new FlatMapFunction<List<WriteStatus>, WriteStatus>() {
|
||||
@Override
|
||||
public Iterable<WriteStatus> call(List<WriteStatus> writeStatuses)
|
||||
@@ -281,7 +282,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
}
|
||||
});
|
||||
// Update the index back
|
||||
JavaRDD<WriteStatus> statuses = index.updateLocation(writeStatusRDD, metaClient);
|
||||
JavaRDD<WriteStatus> statuses = index.updateLocation(writeStatusRDD, table);
|
||||
// Trigger the insert and collect statuses
|
||||
statuses = statuses.persist(config.getWriteStatusStorageLevel());
|
||||
commitOnAutoCommit(commitTime, statuses);
|
||||
@@ -299,9 +300,11 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
*/
|
||||
public boolean commit(String commitTime, JavaRDD<WriteStatus> writeStatuses) {
|
||||
logger.info("Comitting " + commitTime);
|
||||
HoodieTableMetaClient metaClient =
|
||||
new HoodieTableMetaClient(fs, config.getBasePath(), true);
|
||||
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
|
||||
// Create a Hoodie table which encapsulated the commits and files visible
|
||||
HoodieTable<T> table = HoodieTable
|
||||
.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config);
|
||||
|
||||
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
|
||||
|
||||
List<Tuple2<String, HoodieWriteStat>> stats =
|
||||
writeStatuses.mapToPair(new PairFunction<WriteStatus, String, HoodieWriteStat>() {
|
||||
@@ -318,8 +321,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
}
|
||||
|
||||
try {
|
||||
String actionType = table.getCommitActionType();
|
||||
activeTimeline.saveAsComplete(
|
||||
new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime),
|
||||
new HoodieInstant(true, actionType, commitTime),
|
||||
Optional.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
||||
// Save was a success
|
||||
// We cannot have unbounded commit files. Archive commits if we have to archive
|
||||
@@ -333,7 +337,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
metadata);
|
||||
writeContext = null;
|
||||
}
|
||||
logger.info("Status of the commit " + commitTime);
|
||||
logger.info("Committed " + commitTime);
|
||||
} catch (IOException e) {
|
||||
throw new HoodieCommitException(
|
||||
"Failed to commit " + config.getBasePath() + " at time " + commitTime, e);
|
||||
@@ -355,9 +359,10 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
*/
|
||||
public boolean rollback(final String commitTime) throws HoodieRollbackException {
|
||||
final Timer.Context context = metrics.getRollbackCtx();
|
||||
HoodieTableMetaClient metaClient =
|
||||
new HoodieTableMetaClient(fs, config.getBasePath(), true);
|
||||
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
|
||||
// Create a Hoodie table which encapsulated the commits and files visible
|
||||
HoodieTable<T> table = HoodieTable
|
||||
.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config);
|
||||
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
|
||||
HoodieTimeline inflightTimeline = activeTimeline.getCommitTimeline().filterInflights();
|
||||
HoodieTimeline commitTimeline = activeTimeline.getCommitTimeline().filterCompletedInstants();
|
||||
|
||||
@@ -394,7 +399,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
// 3. Delete the new generated parquet files
|
||||
logger.info("Clean out all parquet files generated at time: " + commitTime);
|
||||
final Accumulator<Integer> numFilesDeletedAccu = jsc.accumulator(0);
|
||||
jsc.parallelize(FSUtils.getAllPartitionPaths(fs, metaClient.getBasePath()))
|
||||
jsc.parallelize(
|
||||
FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath()))
|
||||
.foreach(new VoidFunction<String>() {
|
||||
@Override
|
||||
public void call(String partitionPath) throws Exception {
|
||||
@@ -450,10 +456,11 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
try {
|
||||
logger.info("Cleaner started");
|
||||
final Timer.Context context = metrics.getCleanCtx();
|
||||
HoodieTableMetaClient metaClient =
|
||||
new HoodieTableMetaClient(fs, config.getBasePath(), true);
|
||||
// Create a Hoodie table which encapsulated the commits and files visible
|
||||
HoodieTable<T> table = HoodieTable
|
||||
.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config);
|
||||
|
||||
List<String> partitionsToClean = FSUtils.getAllPartitionPaths(fs, metaClient.getBasePath());
|
||||
List<String> partitionsToClean = FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath());
|
||||
// shuffle to distribute cleaning work across partitions evenly
|
||||
Collections.shuffle(partitionsToClean);
|
||||
logger.info("Partitions to clean up : " + partitionsToClean + ", with policy " + config.getCleanerPolicy());
|
||||
@@ -468,7 +475,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
@Override
|
||||
public Integer call(String partitionPathToClean) throws Exception {
|
||||
FileSystem fs = FSUtils.getFs();
|
||||
HoodieCleaner cleaner = new HoodieCleaner(metaClient, config, fs);
|
||||
HoodieCleaner cleaner = new HoodieCleaner(table, config);
|
||||
return cleaner.clean(partitionPathToClean);
|
||||
}
|
||||
}).reduce(new Function2<Integer, Integer, Integer>() {
|
||||
|
||||
Reference in New Issue
Block a user