From 76e2faa28dc1050e9e6ebe5b33dac1972b781eba Mon Sep 17 00:00:00 2001 From: YueZhang <69956021+zhangyue19921010@users.noreply.github.com> Date: Mon, 14 Feb 2022 11:12:52 +0800 Subject: [PATCH] [HUDI-3370] The files recorded in the commit may not match the actual ones for MOR Compaction (#4753) * use HoodieCommitMetadata to replace writeStatuses computation Co-authored-by: yuezhang --- .../hudi/client/BaseHoodieWriteClient.java | 13 ++++----- .../hudi/client/HoodieFlinkWriteClient.java | 26 +++++------------ .../hudi/client/HoodieJavaWriteClient.java | 8 ++--- .../hudi/client/HoodieSparkCompactor.java | 15 ++++++---- .../hudi/client/SparkRDDWriteClient.java | 29 ++++++++----------- .../client/TestHoodieClientMultiWriter.java | 5 ++-- .../table/TestHoodieMergeOnReadTable.java | 11 +++---- ...arkMergeOnReadTableInsertUpdateDelete.java | 12 +++++--- ...stHoodieSparkMergeOnReadTableRollback.java | 28 +++++++++++------- .../common/model/HoodieCommitMetadata.java | 6 ++++ .../spark/HoodieWriteClientExample.java | 6 ++-- .../sink/compact/CompactionCommitSink.java | 8 ++++- .../testsuite/HoodieTestSuiteWriter.java | 13 +++++++-- .../command/CompactionHoodiePathCommand.scala | 25 +++++++--------- .../hudi/utilities/HoodieClusteringJob.java | 19 ++---------- .../hudi/utilities/HoodieCompactor.java | 5 ++-- .../apache/hudi/utilities/UtilHelpers.java | 14 +++++++++ 17 files changed, 129 insertions(+), 114 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 2414a9fb7..14e71f1b0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -883,7 +883,7 @@ public abstract class BaseHoodieWriteClient compact(String compactionInstantTime) { return compact(compactionInstantTime, config.shouldAutoCommit()); } @@ -891,17 +891,16 @@ public abstract class BaseHoodieWriteClient> extraMetadata) throws IOException; + public abstract void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata, + Option> extraMetadata); /** * Commit Compaction and track metrics. */ - protected abstract void completeCompaction(HoodieCommitMetadata metadata, O writeStatuses, - HoodieTable table, String compactionCommitTime); + protected abstract void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime); /** * Get inflight time line exclude compaction and clustering. @@ -1023,7 +1022,7 @@ public abstract class BaseHoodieWriteClient compact(String compactionInstantTime, boolean shouldComplete); /** * Performs a compaction operation on a table, serially before or after an insert/upsert action. diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 50477c3f8..c3d977e88 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -39,7 +39,6 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.index.FlinkHoodieIndexFactory; import org.apache.hudi.index.HoodieIndex; @@ -68,7 +67,6 @@ import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.text.ParseException; import java.util.HashMap; import java.util.Iterator; @@ -346,23 +344,20 @@ public class HoodieFlinkWriteClient extends @Override public void commitCompaction( String compactionInstantTime, - List writeStatuses, - Option> extraMetadata) throws IOException { + HoodieCommitMetadata metadata, + Option> extraMetadata) { HoodieFlinkTable table = getHoodieTable(); - HoodieCommitMetadata metadata = CompactHelpers.getInstance().createCompactionMetadata( - table, compactionInstantTime, HoodieList.of(writeStatuses), config.getSchema()); extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata)); - completeCompaction(metadata, writeStatuses, table, compactionInstantTime); + completeCompaction(metadata, table, compactionInstantTime); } @Override public void completeCompaction( HoodieCommitMetadata metadata, - List writeStatuses, HoodieTable>, List, List> table, String compactionCommitTime) { this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction"); - List writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()); + List writeStats = metadata.getWriteStats(); final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime); try { this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty()); @@ -391,16 +386,11 @@ public class HoodieFlinkWriteClient extends } @Override - protected List compact(String compactionInstantTime, boolean shouldComplete) { + protected HoodieWriteMetadata> compact(String compactionInstantTime, boolean shouldComplete) { // only used for metadata table, the compaction happens in single thread - try { - List writeStatuses = - getHoodieTable().compact(context, compactionInstantTime).getWriteStatuses(); - commitCompaction(compactionInstantTime, writeStatuses, Option.empty()); - return writeStatuses; - } catch (IOException e) { - throw new HoodieException("Error while compacting instant: " + compactionInstantTime); - } + HoodieWriteMetadata> compactionMetadata = getHoodieTable().compact(context, compactionInstantTime); + commitCompaction(compactionInstantTime, compactionMetadata.getCommitMetadata().get(), Option.empty()); + return compactionMetadata; } @Override diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java index 7af24c8b5..f365f2932 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java @@ -44,7 +44,6 @@ import org.apache.hudi.table.action.HoodieWriteMetadata; import com.codahale.metrics.Timer; import org.apache.hadoop.conf.Configuration; -import java.io.IOException; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -210,21 +209,20 @@ public class HoodieJavaWriteClient extends @Override public void commitCompaction(String compactionInstantTime, - List writeStatuses, - Option> extraMetadata) throws IOException { + HoodieCommitMetadata metadata, + Option> extraMetadata) { throw new HoodieNotSupportedException("CommitCompaction is not supported in HoodieJavaClient"); } @Override protected void completeCompaction(HoodieCommitMetadata metadata, - List writeStatuses, HoodieTable>, List, List> table, String compactionCommitTime) { throw new HoodieNotSupportedException("CompleteCompaction is not supported in HoodieJavaClient"); } @Override - protected List compact(String compactionInstantTime, + protected HoodieWriteMetadata> compact(String compactionInstantTime, boolean shouldComplete) { throw new HoodieNotSupportedException("Compact is not supported in HoodieJavaClient"); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkCompactor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkCompactor.java index ca5684a19..b3dc27b6f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkCompactor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkCompactor.java @@ -22,14 +22,17 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.table.action.HoodieWriteMetadata; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; -import java.io.IOException; +import java.util.List; public class HoodieSparkCompactor extends BaseCompactor>, JavaRDD, JavaRDD> { @@ -43,12 +46,12 @@ public class HoodieSparkCompactor extends BaseCom } @Override - public void compact(HoodieInstant instant) throws IOException { + public void compact(HoodieInstant instant) { LOG.info("Compactor executing compaction " + instant); SparkRDDWriteClient writeClient = (SparkRDDWriteClient) compactionClient; - JavaRDD res = writeClient.compact(instant.getTimestamp()); - this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status"); - long numWriteErrors = res.collect().stream().filter(WriteStatus::hasErrors).count(); + HoodieWriteMetadata> compactionMetadata = writeClient.compact(instant.getTimestamp()); + List writeStats = compactionMetadata.getCommitMetadata().get().getWriteStats(); + long numWriteErrors = writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum(); if (numWriteErrors != 0) { // We treat even a single error in compaction as fatal LOG.error("Compaction for instant (" + instant + ") failed with write errors. Errors :" + numWriteErrors); @@ -56,6 +59,6 @@ public class HoodieSparkCompactor extends BaseCom "Compaction for instant (" + instant + ") failed with write errors. Errors :" + numWriteErrors); } // Commit compaction - writeClient.commitCompaction(instant.getTimestamp(), res, Option.empty()); + writeClient.commitCompaction(instant.getTimestamp(), compactionMetadata.getCommitMetadata().get(), Option.empty()); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 63f8804bc..2fb27fe79 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -65,7 +65,6 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import java.io.IOException; import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.util.List; @@ -286,20 +285,18 @@ public class SparkRDDWriteClient extends } @Override - public void commitCompaction(String compactionInstantTime, JavaRDD writeStatuses, Option> extraMetadata) throws IOException { + public void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata, Option> extraMetadata) { HoodieSparkTable table = HoodieSparkTable.create(config, context); - HoodieCommitMetadata metadata = CompactHelpers.getInstance().createCompactionMetadata( - table, compactionInstantTime, HoodieJavaRDD.of(writeStatuses), config.getSchema()); extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata)); - completeCompaction(metadata, writeStatuses, table, compactionInstantTime); + completeCompaction(metadata, table, compactionInstantTime); } @Override - protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD writeStatuses, + protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable>, JavaRDD, JavaRDD> table, String compactionCommitTime) { this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction"); - List writeStats = writeStatuses.map(WriteStatus::getStat).collect(); + List writeStats = metadata.getWriteStats(); final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime); try { this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty()); @@ -327,7 +324,7 @@ public class SparkRDDWriteClient extends } @Override - protected JavaRDD compact(String compactionInstantTime, boolean shouldComplete) { + protected HoodieWriteMetadata> compact(String compactionInstantTime, boolean shouldComplete) { HoodieSparkTable table = HoodieSparkTable.create(config, context, true); preWrite(compactionInstantTime, WriteOperationType.COMPACT, table.getMetaClient()); HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); @@ -339,11 +336,10 @@ public class SparkRDDWriteClient extends compactionTimer = metrics.getCompactionCtx(); HoodieWriteMetadata> compactionMetadata = table.compact(context, compactionInstantTime); - JavaRDD statuses = compactionMetadata.getWriteStatuses(); if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) { - completeTableService(TableServiceType.COMPACT, compactionMetadata.getCommitMetadata().get(), statuses, table, compactionInstantTime); + completeTableService(TableServiceType.COMPACT, compactionMetadata.getCommitMetadata().get(), table, compactionInstantTime); } - return statuses; + return compactionMetadata; } @Override @@ -359,15 +355,14 @@ public class SparkRDDWriteClient extends clusteringTimer = metrics.getClusteringCtx(); LOG.info("Starting clustering at " + clusteringInstant); HoodieWriteMetadata> clusteringMetadata = table.cluster(context, clusteringInstant); - JavaRDD statuses = clusteringMetadata.getWriteStatuses(); // TODO : Where is shouldComplete used ? if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) { - completeTableService(TableServiceType.CLUSTER, clusteringMetadata.getCommitMetadata().get(), statuses, table, clusteringInstant); + completeTableService(TableServiceType.CLUSTER, clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant); } return clusteringMetadata; } - private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD writeStatuses, + private void completeClustering(HoodieReplaceCommitMetadata metadata, HoodieTable>, JavaRDD, JavaRDD> table, String clusteringCommitTime) { @@ -469,16 +464,16 @@ public class SparkRDDWriteClient extends } // TODO : To enforce priority between table service and ingestion writer, use transactions here and invoke strategy - private void completeTableService(TableServiceType tableServiceType, HoodieCommitMetadata metadata, JavaRDD writeStatuses, + private void completeTableService(TableServiceType tableServiceType, HoodieCommitMetadata metadata, HoodieTable>, JavaRDD, JavaRDD> table, String commitInstant) { switch (tableServiceType) { case CLUSTER: - completeClustering((HoodieReplaceCommitMetadata) metadata, writeStatuses, table, commitInstant); + completeClustering((HoodieReplaceCommitMetadata) metadata, table, commitInstant); break; case COMPACT: - completeCompaction(metadata, writeStatuses, table, commitInstant); + completeCompaction(metadata, table, commitInstant); break; default: throw new IllegalArgumentException("This table service is not valid " + tableServiceType); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java index 035799c01..c3cde7416 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java @@ -38,6 +38,7 @@ import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieLockConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieWriteConflictException; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.hadoop.fs.Path; @@ -364,8 +365,8 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase { latchCountDownAndWait(runCountDownLatch, 30000); if (tableType == HoodieTableType.MERGE_ON_READ) { assertDoesNotThrow(() -> { - JavaRDD writeStatusJavaRDD = (JavaRDD) client2.compact("005"); - client2.commitCompaction("005", writeStatusJavaRDD, Option.empty()); + HoodieWriteMetadata> compactionMetadata = client2.compact("005"); + client2.commitCompaction("005", compactionMetadata.getCommitMetadata().get(), Option.empty()); validInstants.add("005"); }); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index fb484f4be..e6df53740 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -45,6 +45,7 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor; import org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor; import org.apache.hudi.testutils.HoodieClientTestUtils; @@ -258,7 +259,7 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness // Do a compaction String compactionInstantTime = writeClient.scheduleCompaction(Option.empty()).get().toString(); - JavaRDD result = (JavaRDD) writeClient.compact(compactionInstantTime); + HoodieWriteMetadata> result = writeClient.compact(compactionInstantTime); // Verify that recently written compacted data file has no log file metaClient = HoodieTableMetaClient.reload(metaClient); @@ -275,8 +276,7 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness for (FileSlice slice : groupedLogFiles) { assertEquals(0, slice.getLogFiles().count(), "After compaction there should be no log files visible on a full view"); } - List writeStatuses = result.collect(); - assertTrue(writeStatuses.stream().anyMatch(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath))); + assertTrue(result.getCommitMetadata().get().getWritePartitionPaths().stream().anyMatch(part -> part.contentEquals(partitionPath))); } // Check the entire dataset has all records still @@ -442,8 +442,9 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness // Test small file handling after compaction instantTime = "002"; client.scheduleCompactionAtInstant(instantTime, Option.of(metadata.getExtraMetadata())); - statuses = (JavaRDD) client.compact(instantTime); - client.commitCompaction(instantTime, statuses, Option.empty()); + HoodieWriteMetadata> compactionMetadata = client.compact(instantTime); + statuses = compactionMetadata.getWriteStatuses(); + client.commitCompaction(instantTime, compactionMetadata.getCommitMetadata().get(), Option.empty()); // Read from commit file table = HoodieSparkTable.create(cfg, context()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java index 63f6e4654..2955147b4 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -41,6 +42,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; @@ -56,6 +58,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; +import java.util.Collection; import java.util.List; import java.util.Properties; import java.util.stream.Collectors; @@ -307,11 +310,12 @@ public class TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie assertTrue(numLogFiles > 0); // Do a compaction String instantTime = writeClient.scheduleCompaction(Option.empty()).get().toString(); - statuses = (JavaRDD) writeClient.compact(instantTime); + HoodieWriteMetadata> compactionMetadata = writeClient.compact(instantTime); String extension = table.getBaseFileExtension(); - assertEquals(numLogFiles, statuses.map(status -> status.getStat().getPath().contains(extension)).count()); - assertEquals(numLogFiles, statuses.count()); - writeClient.commitCompaction(instantTime, statuses, Option.empty()); + Collection> stats = compactionMetadata.getCommitMetadata().get().getPartitionToWriteStats().values(); + assertEquals(numLogFiles, stats.stream().flatMap(Collection::stream).filter(state -> state.getPath().contains(extension)).count()); + assertEquals(numLogFiles, stats.stream().mapToLong(Collection::size).sum()); + writeClient.commitCompaction(instantTime, compactionMetadata.getCommitMetadata().get(), Option.empty()); } } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java index f5f9fb8a1..47e99353e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.marker.MarkerType; @@ -50,6 +51,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; @@ -455,8 +457,8 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction compactionInstantTime = "006"; client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); - JavaRDD ws = (JavaRDD) client.compact(compactionInstantTime); - client.commitCompaction(compactionInstantTime, ws, Option.empty()); + HoodieWriteMetadata> compactionMetadata = client.compact(compactionInstantTime); + client.commitCompaction(compactionInstantTime, compactionMetadata.getCommitMetadata().get(), Option.empty()); allFiles = listAllBaseFilesInPath(hoodieTable); metaClient = HoodieTableMetaClient.reload(metaClient); @@ -543,8 +545,8 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction metaClient = HoodieTableMetaClient.reload(metaClient); String compactionInstantTime = "005"; client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); - JavaRDD ws = (JavaRDD) client.compact(compactionInstantTime); - client.commitCompaction(compactionInstantTime, ws, Option.empty()); + HoodieWriteMetadata> compactionMetadata = client.compact(compactionInstantTime); + client.commitCompaction(compactionInstantTime, compactionMetadata.getCommitMetadata().get(), Option.empty()); validateRecords(cfg, metaClient, updates3); List updates4 = updateAndGetRecords("006", client, dataGen, records); @@ -755,11 +757,14 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction assertTrue(numLogFiles > 0); // Do a compaction newCommitTime = writeClient.scheduleCompaction(Option.empty()).get().toString(); - statuses = (JavaRDD) writeClient.compact(newCommitTime); + HoodieWriteMetadata> compactionMetadata = writeClient.compact(newCommitTime); + statuses = compactionMetadata.getWriteStatuses(); // Ensure all log files have been compacted into base files String extension = table.getBaseFileExtension(); - assertEquals(numLogFiles, statuses.map(status -> status.getStat().getPath().contains(extension)).count()); - assertEquals(numLogFiles, statuses.count()); + Collection> stats = compactionMetadata.getCommitMetadata().get().getPartitionToWriteStats().values(); + assertEquals(numLogFiles, stats.stream().flatMap(Collection::stream).filter(state -> state.getPath().contains(extension)).count()); + assertEquals(numLogFiles, stats.stream().mapToLong(Collection::size).sum()); + //writeClient.commitCompaction(newCommitTime, statuses, Option.empty()); // Trigger a rollback of compaction table.getActiveTimeline().reload(); @@ -862,14 +867,15 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction private long doCompaction(SparkRDDWriteClient client, HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, long numLogFiles) throws IOException { // Do a compaction String instantTime = client.scheduleCompaction(Option.empty()).get().toString(); - JavaRDD writeStatuses = (JavaRDD) client.compact(instantTime); + HoodieWriteMetadata> compactionMetadata = client.compact(instantTime); metaClient.reloadActiveTimeline(); HoodieTable table = HoodieSparkTable.create(cfg, context(), metaClient); String extension = table.getBaseFileExtension(); - assertEquals(numLogFiles, writeStatuses.map(status -> status.getStat().getPath().contains(extension)).count()); - assertEquals(numLogFiles, writeStatuses.count()); - client.commitCompaction(instantTime, writeStatuses, Option.empty()); + Collection> stats = compactionMetadata.getCommitMetadata().get().getPartitionToWriteStats().values(); + assertEquals(numLogFiles, stats.stream().flatMap(Collection::stream).filter(state -> state.getPath().contains(extension)).count()); + assertEquals(numLogFiles, stats.stream().mapToLong(Collection::size).sum()); + client.commitCompaction(instantTime, compactionMetadata.getCommitMetadata().get(), Option.empty()); return numLogFiles; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java index d693d91f6..594c1adbb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java @@ -35,10 +35,12 @@ import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * All the metadata that gets stored along with a commit. @@ -89,6 +91,10 @@ public class HoodieCommitMetadata implements Serializable { return partitionToWriteStats; } + public List getWriteStats() { + return partitionToWriteStats.values().stream().flatMap(Collection::stream).collect(Collectors.toList()); + } + public String getMetadata(String metaKey) { return extraMetadata.get(metaKey); } diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java b/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java index 35e46605f..1afc18053 100644 --- a/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java +++ b/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java @@ -38,6 +38,8 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hudi.table.action.HoodieWriteMetadata; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; @@ -140,8 +142,8 @@ public class HoodieWriteClientExample { // compaction if (HoodieTableType.valueOf(tableType) == HoodieTableType.MERGE_ON_READ) { Option instant = client.scheduleCompaction(Option.empty()); - JavaRDD writeStatues = client.compact(instant.get()); - client.commitCompaction(instant.get(), writeStatues, Option.empty()); + HoodieWriteMetadata> compactionMetadata = client.compact(instant.get()); + client.commitCompaction(instant.get(), compactionMetadata.getCommitMetadata().get(), Option.empty()); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java index 53127359c..ecd66936e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java @@ -20,12 +20,15 @@ package org.apache.hudi.sink.compact; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieList; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.CleanFunction; import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.table.action.compact.CompactHelpers; import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.StreamerUtil; @@ -147,8 +150,11 @@ public class CompactionCommitSink extends CleanFunction { .flatMap(Collection::stream) .collect(Collectors.toList()); + HoodieCommitMetadata metadata = CompactHelpers.getInstance().createCompactionMetadata( + table, instant, HoodieList.of(statuses), writeClient.getConfig().getSchema()); + // commit the compaction - this.writeClient.commitCompaction(instant, statuses, Option.empty()); + this.writeClient.commitCompaction(instant, metadata, Option.empty()); // Whether to clean up the old log file when compaction if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) { diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java index e46343c1a..a98c7f2ae 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java @@ -24,7 +24,9 @@ import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.util.Option; @@ -33,6 +35,7 @@ import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig; import org.apache.hudi.integ.testsuite.dag.nodes.CleanNode; @@ -40,6 +43,9 @@ import org.apache.hudi.integ.testsuite.dag.nodes.DagNode; import org.apache.hudi.integ.testsuite.dag.nodes.RollbackNode; import org.apache.hudi.integ.testsuite.dag.nodes.ScheduleCompactNode; import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hudi.table.action.compact.CompactHelpers; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.avro.Schema; @@ -215,7 +221,8 @@ public class HoodieTestSuiteWriter implements Serializable { } } if (instantTime.isPresent()) { - return (JavaRDD) writeClient.compact(instantTime.get()); + HoodieWriteMetadata> compactionMetadata = writeClient.compact(instantTime.get()); + return compactionMetadata.getWriteStatuses(); } else { return null; } @@ -272,7 +279,9 @@ public class HoodieTestSuiteWriter implements Serializable { // Just stores the path where this batch of data is generated to extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map(s -> s.getFilePath()).collect().get(0)); } - writeClient.commitCompaction(instantTime.get(), records, Option.of(extraMetadata)); + HoodieSparkTable table = HoodieSparkTable.create(writeClient.getConfig(), writeClient.getEngineContext()); + HoodieCommitMetadata metadata = CompactHelpers.getInstance().createCompactionMetadata(table, instantTime.get(), HoodieJavaRDD.of(records), writeClient.getConfig().getSchema()); + writeClient.commitCompaction(instantTime.get(), metadata, Option.of(extraMetadata)); } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala index 1363fb939..2f5c4d004 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala @@ -17,14 +17,13 @@ package org.apache.spark.sql.hudi.command -import org.apache.hudi.client.WriteStatus -import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieTableType} import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.{HoodieTimer, Option => HOption} import org.apache.hudi.exception.HoodieException import org.apache.hudi.{DataSourceUtils, DataSourceWriteOptions, HoodieWriterUtils} -import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} +import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE} @@ -100,8 +99,8 @@ case class CompactionHoodiePathCommand(path: String, timer.startTimer() willCompactionInstants.foreach {compactionInstant => val writeResponse = client.compact(compactionInstant) - handlerResponse(writeResponse) - client.commitCompaction(compactionInstant, writeResponse, HOption.empty()) + handleResponse(writeResponse.getCommitMetadata.get()) + client.commitCompaction(compactionInstant, writeResponse.getCommitMetadata.get(), HOption.empty()) } logInfo(s"Finish Run compaction at instants: [${willCompactionInstants.mkString(",")}]," + s" spend: ${timer.endTimer()}ms") @@ -111,17 +110,13 @@ case class CompactionHoodiePathCommand(path: String, } } - private def handlerResponse(writeResponse: JavaRDD[WriteStatus]): Unit = { + private def handleResponse(metadata: HoodieCommitMetadata): Unit = { + // Handle error - val error = writeResponse.rdd.filter(f => f.hasErrors).take(1).headOption - if (error.isDefined) { - if (error.get.hasGlobalError) { - throw error.get.getGlobalError - } else if (!error.get.getErrors.isEmpty) { - val key = error.get.getErrors.asScala.head._1 - val exception = error.get.getErrors.asScala.head._2 - throw new HoodieException(s"Error in write record: $key", exception) - } + val writeStats = metadata.getPartitionToWriteStats.entrySet().flatMap(e => e.getValue).toList + val errorsCount = writeStats.map(state => state.getTotalWriteErrors).sum + if (errorsCount > 0) { + throw new HoodieException(s" Found $errorsCount when writing record") } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java index b7345a146..26639628e 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java @@ -23,7 +23,6 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; @@ -49,7 +48,6 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Date; import java.util.List; -import java.util.stream.Collectors; public class HoodieClusteringJob { @@ -216,7 +214,7 @@ public class HoodieClusteringJob { } Option commitMetadata = client.cluster(cfg.clusteringInstantTime, true).getCommitMetadata(); - return handleErrors(commitMetadata.get(), cfg.clusteringInstantTime); + return UtilHelpers.handleErrors(commitMetadata.get(), cfg.clusteringInstantTime); } } @@ -271,20 +269,7 @@ public class HoodieClusteringJob { LOG.info("The schedule instant time is " + instantTime.get()); LOG.info("Step 2: Do cluster"); Option metadata = client.cluster(instantTime.get(), true).getCommitMetadata(); - return handleErrors(metadata.get(), instantTime.get()); + return UtilHelpers.handleErrors(metadata.get(), instantTime.get()); } } - - private int handleErrors(HoodieCommitMetadata metadata, String instantTime) { - List writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e -> - e.getValue().stream()).collect(Collectors.toList()); - long errorsCount = writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum(); - if (errorsCount == 0) { - LOG.info(String.format("Table imported into hoodie with %s instant time.", instantTime)); - return 0; - } - - LOG.error(String.format("Import failed with %d errors.", errorsCount)); - return -1; - } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java index 56585757f..ce2be7d50 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -266,8 +267,8 @@ public class HoodieCompactor { throw new HoodieCompactionException("There is no scheduled compaction in the table."); } } - JavaRDD writeResponse = client.compact(cfg.compactionInstantTime); - return UtilHelpers.handleErrors(jsc, cfg.compactionInstantTime, writeResponse); + HoodieWriteMetadata> compactionMetadata = client.compact(cfg.compactionInstantTime); + return UtilHelpers.handleErrors(compactionMetadata.getCommitMetadata().get(), cfg.compactionInstantTime); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index b9eda63b5..8690ff1cf 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -26,7 +26,9 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.DFSPropertiesConfiguration; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.util.Functions.Function1; @@ -303,6 +305,18 @@ public class UtilHelpers { return -1; } + public static int handleErrors(HoodieCommitMetadata metadata, String instantTime) { + List writeStats = metadata.getWriteStats(); + long errorsCount = writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum(); + if (errorsCount == 0) { + LOG.info(String.format("Finish job with %s instant time.", instantTime)); + return 0; + } + + LOG.error(String.format("Job failed with %d errors.", errorsCount)); + return -1; + } + /** * Returns a factory for creating connections to the given JDBC URL. *