1
0

[HUDI-2593] Virtual keys support for metadata table (#3968)

- Metadata table today has virtual keys disabled, thereby populating the metafields
  for each record written out and increasing the overall storage space used. Hereby
  adding virtual keys support for metadata table so that metafields are disabled
  for metadata table records.

- Adding a custom KeyGenerator for Metadata table so as to not rely on the
  default Base/SimpleKeyGenerators which currently look for record key
  and partition field set in the table config.

- AbstractHoodieLogRecordReader's version of processing next data block and
  createHoodieRecord() will be a generic version and making the derived class
  HoodieMetadataMergedLogRecordReader take care of the special creation of
  records from explictly passed in partition names.
This commit is contained in:
Manoj Govindassamy
2021-11-19 15:11:29 -08:00
committed by GitHub
parent eba354e922
commit 459b34240b
28 changed files with 423 additions and 123 deletions

View File

@@ -360,7 +360,12 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchemaWithMetaFields.toString());
List<HoodieLogBlock> blocks = new ArrayList<>(2);
if (recordList.size() > 0) {
blocks.add(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header));
if (config.populateMetaFields()) {
blocks.add(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header));
} else {
final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
blocks.add(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header, keyField));
}
}
if (keysToDelete.size() > 0) {
blocks.add(new HoodieDeleteBlock(keysToDelete.toArray(new HoodieKey[keysToDelete.size()]), header));

View File

@@ -41,6 +41,7 @@ import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
@@ -76,6 +77,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
@@ -91,6 +93,10 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
private static final Logger LOG = LogManager.getLogger(HoodieBackedTableMetadataWriter.class);
// Virtual keys support for metadata table. This Field is
// from the metadata payload schema.
private static final String RECORD_KEY_FIELD = HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY;
protected HoodieWriteConfig metadataWriteConfig;
protected HoodieWriteConfig dataWriteConfig;
protected String tableName;
@@ -202,7 +208,15 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
.withDeleteParallelism(parallelism)
.withRollbackParallelism(parallelism)
.withFinalizeWriteParallelism(parallelism)
.withAllowMultiWriteOnSameInstant(true);
.withAllowMultiWriteOnSameInstant(true)
.withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
.withPopulateMetaFields(dataWriteConfig.getMetadataConfig().populateMetaFields());
// RecordKey properties are needed for the metadata table records
final Properties properties = new Properties();
properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), RECORD_KEY_FIELD);
properties.put("hoodie.datasource.write.recordkey.field", RECORD_KEY_FIELD);
builder.withProperties(properties);
if (writeConfig.isMetricsOn()) {
builder.withMetricsConfig(HoodieMetricsConfig.newBuilder()
@@ -395,9 +409,12 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
.setTableType(HoodieTableType.MERGE_ON_READ)
.setTableName(tableName)
.setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
.setPayloadClassName(HoodieMetadataPayload.class.getName())
.setBaseFileFormat(HoodieFileFormat.HFILE.toString())
.initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
.setPayloadClassName(HoodieMetadataPayload.class.getName())
.setBaseFileFormat(HoodieFileFormat.HFILE.toString())
.setRecordKeyFields(RECORD_KEY_FIELD)
.setPopulateMetaFields(dataWriteConfig.getMetadataConfig().populateMetaFields())
.setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
.initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
initTableMetadata();
initializeFileGroups(dataMetaClient, MetadataPartitionType.FILES, createInstantTime, 1);

View File

@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.metadata;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.KeyGenUtils;
/**
* Custom key generator for the Hoodie table metadata. The metadata table record payload
* has an internal schema with a known key field HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY.
* With or without the virtual keys, getting the key from the metadata table record is always
* via the above field and there is no real need for a key generator. But, when a write
* client is instantiated for the metadata table, when virtual keys are enabled, and when
* key generator class is not configured, the default SimpleKeyGenerator will be used.
* To avoid using any other key generators for the metadata table which rely on certain
* config properties, we need this custom key generator exclusively for the metadata table.
*/
public class HoodieTableMetadataKeyGenerator extends BaseKeyGenerator {
public HoodieTableMetadataKeyGenerator(TypedProperties config) {
super(config);
}
@Override
public String getRecordKey(GenericRecord record) {
return KeyGenUtils.getRecordKey(record, HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY);
}
@Override
public String getPartitionPath(GenericRecord record) {
return "";
}
}

View File

@@ -346,7 +346,8 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
if (records.size() > 0) {
Map<HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString());
HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header);
final String keyField = table.getMetaClient().getTableConfig().getRecordKeyFieldProp();
HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header, keyField);
writer.appendBlock(block);
records.clear();
}

View File

@@ -180,6 +180,7 @@ public abstract class HoodieCompactor<T extends HoodieRecordPayload, I, K, O> im
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.withOperationField(config.allowOperationMetadataField())
.withPartition(operation.getPartitionPath())
.build();
if (!scanner.iterator().hasNext()) {
scanner.close();

View File

@@ -203,6 +203,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
.withReverseReader(config.getCompactionReverseLogReadEnabled())
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.withPartition(clusteringOp.getPartitionPath())
.build();
Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())

View File

