1
0

[HUDI-2763] Metadata table records - support for key deduplication based on hardcoded key field (#4449)

* [HUDI-2763] Metadata table records - support for key deduplication and virtual keys
- The backing log format for the metadata table is HFile, a KeyValue type.
Since the key field in the metadata record payload is a duplicate of the
Key in the Cell, the redundant key field in the record can be emptied
to save on the cost.

- HoodieHFileWriter and HoodieHFileDataBlock will now serialize records
with the key field emptied by default. HFile writer tries to find if
the record has metadata payload schema field 'key' and if so it does
the key trimming from the record payload.

- HoodieHFileReader when reading the serialized records back from disk,
it materializes the missing keyFields if any. HFile reader tries to
find if the record has metadata payload schema fiels 'key' and if so
it does the key materialization in the record payload.

- Tests have been added to verify the default virtual keys and key
   deduplication support for the metadata table records.

Co-authored-by: Vinoth Chandar <vinoth@apache.org>
This commit is contained in:
Manoj Govindassamy
2022-01-26 10:34:04 -08:00
committed by GitHub
parent dd4ce1bdfd
commit f87c47352a
17 changed files with 745 additions and 138 deletions

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.client.functional;
import org.apache.avro.Schema;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -31,20 +33,24 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -59,24 +65,22 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.storage.HoodieHFileReader;
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader;
import org.apache.hudi.metadata.HoodieMetadataMetrics;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieSparkTable;
@@ -96,6 +100,8 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.Time;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.schema.MessageType;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
@@ -135,8 +141,8 @@ import static org.apache.hudi.common.model.WriteOperationType.DELETE;
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -374,7 +380,6 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000003001");
}
/**
* Tests that virtual key configs are honored in base files after compaction in metadata table.
*
@@ -508,6 +513,255 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
}
}
/**
* Test arguments - Table type, populate meta fields, exclude key from payload.
*/
public static List<Arguments> testMetadataRecordKeyExcludeFromPayloadArgs() {
return asList(
Arguments.of(COPY_ON_WRITE, true),
Arguments.of(COPY_ON_WRITE, false),
Arguments.of(MERGE_ON_READ, true),
Arguments.of(MERGE_ON_READ, false)
);
}
/**
* 1. Verify metadata table records key deduplication feature. When record key
* deduplication is enabled, verify the metadata record payload on disk has empty key.
* Otherwise, verify the valid key.
* 2. Verify populate meta fields work irrespective of record key deduplication config.
* 3. Verify table services like compaction benefit from record key deduplication feature.
*/
@ParameterizedTest
@MethodSource("testMetadataRecordKeyExcludeFromPayloadArgs")
public void testMetadataRecordKeyExcludeFromPayload(final HoodieTableType tableType, final boolean enableMetaFields) throws Exception {
initPath();
writeConfig = getWriteConfigBuilder(true, true, false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.enable(true)
.withPopulateMetaFields(enableMetaFields)
.withMaxNumDeltaCommitsBeforeCompaction(3)
.build())
.build();
init(tableType, writeConfig);
// 2nd commit
doWriteOperation(testTable, "0000001", INSERT);
final HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder()
.setConf(hadoopConf)
.setBasePath(metadataTableBasePath)
.build();
HoodieWriteConfig metadataTableWriteConfig = getMetadataWriteConfig(writeConfig);
metadataMetaClient.reloadActiveTimeline();
final HoodieTable table = HoodieSparkTable.create(metadataTableWriteConfig, context, metadataMetaClient);
// Compaction has not yet kicked in. Verify all the log files
// for the metadata records persisted on disk as per the config.
assertDoesNotThrow(() -> {
verifyMetadataRecordKeyExcludeFromPayloadLogFiles(table, metadataMetaClient, "0000001",
enableMetaFields);
}, "Metadata table should have valid log files!");
// Verify no base file created yet.
assertThrows(IllegalStateException.class, () -> {
verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(table, enableMetaFields);
}, "Metadata table should not have a base file yet!");
// 2 more commits
doWriteOperation(testTable, "0000002", UPSERT);
doWriteOperation(testTable, "0000004", UPSERT);
// Compaction should be triggered by now. Let's verify the log files
// if any for the metadata records persisted on disk as per the config.
assertDoesNotThrow(() -> {
verifyMetadataRecordKeyExcludeFromPayloadLogFiles(table, metadataMetaClient, "0000002",
enableMetaFields);
}, "Metadata table should have valid log files!");
// Verify the base file created by the just completed compaction.
assertDoesNotThrow(() -> {
verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(table, enableMetaFields);
}, "Metadata table should have a valid base file!");
// 2 more commits to trigger one more compaction, along with a clean
doWriteOperation(testTable, "0000005", UPSERT);
doClean(testTable, "0000006", Arrays.asList("0000004"));
doWriteOperation(testTable, "0000007", UPSERT);
assertDoesNotThrow(() -> {
verifyMetadataRecordKeyExcludeFromPayloadLogFiles(table, metadataMetaClient, "7", enableMetaFields);
}, "Metadata table should have valid log files!");
assertDoesNotThrow(() -> {
verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(table, enableMetaFields);
}, "Metadata table should have a valid base file!");
validateMetadata(testTable);
}
/**
* Verify the metadata table log files for the record field correctness. On disk format
* should be based on meta fields and key deduplication config. And the in-memory merged
* records should all be materialized fully irrespective of the config.
*
* @param table - Hoodie metadata test table
* @param metadataMetaClient - Metadata meta client
* @param latestCommitTimestamp - Latest commit timestamp
* @param enableMetaFields - Enable meta fields for the table records
* @throws IOException
*/
private void verifyMetadataRecordKeyExcludeFromPayloadLogFiles(HoodieTable table, HoodieTableMetaClient metadataMetaClient,
String latestCommitTimestamp,
boolean enableMetaFields) throws IOException {
table.getHoodieView().sync();
// Compaction should not be triggered yet. Let's verify no base file
// and few log files available.
List<FileSlice> fileSlices = table.getSliceView()
.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList());
if (fileSlices.isEmpty()) {
throw new IllegalStateException("LogFile slices are not available!");
}
// Verify the log files honor the key deduplication and virtual keys config
List<HoodieLogFile> logFiles = fileSlices.get(0).getLogFiles().map(logFile -> {
return logFile;
}).collect(Collectors.toList());
List<String> logFilePaths = logFiles.stream().map(logFile -> {
return logFile.getPath().toString();
}).collect(Collectors.toList());
// Verify the on-disk raw records before they get materialized
verifyMetadataRawRecords(table, logFiles, enableMetaFields);
// Verify the in-memory materialized and merged records
verifyMetadataMergedRecords(metadataMetaClient, logFilePaths, latestCommitTimestamp, enableMetaFields);
}
/**
* Verify the metadata table on-disk raw records. When populate meta fields is enabled,
* these records should have additional meta fields in the payload. When key deduplication
* is enabled, these records on the disk should have key in the payload as empty string.
*
* @param table
* @param logFiles - Metadata table log files to be verified
* @param enableMetaFields - Enable meta fields for records
* @throws IOException
*/
private void verifyMetadataRawRecords(HoodieTable table, List<HoodieLogFile> logFiles, boolean enableMetaFields) throws IOException {
for (HoodieLogFile logFile : logFiles) {
FileStatus[] fsStatus = fs.listStatus(logFile.getPath());
MessageType writerSchemaMsg = TableSchemaResolver.readSchemaFromLogFile(fs, logFile.getPath());
if (writerSchemaMsg == null) {
// not a data block
continue;
}
Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg);
HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema);
while (logFileReader.hasNext()) {
HoodieLogBlock logBlock = logFileReader.next();
if (logBlock instanceof HoodieDataBlock) {
for (IndexedRecord indexRecord : ((HoodieDataBlock) logBlock).getRecords()) {
final GenericRecord record = (GenericRecord) indexRecord;
if (enableMetaFields) {
// Metadata table records should have meta fields!
assertNotNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
assertNotNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
} else {
// Metadata table records should not have meta fields!
assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
}
final String key = String.valueOf(record.get(HoodieMetadataPayload.KEY_FIELD_NAME));
assertFalse(key.isEmpty());
if (enableMetaFields) {
assertTrue(key.equals(String.valueOf(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD))));
}
}
}
}
}
}
/**
* Verify the metadata table in-memory merged records. Irrespective of key deduplication
* config, the in-memory merged records should always have the key field in the record
* payload fully materialized.
*
* @param metadataMetaClient - Metadata table meta client
* @param logFilePaths - Metadata table log file paths
* @param latestCommitTimestamp
* @param enableMetaFields - Enable meta fields
*/
private void verifyMetadataMergedRecords(HoodieTableMetaClient metadataMetaClient, List<String> logFilePaths,
String latestCommitTimestamp, boolean enableMetaFields) {
Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
if (enableMetaFields) {
schema = HoodieAvroUtils.addMetadataFields(schema);
}
HoodieMetadataMergedLogRecordReader logRecordReader = HoodieMetadataMergedLogRecordReader.newBuilder()
.withFileSystem(metadataMetaClient.getFs())
.withBasePath(metadataMetaClient.getBasePath())
.withLogFilePaths(logFilePaths)
.withLatestInstantTime(latestCommitTimestamp)
.withPartition(MetadataPartitionType.FILES.partitionPath())
.withReaderSchema(schema)
.withMaxMemorySizeInBytes(100000L)
.withBufferSize(4096)
.withSpillableMapBasePath(tempDir.toString())
.withDiskMapType(ExternalSpillableMap.DiskMapType.BITCASK)
.build();
assertDoesNotThrow(() -> {
logRecordReader.scan();
}, "Metadata log records materialization failed");
for (Map.Entry<String, HoodieRecord<? extends HoodieRecordPayload>> entry : logRecordReader.getRecords().entrySet()) {
assertFalse(entry.getKey().isEmpty());
assertFalse(entry.getValue().getRecordKey().isEmpty());
assertEquals(entry.getKey(), entry.getValue().getRecordKey());
}
}
/**
* Verify metadata table base files for the records persisted based on the config. When
* the key deduplication is enabled, the records persisted on the disk in the base file
* should have key field in the payload as empty string.
*
* @param table - Metadata table
* @param enableMetaFields - Enable meta fields
*/
private void verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(HoodieTable table, boolean enableMetaFields) throws IOException {
table.getHoodieView().sync();
List<FileSlice> fileSlices = table.getSliceView()
.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList());
if (!fileSlices.get(0).getBaseFile().isPresent()) {
throw new IllegalStateException("Base file not available!");
}
final HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get();
HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(context.getHadoopConf().get(),
new Path(baseFile.getPath()),
new CacheConfig(context.getHadoopConf().get()));
List<Pair<String, IndexedRecord>> records = hoodieHFileReader.readAllRecords();
records.forEach(entry -> {
if (enableMetaFields) {
assertNotNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
} else {
assertNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
}
final String keyInPayload = (String) ((GenericRecord) entry.getSecond())
.get(HoodieMetadataPayload.KEY_FIELD_NAME);
assertFalse(keyInPayload.isEmpty());
});
}
/**
* Test rollback of various table operations sync to Metadata Table correctly.
*/
@@ -1492,95 +1746,6 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
}
}
/**
* Fetching WriteConfig for metadata table from Data table's writeConfig is not trivial and the method is not public in source code. so, for now,
* using this method which mimics source code.
* @param writeConfig
* @return
*/
private HoodieWriteConfig getMetadataWriteConfig(HoodieWriteConfig writeConfig) {
int parallelism = writeConfig.getMetadataInsertParallelism();
int minCommitsToKeep = Math.max(writeConfig.getMetadataMinCommitsToKeep(), writeConfig.getMinCommitsToKeep());
int maxCommitsToKeep = Math.max(writeConfig.getMetadataMaxCommitsToKeep(), writeConfig.getMaxCommitsToKeep());
// Create the write config for the metadata table by borrowing options from the main write config.
HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
.withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder()
.withConsistencyCheckEnabled(writeConfig.getConsistencyGuardConfig().isConsistencyCheckEnabled())
.withInitialConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getInitialConsistencyCheckIntervalMs())
.withMaxConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getMaxConsistencyCheckIntervalMs())
.withMaxConsistencyChecks(writeConfig.getConsistencyGuardConfig().getMaxConsistencyChecks())
.build())
.withWriteConcurrencyMode(WriteConcurrencyMode.SINGLE_WRITER)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).withFileListingParallelism(writeConfig.getFileListingParallelism()).build())
.withAutoCommit(true)
.withAvroSchemaValidate(true)
.withEmbeddedTimelineServerEnabled(false)
.withMarkersType(MarkerType.DIRECT.name())
.withRollbackUsingMarkers(false)
.withPath(HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath()))
.withSchema(HoodieMetadataRecord.getClassSchema().toString())
.forTable(writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withAsyncClean(writeConfig.isMetadataAsyncClean())
// we will trigger cleaning manually, to control the instant times
.withAutoClean(false)
.withCleanerParallelism(parallelism)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.retainCommits(writeConfig.getMetadataCleanerCommitsRetained())
.archiveCommitsWith(minCommitsToKeep, maxCommitsToKeep)
// we will trigger compaction manually, to control the instant times
.withInlineCompaction(false)
.withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()).build())
.withParallelism(parallelism, parallelism)
.withDeleteParallelism(parallelism)
.withRollbackParallelism(parallelism)
.withFinalizeWriteParallelism(parallelism)
.withAllowMultiWriteOnSameInstant(true)
.withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
.withPopulateMetaFields(writeConfig.getMetadataConfig().populateMetaFields());
// RecordKey properties are needed for the metadata table records
final Properties properties = new Properties();
properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY);
properties.put("hoodie.datasource.write.recordkey.field", HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY);
builder.withProperties(properties);
if (writeConfig.isMetricsOn()) {
builder.withMetricsConfig(HoodieMetricsConfig.newBuilder()
.withReporterType(writeConfig.getMetricsReporterType().toString())
.withExecutorMetrics(writeConfig.isExecutorMetricsEnabled())
.on(true).build());
switch (writeConfig.getMetricsReporterType()) {
case GRAPHITE:
builder.withMetricsGraphiteConfig(HoodieMetricsGraphiteConfig.newBuilder()
.onGraphitePort(writeConfig.getGraphiteServerPort())
.toGraphiteHost(writeConfig.getGraphiteServerHost())
.usePrefix(writeConfig.getGraphiteMetricPrefix()).build());
break;
case JMX:
builder.withMetricsJmxConfig(HoodieMetricsJmxConfig.newBuilder()
.onJmxPort(writeConfig.getJmxPort())
.toJmxHost(writeConfig.getJmxHost())
.build());
break;
case DATADOG:
case PROMETHEUS:
case PROMETHEUS_PUSHGATEWAY:
case CONSOLE:
case INMEMORY:
case CLOUDWATCH:
break;
default:
throw new HoodieMetadataException("Unsupported Metrics Reporter type " + writeConfig.getMetricsReporterType());
}
}
return builder.build();
}
private void doPreBootstrapOperations(HoodieTestTable testTable) throws Exception {
doPreBootstrapOperations(testTable, "0000001", "0000002");
}

