1
0

[HUDI-2150] Rename/Restructure configs for better modularity (#6061)

- Move clean related configuration to HoodieCleanConfig
- Move Archival related configuration to HoodieArchivalConfig
- hoodie.compaction.payload.class move this to HoodiePayloadConfig
This commit is contained in:
liujinhui
2022-07-09 22:30:48 +08:00
committed by GitHub
parent 6566fc6625
commit 126b88b48d
38 changed files with 920 additions and 672 deletions

View File

@@ -783,12 +783,12 @@ public class DeltaSync implements Serializable {
.combineInput(cfg.filterDupes, combineBeforeUpsert)
.withCompactionConfig(
HoodieCompactionConfig.newBuilder()
.withPayloadClass(cfg.payloadClassName)
.withInlineCompaction(cfg.isInlineCompactionEnabled())
.build()
)
.withPayloadConfig(
HoodiePayloadConfig.newBuilder()
.withPayloadClass(cfg.payloadClassName)
.withPayloadOrderingField(cfg.sourceOrderingField)
.build())
.forTable(cfg.targetTableName)

View File

@@ -46,6 +46,8 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -766,7 +768,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
}
cfg.tableType = tableType.name();
cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
cfg.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
@@ -946,16 +948,16 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
// Step 4 : Insert 1 record and trigger sync/async cleaner and archive.
List<String> configs = getAsyncServicesConfigs(1, "true", "true", "2", "", "");
configs.add(String.format("%s=%s", HoodieCompactionConfig.CLEANER_POLICY.key(), "KEEP_LATEST_COMMITS"));
configs.add(String.format("%s=%s", HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key(), "1"));
configs.add(String.format("%s=%s", HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), "2"));
configs.add(String.format("%s=%s", HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key(), "3"));
configs.add(String.format("%s=%s", HoodieCompactionConfig.ASYNC_CLEAN.key(), asyncClean));
configs.add(String.format("%s=%s", HoodieCleanConfig.CLEANER_POLICY.key(), "KEEP_LATEST_COMMITS"));
configs.add(String.format("%s=%s", HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "1"));
configs.add(String.format("%s=%s", HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), "2"));
configs.add(String.format("%s=%s", HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), "3"));
configs.add(String.format("%s=%s", HoodieCleanConfig.ASYNC_CLEAN.key(), asyncClean));
configs.add(String.format("%s=%s", HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "1"));
if (asyncClean) {
configs.add(String.format("%s=%s", HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()));
configs.add(String.format("%s=%s", HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key(),
configs.add(String.format("%s=%s", HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
HoodieFailedWritesCleaningPolicy.LAZY.name()));
configs.add(String.format("%s=%s", HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
InProcessLockProvider.class.getName()));
@@ -987,7 +989,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
List<String> configs = new ArrayList<>();
configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
if (!StringUtils.isNullOrEmpty(autoClean)) {
configs.add(String.format("%s=%s", HoodieCompactionConfig.AUTO_CLEAN.key(), autoClean));
configs.add(String.format("%s=%s", HoodieCleanConfig.AUTO_CLEAN.key(), autoClean));
}
if (!StringUtils.isNullOrEmpty(inlineCluster)) {
configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING.key(), inlineCluster));
@@ -1462,7 +1464,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
cfg2.filterDupes = false;
cfg2.sourceLimit = 2000;
cfg2.operation = WriteOperationType.UPSERT;
cfg2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
cfg2.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg2, jsc);
ds2.sync();
mClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();

View File

@@ -26,7 +26,7 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
@@ -95,7 +95,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
prepJobConfig.continuousMode = true;
prepJobConfig.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
prepJobConfig.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
prepJobConfig.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc());
// Prepare base dataset with some commits
@@ -115,7 +115,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
cfgIngestionJob.continuousMode = true;
cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
cfgIngestionJob.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
// create a backfill job
HoodieDeltaStreamer.Config cfgBackfillJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
@@ -127,7 +127,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
cfgBackfillJob.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc());
// re-init ingestion job to start sync service
@@ -157,7 +157,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
prepJobConfig.continuousMode = true;
prepJobConfig.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
prepJobConfig.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
prepJobConfig.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc());
// Prepare base dataset with some commits
@@ -188,13 +188,13 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
cfgBackfillJob2.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
cfgBackfillJob2.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgBackfillJob2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
cfgBackfillJob2.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
HoodieDeltaStreamer.Config cfgIngestionJob2 = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName()));
cfgIngestionJob2.continuousMode = true;
cfgIngestionJob2.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgIngestionJob2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
cfgIngestionJob2.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
// re-init ingestion job
HoodieDeltaStreamer ingestionJob3 = new HoodieDeltaStreamer(cfgIngestionJob2, jsc());
// re-init backfill job
@@ -225,7 +225,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
prepJobConfig.continuousMode = true;
prepJobConfig.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
prepJobConfig.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
prepJobConfig.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc());
// Prepare base dataset with some commits
@@ -263,7 +263,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
// Set checkpoint to the last successful position
cfgBackfillJob.checkpoint = commitMetadataForLastInstant.getMetadata(CHECKPOINT_KEY);
cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
cfgBackfillJob.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc());
backfillJob.sync();

View File

@@ -28,7 +28,8 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -64,8 +65,8 @@ public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness {
@Test
public void testHoodieIncrSource() throws IOException {
HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.archiveCommitsWith(2, 3).retainCommits(1).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.withMaxNumDeltaCommitsBeforeCompaction(1).build())
.build();

View File

@@ -124,11 +124,14 @@ public class UtilitiesTestBase {
}
public static void initTestServices(boolean needsHive, boolean needsZookeeper) throws Exception {
hdfsTestService = new HdfsTestService();
dfsCluster = hdfsTestService.start(true);
dfs = dfsCluster.getFileSystem();
dfsBasePath = dfs.getWorkingDirectory().toString();
dfs.mkdirs(new Path(dfsBasePath));
if (hdfsTestService == null) {
hdfsTestService = new HdfsTestService();
dfsCluster = hdfsTestService.start(true);
dfs = dfsCluster.getFileSystem();
dfsBasePath = dfs.getWorkingDirectory().toString();
dfs.mkdirs(new Path(dfsBasePath));
}
if (needsHive) {
hiveTestService = new HiveTestService(hdfsTestService.getHadoopConf());
hiveServer = hiveTestService.start();