From 19ca0b56296bf89c611406bb86c3626f87d562ee Mon Sep 17 00:00:00 2001 From: vinoth chandar Date: Sat, 25 Apr 2020 18:26:44 -0700 Subject: [PATCH] [HUDI-785] Refactor compaction/savepoint execution based on ActionExector abstraction (#1548) - Savepoint and compaction classes moved to table.action.* packages - HoodieWriteClient#savepoint(...) returns void - Renamed HoodieCommitArchiveLog -> HoodieTimelineArchiveLog - Fixed tests to take into account the additional validation done - Moved helper code into CompactHelpers and SavepointHelpers --- .../hudi/cli/commands/CompactionCommand.java | 2 +- .../hudi/cli/commands/SavepointsCommand.java | 8 +- .../apache/hudi/cli/commands/SparkMain.java | 7 +- .../org/apache/hudi/cli/utils/CommitUtil.java | 2 +- .../commands/TestArchivedCommitsCommand.java | 4 +- .../cli/common/HoodieTestCommitUtilities.java | 4 +- .../client/AbstractHoodieWriteClient.java | 38 +- .../hudi/client/CompactionAdminClient.java | 12 +- .../apache/hudi/client/HoodieWriteClient.java | 484 ++++-------------- .../hudi/config/HoodieCompactionConfig.java | 4 +- .../apache/hudi/config/HoodieWriteConfig.java | 2 +- .../hudi/table/HoodieCopyOnWriteTable.java | 18 +- .../hudi/table/HoodieMergeOnReadTable.java | 55 +- .../org/apache/hudi/table/HoodieTable.java | 95 +++- ...Log.java => HoodieTimelineArchiveLog.java} | 8 +- .../{commit => }/HoodieWriteMetadata.java | 2 +- .../hudi/table/action/clean/CleanPlanner.java | 28 +- .../commit/BaseCommitActionExecutor.java | 4 +- .../BulkInsertCommitActionExecutor.java | 1 + .../table/action/commit/BulkInsertHelper.java | 1 + ...BulkInsertPreppedCommitActionExecutor.java | 1 + .../commit/DeleteCommitActionExecutor.java | 1 + .../table/action/commit/DeleteHelper.java | 5 +- .../commit/InsertCommitActionExecutor.java | 1 + .../InsertPreppedCommitActionExecutor.java | 1 + .../commit/UpsertCommitActionExecutor.java | 1 + .../UpsertPreppedCommitActionExecutor.java | 1 + .../hudi/table/action/commit/WriteHelper.java | 7 +- .../table/action/compact/CompactHelpers.java | 70 +++ .../{ => action}/compact/HoodieCompactor.java | 2 +- .../HoodieMergeOnReadTableCompactor.java | 5 +- .../{ => action}/compact/OperationResult.java | 2 +- .../compact/RunCompactionActionExecutor.java | 93 ++++ .../ScheduleCompactionActionExecutor.java | 121 +++++ .../strategy/BoundedIOCompactionStrategy.java | 2 +- ...undedPartitionAwareCompactionStrategy.java | 2 +- .../compact/strategy/CompactionStrategy.java | 4 +- .../strategy/DayBasedCompactionStrategy.java | 2 +- .../LogFileSizeBasedCompactionStrategy.java | 2 +- .../strategy/UnBoundedCompactionStrategy.java | 2 +- ...undedPartitionAwareCompactionStrategy.java | 2 +- .../BulkInsertDeltaCommitActionExecutor.java | 2 +- ...nsertPreppedDeltaCommitActionExecutor.java | 2 +- .../DeleteDeltaCommitActionExecutor.java | 2 +- .../InsertDeltaCommitActionExecutor.java | 2 +- ...nsertPreppedDeltaCommitActionExecutor.java | 2 +- .../UpsertDeltaCommitActionExecutor.java | 2 +- ...psertPreppedDeltaCommitActionExecutor.java | 2 +- .../MergeOnReadRollbackActionExecutor.java | 2 +- .../savepoint/SavepointActionExecutor.java | 115 +++++ .../action/savepoint/SavepointHelpers.java | 71 +++ .../hudi/client/TestClientRollback.java | 2 +- .../client/TestCompactionAdminClient.java | 2 +- .../hudi/io/TestHoodieCommitArchiveLog.java | 18 +- .../compact/TestAsyncCompaction.java | 2 +- .../compact/TestHoodieCompactor.java | 42 +- .../TestHoodieCompactionStrategy.java | 2 +- .../hudi/common/model/HoodieTestUtils.java | 14 + 58 files changed, 789 insertions(+), 601 deletions(-) rename hudi-client/src/main/java/org/apache/hudi/table/{HoodieCommitArchiveLog.java => HoodieTimelineArchiveLog.java} (98%) rename hudi-client/src/main/java/org/apache/hudi/table/action/{commit => }/HoodieWriteMetadata.java (98%) create mode 100644 hudi-client/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java rename hudi-client/src/main/java/org/apache/hudi/table/{ => action}/compact/HoodieCompactor.java (98%) rename hudi-client/src/main/java/org/apache/hudi/table/{ => action}/compact/HoodieMergeOnReadTableCompactor.java (98%) rename hudi-client/src/main/java/org/apache/hudi/table/{ => action}/compact/OperationResult.java (97%) create mode 100644 hudi-client/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java create mode 100644 hudi-client/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java rename hudi-client/src/main/java/org/apache/hudi/table/{ => action}/compact/strategy/BoundedIOCompactionStrategy.java (97%) rename hudi-client/src/main/java/org/apache/hudi/table/{ => action}/compact/strategy/BoundedPartitionAwareCompactionStrategy.java (98%) rename hudi-client/src/main/java/org/apache/hudi/table/{ => action}/compact/strategy/CompactionStrategy.java (97%) rename hudi-client/src/main/java/org/apache/hudi/table/{ => action}/compact/strategy/DayBasedCompactionStrategy.java (98%) rename hudi-client/src/main/java/org/apache/hudi/table/{ => action}/compact/strategy/LogFileSizeBasedCompactionStrategy.java (98%) rename hudi-client/src/main/java/org/apache/hudi/table/{ => action}/compact/strategy/UnBoundedCompactionStrategy.java (96%) rename hudi-client/src/main/java/org/apache/hudi/table/{ => action}/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java (98%) create mode 100644 hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java create mode 100644 hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointHelpers.java rename hudi-client/src/test/java/org/apache/hudi/table/{ => action}/compact/TestAsyncCompaction.java (99%) rename hudi-client/src/test/java/org/apache/hudi/table/{ => action}/compact/TestHoodieCompactor.java (82%) rename hudi-client/src/test/java/org/apache/hudi/table/{ => action}/compact/strategy/TestHoodieCompactionStrategy.java (99%) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java index a4c70da61..9b55fe26e 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java @@ -42,7 +42,7 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.table.compact.OperationResult; +import org.apache.hudi.table.action.compact.OperationResult; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hadoop.fs.FSDataInputStream; diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java index dac002147..b5bc349ce 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieSavepointException; import org.apache.hudi.index.HoodieIndex; import org.apache.spark.api.java.JavaSparkContext; @@ -79,12 +80,13 @@ public class SavepointsCommand implements CommandMarker { String result; try (JavaSparkContext jsc = SparkUtil.initJavaSparkConf("Create Savepoint")) { HoodieWriteClient client = createHoodieClient(jsc, metaClient.getBasePath()); - if (client.savepoint(commitTime, user, comments)) { + try { + client.savepoint(commitTime, user, comments); // Refresh the current refreshMetaClient(); result = String.format("The commit \"%s\" has been savepointed.", commitTime); - } else { - result = String.format("Failed: Could not savepoint commit \"%s\".", commitTime); + } catch (HoodieSavepointException se) { + result = String.format("Failed: Could not create savepoint \"%s\".", commitTime); } } return result; diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index 3534cc541..5aa225559 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -27,7 +27,7 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; -import org.apache.hudi.table.compact.strategy.UnBoundedCompactionStrategy; +import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy; import org.apache.hudi.utilities.HDFSParquetImporter; import org.apache.hudi.utilities.HDFSParquetImporter.Config; import org.apache.hudi.utilities.HoodieCleaner; @@ -283,10 +283,11 @@ public class SparkMain { private static int rollbackToSavepoint(JavaSparkContext jsc, String savepointTime, String basePath) throws Exception { HoodieWriteClient client = createHoodieClient(jsc, basePath); - if (client.restoreToSavepoint(savepointTime)) { + try { + client.restoreToSavepoint(savepointTime); LOG.info(String.format("The commit \"%s\" rolled back.", savepointTime)); return 0; - } else { + } catch (Exception e) { LOG.info(String.format("The commit \"%s\" failed to roll back.", savepointTime)); return -1; } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java index 1cbe2e68a..5a1c457b1 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java @@ -39,7 +39,7 @@ public class CommitUtil { public static long countNewRecords(HoodieTableMetaClient target, List commitsToCatchup) throws IOException { long totalNew = 0; - HoodieTimeline timeline = target.getActiveTimeline().reload().getCommitTimeline().filterCompletedInstants(); + HoodieTimeline timeline = target.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants(); for (String commit : commitsToCatchup) { HoodieCommitMetadata c = HoodieCommitMetadata.fromBytes( timeline.getInstantDetails(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commit)).get(), diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java index 56dc5e9ba..b2261ef4a 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java @@ -30,7 +30,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieCommitArchiveLog; +import org.apache.hudi.table.HoodieTimelineArchiveLog; import org.junit.After; import org.junit.Before; @@ -91,7 +91,7 @@ public class TestArchivedCommitsCommand extends AbstractShellIntegrationTest { metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants(); // archive - HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient); archiveLog.archiveIfRequired(jsc); } diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/common/HoodieTestCommitUtilities.java b/hudi-cli/src/test/java/org/apache/hudi/cli/common/HoodieTestCommitUtilities.java index bfd0f0fe7..75a02c8b8 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/common/HoodieTestCommitUtilities.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/common/HoodieTestCommitUtilities.java @@ -20,7 +20,7 @@ package org.apache.hudi.cli.common; import org.apache.hudi.avro.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.table.HoodieCommitArchiveLog; +import org.apache.hudi.table.HoodieTimelineArchiveLog; import java.util.LinkedHashMap; import java.util.List; @@ -36,7 +36,7 @@ public class HoodieTestCommitUtilities { */ public static org.apache.hudi.avro.model.HoodieCommitMetadata convertAndOrderCommitMetadata( HoodieCommitMetadata hoodieCommitMetadata) { - return orderCommitMetadata(HoodieCommitArchiveLog.convertCommitMetadata(hoodieCommitMetadata)); + return orderCommitMetadata(HoodieTimelineArchiveLog.convertCommitMetadata(hoodieCommitMetadata)); } /** diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index d712d5753..8d730e25d 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -20,7 +20,6 @@ package org.apache.hudi.client; import com.codahale.metrics.Timer; import org.apache.hudi.client.embedded.EmbeddedTimelineService; -import org.apache.hudi.client.utils.SparkConfigUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRollingStat; @@ -57,7 +56,6 @@ import java.util.Map; public abstract class AbstractHoodieWriteClient extends AbstractHoodieClient { private static final Logger LOG = LogManager.getLogger(AbstractHoodieWriteClient.class); - private static final String UPDATE_STR = "update"; private final transient HoodieMetrics metrics; private final transient HoodieIndex index; @@ -96,32 +94,6 @@ public abstract class AbstractHoodieWriteClient e return commit(instantTime, writeStatuses, extraMetadata, metaClient.getCommitActionType()); } - protected JavaRDD updateIndexAndCommitIfNeeded(JavaRDD writeStatusRDD, HoodieTable table, - String instantTime) { - // cache writeStatusRDD before updating index, so that all actions before this are not triggered again for future - // RDD actions that are performed after updating the index. - writeStatusRDD = writeStatusRDD.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps())); - Timer.Context indexTimer = metrics.getIndexCtx(); - // Update the index back - JavaRDD statuses = index.updateLocation(writeStatusRDD, jsc, table); - metrics.updateIndexMetrics(UPDATE_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop())); - // Trigger the insert and collect statuses - commitOnAutoCommit(instantTime, statuses, table.getMetaClient().getCommitActionType()); - return statuses; - } - - protected void commitOnAutoCommit(String instantTime, JavaRDD resultRDD, String actionType) { - if (config.shouldAutoCommit()) { - LOG.info("Auto commit enabled: Committing " + instantTime); - boolean commitResult = commit(instantTime, resultRDD, Option.empty(), actionType); - if (!commitResult) { - throw new HoodieCommitException("Failed to commit " + instantTime); - } - } else { - LOG.info("Auto commit disabled for " + instantTime); - } - } - private boolean commit(String instantTime, JavaRDD writeStatuses, Option> extraMetadata, String actionType) { @@ -131,7 +103,6 @@ public abstract class AbstractHoodieWriteClient e HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); HoodieCommitMetadata metadata = new HoodieCommitMetadata(); - List stats = writeStatuses.map(WriteStatus::getStat).collect(); updateMetadataAndRollingStats(actionType, metadata, stats); @@ -149,10 +120,8 @@ public abstract class AbstractHoodieWriteClient e try { activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, instantTime), Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); - postCommit(metadata, instantTime, extraMetadata); emitCommitMetrics(instantTime, metadata, actionType); - LOG.info("Committed " + instantTime); } catch (IOException e) { throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime, @@ -182,8 +151,7 @@ public abstract class AbstractHoodieWriteClient e * @param instantTime Instant Time * @param extraMetadata Additional Metadata passed by user */ - protected abstract void postCommit(HoodieCommitMetadata metadata, String instantTime, - Option> extraMetadata); + protected abstract void postCommit(HoodieCommitMetadata metadata, String instantTime, Option> extraMetadata); /** * Finalize Write operation. @@ -260,7 +228,7 @@ public abstract class AbstractHoodieWriteClient e protected HoodieTable getTableAndInitCtx(WriteOperationType operationType) { HoodieTableMetaClient metaClient = createMetaClient(true); if (operationType == WriteOperationType.DELETE) { - setWriteSchemaFromLastInstant(metaClient); + setWriteSchemaForDeletes(metaClient); } // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = HoodieTable.create(metaClient, config, jsc); @@ -275,7 +243,7 @@ public abstract class AbstractHoodieWriteClient e /** * Sets write schema from last instant since deletes may not have schema set in the config. */ - private void setWriteSchemaFromLastInstant(HoodieTableMetaClient metaClient) { + private void setWriteSchemaForDeletes(HoodieTableMetaClient metaClient) { try { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); Option lastInstant = diff --git a/hudi-client/src/main/java/org/apache/hudi/client/CompactionAdminClient.java b/hudi-client/src/main/java/org/apache/hudi/client/CompactionAdminClient.java index 9f9ce1254..627348cb8 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/CompactionAdminClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/CompactionAdminClient.java @@ -18,9 +18,10 @@ package org.apache.hudi.client; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.common.model.FileSlice; @@ -41,10 +42,7 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.table.compact.OperationResult; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; +import org.apache.hudi.table.action.compact.OperationResult; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; @@ -71,10 +69,6 @@ public class CompactionAdminClient extends AbstractHoodieClient { super(jsc, HoodieWriteConfig.newBuilder().withPath(basePath).build()); } - public CompactionAdminClient(JavaSparkContext jsc, String basePath, Option timelineServer) { - super(jsc, HoodieWriteConfig.newBuilder().withPath(basePath).build(), timelineServer); - } - /** * Validate all compaction operations in a compaction plan. Verifies the file-slices are consistent with corresponding * compaction operations. diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java index c7de8dfbc..5ae7ca021 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java @@ -19,64 +19,48 @@ package org.apache.hudi.client; import com.codahale.metrics.Timer; -import org.apache.avro.Schema; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; -import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.client.embedded.EmbeddedTimelineService; -import org.apache.hudi.client.utils.SparkConfigUtils; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; -import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; -import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; -import org.apache.hudi.exception.HoodieCompactionException; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.exception.HoodieRestoreException; import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.exception.HoodieSavepointException; -import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.metrics.HoodieMetrics; -import org.apache.hudi.table.HoodieCommitArchiveLog; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.HoodieTimelineArchiveLog; import org.apache.hudi.table.UserDefinedBulkInsertPartitioner; -import org.apache.hudi.table.action.commit.HoodieWriteMetadata; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hudi.table.action.compact.CompactHelpers; +import org.apache.hudi.table.action.savepoint.SavepointHelpers; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.PairFunction; -import scala.Tuple2; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -171,9 +155,9 @@ public class HoodieWriteClient extends AbstractHo */ public JavaRDD upsert(JavaRDD> records, final String instantTime) { HoodieTable table = getTableAndInitCtx(WriteOperationType.UPSERT); - validateSchema(table, true); + table.validateUpsertSchema(); setOperationType(WriteOperationType.UPSERT); - HoodieWriteMetadata result = table.upsert(jsc,instantTime, records); + HoodieWriteMetadata result = table.upsert(jsc, instantTime, records); if (result.getIndexLookupDuration().isPresent()) { metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis()); } @@ -191,7 +175,7 @@ public class HoodieWriteClient extends AbstractHo */ public JavaRDD upsertPreppedRecords(JavaRDD> preppedRecords, final String instantTime) { HoodieTable table = getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED); - validateSchema(table, true); + table.validateUpsertSchema(); setOperationType(WriteOperationType.UPSERT_PREPPED); HoodieWriteMetadata result = table.upsertPrepped(jsc,instantTime, preppedRecords); return postWrite(result, instantTime, table); @@ -209,7 +193,7 @@ public class HoodieWriteClient extends AbstractHo */ public JavaRDD insert(JavaRDD> records, final String instantTime) { HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT); - validateSchema(table, false); + table.validateInsertSchema(); setOperationType(WriteOperationType.INSERT); HoodieWriteMetadata result = table.insert(jsc,instantTime, records); return postWrite(result, instantTime, table); @@ -228,7 +212,7 @@ public class HoodieWriteClient extends AbstractHo */ public JavaRDD insertPreppedRecords(JavaRDD> preppedRecords, final String instantTime) { HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_PREPPED); - validateSchema(table, false); + table.validateInsertSchema(); setOperationType(WriteOperationType.INSERT_PREPPED); HoodieWriteMetadata result = table.insertPrepped(jsc,instantTime, preppedRecords); return postWrite(result, instantTime, table); @@ -267,6 +251,7 @@ public class HoodieWriteClient extends AbstractHo public JavaRDD bulkInsert(JavaRDD> records, final String instantTime, Option bulkInsertPartitioner) { HoodieTable table = getTableAndInitCtx(WriteOperationType.BULK_INSERT); + table.validateInsertSchema(); setOperationType(WriteOperationType.BULK_INSERT); HoodieWriteMetadata result = table.bulkInsert(jsc,instantTime, records, bulkInsertPartitioner); return postWrite(result, instantTime, table); @@ -291,6 +276,7 @@ public class HoodieWriteClient extends AbstractHo public JavaRDD bulkInsertPreppedRecords(JavaRDD> preppedRecords, final String instantTime, Option bulkInsertPartitioner) { HoodieTable table = getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED); + table.validateInsertSchema(); setOperationType(WriteOperationType.BULK_INSERT_PREPPED); HoodieWriteMetadata result = table.bulkInsertPrepped(jsc,instantTime, preppedRecords, bulkInsertPartitioner); return postWrite(result, instantTime, table); @@ -344,12 +330,12 @@ public class HoodieWriteClient extends AbstractHo // Do an inline compaction if enabled if (config.isInlineCompaction()) { metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true"); - forceCompact(extraMetadata); + inlineCompact(extraMetadata); } else { metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false"); } // We cannot have unbounded commit files. Archive commits if we have to archive - HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(config, createMetaClient(true)); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, createMetaClient(true)); archiveLog.archiveIfRequired(jsc); if (config.isAutoClean()) { // Call clean to cleanup if there is anything to cleanup after the commit, @@ -364,35 +350,25 @@ public class HoodieWriteClient extends AbstractHo } /** - * Savepoint a specific commit. Latest version of data files as of the passed in commitTime will be referenced in the - * savepoint and will never be cleaned. The savepointed commit will never be rolledback or archived. - *

- * This gives an option to rollback the state to the savepoint anytime. Savepoint needs to be manually created and - * deleted. - *

- * Savepoint should be on a commit that could not have been cleaned. + * Create a savepoint based on the latest commit action on the timeline. * * @param user - User creating the savepoint * @param comment - Comment for the savepoint - * @return true if the savepoint was created successfully */ - public boolean savepoint(String user, String comment) { + public void savepoint(String user, String comment) { HoodieTable table = HoodieTable.create(config, jsc); if (table.getCompletedCommitsTimeline().empty()) { throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty"); } - if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { - throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types"); - } String latestCommit = table.getCompletedCommitsTimeline().lastInstant().get().getTimestamp(); LOG.info("Savepointing latest commit " + latestCommit); - return savepoint(latestCommit, user, comment); + savepoint(latestCommit, user, comment); } /** - * Savepoint a specific commit. Latest version of data files as of the passed in instantTime will be referenced in the - * savepoint and will never be cleaned. The savepointed commit will never be rolledback or archived. + * Savepoint a specific commit instant time. Latest version of data files as of the passed in instantTime + * will be referenced in the savepoint and will never be cleaned. The savepointed commit will never be rolledback or archived. *

* This gives an option to rollback the state to the savepoint anytime. Savepoint needs to be manually created and * deleted. @@ -402,60 +378,10 @@ public class HoodieWriteClient extends AbstractHo * @param instantTime - commit that should be savepointed * @param user - User creating the savepoint * @param comment - Comment for the savepoint - * @return true if the savepoint was created successfully */ - public boolean savepoint(String instantTime, String user, String comment) { + public void savepoint(String instantTime, String user, String comment) { HoodieTable table = HoodieTable.create(config, jsc); - if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { - throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types"); - } - Option cleanInstant = table.getCompletedCleanTimeline().lastInstant(); - - HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime); - if (!table.getCompletedCommitsTimeline().containsInstant(commitInstant)) { - throw new HoodieSavepointException("Could not savepoint non-existing commit " + commitInstant); - } - - try { - // Check the last commit that was not cleaned and check if savepoint time is > that commit - String lastCommitRetained; - if (cleanInstant.isPresent()) { - HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils - .deserializeHoodieCleanMetadata(table.getActiveTimeline().getInstantDetails(cleanInstant.get()).get()); - lastCommitRetained = cleanMetadata.getEarliestCommitToRetain(); - } else { - lastCommitRetained = table.getCompletedCommitsTimeline().firstInstant().get().getTimestamp(); - } - - // Cannot allow savepoint time on a commit that could have been cleaned - ValidationUtils.checkArgument( - HoodieTimeline.compareTimestamps(instantTime, lastCommitRetained, HoodieTimeline.GREATER_OR_EQUAL), - "Could not savepoint commit " + instantTime + " as this is beyond the lookup window " + lastCommitRetained); - - Map> latestFilesMap = jsc - .parallelize(FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath(), - config.shouldAssumeDatePartitioning())) - .mapToPair((PairFunction>) partitionPath -> { - // Scan all partitions files with this commit time - LOG.info("Collecting latest files in partition path " + partitionPath); - BaseFileOnlyView view = table.getBaseFileOnlyView(); - List latestFiles = view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime) - .map(HoodieBaseFile::getFileName).collect(Collectors.toList()); - return new Tuple2<>(partitionPath, latestFiles); - }).collectAsMap(); - - HoodieSavepointMetadata metadata = TimelineMetadataUtils.convertSavepointMetadata(user, comment, latestFilesMap); - // Nothing to save in the savepoint - table.getActiveTimeline().createNewInstant( - new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, instantTime)); - table.getActiveTimeline() - .saveAsComplete(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, instantTime), - TimelineMetadataUtils.serializeSavepointMetadata(metadata)); - LOG.info("Savepoint " + instantTime + " created"); - return true; - } catch (IOException e) { - throw new HoodieSavepointException("Failed to savepoint " + instantTime, e); - } + table.savepoint(jsc, instantTime, user, comment); } /** @@ -467,88 +393,24 @@ public class HoodieWriteClient extends AbstractHo */ public void deleteSavepoint(String savepointTime) { HoodieTable table = HoodieTable.create(config, jsc); - if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { - throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types"); - } - HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); - - HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime); - boolean isSavepointPresent = table.getCompletedSavepointTimeline().containsInstant(savePoint); - if (!isSavepointPresent) { - LOG.warn("No savepoint present " + savepointTime); - return; - } - - activeTimeline.revertToInflight(savePoint); - activeTimeline.deleteInflight(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, savepointTime)); - LOG.info("Savepoint " + savepointTime + " deleted"); + SavepointHelpers.deleteSavepoint(table, savepointTime); } /** - * Delete a compaction request that is pending. + * Restore the data to the savepoint. * - * NOTE - This is an Admin operation. With async compaction, this is expected to be called with async compaction and - * write shutdown. Otherwise, async compactor could fail with errors - * - * @param compactionTime - delete the compaction time - */ - private void deleteRequestedCompaction(String compactionTime) { - HoodieTable table = HoodieTable.create(config, jsc); - HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); - HoodieInstant compactionRequestedInstant = - new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionTime); - boolean isCompactionInstantInRequestedState = - table.getActiveTimeline().filterPendingCompactionTimeline().containsInstant(compactionRequestedInstant); - HoodieTimeline commitTimeline = table.getCompletedCommitTimeline(); - if (commitTimeline.empty() && !commitTimeline.findInstantsAfter(compactionTime, Integer.MAX_VALUE).empty()) { - throw new HoodieRollbackException( - "Found commits after time :" + compactionTime + ", please rollback greater commits first"); - } - if (isCompactionInstantInRequestedState) { - activeTimeline.deleteCompactionRequested(compactionRequestedInstant); - } else { - throw new IllegalArgumentException("Compaction is not in requested state " + compactionTime); - } - LOG.info("Compaction " + compactionTime + " deleted"); - } - - /** - * Restore the state to the savepoint. WARNING: This rollsback recent commits and deleted data files. Queries - * accessing the files will mostly fail. This should be done during a downtime. + * WARNING: This rolls back recent commits and deleted data files and also pending compactions after savepoint time. + * Queries accessing the files will mostly fail. This is expected to be a manual operation and no concurrent write or + * compaction is expected to be running * * @param savepointTime - savepoint time to rollback to - * @return true if the savepoint was rollecback to successfully + * @return true if the savepoint was restored to successfully */ - public boolean restoreToSavepoint(String savepointTime) { + public void restoreToSavepoint(String savepointTime) { HoodieTable table = HoodieTable.create(config, jsc); - HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); - - // Rollback to savepoint is expected to be a manual operation and no concurrent write or compaction is expected - // to be running. Rollback to savepoint also removes any pending compaction actions that are generated after - // savepoint time. Allowing pending compaction to be retained is not safe as those workload could be referencing - // file-slices that will be rolled-back as part of this operation - HoodieTimeline instantTimeline = table.getMetaClient().getCommitsAndCompactionTimeline(); - - HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime); - boolean isSavepointPresent = table.getCompletedSavepointTimeline().containsInstant(savePoint); - if (!isSavepointPresent) { - throw new HoodieRollbackException("No savepoint for instantTime " + savepointTime); - } - - List commitsToRollback = instantTimeline.findInstantsAfter(savepointTime, Integer.MAX_VALUE).getInstants() - .map(HoodieInstant::getTimestamp).collect(Collectors.toList()); - LOG.info("Rolling back commits " + commitsToRollback); - + SavepointHelpers.validateSavepointPresence(table, savepointTime); restoreToInstant(savepointTime); - - // Make sure the rollback was successful - Option lastInstant = - activeTimeline.reload().getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants().lastInstant(); - ValidationUtils.checkArgument(lastInstant.isPresent()); - ValidationUtils.checkArgument(lastInstant.get().getTimestamp().equals(savepointTime), - savepointTime + "is not the last commit after rolling back " + commitsToRollback + ", last commit was " - + lastInstant.get().getTimestamp()); - return true; + SavepointHelpers.validateSavepointRestore(table, savepointTime); } /** @@ -574,7 +436,7 @@ public class HoodieWriteClient extends AbstractHo } return true; } else { - LOG.info("Cannot find instant " + commitInstantTime + " in the timeline, for rollback"); + LOG.warn("Cannot find instant " + commitInstantTime + " in the timeline, for rollback"); return false; } } catch (Exception e) { @@ -626,9 +488,7 @@ public class HoodieWriteClient extends AbstractHo public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOException { LOG.info("Cleaner started"); final Timer.Context context = metrics.getCleanCtx(); - HoodieCleanMetadata metadata = HoodieTable.create(config, jsc).clean(jsc, cleanInstantTime); - if (context != null) { long durationMs = metrics.getDurationInMs(context.stop()); metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted()); @@ -636,7 +496,6 @@ public class HoodieWriteClient extends AbstractHo + " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain() + " cleanerElaspsedMs" + durationMs); } - return metadata; } @@ -692,11 +551,9 @@ public class HoodieWriteClient extends AbstractHo * * @param extraMetadata Extra Metadata to be stored */ - public Option scheduleCompaction(Option> extraMetadata) throws IOException { + public Option scheduleCompaction(Option> extraMetadata) throws HoodieIOException { String instantTime = HoodieActiveTimeline.createNewInstantTime(); - LOG.info("Generate a new instant time " + instantTime); - boolean notEmpty = scheduleCompactionAtInstant(instantTime, extraMetadata); - return notEmpty ? Option.of(instantTime) : Option.empty(); + return scheduleCompactionAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty(); } /** @@ -705,35 +562,11 @@ public class HoodieWriteClient extends AbstractHo * @param instantTime Compaction Instant Time * @param extraMetadata Extra Metadata to be stored */ - public boolean scheduleCompactionAtInstant(String instantTime, Option> extraMetadata) - throws IOException { - HoodieTableMetaClient metaClient = createMetaClient(true); - // if there are inflight writes, their instantTime must not be less than that of compaction instant time - metaClient.getCommitsTimeline().filterPendingExcludingCompaction().firstInstant().ifPresent(earliestInflight -> { - ValidationUtils.checkArgument( - HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), instantTime, HoodieTimeline.GREATER), - "Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight - + ", Compaction scheduled at " + instantTime); - }); - // Committed and pending compaction instants should have strictly lower timestamps - List conflictingInstants = metaClient - .getActiveTimeline().getCommitsAndCompactionTimeline().getInstants().filter(instant -> HoodieTimeline - .compareTimestamps(instant.getTimestamp(), instantTime, HoodieTimeline.GREATER_OR_EQUAL)) - .collect(Collectors.toList()); - ValidationUtils.checkArgument(conflictingInstants.isEmpty(), - "Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :" - + conflictingInstants); - HoodieTable table = HoodieTable.create(metaClient, config, jsc); - HoodieCompactionPlan workload = table.scheduleCompaction(jsc, instantTime); - if (workload != null && (workload.getOperations() != null) && (!workload.getOperations().isEmpty())) { - extraMetadata.ifPresent(workload::setExtraMetadata); - HoodieInstant compactionInstant = - new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime); - metaClient.getActiveTimeline().saveToCompactionRequested(compactionInstant, - TimelineMetadataUtils.serializeCompactionPlan(workload)); - return true; - } - return false; + public boolean scheduleCompactionAtInstant(String instantTime, Option> extraMetadata) throws HoodieIOException { + LOG.info("Scheduling compaction at instant time :" + instantTime); + Option plan = HoodieTable.create(config, jsc) + .scheduleCompaction(jsc, instantTime, extraMetadata); + return plan.isPresent(); } /** @@ -742,7 +575,7 @@ public class HoodieWriteClient extends AbstractHo * @param compactionInstantTime Compaction Instant Time * @return RDD of WriteStatus to inspect errors and counts */ - public JavaRDD compact(String compactionInstantTime) throws IOException { + public JavaRDD compact(String compactionInstantTime) { return compact(compactionInstantTime, config.shouldAutoCommit()); } @@ -754,24 +587,47 @@ public class HoodieWriteClient extends AbstractHo * @param extraMetadata Extra Metadata to be stored */ public void commitCompaction(String compactionInstantTime, JavaRDD writeStatuses, - Option> extraMetadata) throws IOException { - HoodieTableMetaClient metaClient = createMetaClient(true); - HoodieTable table = HoodieTable.create(metaClient, config, jsc); - HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); - HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeCompactionPlan( - timeline.readCompactionPlanAsBytes(HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get()); - // Merge extra meta-data passed by user with the one already in inflight compaction - Option> mergedMetaData = extraMetadata.map(m -> { - Map merged = new HashMap<>(); - Map extraMetaDataFromInstantFile = compactionPlan.getExtraMetadata(); - if (extraMetaDataFromInstantFile != null) { - merged.putAll(extraMetaDataFromInstantFile); + Option> extraMetadata) throws IOException { + HoodieTable table = HoodieTable.create(config, jsc); + HoodieCommitMetadata metadata = CompactHelpers.createCompactionMetadata( + table, compactionInstantTime, writeStatuses, config.getSchema()); + extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata)); + completeCompaction(metadata, writeStatuses, table, compactionInstantTime); + } + + /** + * Commit Compaction and track metrics. + */ + protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD writeStatuses, HoodieTable table, + String compactionCommitTime) { + + List writeStats = writeStatuses.map(WriteStatus::getStat).collect(); + finalizeWrite(table, compactionCommitTime, writeStats); + LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata); + CompactHelpers.completeInflightCompaction(table, compactionCommitTime, metadata); + + if (compactionTimer != null) { + long durationInMs = metrics.getDurationInMs(compactionTimer.stop()); + try { + metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).getTime(), + durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION); + } catch (ParseException e) { + throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " + + config.getBasePath() + " at time " + compactionCommitTime, e); } - // Overwrite/Merge with the user-passed meta-data - merged.putAll(m); - return Option.of(merged); - }).orElseGet(() -> Option.ofNullable(compactionPlan.getExtraMetadata())); - commitCompaction(writeStatuses, table, compactionInstantTime, true, mergedMetaData); + } + LOG.info("Compacted successfully on commit " + compactionCommitTime); + } + + /** + * Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file to the .requested file + * + * @param inflightInstant Inflight Compaction Instant + * @param table Hoodie Table + */ + public void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) { + table.rollback(jsc, HoodieActiveTimeline.createNewInstantTime(), inflightInstant, false); + table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant); } /** @@ -793,192 +649,32 @@ public class HoodieWriteClient extends AbstractHo * @param compactionInstantTime Compaction Instant Time * @return RDD of Write Status */ - private JavaRDD compact(String compactionInstantTime, boolean autoCommit) throws IOException { - // Create a Hoodie table which encapsulated the commits and files visible - HoodieTableMetaClient metaClient = createMetaClient(true); - HoodieTable table = HoodieTable.create(metaClient, config, jsc); - HoodieTimeline pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline(); + private JavaRDD compact(String compactionInstantTime, boolean shouldComplete) { + HoodieTable table = HoodieTable.create(config, jsc); + HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); if (pendingCompactionTimeline.containsInstant(inflightInstant)) { rollbackInflightCompaction(inflightInstant, table); - metaClient.reloadActiveTimeline(); - pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline(); + table.getMetaClient().reloadActiveTimeline(); } - - HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); - if (pendingCompactionTimeline.containsInstant(instant)) { - return runCompaction(instant, metaClient.getActiveTimeline(), autoCommit); - } else { - throw new IllegalStateException( - "No Compaction request available at " + compactionInstantTime + " to run compaction"); - } - } - - /** - * Perform compaction operations as specified in the compaction commit file. - * - * @param compactionInstant Compaction Instant time - * @param activeTimeline Active Timeline - * @param autoCommit Commit after compaction - * @return RDD of Write Status - */ - private JavaRDD runCompaction(HoodieInstant compactionInstant, HoodieActiveTimeline activeTimeline, - boolean autoCommit) throws IOException { - HoodieTableMetaClient metaClient = createMetaClient(true); - HoodieCompactionPlan compactionPlan = - CompactionUtils.getCompactionPlan(metaClient, compactionInstant.getTimestamp()); - // Mark instant as compaction inflight - activeTimeline.transitionCompactionRequestedToInflight(compactionInstant); compactionTimer = metrics.getCompactionCtx(); - // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.create(metaClient, config, jsc); - JavaRDD statuses = table.compact(jsc, compactionInstant.getTimestamp(), compactionPlan); - // Force compaction action - statuses.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps())); - // pass extra-metada so that it gets stored in commit file automatically - commitCompaction(statuses, table, compactionInstant.getTimestamp(), autoCommit, - Option.ofNullable(compactionPlan.getExtraMetadata())); + HoodieWriteMetadata compactionMetadata = table.compact(jsc, compactionInstantTime); + JavaRDD statuses = compactionMetadata.getWriteStatuses(); + if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) { + completeCompaction(compactionMetadata.getCommitMetadata().get(), statuses, table, compactionInstantTime); + } return statuses; } - /** - * Commit Compaction and track metrics. - * - * @param compactedStatuses Compaction Write status - * @param table Hoodie Table - * @param compactionCommitTime Compaction Commit Time - * @param autoCommit Auto Commit - * @param extraMetadata Extra Metadata to store - */ - protected void commitCompaction(JavaRDD compactedStatuses, HoodieTable table, - String compactionCommitTime, boolean autoCommit, Option> extraMetadata) { - if (autoCommit) { - HoodieCommitMetadata metadata = doCompactionCommit(table, compactedStatuses, compactionCommitTime, extraMetadata); - if (compactionTimer != null) { - long durationInMs = metrics.getDurationInMs(compactionTimer.stop()); - try { - metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).getTime(), - durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION); - } catch (ParseException e) { - throw new HoodieCommitException("Commit time is not of valid format.Failed to commit compaction " - + config.getBasePath() + " at time " + compactionCommitTime, e); - } - } - LOG.info("Compacted successfully on commit " + compactionCommitTime); - } else { - LOG.info("Compaction did not run for commit " + compactionCommitTime); - } - } - - /** - * Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file to the .requested file - * - * @param inflightInstant Inflight Compaction Instant - * @param table Hoodie Table - */ - public void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) { - table.rollback(jsc, HoodieActiveTimeline.createNewInstantTime(), inflightInstant, false); - table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant); - } - - private HoodieCommitMetadata doCompactionCommit(HoodieTable table, JavaRDD writeStatuses, - String compactionCommitTime, Option> extraMetadata) { - HoodieTableMetaClient metaClient = table.getMetaClient(); - List updateStatusMap = writeStatuses.map(WriteStatus::getStat).collect(); - - HoodieCommitMetadata metadata = new HoodieCommitMetadata(true); - for (HoodieWriteStat stat : updateStatusMap) { - metadata.addWriteStat(stat.getPartitionPath(), stat); - } - - metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema()); - - // Finalize write - finalizeWrite(table, compactionCommitTime, updateStatusMap); - - // Copy extraMetadata - extraMetadata.ifPresent(m -> { - m.forEach(metadata::addMetadata); - }); - - LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata); - HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - - try { - activeTimeline.transitionCompactionInflightToComplete( - new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime), - Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); - } catch (IOException e) { - throw new HoodieCompactionException( - "Failed to commit " + metaClient.getBasePath() + " at time " + compactionCommitTime, e); - } - return metadata; - } - /** * Performs a compaction operation on a table, serially before or after an insert/upsert action. */ - private Option forceCompact(Option> extraMetadata) throws IOException { + private Option inlineCompact(Option> extraMetadata) { Option compactionInstantTimeOpt = scheduleCompaction(extraMetadata); compactionInstantTimeOpt.ifPresent(compactionInstantTime -> { - try { - // inline compaction should auto commit as the user is never given control - compact(compactionInstantTime, true); - } catch (IOException ioe) { - throw new HoodieIOException(ioe.getMessage(), ioe); - } + // inline compaction should auto commit as the user is never given control + compact(compactionInstantTime, true); }); return compactionInstantTimeOpt; } - - /** - * Ensure that the current writerSchema is compatible with the latest schema of this dataset. - * - * When inserting/updating data, we read records using the last used schema and convert them to the - * GenericRecords with writerSchema. Hence, we need to ensure that this conversion can take place without errors. - * - * @param hoodieTable The Hoodie Table - * @param isUpsert If this is a check during upserts - * @throws HoodieUpsertException If schema check fails during upserts - * @throws HoodieInsertException If schema check fails during inserts - */ - private void validateSchema(HoodieTable hoodieTable, final boolean isUpsert) - throws HoodieUpsertException, HoodieInsertException { - - if (!getConfig().getAvroSchemaValidate()) { - // Check not required - return; - } - - boolean isValid = false; - String errorMsg = "WriterSchema is not compatible with the schema present in the Table"; - Throwable internalError = null; - Schema tableSchema = null; - Schema writerSchema = null; - try { - TableSchemaResolver schemaUtil = new TableSchemaResolver(hoodieTable.getMetaClient()); - writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema()); - tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableSchemaFromCommitMetadata()); - isValid = schemaUtil.isSchemaCompatible(tableSchema, writerSchema); - } catch (Exception e) { - // Two error cases are possible: - // 1. There was no schema as no data has been inserted yet (first time only) - // 2. Failure in reading the schema - isValid = hoodieTable.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 0; - errorMsg = "Failed to read latest schema on path " + basePath; - internalError = e; - } - - if (!isValid) { - LOG.error(errorMsg); - LOG.warn("WriterSchema: " + writerSchema); - LOG.warn("Table latest schema: " + tableSchema); - if (isUpsert) { - throw new HoodieUpsertException(errorMsg, internalError); - } else { - throw new HoodieInsertException(errorMsg, internalError); - } - } - } - } \ No newline at end of file diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index 910c04c52..bb087a268 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -22,8 +22,8 @@ import org.apache.hudi.common.config.DefaultHoodieConfig; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.table.compact.strategy.CompactionStrategy; -import org.apache.hudi.table.compact.strategy.LogFileSizeBasedCompactionStrategy; +import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; +import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy; import javax.annotation.concurrent.Immutable; diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 50af72569..2d98edca4 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -28,7 +28,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.metrics.MetricsReporterType; -import org.apache.hudi.table.compact.strategy.CompactionStrategy; +import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; import org.apache.parquet.hadoop.metadata.CompressionCodecName; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java index 087a2b0f1..73c1b23a8 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java @@ -24,6 +24,7 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.utils.ParquetReaderIterator; import org.apache.hudi.common.model.HoodieBaseFile; @@ -46,7 +47,7 @@ import org.apache.hudi.execution.SparkBoundedInMemoryExecutor; import org.apache.hudi.io.HoodieCreateHandle; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.table.action.clean.CleanActionExecutor; -import org.apache.hudi.table.action.commit.HoodieWriteMetadata; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.commit.BulkInsertCommitActionExecutor; import org.apache.hudi.table.action.commit.BulkInsertPreppedCommitActionExecutor; import org.apache.hudi.table.action.commit.DeleteCommitActionExecutor; @@ -56,6 +57,7 @@ import org.apache.hudi.table.action.commit.UpsertCommitActionExecutor; import org.apache.hudi.table.action.commit.UpsertPreppedCommitActionExecutor; import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor; import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor; +import org.apache.hudi.table.action.savepoint.SavepointActionExecutor; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroParquetReader; @@ -130,14 +132,13 @@ public class HoodieCopyOnWriteTable extends Hoodi } @Override - public HoodieCompactionPlan scheduleCompaction(JavaSparkContext jsc, String instantTime) { - throw new HoodieNotSupportedException("Compaction is not supported from a CopyOnWrite table"); + public Option scheduleCompaction(JavaSparkContext jsc, String instantTime, Option> extraMetadata) { + throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table"); } @Override - public JavaRDD compact(JavaSparkContext jsc, String compactionInstantTime, - HoodieCompactionPlan compactionPlan) { - throw new HoodieNotSupportedException("Compaction is not supported from a CopyOnWrite table"); + public HoodieWriteMetadata compact(JavaSparkContext jsc, String compactionInstantTime) { + throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table"); } public Iterator> handleUpdate(String instantTime, String partitionPath, String fileId, @@ -202,6 +203,11 @@ public class HoodieCopyOnWriteTable extends Hoodi return new CopyOnWriteRollbackActionExecutor(jsc, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute(); } + @Override + public HoodieSavepointMetadata savepoint(JavaSparkContext jsc, String instantToSavepoint, String user, String comment) { + return new SavepointActionExecutor(jsc, config, this, instantToSavepoint, user, comment).execute(); + } + public HoodieRestoreMetadata restore(JavaSparkContext jsc, String restoreInstantTime, String instantToRestore) { return new CopyOnWriteRestoreActionExecutor(jsc, config, this, restoreInstantTime, instantToRestore).execute(); } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java index 844b9ad2e..cacc1efc5 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java @@ -21,19 +21,17 @@ package org.apache.hudi.table; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; -import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieCompactionException; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.table.action.commit.HoodieWriteMetadata; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hudi.table.action.compact.RunCompactionActionExecutor; import org.apache.hudi.table.action.deltacommit.BulkInsertDeltaCommitActionExecutor; import org.apache.hudi.table.action.deltacommit.BulkInsertPreppedDeltaCommitActionExecutor; import org.apache.hudi.table.action.deltacommit.DeleteDeltaCommitActionExecutor; @@ -41,17 +39,16 @@ import org.apache.hudi.table.action.deltacommit.InsertDeltaCommitActionExecutor; import org.apache.hudi.table.action.deltacommit.InsertPreppedDeltaCommitActionExecutor; import org.apache.hudi.table.action.deltacommit.UpsertDeltaCommitActionExecutor; import org.apache.hudi.table.action.deltacommit.UpsertPreppedDeltaCommitActionExecutor; +import org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor; import org.apache.hudi.table.action.restore.MergeOnReadRestoreActionExecutor; import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor; -import org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import java.io.IOException; import java.util.List; -import java.util.stream.Collectors; +import java.util.Map; /** * Implementation of a more real-time Hoodie Table the provides tradeoffs on read and write cost/amplification. @@ -119,46 +116,16 @@ public class HoodieMergeOnReadTable extends Hoodi } @Override - public HoodieCompactionPlan scheduleCompaction(JavaSparkContext jsc, String instantTime) { - LOG.info("Checking if compaction needs to be run on " + config.getBasePath()); - Option lastCompaction = - getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant(); - String deltaCommitsSinceTs = "0"; - if (lastCompaction.isPresent()) { - deltaCommitsSinceTs = lastCompaction.get().getTimestamp(); - } - - int deltaCommitsSinceLastCompaction = getActiveTimeline().getDeltaCommitTimeline() - .findInstantsAfter(deltaCommitsSinceTs, Integer.MAX_VALUE).countInstants(); - if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) { - LOG.info("Not running compaction as only " + deltaCommitsSinceLastCompaction - + " delta commits was found since last compaction " + deltaCommitsSinceTs + ". Waiting for " - + config.getInlineCompactDeltaCommitMax()); - return new HoodieCompactionPlan(); - } - - LOG.info("Compacting merge on read table " + config.getBasePath()); - HoodieMergeOnReadTableCompactor compactor = new HoodieMergeOnReadTableCompactor(); - try { - return compactor.generateCompactionPlan(jsc, this, config, instantTime, - ((SyncableFileSystemView) getSliceView()).getPendingCompactionOperations() - .map(instantTimeCompactionopPair -> instantTimeCompactionopPair.getValue().getFileGroupId()) - .collect(Collectors.toSet())); - - } catch (IOException e) { - throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e); - } + public Option scheduleCompaction(JavaSparkContext jsc, String instantTime, Option> extraMetadata) { + ScheduleCompactionActionExecutor scheduleCompactionExecutor = new ScheduleCompactionActionExecutor( + jsc, config, this, instantTime, extraMetadata); + return scheduleCompactionExecutor.execute(); } @Override - public JavaRDD compact(JavaSparkContext jsc, String compactionInstantTime, - HoodieCompactionPlan compactionPlan) { - HoodieMergeOnReadTableCompactor compactor = new HoodieMergeOnReadTableCompactor(); - try { - return compactor.compact(jsc, compactionPlan, this, config, compactionInstantTime); - } catch (IOException e) { - throw new HoodieCompactionException("Could not compact " + config.getBasePath(), e); - } + public HoodieWriteMetadata compact(JavaSparkContext jsc, String compactionInstantTime) { + RunCompactionActionExecutor compactionExecutor = new RunCompactionActionExecutor(jsc, config, this, compactionInstantTime); + return compactionExecutor.execute(); } @Override diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java index 0275b72d6..990441139 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -18,16 +18,17 @@ package org.apache.hudi.table; +import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.client.SparkTaskContextSupplier; -import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.ConsistencyGuard; import org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility; @@ -38,10 +39,10 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; @@ -54,9 +55,10 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.exception.HoodieSavepointException; +import org.apache.hudi.exception.HoodieInsertException; +import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.index.HoodieIndex; -import org.apache.hudi.table.action.commit.HoodieWriteMetadata; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -295,24 +297,6 @@ public abstract class HoodieTable implements Seri return getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList()); } - /** - * Get the list of data file names savepointed. - */ - public Stream getSavepointedDataFiles(String savepointTime) { - if (!getSavepoints().contains(savepointTime)) { - throw new HoodieSavepointException( - "Could not get data files for savepoint " + savepointTime + ". No such savepoint."); - } - HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime); - HoodieSavepointMetadata metadata; - try { - metadata = TimelineMetadataUtils.deserializeHoodieSavepointMetadata(getActiveTimeline().getInstantDetails(instant).get()); - } catch (IOException e) { - throw new HoodieSavepointException("Could not get savepointed data files for savepoint " + savepointTime, e); - } - return metadata.getPartitionMetadata().values().stream().flatMap(s -> s.getSavepointDataFile().stream()); - } - public HoodieActiveTimeline getActiveTimeline() { return metaClient.getActiveTimeline(); } @@ -329,19 +313,21 @@ public abstract class HoodieTable implements Seri * * @param jsc Spark Context * @param instantTime Instant Time for scheduling compaction + * @param extraMetadata additional metadata to write into plan * @return */ - public abstract HoodieCompactionPlan scheduleCompaction(JavaSparkContext jsc, String instantTime); + public abstract Option scheduleCompaction(JavaSparkContext jsc, + String instantTime, + Option> extraMetadata); /** * Run Compaction on the table. Compaction arranges the data so that it is optimized for data access. * * @param jsc Spark Context * @param compactionInstantTime Instant Time - * @param compactionPlan Compaction Plan */ - public abstract JavaRDD compact(JavaSparkContext jsc, String compactionInstantTime, - HoodieCompactionPlan compactionPlan); + public abstract HoodieWriteMetadata compact(JavaSparkContext jsc, + String compactionInstantTime); /** * Executes a new clean action. @@ -365,6 +351,15 @@ public abstract class HoodieTable implements Seri HoodieInstant commitInstant, boolean deleteInstants); + /** + * Create a savepoint at the specified instant, so that the table can be restored + * to this point-in-timeline later if needed. + */ + public abstract HoodieSavepointMetadata savepoint(JavaSparkContext jsc, + String instantToSavepoint, + String user, + String comment); + /** * Restore the table to the given instant. Note that this is a admin table recovery operation * that would cause any running queries that are accessing file slices written after the instant to fail. @@ -519,4 +514,52 @@ public abstract class HoodieTable implements Seri public SparkTaskContextSupplier getSparkTaskContextSupplier() { return sparkTaskContextSupplier; } + + /** + * Ensure that the current writerSchema is compatible with the latest schema of this dataset. + * + * When inserting/updating data, we read records using the last used schema and convert them to the + * GenericRecords with writerSchema. Hence, we need to ensure that this conversion can take place without errors. + * + */ + private void validateSchema() throws HoodieUpsertException, HoodieInsertException { + + if (!config.getAvroSchemaValidate() || getActiveTimeline().getCommitsTimeline().filterCompletedInstants().empty()) { + // Check not required + return; + } + + Schema tableSchema; + Schema writerSchema; + boolean isValid; + try { + TableSchemaResolver schemaUtil = new TableSchemaResolver(getMetaClient()); + writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema()); + tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableSchemaFromCommitMetadata()); + isValid = TableSchemaResolver.isSchemaCompatible(tableSchema, writerSchema); + } catch (Exception e) { + throw new HoodieException("Failed to read schema/check compatibility for base path " + metaClient.getBasePath(), e); + } + + if (!isValid) { + throw new HoodieException("Failed schema compatibility check for writerSchema :" + writerSchema + + ", table schema :" + tableSchema + ", base path :" + metaClient.getBasePath()); + } + } + + public void validateUpsertSchema() throws HoodieUpsertException { + try { + validateSchema(); + } catch (HoodieException e) { + throw new HoodieUpsertException("Failed upsert schema compatibility check.", e); + } + } + + public void validateInsertSchema() throws HoodieInsertException { + try { + validateSchema(); + } catch (HoodieException e) { + throw new HoodieInsertException("Failed insert schema compability check.", e); + } + } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java similarity index 98% rename from hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java rename to hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java index 5bd81eeed..bfe9c9dbc 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java @@ -68,18 +68,18 @@ import java.util.stream.Collectors; import java.util.stream.Stream; /** - * Archiver to bound the growth of .commit files. + * Archiver to bound the growth of files under .hoodie meta path. */ -public class HoodieCommitArchiveLog { +public class HoodieTimelineArchiveLog { - private static final Logger LOG = LogManager.getLogger(HoodieCommitArchiveLog.class); + private static final Logger LOG = LogManager.getLogger(HoodieTimelineArchiveLog.class); private final Path archiveFilePath; private final HoodieTableMetaClient metaClient; private final HoodieWriteConfig config; private Writer writer; - public HoodieCommitArchiveLog(HoodieWriteConfig config, HoodieTableMetaClient metaClient) { + public HoodieTimelineArchiveLog(HoodieWriteConfig config, HoodieTableMetaClient metaClient) { this.config = config; this.metaClient = metaClient; this.archiveFilePath = HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath()); diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteMetadata.java b/hudi-client/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java similarity index 98% rename from hudi-client/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteMetadata.java rename to hudi-client/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java index 64fd4df7e..c46102a4a 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteMetadata.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.table.action.commit; +package org.apache.hudi.table.action; import java.util.List; import org.apache.hudi.client.WriteStatus; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 62203a382..9750162b3 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -19,6 +19,7 @@ package org.apache.hudi.table.action.clean; import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.common.model.FileSlice; @@ -39,6 +40,7 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieSavepointException; import org.apache.hudi.table.HoodieTable; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -50,6 +52,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Cleaner is responsible for garbage collecting older files in a given partition path. Such that @@ -81,6 +84,25 @@ public class CleanPlanner> implements Serializa .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); } + /** + * Get the list of data file names savepointed. + */ + public Stream getSavepointedDataFiles(String savepointTime) { + if (!hoodieTable.getSavepoints().contains(savepointTime)) { + throw new HoodieSavepointException( + "Could not get data files for savepoint " + savepointTime + ". No such savepoint."); + } + HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime); + HoodieSavepointMetadata metadata; + try { + metadata = TimelineMetadataUtils.deserializeHoodieSavepointMetadata( + hoodieTable.getActiveTimeline().getInstantDetails(instant).get()); + } catch (IOException e) { + throw new HoodieSavepointException("Could not get savepointed data files for savepoint " + savepointTime, e); + } + return metadata.getPartitionMetadata().values().stream().flatMap(s -> s.getSavepointDataFile().stream()); + } + /** * Returns list of partitions where clean operations needs to be performed. * @@ -131,7 +153,8 @@ public class CleanPlanner> implements Serializa List deletePaths = new ArrayList<>(); // Collect all the datafiles savepointed by all the savepoints List savepointedFiles = hoodieTable.getSavepoints().stream() - .flatMap(s -> hoodieTable.getSavepointedDataFiles(s)).collect(Collectors.toList()); + .flatMap(this::getSavepointedDataFiles) + .collect(Collectors.toList()); for (HoodieFileGroup fileGroup : fileGroups) { int keepVersions = config.getCleanerFileVersionsRetained(); @@ -190,7 +213,8 @@ public class CleanPlanner> implements Serializa // Collect all the datafiles savepointed by all the savepoints List savepointedFiles = hoodieTable.getSavepoints().stream() - .flatMap(s -> hoodieTable.getSavepointedDataFiles(s)).collect(Collectors.toList()); + .flatMap(this::getSavepointedDataFiles) + .collect(Collectors.toList()); // determine if we have enough commits, to start cleaning. if (commitTimeline.countInstants() > commitsRetained) { diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java index e72c8010f..93a655ea8 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java @@ -39,6 +39,7 @@ import org.apache.hudi.table.WorkloadProfile; import org.apache.hudi.table.WorkloadStat; import org.apache.hudi.table.action.BaseActionExecutor; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.Partitioner; @@ -165,8 +166,6 @@ public abstract class BaseCommitActionExecutor> (HoodieTable)table); result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now())); result.setWriteStatuses(statuses); - - // Trigger the insert and collect statuses commitOnAutoCommit(result); } @@ -207,7 +206,6 @@ public abstract class BaseCommitActionExecutor> try { activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, instantTime), Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); - LOG.info("Committed " + instantTime); } catch (IOException e) { throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime, diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertCommitActionExecutor.java index e0182dad9..9f5468e5c 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertCommitActionExecutor.java @@ -27,6 +27,7 @@ import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.UserDefinedBulkInsertPartitioner; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java index fbc8dbb01..4755664b4 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java @@ -30,6 +30,7 @@ import org.apache.hudi.execution.BulkInsertMapFunction; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.UserDefinedBulkInsertPartitioner; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.spark.api.java.JavaRDD; import java.util.List; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertPreppedCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertPreppedCommitActionExecutor.java index 0b8e75fe0..3d80a0725 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertPreppedCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertPreppedCommitActionExecutor.java @@ -27,6 +27,7 @@ import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.UserDefinedBulkInsertPartitioner; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteCommitActionExecutor.java index ba25a97cd..8fa1cb7c5 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteCommitActionExecutor.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteHelper.java index 7ee891f8a..8c0b75f30 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteHelper.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteHelper.java @@ -27,6 +27,7 @@ import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.WorkloadProfile; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -58,8 +59,8 @@ public class DeleteHelper> { } public static > HoodieWriteMetadata execute(String instantTime, - JavaRDD keys, JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable table, - CommitActionExecutor deleteExecutor) { + JavaRDD keys, JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable table, + CommitActionExecutor deleteExecutor) { try { HoodieWriteMetadata result = null; // De-dupe/merge if needed diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertCommitActionExecutor.java index d08dab258..d8944e36f 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertCommitActionExecutor.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertPreppedCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertPreppedCommitActionExecutor.java index acc99021f..b7d64b124 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertPreppedCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertPreppedCommitActionExecutor.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertCommitActionExecutor.java index efdcae19e..0c4d08e35 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertCommitActionExecutor.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPreppedCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPreppedCommitActionExecutor.java index 5999104ac..d8470ea04 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPreppedCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPreppedCommitActionExecutor.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/WriteHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/WriteHelper.java index 7faee1c9d..92dcbb628 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/WriteHelper.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/WriteHelper.java @@ -25,6 +25,7 @@ import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -35,9 +36,9 @@ import scala.Tuple2; public class WriteHelper> { public static > HoodieWriteMetadata write(String instantTime, - JavaRDD> inputRecordsRDD, JavaSparkContext jsc, - HoodieTable table, boolean shouldCombine, - int shuffleParallelism, CommitActionExecutor executor, boolean performTagging) { + JavaRDD> inputRecordsRDD, JavaSparkContext jsc, + HoodieTable table, boolean shouldCombine, + int shuffleParallelism, CommitActionExecutor executor, boolean performTagging) { try { // De-dupe/merge if needed JavaRDD> dedupedRecords = diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java new file mode 100644 index 000000000..97fdd0fa4 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.compact; + +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieCompactionException; +import org.apache.hudi.table.HoodieTable; +import org.apache.spark.api.java.JavaRDD; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; + +public class CompactHelpers { + + public static HoodieCommitMetadata createCompactionMetadata(HoodieTable table, + String compactionInstantTime, + JavaRDD writeStatuses, + String schema) throws IOException { + byte[] planBytes = table.getActiveTimeline().readCompactionPlanAsBytes( + HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get(); + HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeCompactionPlan(planBytes); + List updateStatusMap = writeStatuses.map(WriteStatus::getStat).collect(); + org.apache.hudi.common.model.HoodieCommitMetadata metadata = new org.apache.hudi.common.model.HoodieCommitMetadata(true); + for (HoodieWriteStat stat : updateStatusMap) { + metadata.addWriteStat(stat.getPartitionPath(), stat); + } + metadata.addMetadata(org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY, schema); + if (compactionPlan.getExtraMetadata() != null) { + compactionPlan.getExtraMetadata().forEach(metadata::addMetadata); + } + return metadata; + } + + public static void completeInflightCompaction(HoodieTable table, String compactionCommitTime, HoodieCommitMetadata commitMetadata) { + HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); + try { + activeTimeline.transitionCompactionInflightToComplete( + new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime), + Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + } catch (IOException e) { + throw new HoodieCompactionException( + "Failed to commit " + table.getMetaClient().getBasePath() + " at time " + compactionCommitTime, e); + } + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieCompactor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java similarity index 98% rename from hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieCompactor.java rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java index f5579dd5b..c81b02885 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieCompactor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.table.compact; +package org.apache.hudi.table.action.compact; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.WriteStatus; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieMergeOnReadTableCompactor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java similarity index 98% rename from hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieMergeOnReadTableCompactor.java rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java index f71ab9518..2c0afa9cb 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieMergeOnReadTableCompactor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.table.compact; +package org.apache.hudi.table.action.compact; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCompactionOperation; @@ -42,7 +42,7 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieCopyOnWriteTable; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.compact.strategy.CompactionStrategy; +import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; import org.apache.avro.Schema; import org.apache.hadoop.fs.FileSystem; @@ -71,7 +71,6 @@ import static java.util.stream.Collectors.toList; * passes it through a CompactionFilter and executes all the compactions and writes a new version of base files and make * a normal commit * - * @see HoodieCompactor */ public class HoodieMergeOnReadTableCompactor implements HoodieCompactor { diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/OperationResult.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/OperationResult.java similarity index 97% rename from hudi-client/src/main/java/org/apache/hudi/table/compact/OperationResult.java rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/OperationResult.java index 53bc48a7e..cd00cb9e2 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/compact/OperationResult.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/OperationResult.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.table.compact; +package org.apache.hudi.table.action.compact; import org.apache.hudi.common.util.Option; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java new file mode 100644 index 000000000..2f99fa1ce --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.compact; + +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.utils.SparkConfigUtils; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.CompactionUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieCompactionException; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.BaseActionExecutor; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.IOException; +import java.util.List; + +public class RunCompactionActionExecutor extends BaseActionExecutor { + + private static final Logger LOG = LogManager.getLogger(RunCompactionActionExecutor.class); + + public RunCompactionActionExecutor(JavaSparkContext jsc, + HoodieWriteConfig config, + HoodieTable table, + String instantTime) { + super(jsc, config, table, instantTime); + } + + @Override + public HoodieWriteMetadata execute() { + HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(instantTime); + HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); + if (!pendingCompactionTimeline.containsInstant(instant)) { + throw new IllegalStateException( + "No Compaction request available at " + instantTime + " to run compaction"); + } + + HoodieWriteMetadata compactionMetadata = new HoodieWriteMetadata(); + try { + HoodieActiveTimeline timeline = table.getActiveTimeline(); + HoodieCompactionPlan compactionPlan = + CompactionUtils.getCompactionPlan(table.getMetaClient(), instantTime); + // Mark instant as compaction inflight + timeline.transitionCompactionRequestedToInflight(instant); + table.getMetaClient().reloadActiveTimeline(); + + HoodieMergeOnReadTableCompactor compactor = new HoodieMergeOnReadTableCompactor(); + JavaRDD statuses = compactor.compact(jsc, compactionPlan, table, config, instantTime); + + statuses.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps())); + List updateStatusMap = statuses.map(WriteStatus::getStat).collect(); + HoodieCommitMetadata metadata = new HoodieCommitMetadata(true); + for (HoodieWriteStat stat : updateStatusMap) { + metadata.addWriteStat(stat.getPartitionPath(), stat); + } + metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema()); + + compactionMetadata.setWriteStatuses(statuses); + compactionMetadata.setCommitted(false); + compactionMetadata.setCommitMetadata(Option.of(metadata)); + } catch (IOException e) { + throw new HoodieCompactionException("Could not compact " + config.getBasePath(), e); + } + + return compactionMetadata; + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java new file mode 100644 index 000000000..586b5b3e9 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.compact; + +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.table.view.SyncableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieCompactionException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.BaseActionExecutor; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class ScheduleCompactionActionExecutor extends BaseActionExecutor> { + + private static final Logger LOG = LogManager.getLogger(ScheduleCompactionActionExecutor.class); + + private final Option> extraMetadata; + + public ScheduleCompactionActionExecutor(JavaSparkContext jsc, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + Option> extraMetadata) { + super(jsc, config, table, instantTime); + this.extraMetadata = extraMetadata; + } + + private HoodieCompactionPlan scheduleCompaction() { + LOG.info("Checking if compaction needs to be run on " + config.getBasePath()); + Option lastCompaction = table.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant(); + String deltaCommitsSinceTs = "0"; + if (lastCompaction.isPresent()) { + deltaCommitsSinceTs = lastCompaction.get().getTimestamp(); + } + + int deltaCommitsSinceLastCompaction = table.getActiveTimeline().getDeltaCommitTimeline() + .findInstantsAfter(deltaCommitsSinceTs, Integer.MAX_VALUE).countInstants(); + if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) { + LOG.info("Not running compaction as only " + deltaCommitsSinceLastCompaction + + " delta commits was found since last compaction " + deltaCommitsSinceTs + ". Waiting for " + + config.getInlineCompactDeltaCommitMax()); + return new HoodieCompactionPlan(); + } + + LOG.info("Compacting merge on read table " + config.getBasePath()); + HoodieMergeOnReadTableCompactor compactor = new HoodieMergeOnReadTableCompactor(); + try { + return compactor.generateCompactionPlan(jsc, table, config, instantTime, + ((SyncableFileSystemView) table.getSliceView()).getPendingCompactionOperations() + .map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId()) + .collect(Collectors.toSet())); + + } catch (IOException e) { + throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e); + } + } + + @Override + public Option execute() { + // if there are inflight writes, their instantTime must not be less than that of compaction instant time + table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant() + .ifPresent(earliestInflight -> ValidationUtils.checkArgument( + HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), instantTime, HoodieTimeline.GREATER), + "Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight + + ", Compaction scheduled at " + instantTime)); + + // Committed and pending compaction instants should have strictly lower timestamps + List conflictingInstants = table.getActiveTimeline() + .getCommitsAndCompactionTimeline().getInstants() + .filter(instant -> HoodieTimeline.compareTimestamps( + instant.getTimestamp(), instantTime, HoodieTimeline.GREATER_OR_EQUAL)) + .collect(Collectors.toList()); + ValidationUtils.checkArgument(conflictingInstants.isEmpty(), + "Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :" + + conflictingInstants); + + HoodieCompactionPlan plan = scheduleCompaction(); + if (plan != null && (plan.getOperations() != null) && (!plan.getOperations().isEmpty())) { + extraMetadata.ifPresent(plan::setExtraMetadata); + HoodieInstant compactionInstant = + new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime); + try { + table.getActiveTimeline().saveToCompactionRequested(compactionInstant, + TimelineMetadataUtils.serializeCompactionPlan(plan)); + } catch (IOException ioe) { + throw new HoodieIOException("Exception scheduling compaction", ioe); + } + return Option.of(plan); + } + return Option.empty(); + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/BoundedIOCompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedIOCompactionStrategy.java similarity index 97% rename from hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/BoundedIOCompactionStrategy.java rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedIOCompactionStrategy.java index f77a0158c..d93a50fd6 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/BoundedIOCompactionStrategy.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedIOCompactionStrategy.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.table.compact.strategy; +package org.apache.hudi.table.action.compact.strategy; import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/BoundedPartitionAwareCompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedPartitionAwareCompactionStrategy.java similarity index 98% rename from hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/BoundedPartitionAwareCompactionStrategy.java rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedPartitionAwareCompactionStrategy.java index 24da22676..597348f47 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/BoundedPartitionAwareCompactionStrategy.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedPartitionAwareCompactionStrategy.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.table.compact.strategy; +package org.apache.hudi.table.action.compact.strategy; import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/CompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/CompactionStrategy.java similarity index 97% rename from hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/CompactionStrategy.java rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/CompactionStrategy.java index 928ae3a21..0e8e4c674 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/CompactionStrategy.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/CompactionStrategy.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.table.compact.strategy; +package org.apache.hudi.table.action.compact.strategy; import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; @@ -26,7 +26,7 @@ import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor; +import org.apache.hudi.table.action.compact.HoodieMergeOnReadTableCompactor; import java.io.Serializable; import java.util.HashMap; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/DayBasedCompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/DayBasedCompactionStrategy.java similarity index 98% rename from hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/DayBasedCompactionStrategy.java rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/DayBasedCompactionStrategy.java index ea29fddc7..4a12bb8a0 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/DayBasedCompactionStrategy.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/DayBasedCompactionStrategy.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.table.compact.strategy; +package org.apache.hudi.table.action.compact.strategy; import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/LogFileSizeBasedCompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java similarity index 98% rename from hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/LogFileSizeBasedCompactionStrategy.java rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java index 83f1cca90..c9a811a1c 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/LogFileSizeBasedCompactionStrategy.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.table.compact.strategy; +package org.apache.hudi.table.action.compact.strategy; import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/UnBoundedCompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedCompactionStrategy.java similarity index 96% rename from hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/UnBoundedCompactionStrategy.java rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedCompactionStrategy.java index 84f4ee097..ffc437bcd 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/UnBoundedCompactionStrategy.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedCompactionStrategy.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.table.compact.strategy; +package org.apache.hudi.table.action.compact.strategy; import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java similarity index 98% rename from hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java index 9c00e5ebb..66b5612e1 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.table.compact.strategy; +package org.apache.hudi.table.action.compact.strategy; import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertDeltaCommitActionExecutor.java index 95779a76d..5e4b915cb 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertDeltaCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertDeltaCommitActionExecutor.java @@ -28,7 +28,7 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.UserDefinedBulkInsertPartitioner; import org.apache.hudi.table.action.commit.BulkInsertHelper; -import org.apache.hudi.table.action.commit.HoodieWriteMetadata; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertPreppedDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertPreppedDeltaCommitActionExecutor.java index 7b6e14623..5a3fe7a1d 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertPreppedDeltaCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertPreppedDeltaCommitActionExecutor.java @@ -28,7 +28,7 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.UserDefinedBulkInsertPartitioner; import org.apache.hudi.table.action.commit.BulkInsertHelper; -import org.apache.hudi.table.action.commit.HoodieWriteMetadata; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeleteDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeleteDeltaCommitActionExecutor.java index 15754065f..53d4d84d7 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeleteDeltaCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeleteDeltaCommitActionExecutor.java @@ -25,7 +25,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.DeleteHelper; -import org.apache.hudi.table.action.commit.HoodieWriteMetadata; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertDeltaCommitActionExecutor.java index f76f7fc86..212416555 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertDeltaCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertDeltaCommitActionExecutor.java @@ -24,7 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.action.commit.HoodieWriteMetadata; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.commit.WriteHelper; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertPreppedDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertPreppedDeltaCommitActionExecutor.java index 55031ea6c..0fb787e55 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertPreppedDeltaCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertPreppedDeltaCommitActionExecutor.java @@ -23,7 +23,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.action.commit.HoodieWriteMetadata; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertDeltaCommitActionExecutor.java index 1fdf4337a..1809078cb 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertDeltaCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertDeltaCommitActionExecutor.java @@ -24,7 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.action.commit.HoodieWriteMetadata; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.commit.WriteHelper; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertPreppedDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertPreppedDeltaCommitActionExecutor.java index 413d2e2af..d1773f9a5 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertPreppedDeltaCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertPreppedDeltaCommitActionExecutor.java @@ -23,7 +23,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.action.commit.HoodieWriteMetadata; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java index 8b95198aa..498d7b7b5 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java @@ -118,7 +118,7 @@ public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecuto config.shouldAssumeDatePartitioning()); int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1); return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions)).flatMap(partitionPath -> { - HoodieActiveTimeline activeTimeline = table.getActiveTimeline().reload(); + HoodieActiveTimeline activeTimeline = table.getMetaClient().reloadActiveTimeline(); List partitionRollbackRequests = new ArrayList<>(); switch (instantToRollback.getAction()) { case HoodieTimeline.COMMIT_ACTION: diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java new file mode 100644 index 000000000..d46707403 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.savepoint; + +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieSavepointMetadata; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.table.view.TableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieSavepointException; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.BaseActionExecutor; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import scala.Tuple2; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class SavepointActionExecutor extends BaseActionExecutor { + + private static final Logger LOG = LogManager.getLogger(SavepointActionExecutor.class); + + private final String user; + private final String comment; + + public SavepointActionExecutor(JavaSparkContext jsc, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + String user, + String comment) { + super(jsc, config, table, instantTime); + this.user = user; + this.comment = comment; + } + + @Override + public HoodieSavepointMetadata execute() { + if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { + throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types"); + } + Option cleanInstant = table.getCompletedCleanTimeline().lastInstant(); + HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime); + if (!table.getCompletedCommitsTimeline().containsInstant(commitInstant)) { + throw new HoodieSavepointException("Could not savepoint non-existing commit " + commitInstant); + } + + try { + // Check the last commit that was not cleaned and check if savepoint time is > that commit + String lastCommitRetained; + if (cleanInstant.isPresent()) { + HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils + .deserializeHoodieCleanMetadata(table.getActiveTimeline().getInstantDetails(cleanInstant.get()).get()); + lastCommitRetained = cleanMetadata.getEarliestCommitToRetain(); + } else { + lastCommitRetained = table.getCompletedCommitsTimeline().firstInstant().get().getTimestamp(); + } + + // Cannot allow savepoint time on a commit that could have been cleaned + ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, lastCommitRetained, HoodieTimeline.GREATER_OR_EQUAL), + "Could not savepoint commit " + instantTime + " as this is beyond the lookup window " + lastCommitRetained); + + Map> latestFilesMap = jsc.parallelize(FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), + table.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning())) + .mapToPair(partitionPath -> { + // Scan all partitions files with this commit time + LOG.info("Collecting latest files in partition path " + partitionPath); + TableFileSystemView.BaseFileOnlyView view = table.getBaseFileOnlyView(); + List latestFiles = view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime) + .map(HoodieBaseFile::getFileName).collect(Collectors.toList()); + return new Tuple2<>(partitionPath, latestFiles); + }) + .collectAsMap(); + + HoodieSavepointMetadata metadata = TimelineMetadataUtils.convertSavepointMetadata(user, comment, latestFilesMap); + // Nothing to save in the savepoint + table.getActiveTimeline().createNewInstant( + new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, instantTime)); + table.getActiveTimeline() + .saveAsComplete(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, instantTime), + TimelineMetadataUtils.serializeSavepointMetadata(metadata)); + LOG.info("Savepoint " + instantTime + " created"); + return metadata; + } catch (IOException e) { + throw new HoodieSavepointException("Failed to savepoint " + instantTime, e); + } + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointHelpers.java b/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointHelpers.java new file mode 100644 index 000000000..06acd4691 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointHelpers.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.savepoint; + +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieRollbackException; +import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +public class SavepointHelpers { + + private static final Logger LOG = LogManager.getLogger(SavepointHelpers.class); + + public static void deleteSavepoint(HoodieTable table, String savepointTime) { + if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { + throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types"); + } + HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime); + boolean isSavepointPresent = table.getCompletedSavepointTimeline().containsInstant(savePoint); + if (!isSavepointPresent) { + LOG.warn("No savepoint present " + savepointTime); + return; + } + + table.getActiveTimeline().revertToInflight(savePoint); + table.getActiveTimeline().deleteInflight(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, savepointTime)); + LOG.info("Savepoint " + savepointTime + " deleted"); + } + + public static void validateSavepointRestore(HoodieTable table, String savepointTime) { + // Make sure the restore was successful + table.getMetaClient().reloadActiveTimeline(); + Option lastInstant = table.getActiveTimeline() + .getCommitsAndCompactionTimeline() + .filterCompletedAndCompactionInstants() + .lastInstant(); + ValidationUtils.checkArgument(lastInstant.isPresent()); + ValidationUtils.checkArgument(lastInstant.get().getTimestamp().equals(savepointTime), + savepointTime + " is not the last commit after restoring to savepoint, last commit was " + + lastInstant.get().getTimestamp()); + } + + public static void validateSavepointPresence(HoodieTable table, String savepointTime) { + HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime); + boolean isSavepointPresent = table.getCompletedSavepointTimeline().containsInstant(savePoint); + if (!isSavepointPresent) { + throw new HoodieRollbackException("No savepoint for instantTime " + savepointTime); + } + } +} diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java index 16ccb0574..7fc0dee96 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java @@ -58,7 +58,7 @@ public class TestClientRollback extends TestHoodieClientBase { public void testSavepointAndRollback() throws Exception { HoodieWriteConfig cfg = getConfigBuilder().withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build()).build(); - try (HoodieWriteClient client = getHoodieWriteClient(cfg);) { + try (HoodieWriteClient client = getHoodieWriteClient(cfg)) { HoodieTestDataGenerator.writePartitionMetadata(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath); /** diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java b/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java index 383fd7459..9726465b2 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java @@ -32,7 +32,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.table.compact.OperationResult; +import org.apache.hudi.table.action.compact.OperationResult; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java index 7bfd76866..841a09d4f 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java @@ -31,7 +31,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieCommitArchiveLog; +import org.apache.hudi.table.HoodieTimelineArchiveLog; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -79,7 +79,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness { HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) .withParallelism(2, 2).forTable("test-trip-table").build(); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient); boolean result = archiveLog.archiveIfRequired(jsc); assertTrue(result); } @@ -157,7 +157,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness { verifyInflightInstants(metaClient, 2); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient); assertTrue(archiveLog.archiveIfRequired(jsc)); @@ -216,7 +216,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness { .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()) .build(); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient); // Requested Compaction HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath, new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "100"), dfs.getConf()); @@ -281,7 +281,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness { .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()) .build(); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient); HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf()); HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf()); HoodieTestDataGenerator.createCommitFile(basePath, "102", dfs.getConf()); @@ -307,7 +307,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness { .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()) .build(); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient); HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf()); HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf()); HoodieTestDataGenerator.createSavepointFile(basePath, "101", dfs.getConf()); @@ -339,7 +339,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness { .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()) .build(); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient); HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf()); HoodieTestDataGenerator.createCompactionRequestedFile(basePath, "101", dfs.getConf()); HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath, @@ -386,7 +386,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness { .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build()) .build(); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient); HoodieTestDataGenerator.createCommitFile(basePath, "1", dfs.getConf()); HoodieInstant instant1 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "1"); @@ -426,7 +426,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness { .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()) .build(); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient); org.apache.hudi.avro.model.HoodieCommitMetadata expectedCommitMetadata = archiveLog.convertCommitMetadata(hoodieCommitMetadata); assertEquals(expectedCommitMetadata.getOperationType(), WriteOperationType.INSERT.toString()); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java similarity index 99% rename from hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java rename to hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java index b5151b303..142e63ab8 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.table.compact; +package org.apache.hudi.table.action.compact; import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java similarity index 82% rename from hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java rename to hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java index d77d3f83f..819a41c00 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java @@ -16,8 +16,9 @@ * limitations under the License. */ -package org.apache.hudi.table.compact; +package org.apache.hudi.table.action.compact; +import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.HoodieClientTestHarness; @@ -29,6 +30,7 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieMemoryConfig; @@ -50,6 +52,7 @@ import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; public class TestHoodieCompactor extends HoodieClientTestHarness { @@ -100,9 +103,10 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { @Test(expected = HoodieNotSupportedException.class) public void testCompactionOnCopyOnWriteFail() throws Exception { metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE); - HoodieTable table = HoodieTable.create(metaClient, getConfig(), jsc); + HoodieTable table = HoodieTable.create(metaClient, getConfig(), jsc); String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime(); - table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime)); + table.scheduleCompaction(jsc, compactionInstantTime, Option.empty()); + table.compact(jsc, compactionInstantTime); } @Test @@ -118,9 +122,8 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { writeClient.insert(recordsRDD, newCommitTime).collect(); String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime(); - JavaRDD result = - table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime)); - assertTrue("If there is nothing to compact, result will be empty", result.isEmpty()); + Option plan = table.scheduleCompaction(jsc, compactionInstantTime, Option.empty()); + assertFalse("If there is nothing to compact, result will be empty", plan.isPresent()); } } @@ -128,18 +131,16 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { public void testWriteStatusContentsAfterCompaction() throws Exception { // insert 100 records HoodieWriteConfig config = getConfig(); - try (HoodieWriteClient writeClient = getWriteClient(config);) { + try (HoodieWriteClient writeClient = getWriteClient(config)) { String newCommitTime = "100"; writeClient.startCommitWithTime(newCommitTime); List records = dataGen.generateInserts(newCommitTime, 100); JavaRDD recordsRDD = jsc.parallelize(records, 1); - List statuses = writeClient.insert(recordsRDD, newCommitTime).collect(); + writeClient.insert(recordsRDD, newCommitTime).collect(); // Update all the 100 records - metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.create(metaClient, config, jsc); - + HoodieTable table = HoodieTable.create(config, jsc); newCommitTime = "101"; writeClient.startCommitWithTime(newCommitTime); @@ -153,8 +154,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, updatedRecords); // Verify that all data file has one log file - metaClient = HoodieTableMetaClient.reload(metaClient); - table = HoodieTable.create(metaClient, config, jsc); + table = HoodieTable.create(config, jsc); for (String partitionPath : dataGen.getPartitionPaths()) { List groupedLogFiles = table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList()); @@ -162,14 +162,14 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { assertEquals("There should be 1 log file written for every data file", 1, fileSlice.getLogFiles().count()); } } + HoodieTestUtils.createDeltaCommitFiles(basePath, newCommitTime); // Do a compaction - metaClient = HoodieTableMetaClient.reload(metaClient); - table = HoodieTable.create(metaClient, config, jsc); - - String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime(); - JavaRDD result = - table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime)); + table = HoodieTable.create(config, jsc); + String compactionInstantTime = "102"; + table.scheduleCompaction(jsc, compactionInstantTime, Option.empty()); + table.getMetaClient().reloadActiveTimeline(); + JavaRDD result = table.compact(jsc, compactionInstantTime).getWriteStatuses(); // Verify that all partition paths are present in the WriteStatus result for (String partitionPath : dataGen.getPartitionPaths()) { @@ -184,8 +184,4 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { protected HoodieTableType getTableType() { return HoodieTableType.MERGE_ON_READ; } - - // TODO - after modifying HoodieReadClient to support mor tables - add more tests to make - // sure the data read is the updated data (compaction correctness) - // TODO - add more test cases for compactions after a failed commit/compaction } diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/strategy/TestHoodieCompactionStrategy.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java similarity index 99% rename from hudi-client/src/test/java/org/apache/hudi/table/compact/strategy/TestHoodieCompactionStrategy.java rename to hudi-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java index 88da4423a..e49aafa80 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/compact/strategy/TestHoodieCompactionStrategy.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.table.compact.strategy; +package org.apache.hudi.table.action.compact.strategy; import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.common.model.HoodieBaseFile; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java index 86b2303fd..060b5a3e9 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java @@ -138,6 +138,20 @@ public class HoodieTestUtils { } } + public static void createDeltaCommitFiles(String basePath, String... instantTimes) throws IOException { + for (String instantTime : instantTimes) { + new File( + basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + + HoodieTimeline.makeRequestedDeltaFileName(instantTime)).createNewFile(); + new File( + basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + + HoodieTimeline.makeInflightDeltaFileName(instantTime)).createNewFile(); + new File( + basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeDeltaFileName(instantTime)) + .createNewFile(); + } + } + public static void createMetadataFolder(String basePath) { new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME).mkdirs(); }