1
0

[HUDI-1576] Make archiving an async service (#4795)

This commit is contained in:
Raymond Xu
2022-02-14 18:15:06 -08:00
committed by GitHub
parent 3b401d839c
commit 27bd7b538e
15 changed files with 327 additions and 127 deletions

View File

@@ -33,7 +33,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTimelineArchiveLog; import org.apache.hudi.client.HoodieTimelineArchiver;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Tag;
@@ -96,8 +96,8 @@ public class TestArchivedCommitsCommand extends CLIFunctionalTestHarness {
// archive // archive
HoodieSparkTable table = HoodieSparkTable.create(cfg, context(), metaClient); HoodieSparkTable table = HoodieSparkTable.create(cfg, context(), metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table);
archiveLog.archiveIfRequired(context()); archiver.archiveIfRequired(context());
} }
/** /**

View File

@@ -40,7 +40,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTimelineArchiveLog; import org.apache.hudi.client.HoodieTimelineArchiver;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@@ -232,8 +232,8 @@ public class TestCommitsCommand extends CLIFunctionalTestHarness {
// archive // archive
metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
HoodieSparkTable table = HoodieSparkTable.create(cfg, context(), metaClient); HoodieSparkTable table = HoodieSparkTable.create(cfg, context(), metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table);
archiveLog.archiveIfRequired(context()); archiver.archiveIfRequired(context());
CommandResult cr = shell().executeCommand(String.format("commits showarchived --startTs %s --endTs %s", "100", "104")); CommandResult cr = shell().executeCommand(String.format("commits showarchived --startTs %s --endTs %s", "100", "104"));
assertTrue(cr.isSuccess()); assertTrue(cr.isSuccess());
@@ -279,8 +279,8 @@ public class TestCommitsCommand extends CLIFunctionalTestHarness {
HoodieSparkTable table = HoodieSparkTable.create(cfg, context(), metaClient); HoodieSparkTable table = HoodieSparkTable.create(cfg, context(), metaClient);
// need to create multi archive files // need to create multi archive files
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table);
archiveLog.archiveIfRequired(context()); archiver.archiveIfRequired(context());
} }
CommandResult cr = shell().executeCommand(String.format("commits showarchived --startTs %s --endTs %s", "160", "174")); CommandResult cr = shell().executeCommand(String.format("commits showarchived --startTs %s --endTs %s", "160", "174"));

View File

@@ -40,7 +40,7 @@ import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTimelineArchiveLog; import org.apache.hudi.client.HoodieTimelineArchiver;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Tag;
@@ -167,8 +167,8 @@ public class TestCompactionCommand extends CLIFunctionalTestHarness {
// archive // archive
HoodieTableMetaClient metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); HoodieTableMetaClient metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
HoodieSparkTable table = HoodieSparkTable.create(cfg, context(), metaClient); HoodieSparkTable table = HoodieSparkTable.create(cfg, context(), metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table);
archiveLog.archiveIfRequired(context()); archiver.archiveIfRequired(context());
} }
/** /**

View File

@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.async;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Async archive service to run concurrently with write operation.
*/
public class AsyncArchiveService extends HoodieAsyncService {
private static final Logger LOG = LogManager.getLogger(AsyncArchiveService.class);
private final BaseHoodieWriteClient writeClient;
private final transient ExecutorService executor = Executors.newSingleThreadExecutor();
protected AsyncArchiveService(BaseHoodieWriteClient writeClient) {
this.writeClient = writeClient;
}
@Override
protected Pair<CompletableFuture, ExecutorService> startService() {
LOG.info("Starting async archive service...");
return Pair.of(CompletableFuture.supplyAsync(() -> {
writeClient.archive();
return true;
}, executor), executor);
}
public static AsyncArchiveService startAsyncArchiveIfEnabled(BaseHoodieWriteClient writeClient) {
HoodieWriteConfig config = writeClient.getConfig();
if (!config.isAutoArchive() || !config.isAsyncArchive()) {
LOG.info("The HoodieWriteClient is not configured to auto & async archive. Async archive service will not start.");
return null;
}
AsyncArchiveService asyncArchiveService = new AsyncArchiveService(writeClient);
asyncArchiveService.start(null);
return asyncArchiveService;
}
public static void waitForCompletion(AsyncArchiveService asyncArchiveService) {
if (asyncArchiveService != null) {
LOG.info("Waiting for async archive service to finish");
try {
asyncArchiveService.waitForShutdown();
} catch (Exception e) {
throw new HoodieException("Error waiting for async archive service to finish", e);
}
}
}
public static void forceShutdown(AsyncArchiveService asyncArchiveService) {
if (asyncArchiveService != null) {
LOG.info("Shutting down async archive service...");
asyncArchiveService.shutdown(true);
}
}
}