@@ -47,9 +47,9 @@ public class SimpleKeyGenerator extends BuiltinKeyGenerator {
SimpleKeyGenerator(TypedProperties props, String recordKeyField, String partitionPathField) {
super(props);
this.recordKeyFields = recordKeyField == null
? Collections.emptyList()
: Collections.singletonList(recordKeyField);
this.partitionPathFields = Collections.singletonList(partitionPathField);
? Collections.emptyList() : Collections.singletonList(recordKeyField);
this.partitionPathFields = partitionPathField == null
? Collections.emptyList() : Collections.singletonList(partitionPathField);
simpleAvroKeyGenerator = new SimpleAvroKeyGenerator(props, recordKeyField, partitionPathField);
}

View File

@@ -88,6 +88,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.nio.file.Files;
@@ -358,8 +359,9 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
* Test that manual rollbacks work correctly and enough timeline history is maintained on the metadata table
* timeline.
*/
@Test
public void testManualRollbacks() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testManualRollbacks(final boolean populateMateFields) throws Exception {
HoodieTableType tableType = COPY_ON_WRITE;
init(tableType, false);
// Setting to archive more aggressively on the Metadata Table than the Dataset
@@ -369,7 +371,9 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
writeConfig = getWriteConfigBuilder(true, true, false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
.archiveCommitsWith(minArchiveCommitsMetadata, minArchiveCommitsMetadata + 1).retainCommits(1)
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build())
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction)
.withPopulateMetaFields(populateMateFields)
.build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(minArchiveCommitsDataset, minArchiveCommitsDataset + 1)
.retainCommits(1).retainFileVersions(1).withAutoClean(false).withAsyncClean(true).build())
.build();

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
@@ -29,6 +30,8 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.io.IOException;
import java.util.ArrayList;
@@ -90,4 +93,20 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase {
});
}
/**
* Verify if the Metadata table is constructed with table properties including
* the right key generator class name.
*/
@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testMetadataTableKeyGenerator(final HoodieTableType tableType) throws Exception {
init(tableType);
HoodieBackedTableMetadata tableMetadata = new HoodieBackedTableMetadata(context,
writeConfig.getMetadataConfig(), writeConfig.getBasePath(), writeConfig.getSpillableMapBasePath(), false);
assertEquals(HoodieTableMetadataKeyGenerator.class.getCanonicalName(),
tableMetadata.getMetadataMetaClient().getTableConfig().getKeyGeneratorClassName());
}
}

View File

@@ -288,7 +288,9 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.enable(useFileListingMetadata)
.enableFullScan(enableFullScan)
.enableMetrics(enableMetrics).build())
.enableMetrics(enableMetrics)
.withPopulateMetaFields(false)
.build())
.withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics)
.withExecutorMetrics(true).build())
.withMetricsGraphiteConfig(HoodieMetricsGraphiteConfig.newBuilder()

View File

@@ -283,13 +283,26 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
return properties;
}
protected void addConfigsForPopulateMetaFields(HoodieWriteConfig.Builder configBuilder, boolean populateMetaFields) {
protected Properties getPropertiesForMetadataTable() {
Properties properties = new Properties();
properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false");
properties.put("hoodie.datasource.write.recordkey.field", "key");
properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "key");
return properties;
}
protected void addConfigsForPopulateMetaFields(HoodieWriteConfig.Builder configBuilder, boolean populateMetaFields,
boolean isMetadataTable) {
if (!populateMetaFields) {
configBuilder.withProperties(getPropertiesForKeyGen())
configBuilder.withProperties((isMetadataTable ? getPropertiesForMetadataTable() : getPropertiesForKeyGen()))
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.SIMPLE).build());
}
}
protected void addConfigsForPopulateMetaFields(HoodieWriteConfig.Builder configBuilder, boolean populateMetaFields) {
addConfigsForPopulateMetaFields(configBuilder, populateMetaFields, false);
}
/**
* Cleanups hoodie clients.
*/

View File

