1
0

[HUDI-3366] Remove hardcoded logic of disabling metadata table in tests (#4792)

This commit is contained in:
Y Ethan Guo
2022-02-15 13:41:47 -08:00
committed by GitHub
parent 538ec44fa8
commit 9a05940a74
25 changed files with 244 additions and 148 deletions

View File

@@ -24,12 +24,12 @@ import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.functional.CLIFunctionalTestHarness;
import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
import org.apache.hudi.cli.testutils.HoodieTestCommitUtilities;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
@@ -75,7 +75,6 @@ public class TestArchivedCommitsCommand extends CLIFunctionalTestHarness {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.forTable("test-trip-table").build();
// Create six commits
@@ -90,6 +89,11 @@ public class TestArchivedCommitsCommand extends CLIFunctionalTestHarness {
HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, timestamp, hadoopConf());
}
// Simulate a compaction commit in metadata table timeline
// so the archival in data table can happen
HoodieTestUtils.createCompactionCommitInMetadataTable(
hadoopConf(), metaClient.getFs(), tablePath, "105");
metaClient = HoodieTableMetaClient.reload(metaClient);
// reload the timeline and get all the commits before archive
metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();

View File

@@ -48,6 +48,7 @@ import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.springframework.shell.core.CommandResult;
import java.io.IOException;
@@ -60,6 +61,7 @@ import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.hudi.common.testutils.HoodieTestUtils.createCompactionCommitInMetadataTable;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -204,15 +206,16 @@ public class TestCommitsCommand extends CLIFunctionalTestHarness {
/**
* Test case of 'commits showarchived' command.
*/
@Test
public void testShowArchivedCommits() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testShowArchivedCommits(boolean enableMetadataTable) throws Exception {
// Generate archive
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath1)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
.forTable("test-trip-table").build();
// generate data and metadata
@@ -229,6 +232,12 @@ public class TestCommitsCommand extends CLIFunctionalTestHarness {
Option.of(value[0]), Option.of(value[1]));
}
if (enableMetadataTable) {
// Simulate a compaction commit in metadata table timeline
// so the archival in data table can happen
createCompactionCommitInMetadataTable(hadoopConf(), metaClient.getFs(), tablePath1, "104");
}
// archive
metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
HoodieSparkTable table = HoodieSparkTable.create(cfg, context(), metaClient);
@@ -251,15 +260,16 @@ public class TestCommitsCommand extends CLIFunctionalTestHarness {
assertEquals(expected, got);
}
@Test
public void testShowArchivedCommitsWithMultiCommitsFile() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testShowArchivedCommitsWithMultiCommitsFile(boolean enableMetadataTable) throws Exception {
// Generate archive
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath1)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
.forTable("test-trip-table").build();
// generate data and metadata
@@ -269,6 +279,12 @@ public class TestCommitsCommand extends CLIFunctionalTestHarness {
data.put(String.valueOf(i), new Integer[] {i, i});
}
if (enableMetadataTable) {
// Simulate a compaction commit in metadata table timeline
// so the archival in data table can happen
createCompactionCommitInMetadataTable(hadoopConf(), metaClient.getFs(), tablePath1, "194");
}
for (Map.Entry<String, Integer[]> entry : data.entrySet()) {
String key = entry.getKey();
Integer[] value = entry.getValue();

View File

@@ -24,7 +24,9 @@ import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.functional.CLIFunctionalTestHarness;
import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.fs.NoOpConsistencyGuard;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -35,6 +37,7 @@ import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.CompactionTestUtils;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -152,7 +155,11 @@ public class TestCompactionCommand extends CLIFunctionalTestHarness {
activeTimeline.transitionCompactionInflightToComplete(
new HoodieInstant(HoodieInstant.State.INFLIGHT, COMPACTION_ACTION, timestamp), Option.empty());
});
// Simulate a compaction commit in metadata table timeline
// so the archival in data table can happen
HoodieTestUtils.createCompactionCommitInMetadataTable(hadoopConf(),
new HoodieWrapperFileSystem(
FSUtils.getFs(tablePath, hadoopConf()), new NoOpConsistencyGuard()), tablePath, "007");
}
private void generateArchive() throws IOException {
@@ -162,7 +169,6 @@ public class TestCompactionCommand extends CLIFunctionalTestHarness {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.forTable("test-trip-table").build();
// archive
HoodieTableMetaClient metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());

View File

@@ -19,8 +19,6 @@
package org.apache.hudi.client;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan;
import org.apache.hudi.client.utils.MetadataConversionUtils;
@@ -59,6 +57,8 @@ import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -429,7 +429,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
.collect(Collectors.groupingBy(i -> Pair.of(i.getTimestamp(),
HoodieInstant.getComparableAction(i.getAction()))));
// If metadata table is enabled, do not archive instants which are more recent that the last compaction on the
// If metadata table is enabled, do not archive instants which are more recent than the last compaction on the
// metadata table.
if (config.isMetadataTableEnabled()) {
try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(table.getContext(), config.getMetadataConfig(),

View File

@@ -19,7 +19,6 @@
package org.apache.hudi.client;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -540,7 +539,6 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.withAutoClean(false).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
// Timeline-server-based markers are not used for multi-writer tests
.withMarkersType(MarkerType.DIRECT.name())
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.client;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
@@ -69,14 +68,9 @@ public class TestMultiFS extends HoodieClientTestHarness {
}
protected HoodieWriteConfig getHoodieWriteConfig(String basePath) {
return getHoodieWriteConfig(basePath, HoodieMetadataConfig.ENABLE.defaultValue());
}
protected HoodieWriteConfig getHoodieWriteConfig(String basePath, boolean enableMetadata) {
return HoodieWriteConfig.newBuilder().withPath(basePath).withEmbeddedTimelineServerEnabled(true)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable(tableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata).build())
.build();
}
@@ -84,21 +78,21 @@ public class TestMultiFS extends HoodieClientTestHarness {
public void readLocalWriteHDFS() throws Exception {
// Initialize table and filesystem
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(tableType)
.setTableName(tableName)
.setPayloadClass(HoodieAvroPayload.class)
.initTable(hadoopConf, dfsBasePath);
.setTableType(tableType)
.setTableName(tableName)
.setPayloadClass(HoodieAvroPayload.class)
.initTable(hadoopConf, dfsBasePath);
// Create write client to write some records in
HoodieWriteConfig cfg = getHoodieWriteConfig(dfsBasePath, false);
HoodieWriteConfig localConfig = getHoodieWriteConfig(tablePath, false);
HoodieWriteConfig cfg = getHoodieWriteConfig(dfsBasePath);
HoodieWriteConfig localConfig = getHoodieWriteConfig(tablePath);
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(tableType)
.setTableName(tableName)
.setPayloadClass(HoodieAvroPayload.class)
.setRecordKeyFields(localConfig.getProps().getProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()))
.setPartitionFields(localConfig.getProps().getProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()))
.setPartitionFields(localConfig.getProps().getProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()))
.initTable(hadoopConf, tablePath);

View File

@@ -19,7 +19,6 @@
package org.apache.hudi.client.functional;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -105,10 +104,10 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
private HoodieWriteConfig config;
private void setUp(IndexType indexType, boolean populateMetaFields) throws Exception {
setUp(indexType, populateMetaFields, true, true);
setUp(indexType, populateMetaFields, true);
}
private void setUp(IndexType indexType, boolean populateMetaFields, boolean enableMetadata, boolean rollbackUsingMarkers) throws Exception {
private void setUp(IndexType indexType, boolean populateMetaFields, boolean rollbackUsingMarkers) throws Exception {
this.indexType = indexType;
initPath();
initSparkContexts();
@@ -122,8 +121,8 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
config = getConfigBuilder()
.withProperties(populateMetaFields ? new Properties() : getPropertiesForKeyGen())
.withRollbackUsingMarkers(rollbackUsingMarkers)
.withIndexConfig(indexBuilder
.build()).withAutoCommit(false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata).build())
.withIndexConfig(indexBuilder.build())
.withAutoCommit(false)
.withLayoutConfig(HoodieLayoutConfig.newBuilder().fromProperties(indexBuilder.build().getProps())
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build()).build();
writeClient = getHoodieWriteClient(config);
@@ -238,7 +237,7 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
@ParameterizedTest
@MethodSource("indexTypeParams")
public void testSimpleTagLocationAndUpdateWithRollback(IndexType indexType, boolean populateMetaFields) throws Exception {
setUp(indexType, populateMetaFields, true, false);
setUp(indexType, populateMetaFields, false);
String newCommitTime = writeClient.startCommit();
int totalRecords = 20 + random.nextInt(20);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, totalRecords);
@@ -385,8 +384,6 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
.withGlobalSimpleIndexUpdatePartitionPath(true)
.withBloomIndexUpdatePartitionPath(true)
.build())
.withMetadataConfig(
HoodieMetadataConfig.newBuilder().enable(true).build())
.build();
writeClient = getHoodieWriteClient(config);
index = writeClient.getIndex();

View File

@@ -278,7 +278,7 @@ public class TestHoodieMetadataBootstrap extends TestHoodieMetadataBase {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minArchivalCommits, maxArchivalCommits).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build())
.forTable("test-trip-table").build();
}