View File

@@ -18,30 +18,63 @@
package org.apache.hudi.client.functional;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.storage.HoodieHFileReader;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.fs.FileStatus;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.schema.MessageType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -123,4 +156,216 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase {
tableMetadata.getAllFilesInPartition(new Path(writeConfig.getBasePath() + "dummy"));
assertEquals(allFilesInPartition.length, 0);
}
/**
* 1. Verify metadata table records key deduplication feature. When record key
* deduplication is enabled, verify the metadata record payload on disk has empty key.
* Otherwise, verify the valid key.
* 2. Verify populate meta fields work irrespective of record key deduplication config.
* 3. Verify table services like compaction benefit from record key deduplication feature.
*/
@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testMetadataRecordKeyExcludeFromPayload(final HoodieTableType tableType) throws Exception {
initPath();
writeConfig = getWriteConfigBuilder(true, true, false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.enable(true)
.withPopulateMetaFields(false)
.withMaxNumDeltaCommitsBeforeCompaction(3)
.build())
.build();
init(tableType, writeConfig);
// 2nd commit
doWriteOperation(testTable, "0000001", INSERT);
final HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder()
.setConf(hadoopConf)
.setBasePath(metadataTableBasePath)
.build();
HoodieWriteConfig metadataTableWriteConfig = getMetadataWriteConfig(writeConfig);
metadataMetaClient.reloadActiveTimeline();
final HoodieTable table = HoodieSparkTable.create(metadataTableWriteConfig, context, metadataMetaClient);
// Compaction has not yet kicked in. Verify all the log files
// for the metadata records persisted on disk as per the config.
assertDoesNotThrow(() -> {
verifyMetadataRecordKeyExcludeFromPayloadLogFiles(table, metadataMetaClient, "0000001");
}, "Metadata table should have valid log files!");
// Verify no base file created yet.
assertThrows(IllegalStateException.class, () -> {
verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(table);
}, "Metadata table should not have a base file yet!");
// 2 more commits
doWriteOperation(testTable, "0000002", UPSERT);
doWriteOperation(testTable, "0000004", UPSERT);
// Compaction should be triggered by now. Let's verify the log files
// if any for the metadata records persisted on disk as per the config.
assertDoesNotThrow(() -> {
verifyMetadataRecordKeyExcludeFromPayloadLogFiles(table, metadataMetaClient, "0000002");
}, "Metadata table should have valid log files!");
// Verify the base file created by the just completed compaction.
assertDoesNotThrow(() -> {
verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(table);
}, "Metadata table should have a valid base file!");
// 2 more commits to trigger one more compaction, along with a clean
doWriteOperation(testTable, "0000005", UPSERT);
doClean(testTable, "0000006", Arrays.asList("0000004"));
doWriteOperation(testTable, "0000007", UPSERT);
assertDoesNotThrow(() -> {
verifyMetadataRecordKeyExcludeFromPayloadLogFiles(table, metadataMetaClient, "7");
}, "Metadata table should have valid log files!");
assertDoesNotThrow(() -> {
verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(table);
}, "Metadata table should have a valid base file!");
validateMetadata(testTable);
}
/**
* Verify the metadata table log files for the record field correctness. On disk format
* should be based on meta fields and key deduplication config. And the in-memory merged
* records should all be materialized fully irrespective of the config.
*
* @param table - Hoodie metadata test table
* @param metadataMetaClient - Metadata meta client
* @param latestCommitTimestamp - Latest commit timestamp
* @throws IOException
*/
private void verifyMetadataRecordKeyExcludeFromPayloadLogFiles(HoodieTable table, HoodieTableMetaClient metadataMetaClient,
String latestCommitTimestamp) throws IOException {
table.getHoodieView().sync();
// Compaction should not be triggered yet. Let's verify no base file
// and few log files available.
List<FileSlice> fileSlices = table.getSliceView()
.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList());
if (fileSlices.isEmpty()) {
throw new IllegalStateException("LogFile slices are not available!");
}
// Verify the log files honor the key deduplication and virtual keys config
List<HoodieLogFile> logFiles = fileSlices.get(0).getLogFiles().map(logFile -> {
return logFile;
}).collect(Collectors.toList());
List<String> logFilePaths = logFiles.stream().map(logFile -> {
return logFile.getPath().toString();
}).collect(Collectors.toList());
// Verify the on-disk raw records before they get materialized
verifyMetadataRawRecords(table, logFiles);
// Verify the in-memory materialized and merged records
verifyMetadataMergedRecords(metadataMetaClient, logFilePaths, latestCommitTimestamp);
}
/**
* Verify the metadata table on-disk raw records. When populate meta fields is enabled,
* these records should have additional meta fields in the payload. When key deduplication
* is enabled, these records on the disk should have key in the payload as empty string.
*
* @param table
* @param logFiles - Metadata table log files to be verified
* @throws IOException
*/
private void verifyMetadataRawRecords(HoodieTable table, List<HoodieLogFile> logFiles) throws IOException {
for (HoodieLogFile logFile : logFiles) {
FileStatus[] fsStatus = fs.listStatus(logFile.getPath());
MessageType writerSchemaMsg = TableSchemaResolver.readSchemaFromLogFile(fs, logFile.getPath());
if (writerSchemaMsg == null) {
// not a data block
continue;
}
Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg);
HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema);
while (logFileReader.hasNext()) {
HoodieLogBlock logBlock = logFileReader.next();
if (logBlock instanceof HoodieDataBlock) {
for (IndexedRecord indexRecord : ((HoodieDataBlock) logBlock).getRecords()) {
final GenericRecord record = (GenericRecord) indexRecord;
// Metadata table records should not have meta fields!
assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
final String key = String.valueOf(record.get(HoodieMetadataPayload.KEY_FIELD_NAME));
assertFalse(key.isEmpty());
}
}
}
}
}
/**
* Verify the metadata table in-memory merged records. Irrespective of key deduplication
* config, the in-memory merged records should always have the key field in the record
* payload fully materialized.
*
* @param metadataMetaClient - Metadata table meta client
* @param logFilePaths - Metadata table log file paths
* @param latestCommitTimestamp - Latest commit timestamp
*/
private void verifyMetadataMergedRecords(HoodieTableMetaClient metadataMetaClient, List<String> logFilePaths, String latestCommitTimestamp) {
Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
HoodieMetadataMergedLogRecordReader logRecordReader = HoodieMetadataMergedLogRecordReader.newBuilder()
.withFileSystem(metadataMetaClient.getFs())
.withBasePath(metadataMetaClient.getBasePath())
.withLogFilePaths(logFilePaths)
.withLatestInstantTime(latestCommitTimestamp)
.withPartition(MetadataPartitionType.FILES.partitionPath())
.withReaderSchema(schema)
.withMaxMemorySizeInBytes(100000L)
.withBufferSize(4096)
.withSpillableMapBasePath(tempDir.toString())
.withDiskMapType(ExternalSpillableMap.DiskMapType.BITCASK)
.build();
assertDoesNotThrow(() -> {
logRecordReader.scan();
}, "Metadata log records materialization failed");
for (Map.Entry<String, HoodieRecord<? extends HoodieRecordPayload>> entry : logRecordReader.getRecords().entrySet()) {
assertFalse(entry.getKey().isEmpty());
assertFalse(entry.getValue().getRecordKey().isEmpty());
assertEquals(entry.getKey(), entry.getValue().getRecordKey());
}
}
/**
* Verify metadata table base files for the records persisted based on the config. When
* the key deduplication is enabled, the records persisted on the disk in the base file
* should have key field in the payload as empty string.
*
* @param table - Metadata table
*/
private void verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(HoodieTable table) throws IOException {
table.getHoodieView().sync();
List<FileSlice> fileSlices = table.getSliceView()
.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList());
if (!fileSlices.get(0).getBaseFile().isPresent()) {
throw new IllegalStateException("Base file not available!");
}
final HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get();
HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(context.getHadoopConf().get(),
new Path(baseFile.getPath()),
new CacheConfig(context.getHadoopConf().get()));
List<Pair<String, IndexedRecord>> records = hoodieHFileReader.readAllRecords();
records.forEach(entry -> {
assertNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
final String keyInPayload = (String) ((GenericRecord) entry.getSecond())
.get(HoodieMetadataPayload.KEY_FIELD_NAME);
assertFalse(keyInPayload.isEmpty());
});
}
}