@@ -132,6 +132,12 @@ public final class HoodieMetadataConfig extends HoodieConfig {
.sinceVersion("0.10.0")
.withDocumentation("Enable full scanning of log files while reading log records. If disabled, hudi does look up of only interested entries.");
public static final ConfigProperty<String> POPULATE_META_FIELDS = ConfigProperty
.key(METADATA_PREFIX + ".populate.meta.fields")
.defaultValue("false")
.sinceVersion("0.10.0")
.withDocumentation("When enabled, populates all meta fields. When disabled, no meta fields are populated.");
private HoodieMetadataConfig() {
super();
}
@@ -164,6 +170,10 @@ public final class HoodieMetadataConfig extends HoodieConfig {
return getBoolean(ENABLE_FULL_SCAN_LOG_FILES);
}
public boolean populateMetaFields() {
return getBooleanOrDefault(HoodieMetadataConfig.POPULATE_META_FIELDS);
}
public static class Builder {
private EngineType engineType = EngineType.SPARK;
@@ -206,6 +216,11 @@ public final class HoodieMetadataConfig extends HoodieConfig {
return this;
}
public Builder withPopulateMetaFields(boolean populateMetaFields) {
metadataConfig.setValue(POPULATE_META_FIELDS, Boolean.toString(populateMetaFields));
return this;
}
public Builder archiveCommitsWith(int minToKeep, int maxToKeep) {
metadataConfig.setValue(MIN_COMMITS_TO_KEEP, String.valueOf(minToKeep));
metadataConfig.setValue(MAX_COMMITS_TO_KEEP, String.valueOf(maxToKeep));

View File

@@ -25,6 +25,7 @@ import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
@@ -364,7 +365,7 @@ public class HoodieTableConfig extends HoodieConfig {
* @returns the record key field prop.
*/
public String getRecordKeyFieldProp() {
return getString(RECORDKEY_FIELDS);
return getStringOrDefault(RECORDKEY_FIELDS, HoodieRecord.RECORD_KEY_METADATA_FIELD);
}
public String getKeyGeneratorClassName() {

View File

@@ -33,6 +33,7 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
@@ -120,28 +121,32 @@ public abstract class AbstractHoodieLogRecordReader {
private int totalScannedLogFiles;
// Progress
private float progress = 0.0f;
// Partition name
private Option<String> partitionName;
// Populate meta fields for the records
private boolean populateMetaFields = true;
protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<String> logFilePaths,
Schema readerSchema,
String latestInstantTime, boolean readBlocksLazily, boolean reverseReader,
int bufferSize, Option<InstantRange> instantRange, boolean withOperationField) {
this(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, withOperationField,
true);
int bufferSize, Option<InstantRange> instantRange,
boolean withOperationField) {
this(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize,
instantRange, withOperationField, true, Option.empty());
}
protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
String latestInstantTime, boolean readBlocksLazily, boolean reverseReader,
int bufferSize, Option<InstantRange> instantRange, boolean withOperationField,
boolean enableFullScan) {
protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<String> logFilePaths,
Schema readerSchema, String latestInstantTime, boolean readBlocksLazily,
boolean reverseReader, int bufferSize, Option<InstantRange> instantRange,
boolean withOperationField, boolean enableFullScan,
Option<String> partitionName) {
this.readerSchema = readerSchema;
this.latestInstantTime = latestInstantTime;
this.hoodieTableMetaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build();
// load class from the payload fully qualified class name
this.payloadClassFQN = this.hoodieTableMetaClient.getTableConfig().getPayloadClass();
this.preCombineField = this.hoodieTableMetaClient.getTableConfig().getPreCombineField();
HoodieTableConfig tableConfig = this.hoodieTableMetaClient.getTableConfig();
if (!tableConfig.populateMetaFields()) {
this.simpleKeyGenFields = Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp()));
}
this.payloadClassFQN = tableConfig.getPayloadClass();
this.preCombineField = tableConfig.getPreCombineField();
this.totalLogFiles.addAndGet(logFilePaths.size());
this.logFilePaths = logFilePaths;
this.reverseReader = reverseReader;
@@ -151,6 +156,22 @@ public abstract class AbstractHoodieLogRecordReader {
this.instantRange = instantRange;
this.withOperationField = withOperationField;
this.enableFullScan = enableFullScan;
// Key fields when populate meta fields is disabled (that is, virtual keys enabled)
if (!tableConfig.populateMetaFields()) {
this.populateMetaFields = false;
this.simpleKeyGenFields = Option.of(
Pair.of(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp()));
}
this.partitionName = partitionName;
}
protected String getKeyField() {
if (this.populateMetaFields) {
return HoodieRecord.RECORD_KEY_METADATA_FIELD;
}
ValidationUtils.checkState(this.simpleKeyGenFields.isPresent());
return this.simpleKeyGenFields.get().getKey();
}
public void scan() {
@@ -170,10 +191,15 @@ public abstract class AbstractHoodieLogRecordReader {
HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants();
HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights();
try {
// iterate over the paths
// Get the key field based on populate meta fields config
// and the table type
final String keyField = getKeyField();
// Iterate over the paths
logFormatReaderWrapper = new HoodieLogFormatReader(fs,
logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()),
readerSchema, readBlocksLazily, reverseReader, bufferSize, !enableFullScan);
readerSchema, readBlocksLazily, reverseReader, bufferSize, !enableFullScan, keyField);
Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
while (logFormatReaderWrapper.hasNext()) {
HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
@@ -339,15 +365,34 @@ public abstract class AbstractHoodieLogRecordReader {
}
totalLogRecords.addAndGet(recs.size());
for (IndexedRecord rec : recs) {
processNextRecord(createHoodieRecord(rec));
processNextRecord(createHoodieRecord(rec, this.hoodieTableMetaClient.getTableConfig(), this.payloadClassFQN,
this.preCombineField, this.withOperationField, this.simpleKeyGenFields, this.partitionName));
}
}
protected HoodieRecord<?> createHoodieRecord(IndexedRecord rec) {
if (!simpleKeyGenFields.isPresent()) {
return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN, this.preCombineField, this.withOperationField);
/**
* Create @{@link HoodieRecord} from the @{@link IndexedRecord}.
*
* @param rec - IndexedRecord to create the HoodieRecord from
* @param hoodieTableConfig - Table config
* @param payloadClassFQN - Payload class fully qualified name
* @param preCombineField - PreCombine field
* @param withOperationField - Whether operation field is enabled
* @param simpleKeyGenFields - Key generator fields when populate meta fields is tuened off
* @param partitionName - Partition name
* @return HoodieRecord created from the IndexedRecord
*/
protected HoodieRecord<?> createHoodieRecord(final IndexedRecord rec, final HoodieTableConfig hoodieTableConfig,
final String payloadClassFQN, final String preCombineField,
final boolean withOperationField,
final Option<Pair<String, String>> simpleKeyGenFields,
final Option<String> partitionName) {
if (this.populateMetaFields) {
return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, payloadClassFQN,
preCombineField, withOperationField);
} else {
return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN, this.preCombineField, this.simpleKeyGenFields.get(), this.withOperationField);
return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, payloadClassFQN,
preCombineField, simpleKeyGenFields.get(), withOperationField, partitionName);
}
}
@@ -418,6 +463,10 @@ public abstract class AbstractHoodieLogRecordReader {
return payloadClassFQN;
}
protected Option<String> getPartitionName() {
return partitionName;
}
public long getTotalRollbacks() {
return totalRollbacks.get();
}
@@ -451,6 +500,10 @@ public abstract class AbstractHoodieLogRecordReader {
public abstract Builder withBufferSize(int bufferSize);
public Builder withPartition(String partitionName) {
throw new UnsupportedOperationException();
}
public Builder withInstantRange(Option<InstantRange> instantRange) {
throw new UnsupportedOperationException();
}

View File

@@ -46,7 +46,8 @@ public class HoodieFileSliceReader<T extends HoodieRecordPayload> implements Ite
Iterator baseIterator = baseFileReader.get().getRecordIterator(schema);
while (baseIterator.hasNext()) {
GenericRecord record = (GenericRecord) baseIterator.next();
HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = transform(record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt);
HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = transform(
record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt);
scanner.processNextRecord(hoodieRecord);
}
return new HoodieFileSliceReader(scanner.iterator());
@@ -68,8 +69,10 @@ public class HoodieFileSliceReader<T extends HoodieRecordPayload> implements Ite
GenericRecord record, HoodieMergedLogRecordScanner scanner, String payloadClass,
String preCombineField, Option<Pair<String, String>> simpleKeyGenFieldsOpt) {
return simpleKeyGenFieldsOpt.isPresent()
? SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, preCombineField, simpleKeyGenFieldsOpt.get(), scanner.isWithOperationField())
: SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, preCombineField, scanner.isWithOperationField());
? SpillableMapUtils.convertToHoodieRecordPayload(record,
payloadClass, preCombineField, simpleKeyGenFieldsOpt.get(), scanner.isWithOperationField(), Option.empty())
: SpillableMapUtils.convertToHoodieRecordPayload(record,
payloadClass, preCombineField, scanner.isWithOperationField(), scanner.getPartitionName());
}
private HoodieFileSliceReader(Iterator<HoodieRecord<T>> recordsItr) {

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.SchemeAwareFSDataInputStream;
import org.apache.hudi.common.fs.TimedFSDataInputStream;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieCorruptBlock;
@@ -66,6 +67,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
private final HoodieLogFile logFile;
private final byte[] magicBuffer = new byte[6];
private final Schema readerSchema;
private final String keyField;
private boolean readBlockLazily;
private long reverseLogFilePosition;
private long lastReverseLogFilePosition;
@@ -76,11 +78,13 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader) throws IOException {
this(fs, logFile, readerSchema, bufferSize, readBlockLazily, reverseReader, false);
this(fs, logFile, readerSchema, bufferSize, readBlockLazily, reverseReader, false,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
}
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader, boolean enableInlineReading) throws IOException {
boolean readBlockLazily, boolean reverseReader, boolean enableInlineReading,
String keyField) throws IOException {
FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
this.logFile = logFile;
this.inputStream = getFSDataInputStream(fsDataInputStream, fs, bufferSize);
@@ -88,6 +92,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
this.readBlockLazily = readBlockLazily;
this.reverseReader = reverseReader;
this.enableInlineReading = enableInlineReading;
this.keyField = keyField;
if (this.reverseReader) {
this.reverseLogFilePosition = this.lastReverseLogFilePosition = fs.getFileStatus(logFile.getPath()).getLen();
}
@@ -251,11 +256,12 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
return HoodieAvroDataBlock.getBlock(content, readerSchema);
} else {
return new HoodieAvroDataBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily,
contentPosition, contentLength, blockEndPos, readerSchema, header, footer);
contentPosition, contentLength, blockEndPos, readerSchema, header, footer, keyField);
}
case HFILE_DATA_BLOCK:
return new HoodieHFileDataBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily,
contentPosition, contentLength, blockEndPos, readerSchema, header, footer, enableInlineReading);
contentPosition, contentLength, blockEndPos, readerSchema,
header, footer, enableInlineReading, keyField);
case DELETE_BLOCK:
return HoodieDeleteBlock.getBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily,
contentPosition, contentLength, blockEndPos, header, footer);

