[HUDI-3370] The files recorded in the commit may not match the actual ones for MOR Compaction (#4753)
* use HoodieCommitMetadata to replace writeStatuses computation Co-authored-by: yuezhang <yuezhang@freewheel.tv>
This commit is contained in:
@@ -22,14 +22,17 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.table.action.HoodieWriteMetadata;
|
||||
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
public class HoodieSparkCompactor<T extends HoodieRecordPayload> extends BaseCompactor<T,
|
||||
JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
|
||||
@@ -43,12 +46,12 @@ public class HoodieSparkCompactor<T extends HoodieRecordPayload> extends BaseCom
|
||||
}
|
||||
|
||||
@Override
|
||||
public void compact(HoodieInstant instant) throws IOException {
|
||||
public void compact(HoodieInstant instant) {
|
||||
LOG.info("Compactor executing compaction " + instant);
|
||||
SparkRDDWriteClient<T> writeClient = (SparkRDDWriteClient<T>) compactionClient;
|
||||
JavaRDD<WriteStatus> res = writeClient.compact(instant.getTimestamp());
|
||||
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status");
|
||||
long numWriteErrors = res.collect().stream().filter(WriteStatus::hasErrors).count();
|
||||
HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = writeClient.compact(instant.getTimestamp());
|
||||
List<HoodieWriteStat> writeStats = compactionMetadata.getCommitMetadata().get().getWriteStats();
|
||||
long numWriteErrors = writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum();
|
||||
if (numWriteErrors != 0) {
|
||||
// We treat even a single error in compaction as fatal
|
||||
LOG.error("Compaction for instant (" + instant + ") failed with write errors. Errors :" + numWriteErrors);
|
||||
@@ -56,6 +59,6 @@ public class HoodieSparkCompactor<T extends HoodieRecordPayload> extends BaseCom
|
||||
"Compaction for instant (" + instant + ") failed with write errors. Errors :" + numWriteErrors);
|
||||
}
|
||||
// Commit compaction
|
||||
writeClient.commitCompaction(instant.getTimestamp(), res, Option.empty());
|
||||
writeClient.commitCompaction(instant.getTimestamp(), compactionMetadata.getCommitMetadata().get(), Option.empty());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,7 +65,6 @@ import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.text.ParseException;
|
||||
import java.util.List;
|
||||
@@ -286,20 +285,18 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commitCompaction(String compactionInstantTime, JavaRDD<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata) throws IOException {
|
||||
public void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
|
||||
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
|
||||
HoodieCommitMetadata metadata = CompactHelpers.getInstance().createCompactionMetadata(
|
||||
table, compactionInstantTime, HoodieJavaRDD.of(writeStatuses), config.getSchema());
|
||||
extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
|
||||
completeCompaction(metadata, writeStatuses, table, compactionInstantTime);
|
||||
completeCompaction(metadata, table, compactionInstantTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD<WriteStatus> writeStatuses,
|
||||
protected void completeCompaction(HoodieCommitMetadata metadata,
|
||||
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
|
||||
String compactionCommitTime) {
|
||||
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction");
|
||||
List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect();
|
||||
List<HoodieWriteStat> writeStats = metadata.getWriteStats();
|
||||
final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime);
|
||||
try {
|
||||
this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
|
||||
@@ -327,7 +324,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
||||
}
|
||||
|
||||
@Override
|
||||
protected JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean shouldComplete) {
|
||||
protected HoodieWriteMetadata<JavaRDD<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
|
||||
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context, true);
|
||||
preWrite(compactionInstantTime, WriteOperationType.COMPACT, table.getMetaClient());
|
||||
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
|
||||
@@ -339,11 +336,10 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
||||
compactionTimer = metrics.getCompactionCtx();
|
||||
HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata =
|
||||
table.compact(context, compactionInstantTime);
|
||||
JavaRDD<WriteStatus> statuses = compactionMetadata.getWriteStatuses();
|
||||
if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) {
|
||||
completeTableService(TableServiceType.COMPACT, compactionMetadata.getCommitMetadata().get(), statuses, table, compactionInstantTime);
|
||||
completeTableService(TableServiceType.COMPACT, compactionMetadata.getCommitMetadata().get(), table, compactionInstantTime);
|
||||
}
|
||||
return statuses;
|
||||
return compactionMetadata;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -359,15 +355,14 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
||||
clusteringTimer = metrics.getClusteringCtx();
|
||||
LOG.info("Starting clustering at " + clusteringInstant);
|
||||
HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata = table.cluster(context, clusteringInstant);
|
||||
JavaRDD<WriteStatus> statuses = clusteringMetadata.getWriteStatuses();
|
||||
// TODO : Where is shouldComplete used ?
|
||||
if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {
|
||||
completeTableService(TableServiceType.CLUSTER, clusteringMetadata.getCommitMetadata().get(), statuses, table, clusteringInstant);
|
||||
completeTableService(TableServiceType.CLUSTER, clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant);
|
||||
}
|
||||
return clusteringMetadata;
|
||||
}
|
||||
|
||||
private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<WriteStatus> writeStatuses,
|
||||
private void completeClustering(HoodieReplaceCommitMetadata metadata,
|
||||
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
|
||||
String clusteringCommitTime) {
|
||||
|
||||
@@ -469,16 +464,16 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
||||
}
|
||||
|
||||
// TODO : To enforce priority between table service and ingestion writer, use transactions here and invoke strategy
|
||||
private void completeTableService(TableServiceType tableServiceType, HoodieCommitMetadata metadata, JavaRDD<WriteStatus> writeStatuses,
|
||||
private void completeTableService(TableServiceType tableServiceType, HoodieCommitMetadata metadata,
|
||||
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
|
||||
String commitInstant) {
|
||||
|
||||
switch (tableServiceType) {
|
||||
case CLUSTER:
|
||||
completeClustering((HoodieReplaceCommitMetadata) metadata, writeStatuses, table, commitInstant);
|
||||
completeClustering((HoodieReplaceCommitMetadata) metadata, table, commitInstant);
|
||||
break;
|
||||
case COMPACT:
|
||||
completeCompaction(metadata, writeStatuses, table, commitInstant);
|
||||
completeCompaction(metadata, table, commitInstant);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("This table service is not valid " + tableServiceType);
|
||||
|
||||
Reference in New Issue
Block a user