View File

@@ -9,19 +9,22 @@
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing,
* distributed under the License is distributed on an "AS IS" BASIS, * software distributed under the License is distributed on an
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* See the License for the specific language governing permissions and * KIND, either express or implied. See the License for the
* limitations under the License. * specific language governing permissions and limitations
* under the License.
*/ */
package org.apache.hudi.client; package org.apache.hudi.async;
import org.apache.hudi.async.HoodieAsyncService; import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@@ -30,9 +33,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
/** /**
* Clean service running concurrently with write operation. * Async clean service to run concurrently with write operation.
*/ */
class AsyncCleanerService extends HoodieAsyncService { public class AsyncCleanerService extends HoodieAsyncService {
private static final Logger LOG = LogManager.getLogger(AsyncCleanerService.class); private static final Logger LOG = LogManager.getLogger(AsyncCleanerService.class);
@@ -46,7 +49,7 @@ class AsyncCleanerService extends HoodieAsyncService {
@Override @Override
protected Pair<CompletableFuture, ExecutorService> startService() { protected Pair<CompletableFuture, ExecutorService> startService() {
String instantTime = HoodieActiveTimeline.createNewInstantTime(); String instantTime = HoodieActiveTimeline.createNewInstantTime();
LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime); LOG.info(String.format("Starting async clean service with instant time %s...", instantTime));
return Pair.of(CompletableFuture.supplyAsync(() -> { return Pair.of(CompletableFuture.supplyAsync(() -> {
writeClient.clean(instantTime); writeClient.clean(instantTime);
return true; return true;
@@ -54,30 +57,30 @@ class AsyncCleanerService extends HoodieAsyncService {
} }
public static AsyncCleanerService startAsyncCleaningIfEnabled(BaseHoodieWriteClient writeClient) { public static AsyncCleanerService startAsyncCleaningIfEnabled(BaseHoodieWriteClient writeClient) {
AsyncCleanerService asyncCleanerService = null; HoodieWriteConfig config = writeClient.getConfig();
if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) { if (!config.isAutoClean() || !config.isAsyncClean()) {
asyncCleanerService = new AsyncCleanerService(writeClient); LOG.info("The HoodieWriteClient is not configured to auto & async clean. Async clean service will not start.");
asyncCleanerService.start(null); return null;
} else {
LOG.info("Async auto cleaning is not enabled. Not running cleaner now");
} }
AsyncCleanerService asyncCleanerService = new AsyncCleanerService(writeClient);
asyncCleanerService.start(null);
return asyncCleanerService; return asyncCleanerService;
} }
public static void waitForCompletion(AsyncCleanerService asyncCleanerService) { public static void waitForCompletion(AsyncCleanerService asyncCleanerService) {
if (asyncCleanerService != null) { if (asyncCleanerService != null) {
LOG.info("Waiting for async cleaner to finish"); LOG.info("Waiting for async clean service to finish");
try { try {
asyncCleanerService.waitForShutdown(); asyncCleanerService.waitForShutdown();
} catch (Exception e) { } catch (Exception e) {
throw new HoodieException("Error waiting for async cleaning to finish", e); throw new HoodieException("Error waiting for async clean service to finish", e);
} }
} }
} }
public static void forceShutdown(AsyncCleanerService asyncCleanerService) { public static void forceShutdown(AsyncCleanerService asyncCleanerService) {
if (asyncCleanerService != null) { if (asyncCleanerService != null) {
LOG.info("Shutting down async cleaner"); LOG.info("Shutting down async clean service...");
asyncCleanerService.shutdown(true); asyncCleanerService.shutdown(true);
} }
} }

View File

@@ -36,7 +36,7 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function; import java.util.function.Function;
/** /**
* Base Class for running clean/delta-sync/compaction/clustering in separate thread and controlling their life-cycle. * Base Class for running archive/clean/delta-sync/compaction/clustering in separate thread and controlling their life-cycles.
*/ */
public abstract class HoodieAsyncService implements Serializable { public abstract class HoodieAsyncService implements Serializable {
@@ -70,11 +70,15 @@ public abstract class HoodieAsyncService implements Serializable {
this.runInDaemonMode = runInDaemonMode; this.runInDaemonMode = runInDaemonMode;
} }
protected boolean isShutdownRequested() { public boolean isStarted() {
return started;
}
public boolean isShutdownRequested() {
return shutdownRequested; return shutdownRequested;
} }
protected boolean isShutdown() { public boolean isShutdown() {
return shutdown; return shutdown;
} }
@@ -138,8 +142,6 @@ public abstract class HoodieAsyncService implements Serializable {
/** /**
* Service implementation. * Service implementation.
*
* @return
*/ */
protected abstract Pair<CompletableFuture, ExecutorService> startService(); protected abstract Pair<CompletableFuture, ExecutorService> startService();

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.client; package org.apache.hudi.client;
import org.apache.hudi.async.AsyncArchiveService;
import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieClusteringPlan;
@@ -67,7 +69,6 @@ import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.rollback.RollbackUtils; import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.apache.hudi.table.action.savepoint.SavepointHelpers; import org.apache.hudi.table.action.savepoint.SavepointHelpers;
@@ -115,6 +116,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
private transient WriteOperationType operationType; private transient WriteOperationType operationType;
private transient HoodieWriteCommitCallback commitCallback; private transient HoodieWriteCommitCallback commitCallback;
protected transient AsyncCleanerService asyncCleanerService; protected transient AsyncCleanerService asyncCleanerService;
protected transient AsyncArchiveService asyncArchiveService;
protected final TransactionManager txnManager; protected final TransactionManager txnManager;
protected Option<Pair<HoodieInstant, Map<String, String>>> lastCompletedTxnAndMetadata = Option.empty(); protected Option<Pair<HoodieInstant, Map<String, String>>> lastCompletedTxnAndMetadata = Option.empty();
@@ -431,6 +433,11 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
} else { } else {
this.asyncCleanerService.start(null); this.asyncCleanerService.start(null);
} }
if (null == this.asyncArchiveService) {
this.asyncArchiveService = AsyncArchiveService.startAsyncArchiveIfEnabled(this);
} else {
this.asyncArchiveService.start(null);
}
} }
/** /**
@@ -456,9 +463,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime) WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
autoCleanOnCommit(); autoCleanOnCommit();
if (config.isAutoArchive()) { autoArchiveOnCommit(table);
archive(table);
}
} finally { } finally {
this.heartbeatClient.stop(instantTime); this.heartbeatClient.stop(instantTime);
} }
@@ -523,23 +528,35 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
}); });
} }
/**
* Handle auto clean during commit.
*
*/
protected void autoCleanOnCommit() { protected void autoCleanOnCommit() {
if (config.isAutoClean()) { if (!config.isAutoClean()) {
// Call clean to cleanup if there is anything to cleanup after the commit, return;
}
if (config.isAsyncClean()) { if (config.isAsyncClean()) {
LOG.info("Cleaner has been spawned already. Waiting for it to finish"); LOG.info("Async cleaner has been spawned. Waiting for it to finish");
AsyncCleanerService.waitForCompletion(asyncCleanerService); AsyncCleanerService.waitForCompletion(asyncCleanerService);
LOG.info("Cleaner has finished"); LOG.info("Async cleaner has finished");
} else { } else {
LOG.info("Start to clean synchronously.");
// Do not reuse instantTime for clean as metadata table requires all changes to have unique instant timestamps. // Do not reuse instantTime for clean as metadata table requires all changes to have unique instant timestamps.
LOG.info("Auto cleaning is enabled. Running cleaner now");
clean(true); clean(true);
} }
} }
protected void autoArchiveOnCommit(HoodieTable<T, I, K, O> table) {
if (!config.isAutoArchive()) {
return;
}
if (config.isAsyncArchive()) {
LOG.info("Async archiver has been spawned. Waiting for it to finish");
AsyncArchiveService.waitForCompletion(asyncArchiveService);
LOG.info("Async archiver has finished");
} else {
LOG.info("Start to archive synchronously.");
archive(table);
}
} }
/** /**
@@ -784,8 +801,8 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
protected void archive(HoodieTable<T, I, K, O> table) { protected void archive(HoodieTable<T, I, K, O> table) {
try { try {
// We cannot have unbounded commit files. Archive commits if we have to archive // We cannot have unbounded commit files. Archive commits if we have to archive
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table); HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, table);
archiveLog.archiveIfRequired(context); archiver.archiveIfRequired(context);
} catch (IOException ioe) { } catch (IOException ioe) {
throw new HoodieIOException("Failed to archive", ioe); throw new HoodieIOException("Failed to archive", ioe);
} }
@@ -1249,7 +1266,8 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
@Override @Override
public void close() { public void close() {
// release AsyncCleanerService AsyncArchiveService.forceShutdown(asyncArchiveService);
asyncArchiveService = null;
AsyncCleanerService.forceShutdown(asyncCleanerService); AsyncCleanerService.forceShutdown(asyncCleanerService);
asyncCleanerService = null; asyncCleanerService = null;
// Stop timeline-server if running // Stop timeline-server if running

View File

@@ -9,14 +9,15 @@
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing,
* distributed under the License is distributed on an "AS IS" BASIS, * software distributed under the License is distributed on an
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* See the License for the specific language governing permissions and * KIND, either express or implied. See the License for the
* limitations under the License. * specific language governing permissions and limitations
* under the License.
*/ */
package org.apache.hudi.table; package org.apache.hudi.client;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@@ -52,6 +53,7 @@ import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkers; import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.table.marker.WriteMarkersFactory;
@@ -79,9 +81,9 @@ import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_O
/** /**
* Archiver to bound the growth of files under .hoodie meta path. * Archiver to bound the growth of files under .hoodie meta path.
*/ */
public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> { public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
private static final Logger LOG = LogManager.getLogger(HoodieTimelineArchiveLog.class); private static final Logger LOG = LogManager.getLogger(HoodieTimelineArchiver.class);
private final Path archiveFilePath; private final Path archiveFilePath;
private final HoodieWriteConfig config; private final HoodieWriteConfig config;
@@ -91,7 +93,7 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
private final HoodieTable<T, I, K, O> table; private final HoodieTable<T, I, K, O> table;
private final HoodieTableMetaClient metaClient; private final HoodieTableMetaClient metaClient;
public HoodieTimelineArchiveLog(HoodieWriteConfig config, HoodieTable<T, I, K, O> table) { public HoodieTimelineArchiver(HoodieWriteConfig config, HoodieTable<T, I, K, O> table) {
this.config = config; this.config = config;
this.table = table; this.table = table;
this.metaClient = table.getMetaClient(); this.metaClient = table.getMetaClient();

View File

@@ -50,13 +50,6 @@ import java.util.stream.Collectors;
+ "cleaning (reclamation of older/unused file groups/slices).") + "cleaning (reclamation of older/unused file groups/slices).")
public class HoodieCompactionConfig extends HoodieConfig { public class HoodieCompactionConfig extends HoodieConfig {
public static final ConfigProperty<String> AUTO_CLEAN = ConfigProperty
.key("hoodie.clean.automatic")
.defaultValue("true")
.withDocumentation("When enabled, the cleaner table service is invoked immediately after each commit,"
+ " to delete older file slices. It's recommended to enable this, to ensure metadata and data storage"
+ " growth is bounded.");
public static final ConfigProperty<String> AUTO_ARCHIVE = ConfigProperty public static final ConfigProperty<String> AUTO_ARCHIVE = ConfigProperty
.key("hoodie.archive.automatic") .key("hoodie.archive.automatic")
.defaultValue("true") .defaultValue("true")
@@ -64,6 +57,20 @@ public class HoodieCompactionConfig extends HoodieConfig {
+ " to archive commits if we cross a maximum value of commits." + " to archive commits if we cross a maximum value of commits."
+ " It's recommended to enable this, to ensure number of active commits is bounded."); + " It's recommended to enable this, to ensure number of active commits is bounded.");
public static final ConfigProperty<String> ASYNC_ARCHIVE = ConfigProperty
.key("hoodie.archive.async")
.defaultValue("false")
.sinceVersion("0.11.0")
.withDocumentation("Only applies when " + AUTO_ARCHIVE.key() + " is turned on. "
+ "When turned on runs archiver async with writing, which can speed up overall write performance.");
public static final ConfigProperty<String> AUTO_CLEAN = ConfigProperty
.key("hoodie.clean.automatic")
.defaultValue("true")
.withDocumentation("When enabled, the cleaner table service is invoked immediately after each commit,"
+ " to delete older file slices. It's recommended to enable this, to ensure metadata and data storage"
+ " growth is bounded.");
public static final ConfigProperty<String> ASYNC_CLEAN = ConfigProperty public static final ConfigProperty<String> ASYNC_CLEAN = ConfigProperty
.key("hoodie.clean.async") .key("hoodie.clean.async")
.defaultValue("false") .defaultValue("false")
@@ -522,6 +529,16 @@ public class HoodieCompactionConfig extends HoodieConfig {
return this; return this;
} }
public Builder withAutoArchive(Boolean autoArchive) {
compactionConfig.setValue(AUTO_ARCHIVE, String.valueOf(autoArchive));
return this;
}
public Builder withAsyncArchive(Boolean asyncArchive) {
compactionConfig.setValue(ASYNC_ARCHIVE, String.valueOf(asyncArchive));
return this;
}
public Builder withAutoClean(Boolean autoClean) { public Builder withAutoClean(Boolean autoClean) {
compactionConfig.setValue(AUTO_CLEAN, String.valueOf(autoClean)); compactionConfig.setValue(AUTO_CLEAN, String.valueOf(autoClean));
return this; return this;
@@ -532,11 +549,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
return this; return this;
} }
public Builder withAutoArchive(Boolean autoArchive) {
compactionConfig.setValue(AUTO_ARCHIVE, String.valueOf(autoArchive));
return this;
}
public Builder withIncrementalCleaningMode(Boolean incrementalCleaningMode) { public Builder withIncrementalCleaningMode(Boolean incrementalCleaningMode) {
compactionConfig.setValue(CLEANER_INCREMENTAL_MODE_ENABLE, String.valueOf(incrementalCleaningMode)); compactionConfig.setValue(CLEANER_INCREMENTAL_MODE_ENABLE, String.valueOf(incrementalCleaningMode));
return this; return this;

View File

@@ -1112,10 +1112,6 @@ public class HoodieWriteConfig extends HoodieConfig {
return getInt(HoodieCompactionConfig.CLEANER_PARALLELISM_VALUE); return getInt(HoodieCompactionConfig.CLEANER_PARALLELISM_VALUE);
} }
public boolean isAutoClean() {
return getBoolean(HoodieCompactionConfig.AUTO_CLEAN);
}
public boolean getArchiveMergeEnable() { public boolean getArchiveMergeEnable() {
return getBoolean(HoodieCompactionConfig.ARCHIVE_MERGE_ENABLE); return getBoolean(HoodieCompactionConfig.ARCHIVE_MERGE_ENABLE);
} }
@@ -1128,6 +1124,14 @@ public class HoodieWriteConfig extends HoodieConfig {
return getBoolean(HoodieCompactionConfig.AUTO_ARCHIVE); return getBoolean(HoodieCompactionConfig.AUTO_ARCHIVE);
} }
public boolean isAsyncArchive() {
return getBoolean(HoodieCompactionConfig.ASYNC_ARCHIVE);
}
public boolean isAutoClean() {
return getBoolean(HoodieCompactionConfig.AUTO_CLEAN);
}
public boolean isAsyncClean() { public boolean isAsyncClean() {
return getBoolean(HoodieCompactionConfig.ASYNC_CLEAN); return getBoolean(HoodieCompactionConfig.ASYNC_CLEAN);
} }
@@ -1872,7 +1876,7 @@ public class HoodieWriteConfig extends HoodieConfig {
* @return True if any table services are configured to run inline, false otherwise. * @return True if any table services are configured to run inline, false otherwise.
*/ */
public Boolean areAnyTableServicesExecutedInline() { public Boolean areAnyTableServicesExecutedInline() {
return inlineClusteringEnabled() || inlineCompactionEnabled() || isAutoClean(); return inlineClusteringEnabled() || inlineCompactionEnabled() || isAutoClean() || isAutoArchive();
} }
/** /**
@@ -1881,7 +1885,7 @@ public class HoodieWriteConfig extends HoodieConfig {
* @return True if any table services are configured to run async, false otherwise. * @return True if any table services are configured to run async, false otherwise.
*/ */
public Boolean areAnyTableServicesAsync() { public Boolean areAnyTableServicesAsync() {
return isAsyncClusteringEnabled() || !inlineCompactionEnabled() || isAsyncClean(); return isAsyncClusteringEnabled() || !inlineCompactionEnabled() || isAsyncClean() || isAsyncArchive();
} }
public Boolean areAnyTableServicesScheduledInline() { public Boolean areAnyTableServicesScheduledInline() {

View File

@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.async;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.config.HoodieWriteConfig;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.concurrent.ExecutionException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class TestAsyncArchiveService {
@Mock
BaseHoodieWriteClient writeClient;
@Mock
HoodieWriteConfig config;
@Test
void startAsyncArchiveReturnsNullWhenAutoArchiveDisabled() {
when(config.isAutoArchive()).thenReturn(false);
when(writeClient.getConfig()).thenReturn(config);
assertNull(AsyncArchiveService.startAsyncArchiveIfEnabled(writeClient));
}
@Test
void startAsyncArchiveReturnsNullWhenAsyncArchiveDisabled() {
when(config.isAutoArchive()).thenReturn(true);
when(config.isAsyncArchive()).thenReturn(false);
when(writeClient.getConfig()).thenReturn(config);
assertNull(AsyncArchiveService.startAsyncArchiveIfEnabled(writeClient));
}
@Test
void startAsyncArchiveIfEnabled() {
when(config.isAutoArchive()).thenReturn(true);
when(config.isAsyncArchive()).thenReturn(true);
when(writeClient.getConfig()).thenReturn(config);
assertNotNull(AsyncArchiveService.startAsyncArchiveIfEnabled(writeClient));
}
@Test
void startServiceShouldInvokeCallArchiveMethod() throws ExecutionException, InterruptedException {
AsyncArchiveService service = new AsyncArchiveService(writeClient);
assertEquals(true, service.startService().getLeft().get());
verify(writeClient).archive();
}
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.client; package org.apache.hudi.client;
import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.data.HoodieList; import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -332,10 +333,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
// Delete the marker directory for the instant. // Delete the marker directory for the instant.
WriteMarkersFactory.get(config.getMarkersType(), createTable(config, hadoopConf), instantTime) WriteMarkersFactory.get(config.getMarkersType(), createTable(config, hadoopConf), instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
if (config.isAutoArchive()) { autoArchiveOnCommit(table);
// We cannot have unbounded commit files. Archive commits if we have to archive
archive(table);
}
} finally { } finally {
this.heartbeatClient.stop(instantTime); this.heartbeatClient.stop(instantTime);
} }

View File

@@ -102,7 +102,8 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000"); properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
HoodieWriteConfig writeConfig = getConfigBuilder() HoodieWriteConfig writeConfig = getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()) .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.withAutoArchive(false).withAutoClean(false).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
// Timeline-server-based markers are not used for multi-writer tests // Timeline-server-based markers are not used for multi-writer tests
.withMarkersType(MarkerType.DIRECT.name()) .withMarkersType(MarkerType.DIRECT.name())

View File

@@ -52,7 +52,7 @@ import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.HoodieTimelineArchiveLog; import org.apache.hudi.client.HoodieTimelineArchiver;
import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@@ -286,8 +286,8 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
protected void archiveDataTable(HoodieWriteConfig writeConfig, HoodieTableMetaClient metaClient) throws IOException { protected void archiveDataTable(HoodieWriteConfig writeConfig, HoodieTableMetaClient metaClient) throws IOException {
HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(writeConfig, table); HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table);
archiveLog.archiveIfRequired(context); archiver.archiveIfRequired(context);
} }
protected void validateMetadata(HoodieTestTable testTable) throws IOException { protected void validateMetadata(HoodieTestTable testTable) throws IOException {

View File

@@ -47,7 +47,7 @@ import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.HoodieTimelineArchiveLog; import org.apache.hudi.client.HoodieTimelineArchiver;
import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@@ -77,9 +77,9 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
private static final Logger LOG = LogManager.getLogger(TestHoodieTimelineArchiveLog.class); private static final Logger LOG = LogManager.getLogger(TestHoodieTimelineArchiver.class);
private Configuration hadoopConf; private Configuration hadoopConf;
private HoodieWrapperFileSystem wrapperFs; private HoodieWrapperFileSystem wrapperFs;
@@ -172,8 +172,8 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
.withParallelism(2, 2).forTable("test-trip-table").build(); .withParallelism(2, 2).forTable("test-trip-table").build();
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table);
boolean result = archiveLog.archiveIfRequired(context); boolean result = archiver.archiveIfRequired(context);
assertTrue(result); assertTrue(result);
} }
@@ -224,14 +224,14 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
// build a merge small archive plan with dummy content // build a merge small archive plan with dummy content
// this plan can not be deserialized. // this plan can not be deserialized.
HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(writeConfig, table); HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table);
FileStatus[] fsStatuses = metaClient.getFs().globStatus( FileStatus[] fsStatuses = metaClient.getFs().globStatus(
new Path(metaClient.getArchivePath() + "/.commits_.archive*")); new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
List<String> candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList()); List<String> candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList());
archiveLog.reOpenWriter(); archiver.reOpenWriter();
Path plan = new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME); Path plan = new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME);
archiveLog.buildArchiveMergePlan(candidateFiles, plan, ".commits_.archive.3_1-0-1"); archiver.buildArchiveMergePlan(candidateFiles, plan, ".commits_.archive.3_1-0-1");
String s = "Dummy Content"; String s = "Dummy Content";
// stain the current merge plan file. // stain the current merge plan file.
FileIOUtils.createFileInPath(metaClient.getFs(), plan, Option.of(s.getBytes())); FileIOUtils.createFileInPath(metaClient.getFs(), plan, Option.of(s.getBytes()));
@@ -274,15 +274,15 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
// do a single merge small archive files // do a single merge small archive files
HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(writeConfig, table); HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table);
FileStatus[] fsStatuses = metaClient.getFs().globStatus( FileStatus[] fsStatuses = metaClient.getFs().globStatus(
new Path(metaClient.getArchivePath() + "/.commits_.archive*")); new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
List<String> candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList()); List<String> candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList());
archiveLog.reOpenWriter(); archiver.reOpenWriter();
archiveLog.buildArchiveMergePlan(candidateFiles, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), ".commits_.archive.3_1-0-1"); archiver.buildArchiveMergePlan(candidateFiles, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), ".commits_.archive.3_1-0-1");
archiveLog.mergeArchiveFiles(Arrays.stream(fsStatuses).collect(Collectors.toList())); archiver.mergeArchiveFiles(Arrays.stream(fsStatuses).collect(Collectors.toList()));
HoodieLogFormat.Writer writer = archiveLog.reOpenWriter(); HoodieLogFormat.Writer writer = archiver.reOpenWriter();
// check loading archived and active timeline success // check loading archived and active timeline success
HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false); HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
@@ -327,16 +327,16 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
// do a single merge small archive files // do a single merge small archive files
HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(writeConfig, table); HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table);
FileStatus[] fsStatuses = metaClient.getFs().globStatus( FileStatus[] fsStatuses = metaClient.getFs().globStatus(
new Path(metaClient.getArchivePath() + "/.commits_.archive*")); new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
List<String> candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList()); List<String> candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList());
archiveLog.reOpenWriter(); archiver.reOpenWriter();
archiveLog.buildArchiveMergePlan(candidateFiles, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), ".commits_.archive.3_1-0-1"); archiver.buildArchiveMergePlan(candidateFiles, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), ".commits_.archive.3_1-0-1");
archiveLog.mergeArchiveFiles(Arrays.stream(fsStatuses).collect(Collectors.toList())); archiver.mergeArchiveFiles(Arrays.stream(fsStatuses).collect(Collectors.toList()));
archiveLog.reOpenWriter(); archiver.reOpenWriter();
// delete only one of the small archive file to simulate delete action failed. // delete only one of the small archive file to simulate delete action failed.
metaClient.getFs().delete(fsStatuses[0].getPath()); metaClient.getFs().delete(fsStatuses[0].getPath());
@@ -397,16 +397,16 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
} }
HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(writeConfig, table); HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table);
FileStatus[] fsStatuses = metaClient.getFs().globStatus( FileStatus[] fsStatuses = metaClient.getFs().globStatus(
new Path(metaClient.getArchivePath() + "/.commits_.archive*")); new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
List<String> candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList()); List<String> candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList());
archiveLog.reOpenWriter(); archiver.reOpenWriter();
archiveLog.buildArchiveMergePlan(candidateFiles, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), ".commits_.archive.3_1-0-1"); archiver.buildArchiveMergePlan(candidateFiles, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), ".commits_.archive.3_1-0-1");
archiveLog.mergeArchiveFiles(Arrays.stream(fsStatuses).collect(Collectors.toList())); archiver.mergeArchiveFiles(Arrays.stream(fsStatuses).collect(Collectors.toList()));
HoodieLogFormat.Writer writer = archiveLog.reOpenWriter(); HoodieLogFormat.Writer writer = archiver.reOpenWriter();
String s = "Dummy Content"; String s = "Dummy Content";
// stain the current merged archive file. // stain the current merged archive file.
@@ -470,11 +470,11 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
HoodieTestDataGenerator.createCommitFile(basePath, "104", wrapperFs.getConf()); HoodieTestDataGenerator.createCommitFile(basePath, "104", wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "105", wrapperFs.getConf()); HoodieTestDataGenerator.createCommitFile(basePath, "105", wrapperFs.getConf());
HoodieTable table = HoodieSparkTable.create(cfg, context); HoodieTable table = HoodieSparkTable.create(cfg, context);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table);
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match"); assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match");
assertTrue(archiveLog.archiveIfRequired(context)); assertTrue(archiver.archiveIfRequired(context));
timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(); timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
assertEquals(5, timeline.countInstants(), assertEquals(5, timeline.countInstants(),
"Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)"); "Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)");
@@ -620,8 +620,8 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
HoodieTestDataGenerator.createCommitFile(basePath, "5", wrapperFs.getConf()); HoodieTestDataGenerator.createCommitFile(basePath, "5", wrapperFs.getConf());
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table);
boolean result = archiveLog.archiveIfRequired(context); boolean result = archiver.archiveIfRequired(context);
assertTrue(result); assertTrue(result);
HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(); HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
List<HoodieInstant> archivedInstants = Arrays.asList(instant1, instant2, instant3); List<HoodieInstant> archivedInstants = Arrays.asList(instant1, instant2, instant3);
@@ -776,9 +776,9 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
} }
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table);
archiveLog.archiveIfRequired(context); archiver.archiveIfRequired(context);
Stream<HoodieInstant> currentInstants = metaClient.getActiveTimeline().reload().getInstants(); Stream<HoodieInstant> currentInstants = metaClient.getActiveTimeline().reload().getInstants();
Map<Object, List<HoodieInstant>> actionInstantMap = currentInstants.collect(Collectors.groupingBy(HoodieInstant::getAction)); Map<Object, List<HoodieInstant>> actionInstantMap = currentInstants.collect(Collectors.groupingBy(HoodieInstant::getAction));
@@ -810,9 +810,9 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
HoodieInstant notArchivedInstant3 = createCleanMetadata("14", true); HoodieInstant notArchivedInstant3 = createCleanMetadata("14", true);
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table);
archiveLog.archiveIfRequired(context); archiver.archiveIfRequired(context);
List<HoodieInstant> notArchivedInstants = metaClient.getActiveTimeline().reload().getInstants().collect(Collectors.toList()); List<HoodieInstant> notArchivedInstants = metaClient.getActiveTimeline().reload().getInstants().collect(Collectors.toList());
assertEquals(3, notArchivedInstants.size(), "Not archived instants should be 3"); assertEquals(3, notArchivedInstants.size(), "Not archived instants should be 3");
@@ -894,8 +894,8 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
HoodieTimeline timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants(); HoodieTimeline timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
List<HoodieInstant> originalCommits = timeline.getInstants().collect(Collectors.toList()); List<HoodieInstant> originalCommits = timeline.getInstants().collect(Collectors.toList());
HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(writeConfig, table); HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table);
archiveLog.archiveIfRequired(context); archiver.archiveIfRequired(context);
timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants(); timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
List<HoodieInstant> commitsAfterArchival = timeline.getInstants().collect(Collectors.toList()); List<HoodieInstant> commitsAfterArchival = timeline.getInstants().collect(Collectors.toList());
return Pair.of(originalCommits, commitsAfterArchival); return Pair.of(originalCommits, commitsAfterArchival);