View File

@@ -44,18 +44,15 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
private final Schema readerSchema;
private final boolean readBlocksLazily;
private final boolean reverseLogReader;
private final boolean enableInLineReading;
private final String recordKeyField;
private final boolean enableInlineReading;
private int bufferSize;
private static final Logger LOG = LogManager.getLogger(HoodieLogFormatReader.class);
HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles, Schema readerSchema, boolean readBlocksLazily,
boolean reverseLogReader, int bufferSize) throws IOException {
this(fs, logFiles, readerSchema, readBlocksLazily, reverseLogReader, bufferSize, false);
}
HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles, Schema readerSchema, boolean readBlocksLazily,
boolean reverseLogReader, int bufferSize, boolean enableInlineReading) throws IOException {
boolean reverseLogReader, int bufferSize, boolean enableInlineReading,
String recordKeyField) throws IOException {
this.logFiles = logFiles;
this.fs = fs;
this.readerSchema = readerSchema;
@@ -63,10 +60,12 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
this.reverseLogReader = reverseLogReader;
this.bufferSize = bufferSize;
this.prevReadersInOpenState = new ArrayList<>();
this.enableInLineReading = enableInlineReading;
this.recordKeyField = recordKeyField;
this.enableInlineReading = enableInlineReading;
if (logFiles.size() > 0) {
HoodieLogFile nextLogFile = logFiles.remove(0);
this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false, enableInlineReading);
this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false,
enableInlineReading, recordKeyField);
}
}
@@ -107,7 +106,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
}
this.currentReader =
new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false,
this.enableInLineReading);
enableInlineReading, recordKeyField);
} catch (IOException io) {
throw new HoodieIOException("unable to initialize read with log file ", io);
}

