diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 699f739ba..444eae62b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -184,11 +184,12 @@ public abstract class AbstractHoodieWriteClient ((HoodieTableMetadataWriter)w).update(metadata, instantTime)); + table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, inflightInstant.getTimestamp(), + table.isTableServiceAction(inflightInstant.getAction()))); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index eb0c6ea89..48d6b948c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -409,7 +409,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta }); LOG.info("Committing " + partitionToFileStatus.size() + " partitions and " + stats[0] + " files to metadata"); - update(commitMetadata, createInstantTime); + update(commitMetadata, createInstantTime, false); return true; } @@ -523,23 +523,24 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta * @param instantTime instant time of interest. * @param convertMetadataFunction converter function to convert the respective metadata to List of HoodieRecords to be written to metadata table. * @param type of commit metadata. + * @param canTriggerTableService true if table services can be triggered. false otherwise. */ - private void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction) { + private void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction, boolean canTriggerTableService) { if (enabled && metadata != null) { List records = convertMetadataFunction.convertMetadata(); - commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime); + commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime, canTriggerTableService); } } /** * Update from {@code HoodieCommitMetadata}. - * * @param commitMetadata {@code HoodieCommitMetadata} * @param instantTime Timestamp at which the commit was performed + * @param isTableServiceAction {@code true} if commit metadata is pertaining to a table service. {@code false} otherwise. */ @Override - public void update(HoodieCommitMetadata commitMetadata, String instantTime) { - processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(commitMetadata, instantTime)); + public void update(HoodieCommitMetadata commitMetadata, String instantTime, boolean isTableServiceAction) { + processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(commitMetadata, instantTime), !isTableServiceAction); } /** @@ -550,7 +551,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta */ @Override public void update(HoodieCleanMetadata cleanMetadata, String instantTime) { - processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(cleanMetadata, instantTime)); + processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(cleanMetadata, instantTime), + false); } /** @@ -562,7 +564,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta @Override public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) { processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(metadataMetaClient.getActiveTimeline(), - restoreMetadata, instantTime, metadata.getSyncedInstantTime())); + restoreMetadata, instantTime, metadata.getSyncedInstantTime()), false); } /** @@ -588,7 +590,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta List records = HoodieTableMetadataUtil.convertMetadataToRecords(metadataMetaClient.getActiveTimeline(), rollbackMetadata, instantTime, metadata.getSyncedInstantTime(), wasSynced); - commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime); + commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime, false); } } @@ -601,12 +603,12 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta /** * Commit the {@code HoodieRecord}s to Metadata Table as a new delta-commit. - * - * @param records The list of records to be written. + * @param records The list of records to be written. * @param partitionName The partition to which the records are to be written. * @param instantTime The timestamp to use for the deltacommit. + * @param canTriggerTableService true if table services can be scheduled and executed. false otherwise. */ - protected abstract void commit(List records, String partitionName, String instantTime); + protected abstract void commit(List records, String partitionName, String instantTime, boolean canTriggerTableService); /** * Perform a compaction on the Metadata Table. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java index f5c4d26d0..4f5ac027c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java @@ -34,8 +34,10 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable { * Update the metadata table due to a COMMIT operation. * @param commitMetadata commit metadata of the operation of interest. * @param instantTime instant time of the commit. + * @param isTableServiceAction true if caller is a table service. false otherwise. Only regular write operations can trigger metadata table services and this argument + * will assist in this. */ - void update(HoodieCommitMetadata commitMetadata, String instantTime); + void update(HoodieCommitMetadata commitMetadata, String instantTime, boolean isTableServiceAction); /** * Update the metadata table due to a CLEAN operation. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 4b582b1d5..6046374ba 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -738,6 +738,13 @@ public abstract class HoodieTable implem return getMetadataWriter(Option.empty()); } + /** + * Check if action type is a table service. + * @param actionType action type of interest. + * @return true if action represents a table service. false otherwise. + */ + public abstract boolean isTableServiceAction(String actionType); + /** * Get Table metadata writer. * diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java index cd32a5bc8..a22479b6b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java @@ -56,8 +56,8 @@ public abstract class BaseActionExecutor w.update(metadata, instantTime)); + protected final void writeTableMetadata(HoodieCommitMetadata metadata, String actionType) { + table.getMetadataWriter().ifPresent(w -> w.update(metadata, instantTime, table.isTableServiceAction(actionType))); } /** diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java index bbaf07374..fa0f5df61 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java @@ -77,7 +77,7 @@ public class HoodieMetadataTestTable extends HoodieTestTable { HoodieCommitMetadata commitMetadata = super.doWriteOperation(commitTime, operationType, newPartitionsToAdd, partitionToFilesNameLengthMap, bootstrap, createInflightCommit); if (writer != null && !createInflightCommit) { - writer.update(commitMetadata, commitTime); + writer.update(commitMetadata, commitTime, false); } return commitMetadata; } @@ -86,7 +86,7 @@ public class HoodieMetadataTestTable extends HoodieTestTable { public HoodieTestTable moveInflightCommitToComplete(String instantTime, HoodieCommitMetadata metadata) throws IOException { super.moveInflightCommitToComplete(instantTime, metadata); if (writer != null) { - writer.update(metadata, instantTime); + writer.update(metadata, instantTime, false); } return this; } @@ -94,7 +94,7 @@ public class HoodieMetadataTestTable extends HoodieTestTable { public HoodieTestTable moveInflightCommitToComplete(String instantTime, HoodieCommitMetadata metadata, boolean ignoreWriter) throws IOException { super.moveInflightCommitToComplete(instantTime, metadata); if (!ignoreWriter && writer != null) { - writer.update(metadata, instantTime); + writer.update(metadata, instantTime, false); } return this; } @@ -103,7 +103,7 @@ public class HoodieMetadataTestTable extends HoodieTestTable { public HoodieTestTable moveInflightCompactionToComplete(String instantTime, HoodieCommitMetadata metadata) throws IOException { super.moveInflightCompactionToComplete(instantTime, metadata); if (writer != null) { - writer.update(metadata, instantTime); + writer.update(metadata, instantTime, true); } return this; } @@ -120,7 +120,7 @@ public class HoodieMetadataTestTable extends HoodieTestTable { public HoodieTestTable addCompaction(String instantTime, HoodieCommitMetadata commitMetadata) throws Exception { super.addCompaction(instantTime, commitMetadata); if (writer != null) { - writer.update(commitMetadata, instantTime); + writer.update(commitMetadata, instantTime, true); } return this; } @@ -151,7 +151,7 @@ public class HoodieMetadataTestTable extends HoodieTestTable { HoodieReplaceCommitMetadata completeReplaceMetadata) throws Exception { super.addReplaceCommit(instantTime, requestedReplaceMetadata, inflightReplaceMetadata, completeReplaceMetadata); if (writer != null) { - writer.update(completeReplaceMetadata, instantTime); + writer.update(completeReplaceMetadata, instantTime, true); } return this; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 349d47de4..3d44a2432 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -258,10 +258,10 @@ public class HoodieFlinkWriteClient extends } @Override - protected void preCommit(String instantTime, HoodieCommitMetadata metadata) { + protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata metadata) { this.metadataWriterOption.ifPresent(w -> { w.initTableMetadata(); // refresh the timeline - w.update(metadata, instantTime); + w.update(metadata, inflightInstant.getTimestamp(), getHoodieTable().isTableServiceAction(inflightInstant.getAction())); }); } @@ -406,7 +406,7 @@ public class HoodieFlinkWriteClient extends this.txnManager.beginTransaction(Option.of(hoodieInstant), Option.empty()); // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. - table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp())); + table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), table.isTableServiceAction(hoodieInstant.getAction()))); } finally { this.txnManager.endTransaction(); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 9ae3e622d..8254d0b88 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -90,7 +90,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad } @Override - protected void commit(List records, String partitionName, String instantTime) { + protected void commit(List records, String partitionName, String instantTime, boolean canTriggerTableService) { ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled"); List recordList = prepRecords(records, partitionName, 1); @@ -125,8 +125,10 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad // reload timeline metadataMetaClient.reloadActiveTimeline(); - compactIfNecessary(writeClient, instantTime); - doClean(writeClient, instantTime); + if (canTriggerTableService) { + compactIfNecessary(writeClient, instantTime); + doClean(writeClient, instantTime); + } } // Update total size of the metadata and count of base/log files diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index 8c2089963..85ad1364d 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -35,6 +35,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieNotSupportedException; @@ -84,6 +85,11 @@ public class HoodieFlinkCopyOnWriteTable super(config, context, metaClient); } + @Override + public boolean isTableServiceAction(String actionType) { + return !actionType.equals(HoodieTimeline.COMMIT_ACTION); + } + /** * Upsert a batch of new records into Hoodie table at the supplied instantTime. * diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java index 56a14da4c..5ad87e083 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; @@ -54,6 +55,11 @@ public class HoodieFlinkMergeOnReadTable super(config, context, metaClient); } + @Override + public boolean isTableServiceAction(String actionType) { + return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION); + } + @Override public HoodieWriteMetadata> upsert( HoodieEngineContext context, diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java index fce159ec8..5dfa511a8 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java @@ -147,7 +147,7 @@ public abstract class BaseFlinkCommitActionExecutor extends H super(config, context, metaClient); } + @Override + public boolean isTableServiceAction(String actionType) { + return !actionType.equals(HoodieTimeline.COMMIT_ACTION); + } + @Override public HoodieWriteMetadata> upsert(HoodieEngineContext context, String instantTime, diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java index a78b71b24..b219ba1a9 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -37,6 +38,11 @@ public class HoodieJavaMergeOnReadTable extends H super(config, context, metaClient); } + @Override + public boolean isTableServiceAction(String actionType) { + return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION); + } + @Override public HoodieWriteMetadata> upsertPrepped(HoodieEngineContext context, String instantTime, diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java index 79aad595f..66cb40758 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java @@ -208,7 +208,7 @@ public abstract class BaseJavaCommitActionExecutor extends HoodieInstant hoodieInstant) { try { this.txnManager.beginTransaction(Option.of(hoodieInstant), Option.empty()); + boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction()); // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. - table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp())); + table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction)); } finally { this.txnManager.endTransaction(); } @@ -478,13 +479,14 @@ public class SparkRDDWriteClient extends } @Override - protected void preCommit(String instantTime, HoodieCommitMetadata metadata) { + protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata metadata) { // Create a Hoodie table after startTxn which encapsulated the commits and files visible. // Important to create this after the lock to ensure latest commits show up in the timeline without need for reload HoodieTable table = createTable(config, hadoopConf); TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(), Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner()); - table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, instantTime)); + table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, inflightInstant.getTimestamp(), + table.isTableServiceAction(inflightInstant.getAction()))); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index e59e19583..95ab7dc79 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -103,7 +103,7 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad } @Override - protected void commit(List records, String partitionName, String instantTime) { + protected void commit(List records, String partitionName, String instantTime, boolean canTriggerTableService) { ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled"); JavaRDD recordRDD = prepRecords(records, partitionName, 1); @@ -132,8 +132,10 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad // reload timeline metadataMetaClient.reloadActiveTimeline(); - compactIfNecessary(writeClient, instantTime); - doClean(writeClient, instantTime); + if (canTriggerTableService) { + compactIfNecessary(writeClient, instantTime); + doClean(writeClient, instantTime); + } } // Update total size of the metadata and count of base/log files diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index 8e4471010..0971b87c4 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -101,6 +101,11 @@ public class HoodieSparkCopyOnWriteTable super(config, context, metaClient); } + @Override + public boolean isTableServiceAction(String actionType) { + return !actionType.equals(HoodieTimeline.COMMIT_ACTION); + } + @Override public HoodieWriteMetadata> upsert(HoodieEngineContext context, String instantTime, JavaRDD> records) { return new SparkUpsertCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java index d0bc96924..9e053aaa0 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java @@ -80,6 +80,11 @@ public class HoodieSparkMergeOnReadTable extends super(config, context, metaClient); } + @Override + public boolean isTableServiceAction(String actionType) { + return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION); + } + @Override public HoodieWriteMetadata> upsert(HoodieEngineContext context, String instantTime, JavaRDD> records) { return new SparkUpsertDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java index 7486d07e1..8b353d64c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java @@ -248,7 +248,7 @@ public class SparkBootstrapCommitActionExecutor metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, getSchemaToStoreInCommit()); metadata.setOperationType(operationType); - writeTableMetadata(metadata); + writeTableMetadata(metadata, actionType); try { activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, instantTime), diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index 0b673b890..2bcd6d787 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -267,7 +267,7 @@ public abstract class BaseSparkCommitActionExecutor, List> commitsList = archiveAndGetCommitsList(writeConfig); - List originalCommits = commitsList.getKey(); List commitsAfterArchival = commitsList.getValue(); - List archivedInstants = getAllArchivedCommitInstants(Arrays.asList("00000001", "00000003", "00000004", "00000005"), HoodieTimeline.DELTA_COMMIT_ACTION); + List archivedInstants = getAllArchivedCommitInstants(Arrays.asList("00000001", "00000003", "00000004", "00000005", "00000006"), HoodieTimeline.DELTA_COMMIT_ACTION); archivedInstants.add(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "00000002")); archivedInstants.add(new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000002")); - verifyArchival(archivedInstants, getActiveCommitInstants(Arrays.asList("00000006", "00000007"), HoodieTimeline.DELTA_COMMIT_ACTION), commitsAfterArchival); + verifyArchival(archivedInstants, getActiveCommitInstants(Arrays.asList("00000007", "00000008"), HoodieTimeline.DELTA_COMMIT_ACTION), commitsAfterArchival); } @Test @@ -379,7 +381,8 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { public void testArchiveTableWithCleanCommits(boolean enableMetadata) throws Exception { HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 2); - // min archival commits is 2 and max archival commits is 4(either clean commits has to be > 4 or commits has to be greater than 4. and so, after 5th commit, 3 commits will be archived. + // min archival commits is 2 and max archival commits is 4(either clean commits has to be > 4 or commits has to be greater than 4. + // and so, after 5th commit, 3 commits will be archived. // 1,2,3,4,5,6 : after archival -> 1,5,6 (because, 2,3,4,5 and 6 are clean commits and are eligible for archival) // after 7th and 8th commit no-op wrt archival. Map cleanStats = new HashMap<>(); @@ -400,13 +403,35 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { if (i < 6) { assertEquals(originalCommits, commitsAfterArchival); } else if (i == 6) { - // 1,2,3,4,5,6 : after archival -> 1,5,6 (bcoz, 2,3,4,5 and 6 are clean commits and are eligible for archival) - List expectedActiveInstants = new ArrayList<>(); - expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000001"))); - expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000005", "00000006"), HoodieTimeline.CLEAN_ACTION)); - verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000002", "00000003", "00000004"), HoodieTimeline.CLEAN_ACTION), expectedActiveInstants, commitsAfterArchival); + if (!enableMetadata) { + // 1,2,3,4,5,6 : after archival -> 1,5,6 (bcoz, 2,3,4,5 and 6 are clean commits and are eligible for archival) + List expectedActiveInstants = new ArrayList<>(); + expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000001"))); + expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000005", "00000006"), HoodieTimeline.CLEAN_ACTION)); + verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000002", "00000003", "00000004"), HoodieTimeline.CLEAN_ACTION), expectedActiveInstants, commitsAfterArchival); + } else { + // with metadata enabled, archival in data table is fenced based on compaction in metadata table. Clean commits in data table will not trigger compaction in + // metadata table. + List expectedActiveInstants = new ArrayList<>(); + expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000001"))); + expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000002", "00000003", "00000004", "00000005", "00000006"), HoodieTimeline.CLEAN_ACTION)); + verifyArchival(getAllArchivedCommitInstants(Collections.emptyList(), HoodieTimeline.CLEAN_ACTION), expectedActiveInstants, commitsAfterArchival); + } } else { - assertEquals(originalCommits, commitsAfterArchival); + if (!enableMetadata) { + assertEquals(originalCommits, commitsAfterArchival); + } else { + if (i == 7) { + // when i == 7 compaction in metadata table will be triggered and hence archival in datatable will kick in. + // 1,2,3,4,5,6 : after archival -> 1,5,6 (bcoz, 2,3,4,5 and 6 are clean commits and are eligible for archival) + List expectedActiveInstants = new ArrayList<>(); + expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000001", "00000007"))); + expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000005", "00000006"), HoodieTimeline.CLEAN_ACTION)); + verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000002", "00000003", "00000004"), HoodieTimeline.CLEAN_ACTION), expectedActiveInstants, commitsAfterArchival); + } else { + assertEquals(originalCommits, commitsAfterArchival); + } + } } } }