1
0

[HUDI-2355][Bug]Archive service executed after cleaner finished. (#3545)

Co-authored-by: yuezhang <yuezhang@freewheel.tv>
This commit is contained in:
zhangyue19921010
2021-09-16 07:00:04 +08:00
committed by GitHub
parent 916f12b7dd
commit 2d5ac55195
2 changed files with 85 additions and 1 deletions

View File

@@ -434,10 +434,10 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
// Delete the marker directory for the instant.
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
autoCleanOnCommit();
// We cannot have unbounded commit files. Archive commits if we have to archive
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table);
archiveLog.archiveIfRequired(context);
autoCleanOnCommit();
if (operationType != null && operationType != WriteOperationType.CLUSTER && operationType != WriteOperationType.COMPACT) {
syncTableMetadata();
}

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.utilities.functional;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
@@ -26,6 +28,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
@@ -100,6 +103,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -972,6 +976,86 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
});
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean) throws Exception {
String tableBasePath = dfsBasePath + "/cleanerDeleteReplacedDataWithArchive" + asyncClean;
int totalRecords = 3000;
// Step 1 : Prepare and insert data without archival and cleaner.
// Make sure that there are 6 commits including 2 replacecommits completed.
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
cfg.continuousMode = true;
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", "2", "", ""));
cfg.configs.add(String.format("%s=%s", HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
return true;
});
TestHelpers.assertAtLeastNCommits(6, tableBasePath, dfs);
TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
// Step 2 : Get the first replacecommit and extract the corresponding replaced file IDs.
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
HoodieTimeline replacedTimeline = meta.reloadActiveTimeline().getCompletedReplaceTimeline();
Option<HoodieInstant> firstReplaceHoodieInstant = replacedTimeline.nthFromLastInstant(1);
assertTrue(firstReplaceHoodieInstant.isPresent());
Option<byte[]> firstReplaceHoodieInstantDetails = replacedTimeline.getInstantDetails(firstReplaceHoodieInstant.get());
HoodieReplaceCommitMetadata firstReplaceMetadata = HoodieReplaceCommitMetadata.fromBytes(firstReplaceHoodieInstantDetails.get(), HoodieReplaceCommitMetadata.class);
Map<String, List<String>> partitionToReplaceFileIds = firstReplaceMetadata.getPartitionToReplaceFileIds();
String partitionName = null;
List replacedFileIDs = null;
for (Map.Entry entry : partitionToReplaceFileIds.entrySet()) {
partitionName = String.valueOf(entry.getKey());
replacedFileIDs = (List) entry.getValue();
}
assertNotNull(partitionName);
assertNotNull(replacedFileIDs);
// Step 3 : Based to replacedFileIDs , get the corresponding complete path.
ArrayList<String> replacedFilePaths = new ArrayList<>();
Path partitionPath = new Path(meta.getBasePath(), partitionName);
RemoteIterator<LocatedFileStatus> hoodieFiles = meta.getFs().listFiles(partitionPath, true);
while (hoodieFiles.hasNext()) {
LocatedFileStatus f = hoodieFiles.next();
String file = f.getPath().toUri().toString();
for (Object replacedFileID : replacedFileIDs) {
if (file.contains(String.valueOf(replacedFileID))) {
replacedFilePaths.add(file);
}
}
}
assertFalse(replacedFilePaths.isEmpty());
// Step 4 : Insert 1 record and trigger sync/async cleaner and archive.
List<String> configs = getAsyncServicesConfigs(1, "true", "true", "2", "", "");
configs.add(String.format("%s=%s", HoodieCompactionConfig.CLEANER_POLICY.key(), "KEEP_LATEST_COMMITS"));
configs.add(String.format("%s=%s", HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key(), "1"));
configs.add(String.format("%s=%s", HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), "2"));
configs.add(String.format("%s=%s", HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key(), "3"));
configs.add(String.format("%s=%s", HoodieCompactionConfig.ASYNC_CLEAN, asyncClean));
cfg.configs = configs;
cfg.continuousMode = false;
ds = new HoodieDeltaStreamer(cfg, jsc);
ds.sync();
// Step 5 : Make sure that firstReplaceHoodieInstant is archived.
long count = meta.reloadActiveTimeline().getCompletedReplaceTimeline().getInstants().filter(instant -> firstReplaceHoodieInstant.get().equals(instant)).count();
assertEquals(0, count);
// Step 6 : All the replaced files in firstReplaceHoodieInstant should be deleted through sync/async cleaner.
for (String replacedFilePath : replacedFilePaths) {
assertFalse(meta.getFs().exists(new Path(replacedFilePath)));
}
}
private List<String> getAsyncServicesConfigs(int totalRecords, String autoClean, String inlineCluster,
String inlineClusterMaxCommit, String asyncCluster, String asyncClusterMaxCommit) {
List<String> configs = new ArrayList<>();