Clean the marker files for flink compaction (#5611)
Co-authored-by: 854194341@qq.com <loukey_7821>
This commit is contained in:
@@ -25,6 +25,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.util.CompactionUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.table.HoodieFlinkTable;
|
||||
import org.apache.hudi.table.marker.WriteMarkersFactory;
|
||||
import org.apache.hudi.util.CompactionUtil;
|
||||
import org.apache.hudi.util.FlinkTables;
|
||||
|
||||
@@ -134,6 +135,9 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
|
||||
List<CompactionOperation> operations = compactionPlan.getOperations().stream()
|
||||
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
|
||||
LOG.info("Execute compaction plan for instant {} as {} file groups", compactionInstantTime, operations.size());
|
||||
WriteMarkersFactory
|
||||
.get(table.getConfig().getMarkersType(), table, compactionInstantTime)
|
||||
.deleteMarkerDir(table.getContext(), table.getConfig().getMarkersDeleteParallelism());
|
||||
for (CompactionOperation operation : operations) {
|
||||
output.collect(new StreamRecord<>(new CompactionPlanEvent(compactionInstantTime, operation)));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user