View File

@@ -18,12 +18,19 @@
package org.apache.hudi.client.functional;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
import org.apache.hudi.common.testutils.HoodieTestTable;
@@ -33,17 +40,19 @@ import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.AfterEach;
@@ -59,6 +68,7 @@ import static java.util.Collections.emptyList;
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
public class TestHoodieMetadataBase extends HoodieClientTestHarness {
@@ -94,6 +104,20 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
initWriteConfigAndMetatableWriter(writeConfig, enableMetadataTable);
}
public void init(HoodieTableType tableType, HoodieWriteConfig writeConfig) throws IOException {
this.tableType = tableType;
initPath();
initSparkContexts("TestHoodieMetadata");
initFileSystem();
fs.mkdirs(new Path(basePath));
initTimelineService();
initMetaClient(tableType);
initTestDataGenerator();
metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath);
this.writeConfig = writeConfig;
initWriteConfigAndMetatableWriter(writeConfig, writeConfig.isMetadataTableEnabled());
}
protected void initWriteConfigAndMetatableWriter(HoodieWriteConfig writeConfig, boolean enableMetadataTable) {
this.writeConfig = writeConfig;
if (enableMetadataTable) {
@@ -327,4 +351,91 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
.withProperties(properties);
}
/**
* Fetching WriteConfig for metadata table from Data table's writeConfig is not trivial and
* the method is not public in source code. so, for now, using this method which mimics source code.
*/
protected HoodieWriteConfig getMetadataWriteConfig(HoodieWriteConfig writeConfig) {
int parallelism = writeConfig.getMetadataInsertParallelism();
int minCommitsToKeep = Math.max(writeConfig.getMetadataMinCommitsToKeep(), writeConfig.getMinCommitsToKeep());
int maxCommitsToKeep = Math.max(writeConfig.getMetadataMaxCommitsToKeep(), writeConfig.getMaxCommitsToKeep());
// Create the write config for the metadata table by borrowing options from the main write config.
HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
.withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder()
.withConsistencyCheckEnabled(writeConfig.getConsistencyGuardConfig().isConsistencyCheckEnabled())
.withInitialConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getInitialConsistencyCheckIntervalMs())
.withMaxConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getMaxConsistencyCheckIntervalMs())
.withMaxConsistencyChecks(writeConfig.getConsistencyGuardConfig().getMaxConsistencyChecks())
.build())
.withWriteConcurrencyMode(WriteConcurrencyMode.SINGLE_WRITER)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).withFileListingParallelism(writeConfig.getFileListingParallelism()).build())
.withAutoCommit(true)
.withAvroSchemaValidate(true)
.withEmbeddedTimelineServerEnabled(false)
.withMarkersType(MarkerType.DIRECT.name())
.withRollbackUsingMarkers(false)
.withPath(HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath()))
.withSchema(HoodieMetadataRecord.getClassSchema().toString())
.forTable(writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withAsyncClean(writeConfig.isMetadataAsyncClean())
// we will trigger cleaning manually, to control the instant times
.withAutoClean(false)
.withCleanerParallelism(parallelism)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.retainCommits(writeConfig.getMetadataCleanerCommitsRetained())
.archiveCommitsWith(minCommitsToKeep, maxCommitsToKeep)
// we will trigger compaction manually, to control the instant times
.withInlineCompaction(false)
.withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()).build())
.withParallelism(parallelism, parallelism)
.withDeleteParallelism(parallelism)
.withRollbackParallelism(parallelism)
.withFinalizeWriteParallelism(parallelism)
.withAllowMultiWriteOnSameInstant(true)
.withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
.withPopulateMetaFields(writeConfig.getMetadataConfig().populateMetaFields());
// RecordKey properties are needed for the metadata table records
final Properties properties = new Properties();
properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), HoodieMetadataPayload.KEY_FIELD_NAME);
properties.put("hoodie.datasource.write.recordkey.field", HoodieMetadataPayload.KEY_FIELD_NAME);
builder.withProperties(properties);
if (writeConfig.isMetricsOn()) {
builder.withMetricsConfig(HoodieMetricsConfig.newBuilder()
.withReporterType(writeConfig.getMetricsReporterType().toString())
.withExecutorMetrics(writeConfig.isExecutorMetricsEnabled())
.on(true).build());
switch (writeConfig.getMetricsReporterType()) {
case GRAPHITE:
builder.withMetricsGraphiteConfig(HoodieMetricsGraphiteConfig.newBuilder()
.onGraphitePort(writeConfig.getGraphiteServerPort())
.toGraphiteHost(writeConfig.getGraphiteServerHost())
.usePrefix(writeConfig.getGraphiteMetricPrefix()).build());
break;
case JMX:
builder.withMetricsJmxConfig(HoodieMetricsJmxConfig.newBuilder()
.onJmxPort(writeConfig.getJmxPort())
.toJmxHost(writeConfig.getJmxHost())
.build());
break;
case DATADOG:
case PROMETHEUS:
case PROMETHEUS_PUSHGATEWAY:
case CONSOLE:
case INMEMORY:
case CLOUDWATCH:
break;
default:
throw new HoodieMetadataException("Unsupported Metrics Reporter type " + writeConfig.getMetricsReporterType());
}
}
return builder.build();
}
}