[HUDI-322] DeltaSteamer should pick checkpoints off only deltacommits for MOR tables
This commit is contained in:
committed by
Balaji Varadarajan
parent
ad50008a59
commit
971c7d41bd
@@ -195,7 +195,16 @@ public class DeltaSync implements Serializable {
|
|||||||
if (fs.exists(new Path(cfg.targetBasePath))) {
|
if (fs.exists(new Path(cfg.targetBasePath))) {
|
||||||
HoodieTableMetaClient meta = new HoodieTableMetaClient(new Configuration(fs.getConf()), cfg.targetBasePath,
|
HoodieTableMetaClient meta = new HoodieTableMetaClient(new Configuration(fs.getConf()), cfg.targetBasePath,
|
||||||
cfg.payloadClassName);
|
cfg.payloadClassName);
|
||||||
this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
|
switch (meta.getTableType()) {
|
||||||
|
case COPY_ON_WRITE:
|
||||||
|
this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants());
|
||||||
|
break;
|
||||||
|
case MERGE_ON_READ:
|
||||||
|
this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants());
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new HoodieException("Unsupported table type :" + meta.getTableType());
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
this.commitTimelineOpt = Option.empty();
|
this.commitTimelineOpt = Option.empty();
|
||||||
HoodieTableMetaClient.initTableType(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath,
|
HoodieTableMetaClient.initTableType(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath,
|
||||||
@@ -378,7 +387,7 @@ public class DeltaSync implements Serializable {
|
|||||||
|
|
||||||
// Schedule compaction if needed
|
// Schedule compaction if needed
|
||||||
if (cfg.isAsyncCompactionEnabled()) {
|
if (cfg.isAsyncCompactionEnabled()) {
|
||||||
scheduledCompactionInstant = writeClient.scheduleCompaction(Option.of(checkpointCommitMetadata));
|
scheduledCompactionInstant = writeClient.scheduleCompaction(Option.empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!isEmpty) {
|
if (!isEmpty) {
|
||||||
|
|||||||
Reference in New Issue
Block a user