[HUDI-44] Adding support to preserve commit metadata for compaction (#4428)
This commit is contained in:
committed by
GitHub
parent
50fa5a6aa7
commit
b6891d253f
@@ -224,6 +224,12 @@ public class HoodieCompactionConfig extends HoodieConfig {
|
|||||||
.withDocumentation("Used by org.apache.hudi.io.compact.strategy.DayBasedCompactionStrategy to denote the number of "
|
.withDocumentation("Used by org.apache.hudi.io.compact.strategy.DayBasedCompactionStrategy to denote the number of "
|
||||||
+ "latest partitions to compact during a compaction run.");
|
+ "latest partitions to compact during a compaction run.");
|
||||||
|
|
||||||
|
public static final ConfigProperty<Boolean> PRESERVE_COMMIT_METADATA = ConfigProperty
|
||||||
|
.key("hoodie.compaction.preserve.commit.metadata")
|
||||||
|
.defaultValue(false)
|
||||||
|
.sinceVersion("0.11.0")
|
||||||
|
.withDocumentation("When rewriting data, preserves existing hoodie_commit_time");
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configs related to specific table types.
|
* Configs related to specific table types.
|
||||||
*/
|
*/
|
||||||
@@ -621,6 +627,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder withPreserveCommitMetadata(boolean preserveCommitMetadata) {
|
||||||
|
compactionConfig.setValue(PRESERVE_COMMIT_METADATA, String.valueOf(preserveCommitMetadata));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public Builder withCommitsArchivalBatchSize(int batchSize) {
|
public Builder withCommitsArchivalBatchSize(int batchSize) {
|
||||||
compactionConfig.setValue(COMMITS_ARCHIVAL_BATCH_SIZE, String.valueOf(batchSize));
|
compactionConfig.setValue(COMMITS_ARCHIVAL_BATCH_SIZE, String.valueOf(batchSize));
|
||||||
return this;
|
return this;
|
||||||
|
|||||||
@@ -1163,10 +1163,14 @@ public class HoodieWriteConfig extends HoodieConfig {
|
|||||||
return getBoolean(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE);
|
return getBoolean(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isPreserveHoodieCommitMetadata() {
|
public boolean isPreserveHoodieCommitMetadataForClustering() {
|
||||||
return getBoolean(HoodieClusteringConfig.PRESERVE_COMMIT_METADATA);
|
return getBoolean(HoodieClusteringConfig.PRESERVE_COMMIT_METADATA);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isPreserveHoodieCommitMetadataForCompaction() {
|
||||||
|
return getBoolean(HoodieCompactionConfig.PRESERVE_COMMIT_METADATA);
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isClusteringEnabled() {
|
public boolean isClusteringEnabled() {
|
||||||
// TODO: future support async clustering
|
// TODO: future support async clustering
|
||||||
return inlineClusteringEnabled() || isAsyncClusteringEnabled();
|
return inlineClusteringEnabled() || isAsyncClusteringEnabled();
|
||||||
|
|||||||
@@ -97,6 +97,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
|
|||||||
protected Map<String, HoodieRecord<T>> keyToNewRecords;
|
protected Map<String, HoodieRecord<T>> keyToNewRecords;
|
||||||
protected Set<String> writtenRecordKeys;
|
protected Set<String> writtenRecordKeys;
|
||||||
protected HoodieFileWriter<IndexedRecord> fileWriter;
|
protected HoodieFileWriter<IndexedRecord> fileWriter;
|
||||||
|
private boolean preserveMetadata = false;
|
||||||
|
|
||||||
protected Path newFilePath;
|
protected Path newFilePath;
|
||||||
protected Path oldFilePath;
|
protected Path oldFilePath;
|
||||||
@@ -133,6 +134,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
|
|||||||
super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier);
|
super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier);
|
||||||
this.keyToNewRecords = keyToNewRecords;
|
this.keyToNewRecords = keyToNewRecords;
|
||||||
this.useWriterSchema = true;
|
this.useWriterSchema = true;
|
||||||
|
this.preserveMetadata = config.isPreserveHoodieCommitMetadataForCompaction();
|
||||||
init(fileId, this.partitionPath, dataFileToBeMerged);
|
init(fileId, this.partitionPath, dataFileToBeMerged);
|
||||||
validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
|
validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
|
||||||
}
|
}
|
||||||
@@ -291,7 +293,11 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
|
|||||||
if (indexedRecord.isPresent() && !isDelete) {
|
if (indexedRecord.isPresent() && !isDelete) {
|
||||||
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
|
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
|
||||||
IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) indexedRecord.get());
|
IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) indexedRecord.get());
|
||||||
|
if (preserveMetadata) {
|
||||||
|
fileWriter.writeAvro(hoodieRecord.getRecordKey(), recordWithMetadataInSchema);
|
||||||
|
} else {
|
||||||
fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, hoodieRecord);
|
fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, hoodieRecord);
|
||||||
|
}
|
||||||
recordsWritten++;
|
recordsWritten++;
|
||||||
} else {
|
} else {
|
||||||
recordsDeleted++;
|
recordsDeleted++;
|
||||||
|
|||||||
@@ -105,7 +105,7 @@ public abstract class PartitionAwareClusteringPlanStrategy<T extends HoodieRecor
|
|||||||
.setInputGroups(clusteringGroups)
|
.setInputGroups(clusteringGroups)
|
||||||
.setExtraMetadata(getExtraMetadata())
|
.setExtraMetadata(getExtraMetadata())
|
||||||
.setVersion(getPlanVersion())
|
.setVersion(getPlanVersion())
|
||||||
.setPreserveHoodieMetadata(getWriteConfig().isPreserveHoodieCommitMetadata())
|
.setPreserveHoodieMetadata(getWriteConfig().isPreserveHoodieCommitMetadataForClustering())
|
||||||
.build());
|
.build());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1602,7 +1602,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
|||||||
SparkRDDWriteClient client = getHoodieWriteClient(config);
|
SparkRDDWriteClient client = getHoodieWriteClient(config);
|
||||||
String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString();
|
String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString();
|
||||||
HoodieWriteMetadata<JavaRDD<WriteStatus>> clusterMetadata = client.cluster(clusteringCommitTime, completeClustering);
|
HoodieWriteMetadata<JavaRDD<WriteStatus>> clusterMetadata = client.cluster(clusteringCommitTime, completeClustering);
|
||||||
if (config.isPreserveHoodieCommitMetadata() && config.populateMetaFields()) {
|
if (config.isPreserveHoodieCommitMetadataForClustering() && config.populateMetaFields()) {
|
||||||
verifyRecordsWrittenWithPreservedMetadata(new HashSet<>(allRecords.getRight()), allRecords.getLeft(), clusterMetadata.getWriteStatuses().collect());
|
verifyRecordsWrittenWithPreservedMetadata(new HashSet<>(allRecords.getRight()), allRecords.getLeft(), clusterMetadata.getWriteStatuses().collect());
|
||||||
} else {
|
} else {
|
||||||
verifyRecordsWritten(clusteringCommitTime, populateMetaFields, allRecords.getLeft(), clusterMetadata.getWriteStatuses().collect(), config);
|
verifyRecordsWritten(clusteringCommitTime, populateMetaFields, allRecords.getLeft(), clusterMetadata.getWriteStatuses().collect(), config);
|
||||||
|
|||||||
@@ -38,12 +38,15 @@ import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
|
|||||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||||
import org.apache.hudi.common.testutils.Transformations;
|
import org.apache.hudi.common.testutils.Transformations;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.config.HoodieClusteringConfig;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
|
import org.apache.hudi.index.HoodieIndex;
|
||||||
import org.apache.hudi.index.HoodieIndex.IndexType;
|
import org.apache.hudi.index.HoodieIndex.IndexType;
|
||||||
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
|
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
|
||||||
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
|
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
|
||||||
import org.apache.hudi.table.action.deltacommit.AbstractSparkDeltaCommitActionExecutor;
|
import org.apache.hudi.table.action.deltacommit.AbstractSparkDeltaCommitActionExecutor;
|
||||||
import org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor;
|
import org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor;
|
||||||
|
import org.apache.hudi.testutils.HoodieClientTestUtils;
|
||||||
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
|
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
|
||||||
import org.apache.hudi.testutils.HoodieSparkWriteableTestTable;
|
import org.apache.hudi.testutils.HoodieSparkWriteableTestTable;
|
||||||
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
|
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
|
||||||
@@ -53,6 +56,8 @@ import org.apache.avro.generic.GenericRecord;
|
|||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Row;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.params.ParameterizedTest;
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
@@ -194,10 +199,12 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
|
|||||||
|
|
||||||
// TODO: Enable metadata virtual keys in this test once the feature HUDI-2593 is completed
|
// TODO: Enable metadata virtual keys in this test once the feature HUDI-2593 is completed
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@ValueSource(booleans = {true})
|
@ValueSource(booleans = {false, true})
|
||||||
public void testLogFileCountsAfterCompaction(boolean populateMetaFields) throws Exception {
|
public void testLogFileCountsAfterCompaction(boolean preserveCommitMeta) throws Exception {
|
||||||
|
boolean populateMetaFields = true;
|
||||||
// insert 100 records
|
// insert 100 records
|
||||||
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true)
|
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true, false, HoodieIndex.IndexType.BLOOM,
|
||||||
|
1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build(), preserveCommitMeta)
|
||||||
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build());
|
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build());
|
||||||
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
|
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
|
||||||
HoodieWriteConfig config = cfgBuilder.build();
|
HoodieWriteConfig config = cfgBuilder.build();
|
||||||
@@ -268,6 +275,18 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
|
|||||||
List<WriteStatus> writeStatuses = result.collect();
|
List<WriteStatus> writeStatuses = result.collect();
|
||||||
assertTrue(writeStatuses.stream().anyMatch(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)));
|
assertTrue(writeStatuses.stream().anyMatch(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check the entire dataset has all records still
|
||||||
|
String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
|
||||||
|
for (int i = 0; i < fullPartitionPaths.length; i++) {
|
||||||
|
fullPartitionPaths[i] = String.format("%s/%s/*", basePath(), dataGen.getPartitionPaths()[i]);
|
||||||
|
}
|
||||||
|
Dataset<Row> actual = HoodieClientTestUtils.read(jsc(), basePath(), sqlContext(), fs(), fullPartitionPaths);
|
||||||
|
List<Row> rows = actual.collectAsList();
|
||||||
|
assertEquals(updatedRecords.size(), rows.size());
|
||||||
|
for (Row row: rows) {
|
||||||
|
assertEquals(row.getAs(HoodieRecord.COMMIT_TIME_METADATA_FIELD), preserveCommitMeta ? newCommitTime : compactionInstantTime);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -305,20 +305,20 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) {
|
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) {
|
||||||
return getConfigBuilder(autoCommit, false, HoodieIndex.IndexType.BLOOM, compactionSmallFileSize, clusteringConfig);
|
return getConfigBuilder(autoCommit, false, HoodieIndex.IndexType.BLOOM, compactionSmallFileSize, clusteringConfig, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType) {
|
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType) {
|
||||||
return getConfigBuilder(autoCommit, rollbackUsingMarkers, indexType, 1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build());
|
return getConfigBuilder(autoCommit, rollbackUsingMarkers, indexType, 1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build(), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType,
|
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType,
|
||||||
long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) {
|
long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig, boolean preserveCommitMetaForCompaction) {
|
||||||
return HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
|
return HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
|
||||||
.withDeleteParallelism(2)
|
.withDeleteParallelism(2)
|
||||||
.withAutoCommit(autoCommit)
|
.withAutoCommit(autoCommit)
|
||||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(compactionSmallFileSize)
|
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(compactionSmallFileSize)
|
||||||
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
|
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).withPreserveCommitMetadata(preserveCommitMetaForCompaction).build())
|
||||||
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 1024 * 1024).build())
|
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 1024 * 1024).build())
|
||||||
.withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table")
|
.withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table")
|
||||||
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
|
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
|
||||||
|
|||||||
Reference in New Issue
Block a user