View File

@@ -76,10 +76,13 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily,
boolean reverseReader, int bufferSize, String spillableMapBasePath,
Option<InstantRange> instantRange, boolean autoScan,
ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled,
boolean withOperationField, boolean enableFullScan) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, withOperationField,
enableFullScan);
ExternalSpillableMap.DiskMapType diskMapType,
boolean isBitCaskDiskMapCompressionEnabled,
boolean withOperationField, boolean enableFullScan,
Option<String> partitionName) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize,
instantRange, withOperationField,
enableFullScan, partitionName);
try {
// Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
@@ -187,6 +190,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
private boolean autoScan = true;
// operation field default false
private boolean withOperationField = false;
protected String partitionName;
@Override
public Builder withFileSystem(FileSystem fs) {
@@ -272,12 +276,19 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
return this;
}
@Override
public Builder withPartition(String partitionName) {
this.partitionName = partitionName;
return this;
}
@Override
public HoodieMergedLogRecordScanner build() {
return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema,
latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader,
bufferSize, spillableMapBasePath, instantRange, autoScan,
diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, true);
diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, true,
Option.ofNullable(partitionName));
}
}
}

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.common.table.log.block;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.SizeAwareDataInputStream;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
@@ -58,22 +59,27 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
private ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
public HoodieAvroDataBlock(@Nonnull Map<HeaderMetadataType, String> logBlockHeader,
@Nonnull Map<HeaderMetadataType, String> logBlockFooter,
@Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation, @Nonnull Option<byte[]> content,
FSDataInputStream inputStream, boolean readBlockLazily) {
@Nonnull Map<HeaderMetadataType, String> logBlockFooter,
@Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation, @Nonnull Option<byte[]> content,
FSDataInputStream inputStream, boolean readBlockLazily) {
super(logBlockHeader, logBlockFooter, blockContentLocation, content, inputStream, readBlockLazily);
}
public HoodieAvroDataBlock(HoodieLogFile logFile, FSDataInputStream inputStream, Option<byte[]> content,
boolean readBlockLazily, long position, long blockSize, long blockEndpos, Schema readerSchema,
Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String> footer) {
boolean readBlockLazily, long position, long blockSize, long blockEndpos, Schema readerSchema,
Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String> footer, String keyField) {
super(content, inputStream, readBlockLazily,
Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)), readerSchema, header,
footer);
Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)), readerSchema, header,
footer, keyField);
}
public HoodieAvroDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType,
String> header, String keyField) {
super(records, header, new HashMap<>(), keyField);
}
public HoodieAvroDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header) {
super(records, header, new HashMap<>());
super(records, header, new HashMap<>(), HoodieRecord.RECORD_KEY_METADATA_FIELD);
}
@Override

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.common.table.log.block;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
@@ -29,7 +30,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -46,39 +46,62 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
protected List<IndexedRecord> records;
protected Schema schema;
protected String keyField;
public HoodieDataBlock(@Nonnull Map<HeaderMetadataType, String> logBlockHeader,
@Nonnull Map<HeaderMetadataType, String> logBlockFooter,
@Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation, @Nonnull Option<byte[]> content,
FSDataInputStream inputStream, boolean readBlockLazily) {
super(logBlockHeader, logBlockFooter, blockContentLocation, content, inputStream, readBlockLazily);
this.keyField = HoodieRecord.RECORD_KEY_METADATA_FIELD;
}
public HoodieDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header,
@Nonnull Map<HeaderMetadataType, String> footer) {
super(header, footer, Option.empty(), Option.empty(), null, false);
@Nonnull Map<HeaderMetadataType, String> footer, String keyField) {
this(header, footer, Option.empty(), Option.empty(), null, false);
this.records = records;
this.schema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
}
public HoodieDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header) {
this(records, header, new HashMap<>());
this.keyField = keyField;
}
protected HoodieDataBlock(Option<byte[]> content, @Nonnull FSDataInputStream inputStream, boolean readBlockLazily,
Option<HoodieLogBlockContentLocation> blockContentLocation, Schema readerSchema,
@Nonnull Map<HeaderMetadataType, String> headers, @Nonnull Map<HeaderMetadataType, String> footer) {
super(headers, footer, blockContentLocation, content, inputStream, readBlockLazily);
Option<HoodieLogBlockContentLocation> blockContentLocation, Schema readerSchema,
@Nonnull Map<HeaderMetadataType, String> headers, @Nonnull Map<HeaderMetadataType,
String> footer, String keyField) {
this(headers, footer, blockContentLocation, content, inputStream, readBlockLazily);
this.schema = readerSchema;
this.keyField = keyField;
}
/**
* Util method to get a data block for the requested type.
*
* @param logDataBlockFormat - Data block type
* @param recordList - List of records that goes in the data block
* @param header - data block header
* @return Data block of the requested type.
*/
public static HoodieLogBlock getBlock(HoodieLogBlockType logDataBlockFormat, List<IndexedRecord> recordList,
Map<HeaderMetadataType, String> header) {
return getBlock(logDataBlockFormat, recordList, header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
}
/**
* Util method to get a data block for the requested type.
*
* @param logDataBlockFormat - Data block type
* @param recordList - List of records that goes in the data block
* @param header - data block header
* @param keyField - FieldId to get the key from the records
* @return Data block of the requested type.
*/
public static HoodieLogBlock getBlock(HoodieLogBlockType logDataBlockFormat, List<IndexedRecord> recordList,
Map<HeaderMetadataType, String> header, String keyField) {
switch (logDataBlockFormat) {
case AVRO_DATA_BLOCK:
return new HoodieAvroDataBlock(recordList, header);
return new HoodieAvroDataBlock(recordList, header, keyField);
case HFILE_DATA_BLOCK:
return new HoodieHFileDataBlock(recordList, header);
return new HoodieHFileDataBlock(recordList, header, keyField);
default:
throw new HoodieException("Data block format " + logDataBlockFormat + " not implemented");
}

View File

@@ -68,24 +68,23 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
private static int blockSize = 1 * 1024 * 1024;
private boolean enableInlineReading = false;
public HoodieHFileDataBlock(@Nonnull Map<HeaderMetadataType, String> logBlockHeader,
@Nonnull Map<HeaderMetadataType, String> logBlockFooter,
@Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation, @Nonnull Option<byte[]> content,
FSDataInputStream inputStream, boolean readBlockLazily) {
super(logBlockHeader, logBlockFooter, blockContentLocation, content, inputStream, readBlockLazily);
}
public HoodieHFileDataBlock(HoodieLogFile logFile, FSDataInputStream inputStream, Option<byte[]> content,
boolean readBlockLazily, long position, long blockSize, long blockEndpos, Schema readerSchema,
Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String> footer, boolean enableInlineReading) {
boolean readBlockLazily, long position, long blockSize, long blockEndpos,
Schema readerSchema, Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer, boolean enableInlineReading, String keyField) {
super(content, inputStream, readBlockLazily,
Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)), readerSchema, header,
footer);
Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)),
readerSchema, header, footer, keyField);
this.enableInlineReading = enableInlineReading;
}
public HoodieHFileDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header,
String keyField) {
super(records, header, new HashMap<>(), keyField);
}
public HoodieHFileDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header) {
super(records, header, new HashMap<>());
this(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
}
@Override
@@ -111,7 +110,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
boolean useIntegerKey = false;
int key = 0;
int keySize = 0;
Field keyField = records.get(0).getSchema().getField(HoodieRecord.RECORD_KEY_METADATA_FIELD);
Field keyField = records.get(0).getSchema().getField(this.keyField);
if (keyField == null) {
// Missing key metadata field so we should use an integer sequence key
useIntegerKey = true;

View File

@@ -115,22 +115,38 @@ public class SpillableMapUtils {
* Utility method to convert bytes to HoodieRecord using schema and payload class.
*/
public static <R> R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz, String preCombineField, boolean withOperationField) {
return convertToHoodieRecordPayload(rec, payloadClazz, preCombineField, Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD), withOperationField);
return convertToHoodieRecordPayload(rec, payloadClazz, preCombineField,
Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD),
withOperationField, Option.empty());
}
public static <R> R convertToHoodieRecordPayload(GenericRecord record, String payloadClazz,
String preCombineField,
boolean withOperationField,
Option<String> partitionName) {
return convertToHoodieRecordPayload(record, payloadClazz, preCombineField,
Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD),
withOperationField, partitionName);
}
/**
* Utility method to convert bytes to HoodieRecord using schema and payload class.
*/
public static <R> R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz,
String preCombineField, Pair<String, String> recordKeyPartitionPathPair,
boolean withOperationField) {
String recKey = rec.get(recordKeyPartitionPathPair.getLeft()).toString();
String partitionPath = rec.get(recordKeyPartitionPathPair.getRight()).toString();
Object preCombineVal = getPreCombineVal(rec, preCombineField);
public static <R> R convertToHoodieRecordPayload(GenericRecord record, String payloadClazz,
String preCombineField,
Pair<String, String> recordKeyPartitionPathFieldPair,
boolean withOperationField,
Option<String> partitionName) {
final String recKey = record.get(recordKeyPartitionPathFieldPair.getKey()).toString();
final String partitionPath = (partitionName.isPresent() ? partitionName.get() :
record.get(recordKeyPartitionPathFieldPair.getRight()).toString());
Object preCombineVal = getPreCombineVal(record, preCombineField);
HoodieOperation operation = withOperationField
? HoodieOperation.fromName(getNullableValAsString(rec, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
? HoodieOperation.fromName(getNullableValAsString(record, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = new HoodieRecord<>(new HoodieKey(recKey, partitionPath),
ReflectionUtils.loadPayload(payloadClazz, new Object[] {rec, preCombineVal}, GenericRecord.class, Comparable.class), operation);
ReflectionUtils.loadPayload(payloadClazz, new Object[]{record, preCombineVal}, GenericRecord.class,
Comparable.class), operation);
return (R) hoodieRecord;
}

View File

@@ -133,8 +133,8 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
// local map to assist in merging with base file records
Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = readLogRecords(logRecordScanner, keys, timings);
List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = readFromBaseAndMergeWithLogRecords(baseFileReader,
keys, logRecords, timings);
List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = readFromBaseAndMergeWithLogRecords(
baseFileReader, keys, logRecords, timings, partitionName);
LOG.info(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms", keys.size(), timings));
return result;
} catch (IOException ioe) {
@@ -175,8 +175,8 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
}
private List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> readFromBaseAndMergeWithLogRecords(HoodieFileReader baseFileReader,
List<String> keys, Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords,
List<Long> timings) throws IOException {
List<String> keys, Map<String,
Option<HoodieRecord<HoodieMetadataPayload>>> logRecords, List<Long> timings, String partitionName) throws IOException {
List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = new ArrayList<>();
// merge with base records
HoodieTimer timer = new HoodieTimer().startTimer();
@@ -189,10 +189,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
readTimer.startTimer();
Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key);
if (baseRecord.isPresent()) {
hoodieRecord = metadataTableConfig.populateMetaFields()
? SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(), false)
: SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(),
Pair.of(metadataTableConfig.getRecordKeyFieldProp(), metadataTableConfig.getPartitionFieldProp()), false);
hoodieRecord = getRecord(baseRecord, partitionName);
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));
// merge base file record w/ log record if present
if (logRecords.containsKey(key) && logRecords.get(key).isPresent()) {
@@ -218,6 +215,18 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
return result;
}
private HoodieRecord<HoodieMetadataPayload> getRecord(Option<GenericRecord> baseRecord, String partitionName) {
ValidationUtils.checkState(baseRecord.isPresent());
if (metadataTableConfig.populateMetaFields()) {
return SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(), false);
}
return SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(),
Pair.of(metadataTableConfig.getRecordKeyFieldProp(), metadataTableConfig.getPartitionFieldProp()),
false, Option.of(partitionName));
}
/**
* Returns a new pair of readers to the base and log files.
*/
@@ -241,7 +250,8 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
baseFileOpenMs = baseFileReaderOpenTimePair.getValue();
// Open the log record scanner using the log files from the latest file slice
Pair<HoodieMetadataMergedLogRecordReader, Long> logRecordScannerOpenTimePair = getLogRecordScanner(slice);
Pair<HoodieMetadataMergedLogRecordReader, Long> logRecordScannerOpenTimePair = getLogRecordScanner(slice,
partitionName);
logRecordScanner = logRecordScannerOpenTimePair.getKey();
logScannerOpenMs = logRecordScannerOpenTimePair.getValue();
@@ -293,7 +303,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
return validInstantTimestamps;
}
private Pair<HoodieMetadataMergedLogRecordReader, Long> getLogRecordScanner(FileSlice slice) {
private Pair<HoodieMetadataMergedLogRecordReader, Long> getLogRecordScanner(FileSlice slice, String partitionName) {
HoodieTimer timer = new HoodieTimer().startTimer();
List<String> logFilePaths = slice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
@@ -323,6 +333,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
.withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled())
.withLogBlockTimestamps(validInstantTimestamps)
.enableFullScan(metadataConfig.enableFullScan())
.withPartition(partitionName)
.build();
Long logScannerOpenMs = timer.endTimer();

View File

@@ -25,7 +25,11 @@ import java.util.List;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -49,13 +53,17 @@ public class HoodieMetadataMergedLogRecordReader extends HoodieMergedLogRecordSc
// Set of all record keys that are to be read in memory
private Set<String> mergeKeyFilter;
private HoodieMetadataMergedLogRecordReader(FileSystem fs, String basePath, List<String> logFilePaths,
Schema readerSchema, String latestInstantTime, Long maxMemorySizeInBytes, int bufferSize,
private HoodieMetadataMergedLogRecordReader(FileSystem fs, String basePath, String partitionName,
List<String> logFilePaths,
Schema readerSchema, String latestInstantTime,
Long maxMemorySizeInBytes, int bufferSize,
String spillableMapBasePath, Set<String> mergeKeyFilter,
ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled,
ExternalSpillableMap.DiskMapType diskMapType,
boolean isBitCaskDiskMapCompressionEnabled,
Option<InstantRange> instantRange, boolean enableFullScan) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, false, false, bufferSize,
spillableMapBasePath, instantRange, false, diskMapType, isBitCaskDiskMapCompressionEnabled, false, enableFullScan);
spillableMapBasePath, instantRange, false, diskMapType, isBitCaskDiskMapCompressionEnabled, false,
enableFullScan, Option.of(partitionName));
this.mergeKeyFilter = mergeKeyFilter;
if (enableFullScan) {
performScan();
@@ -76,6 +84,23 @@ public class HoodieMetadataMergedLogRecordReader extends HoodieMergedLogRecordSc
}
}
@Override
protected HoodieRecord<?> createHoodieRecord(final IndexedRecord rec, final HoodieTableConfig hoodieTableConfig,
final String payloadClassFQN, final String preCombineField,
final boolean withOperationField,
final Option<Pair<String, String>> simpleKeyGenFields,
final Option<String> partitionName) {
if (hoodieTableConfig.populateMetaFields()) {
return super.createHoodieRecord(rec, hoodieTableConfig, payloadClassFQN, preCombineField, withOperationField,
simpleKeyGenFields, partitionName);
}
// When meta fields are not available, create the record using the
// preset key field and the known partition name
return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, payloadClassFQN,
preCombineField, simpleKeyGenFields.get(), withOperationField, partitionName);
}
/**
* Returns the builder for {@code HoodieMetadataMergedLogRecordScanner}.
*/
@@ -107,6 +132,11 @@ public class HoodieMetadataMergedLogRecordReader extends HoodieMergedLogRecordSc
return metadataRecords;
}
@Override
protected String getKeyField() {
return HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY;
}
/**
* Builder used to build {@code HoodieMetadataMergedLogRecordScanner}.
*/
@@ -161,6 +191,12 @@ public class HoodieMetadataMergedLogRecordReader extends HoodieMergedLogRecordSc
return this;
}
@Override
public Builder withPartition(String partitionName) {
this.partitionName = partitionName;
return this;
}
@Override
public Builder withMaxMemorySizeInBytes(Long maxMemorySizeInBytes) {
this.maxMemorySizeInBytes = maxMemorySizeInBytes;
@@ -202,7 +238,7 @@ public class HoodieMetadataMergedLogRecordReader extends HoodieMergedLogRecordSc
@Override
public HoodieMetadataMergedLogRecordReader build() {
return new HoodieMetadataMergedLogRecordReader(fs, basePath, logFilePaths, readerSchema,
return new HoodieMetadataMergedLogRecordReader(fs, basePath, partitionName, logFilePaths, readerSchema,
latestInstantTime, maxMemorySizeInBytes, bufferSize, spillableMapBasePath, mergeKeyFilter,
diskMapType, isBitCaskDiskMapCompressionEnabled, instantRange, enableFullScan);
}

View File

@@ -61,6 +61,12 @@ import static org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_L
* HoodieMetadataRecord for ease of operations.
*/
public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadataPayload> {
// HoodieMetadata schema field ids
public static final String SCHEMA_FIELD_ID_KEY = "key";
public static final String SCHEMA_FIELD_ID_TYPE = "type";
public static final String SCHEMA_FIELD_ID_METADATA = "filesystemMetadata";
// Type of the record
// This can be an enum in the schema but Avro 1.8 has a bug - https://issues.apache.org/jira/browse/AVRO-1810
private static final int PARTITION_LIST = 1;
@@ -78,13 +84,13 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
if (record.isPresent()) {
// This can be simplified using SpecificData.deepcopy once this bug is fixed
// https://issues.apache.org/jira/browse/AVRO-1811
key = record.get().get("key").toString();
type = (int) record.get().get("type");
if (record.get().get("filesystemMetadata") != null) {
key = record.get().get(SCHEMA_FIELD_ID_KEY).toString();
type = (int) record.get().get(SCHEMA_FIELD_ID_TYPE);
if (record.get().get(SCHEMA_FIELD_ID_METADATA) != null) {
filesystemMetadata = (Map<String, HoodieMetadataFileInfo>) record.get().get("filesystemMetadata");
filesystemMetadata.keySet().forEach(k -> {
GenericRecord v = filesystemMetadata.get(k);
filesystemMetadata.put(k.toString(), new HoodieMetadataFileInfo((Long)v.get("size"), (Boolean)v.get("isDeleted")));
filesystemMetadata.put(k.toString(), new HoodieMetadataFileInfo((Long) v.get("size"), (Boolean) v.get("isDeleted")));
});
}
}
@@ -231,8 +237,8 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("HoodieMetadataPayload {");
sb.append("key=").append(key).append(", ");
sb.append("type=").append(type).append(", ");
sb.append(SCHEMA_FIELD_ID_KEY + "=").append(key).append(", ");
sb.append(SCHEMA_FIELD_ID_TYPE + "=").append(type).append(", ");
sb.append("creations=").append(Arrays.toString(getFilenames().toArray())).append(", ");
sb.append("deletions=").append(Arrays.toString(getDeletions().toArray())).append(", ");
sb.append('}');

View File

@@ -1672,9 +1672,9 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
Map<HeaderMetadataType, String> header) {
switch (dataBlockType) {
case AVRO_DATA_BLOCK:
return new HoodieAvroDataBlock(records, header);
return new HoodieAvroDataBlock(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
case HFILE_DATA_BLOCK:
return new HoodieHFileDataBlock(records, header);
return new HoodieHFileDataBlock(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
default:
throw new RuntimeException("Unknown data block type " + dataBlockType);
}