View File

@@ -21,7 +21,6 @@ package org.apache.hudi.index.hbase;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
@@ -342,8 +341,7 @@ public class TestSparkHoodieHBaseIndex extends SparkClientFunctionalTestHarness
public void testSimpleTagLocationAndUpdateWithRollback() throws Exception {
// Load to memory
HoodieWriteConfig config = getConfigBuilder(100, false, false)
.withRollbackUsingMarkers(false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
.withRollbackUsingMarkers(false).build();
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
SparkRDDWriteClient writeClient = getHoodieWriteClient(config);
@@ -431,8 +429,7 @@ public class TestSparkHoodieHBaseIndex extends SparkClientFunctionalTestHarness
public void testEnsureTagLocationUsesCommitTimeline() throws Exception {
// Load to memory
HoodieWriteConfig config = getConfigBuilder(100, false, false)
.withRollbackUsingMarkers(false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
.withRollbackUsingMarkers(false).build();
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
SparkRDDWriteClient writeClient = getHoodieWriteClient(config);

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.io;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.HoodieTimelineArchiver;
import org.apache.hudi.client.utils.MetadataConversionUtils;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
@@ -47,7 +48,6 @@ import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.client.HoodieTimelineArchiver;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hadoop.conf.Configuration;
@@ -58,6 +58,7 @@ import org.apache.log4j.Logger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
@@ -72,6 +73,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.testutils.HoodieTestUtils.createCompactionCommitInMetadataTable;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -213,7 +215,7 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testMergeSmallArchiveFilesRecoverFromBuildPlanFailed(boolean enableArchiveMerge) throws Exception {
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2, 3, 2, enableArchiveMerge, 3, 209715200);
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 2, 3, 2, enableArchiveMerge, 3, 209715200);
// do ingestion and trigger archive actions here.
for (int i = 1; i < 8; i++) {
@@ -264,7 +266,7 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testMergeSmallArchiveFilesRecoverFromMergeFailed(boolean enableArchiveMerge) throws Exception {
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2, 3, 2, enableArchiveMerge, 3, 209715200);
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 2, 3, 2, enableArchiveMerge, 3, 209715200);
// do ingestion and trigger archive actions here.
for (int i = 1; i < 8; i++) {
@@ -317,7 +319,7 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testMergeSmallArchiveFilesRecoverFromDeleteFailed(boolean enableArchiveMerge) throws Exception {
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2, 3, 2, enableArchiveMerge, 3, 209715200);
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 2, 3, 2, enableArchiveMerge, 3, 209715200);
// do ingestion and trigger archive actions here.
for (int i = 1; i < 8; i++) {
@@ -362,7 +364,7 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testLoadArchiveTimelineWithDamagedPlanFile(boolean enableArchiveMerge) throws Exception {
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2, 3, 2, enableArchiveMerge, 3, 209715200);
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 2, 3, 2, enableArchiveMerge, 3, 209715200);
// do ingestion and trigger archive actions here.
for (int i = 1; i < 8; i++) {
@@ -390,7 +392,7 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testLoadArchiveTimelineWithUncompletedMergeArchiveFile(boolean enableArchiveMerge) throws Exception {
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2, 3, 2, enableArchiveMerge, 3, 209715200);
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 2, 3, 2, enableArchiveMerge, 3, 209715200);
for (int i = 1; i < 8; i++) {
testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
archiveAndGetCommitsList(writeConfig);
@@ -451,15 +453,16 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
assertEquals(originalCommits, commitsAfterArchival);
}
@Test
public void testArchiveCommitSavepointNoHole() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testArchiveCommitSavepointNoHole(boolean enableMetadataTable) throws Exception {
init();
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-trip-table")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
.build();
HoodieTestDataGenerator.createCommitFile(basePath, "100", wrapperFs.getConf());
@@ -472,6 +475,12 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
HoodieTable table = HoodieSparkTable.create(cfg, context);
HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table);
if (enableMetadataTable) {
// Simulate a compaction commit in metadata table timeline
// so the archival in data table can happen
createCompactionCommitInMetadataTable(hadoopConf, wrapperFs, basePath, "105");
}
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match");
assertTrue(archiver.archiveIfRequired(context));
@@ -593,8 +602,9 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
verifyArchival(archivedInstants, getActiveCommitInstants(Arrays.asList("00000007", "00000008"), HoodieTimeline.DELTA_COMMIT_ACTION), commitsAfterArchival);
}
@Test
public void testArchiveCommitTimeline() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testArchiveCommitTimeline(boolean enableMetadataTable) throws Exception {
init();
HoodieWriteConfig cfg =
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
@@ -602,7 +612,7 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -619,6 +629,12 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
HoodieTestDataGenerator.createCommitFile(basePath, "4", wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "5", wrapperFs.getConf());
if (enableMetadataTable) {
// Simulate a compaction commit in metadata table timeline
// so the archival in data table can happen
createCompactionCommitInMetadataTable(hadoopConf, wrapperFs, basePath, "5");
}
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table);
boolean result = archiver.archiveIfRequired(context);
@@ -713,10 +729,9 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
@Test
public void testArchiveRollbacksAndCleanTestTable() throws Exception {
boolean enableMetadata = false;
int minArchiveCommits = 2;
int maxArchiveCommits = 9;
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, minArchiveCommits, maxArchiveCommits, 2);
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, minArchiveCommits, maxArchiveCommits, 2);
// trigger 1 commit to add lot of files so that future cleans can clean them up
testTable.doWriteOperation("00000001", WriteOperationType.UPSERT, Arrays.asList("p1", "p2"), Arrays.asList("p1", "p2"), 20);
@@ -751,8 +766,8 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testArchiveCompletedRollbackAndClean(boolean isEmpty) throws Exception {
@CsvSource({"true,true", "true,false", "false,true", "false,false"})
public void testArchiveCompletedRollbackAndClean(boolean isEmpty, boolean enableMetadataTable) throws Exception {
init();
int minInstantsToKeep = 2;
int maxInstantsToKeep = 10;
@@ -762,7 +777,7 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstantsToKeep, maxInstantsToKeep).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -775,6 +790,12 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
createCommitAndRollbackFile(startInstant + 1 + "", startInstant + "", false, isEmpty || i % 2 == 0);
}
if (enableMetadataTable) {
// Simulate a compaction commit in metadata table timeline
// so the archival in data table can happen
createCompactionCommitInMetadataTable(hadoopConf, wrapperFs, basePath, Integer.toString(99));
}
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table);
@@ -790,8 +811,9 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
assertEquals(minInstantsToKeep, actionInstantMap.get("rollback").size(), "Should have min instant");
}
@Test
public void testArchiveInflightClean() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testArchiveInflightClean(boolean enableMetadataTable) throws Exception {
init();
HoodieWriteConfig cfg =
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
@@ -799,7 +821,7 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -809,6 +831,12 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
HoodieInstant notArchivedInstant2 = createCleanMetadata("13", false);
HoodieInstant notArchivedInstant3 = createCleanMetadata("14", true);
if (enableMetadataTable) {
// Simulate a compaction commit in metadata table timeline
// so the archival in data table can happen
createCompactionCommitInMetadataTable(hadoopConf, wrapperFs, basePath, "14");
}
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table);
@@ -889,6 +917,35 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
"00000009", "00000010", "00000011", "00000012")), getActiveCommitInstants(Arrays.asList("00000013", "00000014")), commitsAfterArchival);
}
@Test
public void testArchiveCommitsWithCompactionCommitInMetadataTableTimeline() throws Exception {
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 2, 4, 20);
int startInstantTime = 100;
int numCommits = 15;
int numExpectedArchived = 6; // "100" till "105" should be archived in this case
for (int i = startInstantTime; i < startInstantTime + numCommits; i++) {
HoodieTestDataGenerator.createCommitFile(basePath, Integer.toString(i), wrapperFs.getConf());
}
// Simulate a compaction commit in metadata table timeline
// so the archival in data table can happen
createCompactionCommitInMetadataTable(hadoopConf, wrapperFs, basePath, "105");
HoodieTable table = HoodieSparkTable.create(writeConfig, context);
HoodieTimelineArchiver archiveLog = new HoodieTimelineArchiver(writeConfig, table);
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals(numCommits, timeline.countInstants(), String.format("Loaded %d commits and the count should match", numCommits));
assertTrue(archiveLog.archiveIfRequired(context));
timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
assertEquals(numCommits - numExpectedArchived, timeline.countInstants(),
"Since we have a compaction commit of 105 in metadata table timeline, we should never archive any commit after that");
for (int i = startInstantTime + numExpectedArchived; i < startInstantTime + numCommits; i++) {
assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, Integer.toString(i))),
String.format("Commit %d should not be archived", i));
}
}
private Pair<List<HoodieInstant>, List<HoodieInstant>> archiveAndGetCommitsList(HoodieWriteConfig writeConfig) throws IOException {
metaClient.reloadActiveTimeline();
HoodieTimeline timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();

View File

@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestHarness;
@@ -36,8 +37,9 @@ import org.apache.spark.sql.catalyst.InternalRow;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
@@ -170,19 +172,29 @@ public class TestHoodieRowCreateHandle extends HoodieClientTestHarness {
assertRows(inputRows, result, instantTime, fileNames);
}
@Test
public void testInstantiationFailure() throws IOException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testInstantiationFailure(boolean enableMetadataTable) {
// init config and table
HoodieWriteConfig cfg = SparkDatasetTestUtils.getConfigBuilder(basePath, timelineServicePort)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.withPath("/dummypath/abc/").build();
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
.withPath("/dummypath/abc/")
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
.build();
try {
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
new HoodieRowCreateHandle(table, cfg, " def", UUID.randomUUID().toString(), "001", RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE);
fail("Should have thrown exception");
} catch (HoodieInsertException ioe) {
// expected
// expected without metadata table
if (enableMetadataTable) {
fail("Should have thrown TableNotFoundException");
}
} catch (TableNotFoundException e) {
// expected with metadata table
if (!enableMetadataTable) {
fail("Should have thrown HoodieInsertException");
}
}
}

View File

@@ -114,6 +114,7 @@ import java.util.stream.Stream;
import scala.Tuple3;
import static org.apache.hudi.HoodieTestCommitGenerator.getBaseFilename;
import static org.apache.hudi.common.testutils.HoodieTestTable.makeIncrementalCommitTimes;
import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime;
import static org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS;
@@ -272,7 +273,6 @@ public class TestCleaner extends HoodieClientTestBase {
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(maxVersions).build())
.withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build())
.build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
@@ -442,7 +442,6 @@ public class TestCleaner extends HoodieClientTestBase {
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build())
.withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build())
.build();
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
@@ -519,7 +518,6 @@ public class TestCleaner extends HoodieClientTestBase {
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build())
.withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build())
.build();
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
@@ -648,7 +646,7 @@ public class TestCleaner extends HoodieClientTestBase {
public void testKeepLatestFileVersions(Boolean enableBootstrapSourceClean) throws Exception {
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).enable(true).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
@@ -811,7 +809,7 @@ public class TestCleaner extends HoodieClientTestBase {
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).enable(false).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
.build();
@@ -848,8 +846,8 @@ public class TestCleaner extends HoodieClientTestBase {
public void testKeepLatestCommitsMOR() throws Exception {
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).enable(false).build())
HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build())
.build();
@@ -889,12 +887,15 @@ public class TestCleaner extends HoodieClientTestBase {
@Test
public void testCleanWithReplaceCommits() throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).enable(false).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.withMaxNumDeltaCommitsBeforeCompaction(1)
.withAssumeDatePartitioning(true).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
.build();
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context);
HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter);
String p0 = "2020/01/01";
String p1 = "2020/01/02";
@@ -903,7 +904,7 @@ public class TestCleaner extends HoodieClientTestBase {
String file1P1C0 = UUID.randomUUID().toString();
testTable.addInflightCommit("00000000000001").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0);
HoodieCommitMetadata commitMetadata = generateCommitMetadata(
HoodieCommitMetadata commitMetadata = generateCommitMetadata("00000000000001",
Collections.unmodifiableMap(new HashMap<String, List<String>>() {
{
put(p0, CollectionUtils.createImmutableList(file1P0C0));
@@ -911,6 +912,7 @@ public class TestCleaner extends HoodieClientTestBase {
}
})
);
metadataWriter.update(commitMetadata, "00000000000001", false);
metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000001"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
@@ -926,7 +928,8 @@ public class TestCleaner extends HoodieClientTestBase {
// notice that clustering generates empty inflight commit files
Map<String, String> partitionAndFileId002 = testTable.forReplaceCommit("00000000000002").getFileIdsWithBaseFilesInPartitions(p0);
String file2P0C1 = partitionAndFileId002.get(p0);
Pair<HoodieRequestedReplaceMetadata, HoodieReplaceCommitMetadata> replaceMetadata = generateReplaceCommitMetadata(p0, file1P0C0, file2P0C1);
Pair<HoodieRequestedReplaceMetadata, HoodieReplaceCommitMetadata> replaceMetadata =
generateReplaceCommitMetadata("00000000000002", p0, file1P0C0, file2P0C1);
testTable.addReplaceCommit("00000000000002", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue());
// run cleaner
@@ -940,7 +943,7 @@ public class TestCleaner extends HoodieClientTestBase {
// notice that clustering generates empty inflight commit files
Map<String, String> partitionAndFileId003 = testTable.forReplaceCommit("00000000000003").getFileIdsWithBaseFilesInPartitions(p1);
String file3P1C2 = partitionAndFileId003.get(p1);
replaceMetadata = generateReplaceCommitMetadata(p1, file1P1C0, file3P1C2);
replaceMetadata = generateReplaceCommitMetadata("00000000000003", p1, file1P1C0, file3P1C2);
testTable.addReplaceCommit("00000000000003", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue());
// run cleaner
@@ -955,11 +958,11 @@ public class TestCleaner extends HoodieClientTestBase {
// notice that clustering generates empty inflight commit files
Map<String, String> partitionAndFileId004 = testTable.forReplaceCommit("00000000000004").getFileIdsWithBaseFilesInPartitions(p0);
String file4P0C3 = partitionAndFileId004.get(p0);
replaceMetadata = generateReplaceCommitMetadata(p0, file2P0C1, file4P0C3);
replaceMetadata = generateReplaceCommitMetadata("00000000000004", p0, file2P0C1, file4P0C3);
testTable.addReplaceCommit("00000000000004", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue());
// run cleaner
List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config);
List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config, 5);
assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2));
@@ -969,12 +972,12 @@ public class TestCleaner extends HoodieClientTestBase {
// make next replacecommit, with 1 clustering operation. Replace all data in p1. no new files created
// notice that clustering generates empty inflight commit files
Map<String, String> partitionAndFileId005 = testTable.forReplaceCommit("00000000000005").getFileIdsWithBaseFilesInPartitions(p1);
Map<String, String> partitionAndFileId005 = testTable.forReplaceCommit("00000000000006").getFileIdsWithBaseFilesInPartitions(p1);
String file4P1C4 = partitionAndFileId005.get(p1);
replaceMetadata = generateReplaceCommitMetadata(p0, file3P1C2, file4P1C4);
testTable.addReplaceCommit("00000000000005", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue());
replaceMetadata = generateReplaceCommitMetadata("00000000000006", p0, file3P1C2, file4P1C4);
testTable.addReplaceCommit("00000000000006", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue());
List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, 2);
List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, 7);
assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2));
@@ -982,9 +985,8 @@ public class TestCleaner extends HoodieClientTestBase {
assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
}
private Pair<HoodieRequestedReplaceMetadata, HoodieReplaceCommitMetadata> generateReplaceCommitMetadata(String partition,
String replacedFileId,
String newFileId) {
private Pair<HoodieRequestedReplaceMetadata, HoodieReplaceCommitMetadata> generateReplaceCommitMetadata(
String instantTime, String partition, String replacedFileId, String newFileId) {
HoodieRequestedReplaceMetadata requestedReplaceMetadata = new HoodieRequestedReplaceMetadata();
requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.toString());
requestedReplaceMetadata.setVersion(1);
@@ -1005,7 +1007,7 @@ public class TestCleaner extends HoodieClientTestBase {
if (!StringUtils.isNullOrEmpty(newFileId)) {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setPartitionPath(partition);
writeStat.setPath(newFileId);
writeStat.setPath(partition + "/" + getBaseFilename(instantTime, newFileId));
writeStat.setFileId(newFileId);
replaceMetadata.addWriteStat(partition, writeStat);
}
@@ -1196,7 +1198,7 @@ public class TestCleaner extends HoodieClientTestBase {
@MethodSource("argumentsForTestKeepLatestCommits")
public void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).enable(false).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withIncrementalCleaningMode(enableIncrementalClean)
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
@@ -1216,7 +1218,7 @@ public class TestCleaner extends HoodieClientTestBase {
: UUID.randomUUID().toString();
testTable.addInflightCommit("00000000000001").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0);
HoodieCommitMetadata commitMetadata = generateCommitMetadata(
HoodieCommitMetadata commitMetadata = generateCommitMetadata("00000000000001",
Collections.unmodifiableMap(new HashMap<String, List<String>>() {
{
put(p0, CollectionUtils.createImmutableList(file1P0C0));
@@ -1240,7 +1242,7 @@ public class TestCleaner extends HoodieClientTestBase {
String file2P0C1 = partitionAndFileId002.get(p0);
String file2P1C1 = partitionAndFileId002.get(p1);
testTable.forCommit("00000000000002").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0);
commitMetadata = generateCommitMetadata(new HashMap<String, List<String>>() {
commitMetadata = generateCommitMetadata("00000000000002", new HashMap<String, List<String>>() {
{
put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1));
put(p1, CollectionUtils.createImmutableList(file1P1C0, file2P1C1));
@@ -1261,9 +1263,9 @@ public class TestCleaner extends HoodieClientTestBase {
.withBaseFilesInPartition(p0, file1P0C0)
.withBaseFilesInPartition(p0, file2P0C1)
.getFileIdsWithBaseFilesInPartitions(p0).get(p0);
commitMetadata = generateCommitMetadata(CollectionUtils
.createImmutableMap(p0,
CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file3P0C2)));
commitMetadata = generateCommitMetadata("00000000000003",
CollectionUtils.createImmutableMap(
p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file3P0C2)));
metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000003"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
@@ -1278,8 +1280,9 @@ public class TestCleaner extends HoodieClientTestBase {
.withBaseFilesInPartition(p0, file1P0C0)
.withBaseFilesInPartition(p0, file2P0C1)
.getFileIdsWithBaseFilesInPartitions(p0).get(p0);
commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(
p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3)));
commitMetadata = generateCommitMetadata("00000000000004",
CollectionUtils.createImmutableMap(
p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3)));
metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000004"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
@@ -1305,8 +1308,8 @@ public class TestCleaner extends HoodieClientTestBase {
// No cleaning on partially written file, with no commit.
testTable.forCommit("00000000000005").withBaseFilesInPartition(p0, file3P0C2);
commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(p0,
CollectionUtils.createImmutableList(file3P0C2)));
commitMetadata = generateCommitMetadata("00000000000005",
CollectionUtils.createImmutableMap(p0, CollectionUtils.createImmutableList(file3P0C2)));
metaClient.getActiveTimeline().createNewInstant(
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "00000000000005"));
metaClient.getActiveTimeline().transitionRequestedToInflight(
@@ -1378,7 +1381,7 @@ public class TestCleaner extends HoodieClientTestBase {
@Test
public void testCleaningWithZeroPartitionPaths() throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).enable(true).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
.build();
@@ -1402,7 +1405,7 @@ public class TestCleaner extends HoodieClientTestBase {
@Test
public void testKeepLatestCommitsWithPendingCompactions() throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).enable(true).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
.build();
@@ -1426,7 +1429,7 @@ public class TestCleaner extends HoodieClientTestBase {
public void testKeepLatestVersionsWithPendingCompactions(boolean retryFailure) throws Exception {
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).enable(true).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(2).build())
.build();
@@ -1677,14 +1680,15 @@ public class TestCleaner extends HoodieClientTestBase {
return Stream.concat(stream1, stream2);
}
private static HoodieCommitMetadata generateCommitMetadata(Map<String, List<String>> partitionToFilePaths) {
private static HoodieCommitMetadata generateCommitMetadata(
String instantTime, Map<String, List<String>> partitionToFilePaths) {
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
partitionToFilePaths.forEach((key, value) -> value.forEach(f -> {
partitionToFilePaths.forEach((partitionPath, fileList) -> fileList.forEach(f -> {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setPartitionPath(key);
writeStat.setPath(f);
writeStat.setPartitionPath(partitionPath);
writeStat.setPath(partitionPath + "/" + getBaseFilename(instantTime, f));
writeStat.setFileId(f);
metadata.addWriteStat(key, writeStat);
metadata.addWriteStat(partitionPath, writeStat);
}));
return metadata;
}

View File

@@ -22,7 +22,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -208,8 +207,7 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
boolean populateMetaFields = true;
// insert 100 records
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true, false, HoodieIndex.IndexType.BLOOM,
1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build(), preserveCommitMeta)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build());
1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build(), preserveCommitMeta);
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig config = cfgBuilder.build();

View File

@@ -21,7 +21,6 @@ package org.apache.hudi.table.action.commit;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -219,7 +218,7 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
final String testPartitionPath = "2016/09/26";
int totalInsertNum = 2000;
HoodieWriteConfig config = makeHoodieClientConfigBuilder().withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
HoodieWriteConfig config = makeHoodieClientConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0)
.insertSplitSize(totalInsertNum / 2).autoTuneInsertSplits(false).build()).build();

View File

@@ -20,7 +20,6 @@ package org.apache.hudi.table.action.compact;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -53,7 +52,6 @@ public class TestAsyncCompaction extends CompactionTestBase {
private HoodieWriteConfig getConfig(Boolean autoCommit) {
return getConfigBuilder(autoCommit)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build())
.build();
}

View File

@@ -21,7 +21,6 @@ package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecord;
@@ -159,7 +158,6 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
// insert 100 records
HoodieWriteConfig config = getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build())
.build();
try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
String newCommitTime = "100";

View File

@@ -20,7 +20,6 @@ package org.apache.hudi.table.action.rollback;
import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
@@ -163,7 +162,7 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.withRollbackUsingMarkers(false)
.withPath(basePath).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
.withPath(basePath).build();
try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
client.startCommitWithTime("001");
client.insert(jsc.emptyRDD(), "001");

View File

@@ -20,7 +20,6 @@
package org.apache.hudi.table.functional;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
@@ -86,7 +85,7 @@ public class TestHoodieSparkMergeOnReadTableIncrementalRead extends SparkClientF
Properties props = new Properties();
props.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieFileFormat.PARQUET.toString());
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
HoodieWriteConfig cfg = getConfigBuilder(true).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build();
HoodieWriteConfig cfg = getConfigBuilder(true).build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
/*

View File

@@ -325,8 +325,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
boolean populateMetaFields = true;
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false)
// Timeline-server-based markers are not used for multi-rollback tests
.withMarkersType(MarkerType.DIRECT.name())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build());
.withMarkersType(MarkerType.DIRECT.name());
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig cfg = cfgBuilder.build();
@@ -387,8 +386,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
// WriteClient with custom config (disable small file handling)
HoodieWriteConfig smallFileWriteConfig = getHoodieWriteConfigWithSmallFileHandlingOffBuilder(populateMetaFields)
// Timeline-server-based markers are not used for multi-rollback tests
.withMarkersType(MarkerType.DIRECT.name())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
.withMarkersType(MarkerType.DIRECT.name()).build();
try (SparkRDDWriteClient nClient = getHoodieWriteClient(smallFileWriteConfig)) {
nClient.startCommitWithTime(newCommitTime);
@@ -519,8 +517,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
void testMORTableRestore(boolean restoreAfterCompaction) throws Exception {
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false)
// Timeline-server-based markers are not used for multi-rollback tests
.withMarkersType(MarkerType.DIRECT.name())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build());
.withMarkersType(MarkerType.DIRECT.name());
HoodieWriteConfig cfg = cfgBuilder.build();
Properties properties = new Properties();

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.common.testutils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieTableType;
@@ -25,6 +26,7 @@ import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.metadata.HoodieTableMetadata;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
@@ -176,4 +178,17 @@ public class HoodieTestUtils {
}
return writeStatList;
}
public static void createCompactionCommitInMetadataTable(
Configuration hadoopConf, HoodieWrapperFileSystem wrapperFs, String basePath,
String instantTime) throws IOException {
// This is to simulate a completed compaction commit in metadata table timeline,
// so that the commits on data table timeline can be archived
// Note that, if metadata table is enabled, instants in data table timeline,
// which are more recent than the last compaction on the metadata table,
// are not archived (HoodieTimelineArchiveLog::getInstantsToArchive)
String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath);
HoodieTestUtils.init(hadoopConf, metadataTableBasePath, HoodieTableType.MERGE_ON_READ);
HoodieTestDataGenerator.createCommitFile(metadataTableBasePath, instantTime + "001", wrapperFs.getConf());
}
}

View File

@@ -253,7 +253,6 @@ public class TestBootstrap extends HoodieClientTestBase {
.withFullBootstrapInputProvider(TestFullBootstrapDataProvider.class.getName())
.withBootstrapParallelism(3)
.withBootstrapModeSelector(bootstrapModeSelectorClass).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.build();
SparkRDDWriteClient client = new SparkRDDWriteClient(context, config);
client.bootstrap(Option.empty());

View File

@@ -29,7 +29,6 @@ import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelect
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.bootstrap.FileStatusUtils;
import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -246,7 +245,6 @@ public class TestOrcBootstrap extends HoodieClientTestBase {
.withFullBootstrapInputProvider(TestFullBootstrapDataProvider.class.getName())
.withBootstrapParallelism(3)
.withBootstrapModeSelector(bootstrapModeSelectorClass).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.build();
SparkRDDWriteClient client = new SparkRDDWriteClient(context, config);
client.bootstrap(Option.empty());

View File

@@ -62,7 +62,8 @@ class TestCOWDataSource extends HoodieClientTestBase {
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1"
)
val verificationCol: String = "driver"
@@ -465,15 +466,10 @@ class TestCOWDataSource extends HoodieClientTestBase {
}
private def getDataFrameWriter(keyGenerator: String): DataFrameWriter[Row] = {
getDataFrameWriter(keyGenerator, true)
}
private def getDataFrameWriter(keyGenerator: String, enableMetadata: Boolean): DataFrameWriter[Row] = {
val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
val opts = commonOpts ++ Map(HoodieMetadataConfig.ENABLE.key() -> String.valueOf(enableMetadata))
inputDF.write.format("hudi")
.options(opts)
.options(commonOpts)
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key, keyGenerator)
.mode(SaveMode.Overwrite)
}
@@ -501,7 +497,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= udf_date_format(col("current_ts"))).count() == 0)
// Mixed fieldType
writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName, false)
writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName)
writer.partitionBy("driver", "rider:SIMPLE", "current_ts:TIMESTAMP")
.option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS")
.option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd")
@@ -513,7 +509,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
concat(col("driver"), lit("/"), col("rider"), lit("/"), udf_date_format(col("current_ts")))).count() == 0)
// Test invalid partitionKeyType
writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName, false)
writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName)
writer = writer.partitionBy("current_ts:DUMMY")
.option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS")
.option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd")
@@ -780,7 +776,6 @@ class TestCOWDataSource extends HoodieClientTestBase {
.option("hoodie.keep.min.commits", "4")
.option("hoodie.keep.max.commits", "5")
.option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(HoodieMetadataConfig.ENABLE.key(), value = false)
.mode(SaveMode.Append)
.save(basePath)
}

View File

@@ -22,16 +22,19 @@ import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -44,6 +47,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.TableNotFoundException;
@@ -811,7 +815,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", "2", "", ""));
cfg.configs.add(String.format("%s=%s", HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
cfg.configs.add(HoodieMetadataConfig.ENABLE.key() + "=false");
cfg.configs.add(String.format("%s=%s", HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "1"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
@@ -862,8 +866,16 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
configs.add(String.format("%s=%s", HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key(), "1"));
configs.add(String.format("%s=%s", HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), "2"));
configs.add(String.format("%s=%s", HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key(), "3"));
configs.add(String.format("%s=%s", HoodieCompactionConfig.ASYNC_CLEAN, asyncClean));
configs.add(HoodieMetadataConfig.ENABLE.key() + "=false");
configs.add(String.format("%s=%s", HoodieCompactionConfig.ASYNC_CLEAN.key(), asyncClean));
configs.add(String.format("%s=%s", HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "1"));
if (asyncClean) {
configs.add(String.format("%s=%s", HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()));
configs.add(String.format("%s=%s", HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key(),
HoodieFailedWritesCleaningPolicy.LAZY.name()));
configs.add(String.format("%s=%s", HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
InProcessLockProvider.class.getName()));
}
cfg.configs = configs;
cfg.continuousMode = false;
ds = new HoodieDeltaStreamer(cfg, jsc);

View File

@@ -64,7 +64,11 @@ public class TestHoodieIncrSource extends HoodieClientTestHarness {
@Test
public void testHoodieIncrSource() throws IOException {
HoodieWriteConfig writeConfig = getConfigBuilder(basePath)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2,3).retainCommits(1).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build();
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.archiveCommitsWith(2, 3).retainCommits(1).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.withMaxNumDeltaCommitsBeforeCompaction(1).build())
.build();
SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context, writeConfig);
Pair<String, List<HoodieRecord>> inserts = writeRecords(writeClient, true, null, "100");