[HUDI-2904] Fix metadata table archival overstepping between regular writers and table services (#4186)
- Co-authored-by: Rajesh Mahindra <rmahindra@Rajeshs-MacBook-Pro.local> - Co-authored-by: Sivabalan Narayanan <n.siva.b@gmail.com>
This commit is contained in:
@@ -449,11 +449,9 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
||||
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
|
||||
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
|
||||
autoCleanOnCommit();
|
||||
// We cannot have unbounded commit files. Archive commits if we have to archive
|
||||
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table);
|
||||
archiveLog.archiveIfRequired(context);
|
||||
} catch (IOException ioe) {
|
||||
throw new HoodieIOException(ioe.getMessage(), ioe);
|
||||
if (config.isAutoArchive()) {
|
||||
archive(table);
|
||||
}
|
||||
} finally {
|
||||
this.heartbeatClient.stop(instantTime);
|
||||
}
|
||||
@@ -743,6 +741,31 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
||||
return clean(HoodieActiveTimeline.createNewInstantTime(), skipLocking);
|
||||
}
|
||||
|
||||
/**
|
||||
* Trigger archival for the table. This ensures that the number of commits do not explode
|
||||
* and keep increasing unbounded over time.
|
||||
* @param table table to commit on.
|
||||
*/
|
||||
protected void archive(HoodieTable<T, I, K, O> table) {
|
||||
try {
|
||||
// We cannot have unbounded commit files. Archive commits if we have to archive
|
||||
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table);
|
||||
archiveLog.archiveIfRequired(context);
|
||||
} catch (IOException ioe) {
|
||||
throw new HoodieIOException("Failed to archive", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Trigger archival for the table. This ensures that the number of commits do not explode
|
||||
* and keep increasing unbounded over time.
|
||||
*/
|
||||
public void archive() {
|
||||
// Create a Hoodie table which encapsulated the commits and files visible
|
||||
HoodieTable table = createTable(config, hadoopConf);
|
||||
archive(table);
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides a new commit time for a write operation (insert/update/delete).
|
||||
*/
|
||||
|
||||
@@ -57,6 +57,13 @@ public class HoodieCompactionConfig extends HoodieConfig {
|
||||
+ " to delete older file slices. It's recommended to enable this, to ensure metadata and data storage"
|
||||
+ " growth is bounded.");
|
||||
|
||||
public static final ConfigProperty<String> AUTO_ARCHIVE = ConfigProperty
|
||||
.key("hoodie.archive.automatic")
|
||||
.defaultValue("true")
|
||||
.withDocumentation("When enabled, the archival table service is invoked immediately after each commit,"
|
||||
+ " to archive commits if we cross a maximum value of commits."
|
||||
+ " It's recommended to enable this, to ensure number of active commits is bounded.");
|
||||
|
||||
public static final ConfigProperty<String> ASYNC_CLEAN = ConfigProperty
|
||||
.key("hoodie.clean.async")
|
||||
.defaultValue("false")
|
||||
@@ -493,6 +500,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withAutoArchive(Boolean autoArchive) {
|
||||
compactionConfig.setValue(AUTO_ARCHIVE, String.valueOf(autoArchive));
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withIncrementalCleaningMode(Boolean incrementalCleaningMode) {
|
||||
compactionConfig.setValue(CLEANER_INCREMENTAL_MODE_ENABLE, String.valueOf(incrementalCleaningMode));
|
||||
return this;
|
||||
|
||||
@@ -1101,6 +1101,10 @@ public class HoodieWriteConfig extends HoodieConfig {
|
||||
return getBoolean(HoodieCompactionConfig.AUTO_CLEAN);
|
||||
}
|
||||
|
||||
public boolean isAutoArchive() {
|
||||
return getBoolean(HoodieCompactionConfig.AUTO_ARCHIVE);
|
||||
}
|
||||
|
||||
public boolean isAsyncClean() {
|
||||
return getBoolean(HoodieCompactionConfig.ASYNC_CLEAN);
|
||||
}
|
||||
|
||||
@@ -204,7 +204,9 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
||||
.archiveCommitsWith(minCommitsToKeep, maxCommitsToKeep)
|
||||
// we will trigger compaction manually, to control the instant times
|
||||
.withInlineCompaction(false)
|
||||
.withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()).build())
|
||||
.withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax())
|
||||
// we will trigger archive manually, to ensure only regular writer invokes it
|
||||
.withAutoArchive(false).build())
|
||||
.withParallelism(parallelism, parallelism)
|
||||
.withDeleteParallelism(parallelism)
|
||||
.withRollbackParallelism(parallelism)
|
||||
|
||||
@@ -40,7 +40,6 @@ import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieCommitException;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.exception.HoodieNotSupportedException;
|
||||
import org.apache.hudi.index.FlinkHoodieIndexFactory;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
@@ -57,7 +56,6 @@ import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
|
||||
import org.apache.hudi.table.BulkInsertPartitioner;
|
||||
import org.apache.hudi.table.HoodieFlinkTable;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.HoodieTimelineArchiveLog;
|
||||
import org.apache.hudi.table.action.HoodieWriteMetadata;
|
||||
import org.apache.hudi.table.action.compact.CompactHelpers;
|
||||
import org.apache.hudi.table.marker.WriteMarkersFactory;
|
||||
@@ -332,11 +330,10 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
|
||||
// Delete the marker directory for the instant.
|
||||
WriteMarkersFactory.get(config.getMarkersType(), createTable(config, hadoopConf), instantTime)
|
||||
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
|
||||
// We cannot have unbounded commit files. Archive commits if we have to archive
|
||||
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table);
|
||||
archiveLog.archiveIfRequired(context);
|
||||
} catch (IOException ioe) {
|
||||
throw new HoodieIOException(ioe.getMessage(), ioe);
|
||||
if (config.isAutoArchive()) {
|
||||
// We cannot have unbounded commit files. Archive commits if we have to archive
|
||||
archive(table);
|
||||
}
|
||||
} finally {
|
||||
this.heartbeatClient.stop(instantTime);
|
||||
}
|
||||
|
||||
@@ -140,6 +140,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
|
||||
if (canTriggerTableService) {
|
||||
compactIfNecessary(writeClient, instantTime);
|
||||
doClean(writeClient, instantTime);
|
||||
writeClient.archive();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -155,6 +155,7 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
|
||||
if (canTriggerTableService) {
|
||||
compactIfNecessary(writeClient, instantTime);
|
||||
doClean(writeClient, instantTime);
|
||||
writeClient.archive();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -107,6 +107,7 @@ import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
@@ -250,6 +251,43 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
||||
validateMetadata(testTable, emptyList(), true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetadataTableArchival() throws Exception {
|
||||
init(COPY_ON_WRITE, false);
|
||||
writeConfig = getWriteConfigBuilder(true, true, false)
|
||||
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
|
||||
.enable(true)
|
||||
.enableFullScan(true)
|
||||
.enableMetrics(false)
|
||||
.withMaxNumDeltaCommitsBeforeCompaction(3)
|
||||
.archiveCommitsWith(3, 4)
|
||||
.retainCommits(1)
|
||||
.build())
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 3).retainCommits(1).build()).build();
|
||||
initWriteConfigAndMetatableWriter(writeConfig, true);
|
||||
|
||||
AtomicInteger commitTime = new AtomicInteger(1);
|
||||
// trigger 2 regular writes(1 bootstrap commit). just 1 before archival can get triggered.
|
||||
int i = 1;
|
||||
for (; i <= 2; i++) {
|
||||
doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), INSERT);
|
||||
}
|
||||
// expected num commits = 1 (bootstrap) + 2 (writes) + 1 compaction.
|
||||
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
|
||||
HoodieActiveTimeline metadataTimeline = metadataMetaClient.reloadActiveTimeline();
|
||||
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 4);
|
||||
|
||||
// trigger a async table service, archival should not kick in, even though conditions are met.
|
||||
doCluster(testTable, "000000" + commitTime.getAndIncrement());
|
||||
metadataTimeline = metadataMetaClient.reloadActiveTimeline();
|
||||
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 5);
|
||||
|
||||
// trigger a regular write operation. archival should kick in.
|
||||
doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), INSERT);
|
||||
metadataTimeline = metadataMetaClient.reloadActiveTimeline();
|
||||
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 3);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(HoodieTableType.class)
|
||||
public void testMetadataInsertUpsertClean(HoodieTableType tableType) throws Exception {
|
||||
|
||||
Reference in New Issue
Block a user