[HUDI-2902] Fixing populate meta fields with Hfile writers and Disabling virtual keys by default for metadata table (#4194)
This commit is contained in:
committed by
GitHub
parent
ca427240c0
commit
e483f7c776
@@ -27,6 +27,7 @@ import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieUpsertException;
|
||||
import org.apache.hudi.keygen.BaseKeyGenerator;
|
||||
import org.apache.hudi.keygen.KeyGenUtils;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
@@ -72,7 +73,7 @@ public class HoodieSortedMergeHandle<T extends HoodieRecordPayload, I, K, O> ext
|
||||
*/
|
||||
@Override
|
||||
public void write(GenericRecord oldRecord) {
|
||||
String key = oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
|
||||
String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, keyGeneratorOpt);
|
||||
|
||||
// To maintain overall sorted order across updates and inserts, write any new inserts whose keys are less than
|
||||
// the oldRecord's key.
|
||||
|
||||
@@ -89,7 +89,7 @@ public class HoodieFileWriterFactory {
|
||||
config.getHFileCompressionAlgorithm(), config.getHFileBlockSize(), config.getHFileMaxFileSize(),
|
||||
PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION, filter, HFILE_COMPARATOR);
|
||||
|
||||
return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema, taskContextSupplier);
|
||||
return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema, taskContextSupplier, config.populateMetaFields());
|
||||
}
|
||||
|
||||
private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newOrcFileWriter(
|
||||
|
||||
@@ -62,6 +62,7 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedR
|
||||
private final long maxFileSize;
|
||||
private final String instantTime;
|
||||
private final TaskContextSupplier taskContextSupplier;
|
||||
private final boolean populateMetaFields;
|
||||
private HFile.Writer writer;
|
||||
private String minRecordKey;
|
||||
private String maxRecordKey;
|
||||
@@ -70,7 +71,7 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedR
|
||||
private static String DROP_BEHIND_CACHE_COMPACTION_KEY = "hbase.hfile.drop.behind.compaction";
|
||||
|
||||
public HoodieHFileWriter(String instantTime, Path file, HoodieHFileConfig hfileConfig, Schema schema,
|
||||
TaskContextSupplier taskContextSupplier) throws IOException {
|
||||
TaskContextSupplier taskContextSupplier, boolean populateMetaFields) throws IOException {
|
||||
|
||||
Configuration conf = FSUtils.registerFileSystem(file, hfileConfig.getHadoopConf());
|
||||
this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf);
|
||||
@@ -84,6 +85,7 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedR
|
||||
this.maxFileSize = hfileConfig.getMaxFileSize();
|
||||
this.instantTime = instantTime;
|
||||
this.taskContextSupplier = taskContextSupplier;
|
||||
this.populateMetaFields = populateMetaFields;
|
||||
|
||||
HFileContext context = new HFileContextBuilder().withBlockSize(hfileConfig.getBlockSize())
|
||||
.withCompression(hfileConfig.getCompressionAlgorithm())
|
||||
@@ -104,9 +106,13 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedR
|
||||
|
||||
@Override
|
||||
public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException {
|
||||
prepRecordWithMetadata(avroRecord, record, instantTime,
|
||||
taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName());
|
||||
writeAvro(record.getRecordKey(), (IndexedRecord) avroRecord);
|
||||
if (populateMetaFields) {
|
||||
prepRecordWithMetadata(avroRecord, record, instantTime,
|
||||
taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName());
|
||||
writeAvro(record.getRecordKey(), (IndexedRecord) avroRecord);
|
||||
} else {
|
||||
writeAvro(record.getRecordKey(), (IndexedRecord) avroRecord);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -22,6 +22,9 @@ import org.apache.hudi.common.bloom.BloomFilter;
|
||||
import org.apache.hudi.common.bloom.BloomFilterFactory;
|
||||
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
|
||||
import org.apache.hudi.common.engine.TaskContextSupplier;
|
||||
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
@@ -34,19 +37,24 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
|
||||
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
|
||||
@@ -55,6 +63,9 @@ import static org.apache.hudi.io.storage.HoodieHFileConfig.DROP_BEHIND_CACHE_COM
|
||||
import static org.apache.hudi.io.storage.HoodieHFileConfig.HFILE_COMPARATOR;
|
||||
import static org.apache.hudi.io.storage.HoodieHFileConfig.PREFETCH_ON_OPEN;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestHoodieHFileReaderWriter {
|
||||
@TempDir File tempDir;
|
||||
@@ -73,21 +84,34 @@ public class TestHoodieHFileReaderWriter {
|
||||
}
|
||||
}
|
||||
|
||||
private HoodieHFileWriter createHFileWriter(Schema avroSchema) throws Exception {
|
||||
private static Stream<Arguments> populateMetaFieldsAndTestAvroWithMeta() {
|
||||
return Arrays.stream(new Boolean[][] {
|
||||
{true, true},
|
||||
{false, true},
|
||||
{true, false},
|
||||
{false, false}
|
||||
}).map(Arguments::of);
|
||||
}
|
||||
|
||||
private HoodieHFileWriter createHFileWriter(Schema avroSchema, boolean populateMetaFields) throws Exception {
|
||||
BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.00001, -1, BloomFilterTypeCode.SIMPLE.name());
|
||||
Configuration conf = new Configuration();
|
||||
TaskContextSupplier mockTaskContextSupplier = Mockito.mock(TaskContextSupplier.class);
|
||||
Supplier<Integer> partitionSupplier = Mockito.mock(Supplier.class);
|
||||
when(mockTaskContextSupplier.getPartitionIdSupplier()).thenReturn(partitionSupplier);
|
||||
when(partitionSupplier.get()).thenReturn(10);
|
||||
String instantTime = "000";
|
||||
|
||||
HoodieHFileConfig hoodieHFileConfig = new HoodieHFileConfig(conf, Compression.Algorithm.GZ, 1024 * 1024, 120 * 1024 * 1024,
|
||||
PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION, filter, HFILE_COMPARATOR);
|
||||
return new HoodieHFileWriter(instantTime, filePath, hoodieHFileConfig, avroSchema, mockTaskContextSupplier);
|
||||
return new HoodieHFileWriter(instantTime, filePath, hoodieHFileConfig, avroSchema, mockTaskContextSupplier, populateMetaFields);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteReadHFile() throws Exception {
|
||||
Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchema.avsc");
|
||||
HoodieHFileWriter writer = createHFileWriter(avroSchema);
|
||||
@ParameterizedTest
|
||||
@MethodSource("populateMetaFieldsAndTestAvroWithMeta")
|
||||
public void testWriteReadHFile(boolean populateMetaFields, boolean testAvroWithMeta) throws Exception {
|
||||
Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchemaWithMetaFields.avsc");
|
||||
HoodieHFileWriter writer = createHFileWriter(avroSchema, populateMetaFields);
|
||||
List<String> keys = new ArrayList<>();
|
||||
Map<String, GenericRecord> recordMap = new HashMap<>();
|
||||
for (int i = 0; i < 100; i++) {
|
||||
@@ -97,7 +121,13 @@ public class TestHoodieHFileReaderWriter {
|
||||
keys.add(key);
|
||||
record.put("time", Integer.toString(RANDOM.nextInt()));
|
||||
record.put("number", i);
|
||||
writer.writeAvro(key, record);
|
||||
if (testAvroWithMeta) {
|
||||
writer.writeAvroWithMetadata(record, new HoodieRecord(new HoodieKey((String) record.get("_row_key"),
|
||||
Integer.toString((Integer) record.get("number"))), new EmptyHoodieRecordPayload())); // payload does not matter. GenericRecord passed in is what matters
|
||||
// only HoodieKey will be looked up from the 2nd arg(HoodieRecord).
|
||||
} else {
|
||||
writer.writeAvro(key, record);
|
||||
}
|
||||
recordMap.put(key, record);
|
||||
}
|
||||
writer.close();
|
||||
@@ -109,8 +139,8 @@ public class TestHoodieHFileReaderWriter {
|
||||
records.forEach(entry -> assertEquals(entry.getSecond(), recordMap.get(entry.getFirst())));
|
||||
hoodieHFileReader.close();
|
||||
|
||||
for (int i = 0; i < 20; i++) {
|
||||
int randomRowstoFetch = 5 + RANDOM.nextInt(50);
|
||||
for (int i = 0; i < 2; i++) {
|
||||
int randomRowstoFetch = 5 + RANDOM.nextInt(10);
|
||||
Set<String> rowsToFetch = getRandomKeys(randomRowstoFetch, keys);
|
||||
List<String> rowsList = new ArrayList<>(rowsToFetch);
|
||||
Collections.sort(rowsList);
|
||||
@@ -119,6 +149,11 @@ public class TestHoodieHFileReaderWriter {
|
||||
assertEquals(result.size(), randomRowstoFetch);
|
||||
result.forEach(entry -> {
|
||||
assertEquals(entry.getSecond(), recordMap.get(entry.getFirst()));
|
||||
if (populateMetaFields && testAvroWithMeta) {
|
||||
assertNotNull(entry.getSecond().get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
|
||||
} else {
|
||||
assertNull(entry.getSecond().get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
|
||||
}
|
||||
});
|
||||
hoodieHFileReader.close();
|
||||
}
|
||||
|
||||
@@ -0,0 +1,66 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
{
|
||||
"namespace": "example.schema",
|
||||
"type": "record",
|
||||
"name": "trip",
|
||||
"fields": [
|
||||
{
|
||||
"name": "_hoodie_commit_time",
|
||||
"type": ["null","string"],
|
||||
"default":null
|
||||
},
|
||||
{
|
||||
"name": "_hoodie_commit_seqno",
|
||||
"type": ["null","string"],
|
||||
"default":null
|
||||
},
|
||||
{
|
||||
"name": "_hoodie_record_key",
|
||||
"type": ["null","string"],
|
||||
"default":null
|
||||
},
|
||||
{
|
||||
"name": "_hoodie_partition_path",
|
||||
"type": ["null","string"],
|
||||
"default":null
|
||||
},
|
||||
{
|
||||
"name": "_hoodie_file_name",
|
||||
"type": ["null","string"],
|
||||
"default":null
|
||||
},
|
||||
{
|
||||
"name": "_hoodie_operation",
|
||||
"type": ["null","string"],
|
||||
"default":null
|
||||
},
|
||||
{
|
||||
"name": "_row_key",
|
||||
"type": "string"
|
||||
},
|
||||
{
|
||||
"name": "time",
|
||||
"type": "string"
|
||||
},
|
||||
{
|
||||
"name": "number",
|
||||
"type": ["int", "null"]
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hudi.client.functional;
|
||||
|
||||
import org.apache.hudi.avro.model.HoodieMetadataRecord;
|
||||
import org.apache.hudi.client.SparkRDDWriteClient;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.client.common.HoodieSparkEngineContext;
|
||||
@@ -29,6 +30,8 @@ import org.apache.hudi.common.fs.ConsistencyGuardConfig;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.metrics.Registry;
|
||||
import org.apache.hudi.common.model.FileSlice;
|
||||
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||
import org.apache.hudi.common.model.HoodieCleaningPolicy;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
|
||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||
@@ -41,6 +44,7 @@ import org.apache.hudi.common.model.WriteConcurrencyMode;
|
||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.HoodieTableVersion;
|
||||
import org.apache.hudi.common.table.marker.MarkerType;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
@@ -60,12 +64,18 @@ import org.apache.hudi.config.HoodieIndexConfig;
|
||||
import org.apache.hudi.config.HoodieLockConfig;
|
||||
import org.apache.hudi.config.HoodieStorageConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
|
||||
import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig;
|
||||
import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig;
|
||||
import org.apache.hudi.exception.HoodieMetadataException;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.io.storage.HoodieHFileReader;
|
||||
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
|
||||
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
|
||||
import org.apache.hudi.metadata.HoodieMetadataMetrics;
|
||||
import org.apache.hudi.metadata.HoodieMetadataPayload;
|
||||
import org.apache.hudi.metadata.HoodieTableMetadata;
|
||||
import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator;
|
||||
import org.apache.hudi.metadata.MetadataPartitionType;
|
||||
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
|
||||
import org.apache.hudi.table.HoodieSparkTable;
|
||||
@@ -75,9 +85,13 @@ import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
|
||||
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
|
||||
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
@@ -120,10 +134,12 @@ import static org.apache.hudi.common.model.WriteOperationType.DELETE;
|
||||
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
|
||||
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
|
||||
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
|
||||
import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
|
||||
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@@ -317,6 +333,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
||||
|
||||
/**
|
||||
* Tests that table services in data table won't trigger table services in metadata table.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
@@ -346,6 +363,56 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
||||
assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000004001");
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Tests that virtual key configs are honored in base files after compaction in metadata table.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testVirtualKeysInBaseFiles(boolean populateMetaFields) throws Exception {
|
||||
HoodieTableType tableType = MERGE_ON_READ;
|
||||
init(tableType, false);
|
||||
writeConfig = getWriteConfigBuilder(true, true, false)
|
||||
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
|
||||
.enable(true)
|
||||
.enableFullScan(true)
|
||||
.enableMetrics(false)
|
||||
.withPopulateMetaFields(populateMetaFields)
|
||||
.withMaxNumDeltaCommitsBeforeCompaction(2)
|
||||
.build()).build();
|
||||
initWriteConfigAndMetatableWriter(writeConfig, true);
|
||||
|
||||
doWriteOperation(testTable, "0000001", INSERT);
|
||||
doClean(testTable, "0000003", Arrays.asList("0000001"));
|
||||
// this should have triggered compaction in metadata table
|
||||
doWriteOperation(testTable, "0000004", UPSERT);
|
||||
|
||||
HoodieTableMetadata tableMetadata = metadata(writeConfig, context);
|
||||
assertTrue(tableMetadata.getLatestCompactionTime().isPresent());
|
||||
assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000004001");
|
||||
|
||||
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
|
||||
HoodieWriteConfig metadataTableWriteConfig = getMetadataWriteConfig(writeConfig);
|
||||
metadataMetaClient.reloadActiveTimeline();
|
||||
|
||||
HoodieTable table = HoodieSparkTable.create(metadataTableWriteConfig, context, metadataMetaClient);
|
||||
table.getHoodieView().sync();
|
||||
List<FileSlice> fileSlices = table.getSliceView().getLatestFileSlices("files").collect(Collectors.toList());
|
||||
HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get();
|
||||
HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(context.getHadoopConf().get(), new Path(baseFile.getPath()),
|
||||
new CacheConfig(context.getHadoopConf().get()));
|
||||
List<Pair<String, IndexedRecord>> records = hoodieHFileReader.readAllRecords();
|
||||
records.forEach(entry -> {
|
||||
if (populateMetaFields) {
|
||||
assertNotNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
|
||||
} else {
|
||||
assertNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Test rollback of various table operations sync to Metadata Table correctly.
|
||||
*/
|
||||
@@ -586,6 +653,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
||||
* Tests the metadata payload spurious deletes.
|
||||
* Lets say a commit was applied to metadata table, and later was explicitly got rolledback. Due to spark task failures, there could be more files in rollback
|
||||
* metadata when compared to the original commit metadata. When payload consistency check is enabled, it will throw exception. If not, it will succeed.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@ParameterizedTest
|
||||
@@ -1308,6 +1376,95 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetching WriteConfig for metadata table from Data table's writeConfig is not trivial and the method is not public in source code. so, for now,
|
||||
* using this method which mimics source code.
|
||||
* @param writeConfig
|
||||
* @return
|
||||
*/
|
||||
private HoodieWriteConfig getMetadataWriteConfig(HoodieWriteConfig writeConfig) {
|
||||
int parallelism = writeConfig.getMetadataInsertParallelism();
|
||||
|
||||
int minCommitsToKeep = Math.max(writeConfig.getMetadataMinCommitsToKeep(), writeConfig.getMinCommitsToKeep());
|
||||
int maxCommitsToKeep = Math.max(writeConfig.getMetadataMaxCommitsToKeep(), writeConfig.getMaxCommitsToKeep());
|
||||
|
||||
// Create the write config for the metadata table by borrowing options from the main write config.
|
||||
HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
|
||||
.withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
|
||||
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder()
|
||||
.withConsistencyCheckEnabled(writeConfig.getConsistencyGuardConfig().isConsistencyCheckEnabled())
|
||||
.withInitialConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getInitialConsistencyCheckIntervalMs())
|
||||
.withMaxConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getMaxConsistencyCheckIntervalMs())
|
||||
.withMaxConsistencyChecks(writeConfig.getConsistencyGuardConfig().getMaxConsistencyChecks())
|
||||
.build())
|
||||
.withWriteConcurrencyMode(WriteConcurrencyMode.SINGLE_WRITER)
|
||||
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).withFileListingParallelism(writeConfig.getFileListingParallelism()).build())
|
||||
.withAutoCommit(true)
|
||||
.withAvroSchemaValidate(true)
|
||||
.withEmbeddedTimelineServerEnabled(false)
|
||||
.withMarkersType(MarkerType.DIRECT.name())
|
||||
.withRollbackUsingMarkers(false)
|
||||
.withPath(HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath()))
|
||||
.withSchema(HoodieMetadataRecord.getClassSchema().toString())
|
||||
.forTable(writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||
.withAsyncClean(writeConfig.isMetadataAsyncClean())
|
||||
// we will trigger cleaning manually, to control the instant times
|
||||
.withAutoClean(false)
|
||||
.withCleanerParallelism(parallelism)
|
||||
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
|
||||
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
|
||||
.retainCommits(writeConfig.getMetadataCleanerCommitsRetained())
|
||||
.archiveCommitsWith(minCommitsToKeep, maxCommitsToKeep)
|
||||
// we will trigger compaction manually, to control the instant times
|
||||
.withInlineCompaction(false)
|
||||
.withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()).build())
|
||||
.withParallelism(parallelism, parallelism)
|
||||
.withDeleteParallelism(parallelism)
|
||||
.withRollbackParallelism(parallelism)
|
||||
.withFinalizeWriteParallelism(parallelism)
|
||||
.withAllowMultiWriteOnSameInstant(true)
|
||||
.withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
|
||||
.withPopulateMetaFields(writeConfig.getMetadataConfig().populateMetaFields());
|
||||
|
||||
// RecordKey properties are needed for the metadata table records
|
||||
final Properties properties = new Properties();
|
||||
properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY);
|
||||
properties.put("hoodie.datasource.write.recordkey.field", HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY);
|
||||
builder.withProperties(properties);
|
||||
|
||||
if (writeConfig.isMetricsOn()) {
|
||||
builder.withMetricsConfig(HoodieMetricsConfig.newBuilder()
|
||||
.withReporterType(writeConfig.getMetricsReporterType().toString())
|
||||
.withExecutorMetrics(writeConfig.isExecutorMetricsEnabled())
|
||||
.on(true).build());
|
||||
switch (writeConfig.getMetricsReporterType()) {
|
||||
case GRAPHITE:
|
||||
builder.withMetricsGraphiteConfig(HoodieMetricsGraphiteConfig.newBuilder()
|
||||
.onGraphitePort(writeConfig.getGraphiteServerPort())
|
||||
.toGraphiteHost(writeConfig.getGraphiteServerHost())
|
||||
.usePrefix(writeConfig.getGraphiteMetricPrefix()).build());
|
||||
break;
|
||||
case JMX:
|
||||
builder.withMetricsJmxConfig(HoodieMetricsJmxConfig.newBuilder()
|
||||
.onJmxPort(writeConfig.getJmxPort())
|
||||
.toJmxHost(writeConfig.getJmxHost())
|
||||
.build());
|
||||
break;
|
||||
case DATADOG:
|
||||
case PROMETHEUS:
|
||||
case PROMETHEUS_PUSHGATEWAY:
|
||||
case CONSOLE:
|
||||
case INMEMORY:
|
||||
case CLOUDWATCH:
|
||||
break;
|
||||
default:
|
||||
throw new HoodieMetadataException("Unsupported Metrics Reporter type " + writeConfig.getMetricsReporterType());
|
||||
}
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private void doPreBootstrapOperations(HoodieTestTable testTable) throws Exception {
|
||||
doPreBootstrapOperations(testTable, "0000001", "0000002");
|
||||
}
|
||||
|
||||
@@ -292,7 +292,7 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
|
||||
.enable(useFileListingMetadata)
|
||||
.enableFullScan(enableFullScan)
|
||||
.enableMetrics(enableMetrics)
|
||||
.withPopulateMetaFields(false)
|
||||
.withPopulateMetaFields(HoodieMetadataConfig.POPULATE_META_FIELDS.defaultValue())
|
||||
.ignoreSpuriousDeletes(validateMetadataPayloadConsistency)
|
||||
.build())
|
||||
.withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics)
|
||||
|
||||
@@ -118,23 +118,15 @@ public final class HoodieMetadataConfig extends HoodieConfig {
|
||||
.sinceVersion("0.7.0")
|
||||
.withDocumentation("Parallelism to use, when listing the table on lake storage.");
|
||||
|
||||
public static final ConfigProperty<Boolean> ENABLE_INLINE_READING = ConfigProperty
|
||||
.key(METADATA_PREFIX + ".enable.inline.reading")
|
||||
.defaultValue(true)
|
||||
.sinceVersion("0.10.0")
|
||||
.withDocumentation("Enable inline reading of Log files. By default log block contents are read as byte[] using regular input stream and records "
|
||||
+ "are deserialized from it. Enabling this will read each log block as an inline file and read records from the same. For instance, "
|
||||
+ "for HFileDataBlock, a inline file will be read using HFileReader.");
|
||||
|
||||
public static final ConfigProperty<Boolean> ENABLE_FULL_SCAN_LOG_FILES = ConfigProperty
|
||||
.key(METADATA_PREFIX + ".enable.full.scan.log.files")
|
||||
.defaultValue(true)
|
||||
.sinceVersion("0.10.0")
|
||||
.withDocumentation("Enable full scanning of log files while reading log records. If disabled, hudi does look up of only interested entries.");
|
||||
|
||||
public static final ConfigProperty<String> POPULATE_META_FIELDS = ConfigProperty
|
||||
public static final ConfigProperty<Boolean> POPULATE_META_FIELDS = ConfigProperty
|
||||
.key(METADATA_PREFIX + ".populate.meta.fields")
|
||||
.defaultValue("false")
|
||||
.defaultValue(true)
|
||||
.sinceVersion("0.10.0")
|
||||
.withDocumentation("When enabled, populates all meta fields. When disabled, no meta fields are populated.");
|
||||
|
||||
|
||||
@@ -28,6 +28,8 @@ import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
||||
import org.apache.hudi.common.config.HoodieMetadataConfig;
|
||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||
import org.apache.hudi.common.util.SpillableMapUtils;
|
||||
import org.apache.log4j.LogManager;
|
||||
@@ -142,7 +144,7 @@ public class HoodieMetadataMergedLogRecordReader extends HoodieMergedLogRecordSc
|
||||
*/
|
||||
public static class Builder extends HoodieMergedLogRecordScanner.Builder {
|
||||
private Set<String> mergeKeyFilter = Collections.emptySet();
|
||||
private boolean enableFullScan;
|
||||
private boolean enableFullScan = HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.defaultValue();
|
||||
private boolean enableInlineReading;
|
||||
|
||||
@Override
|
||||
|
||||
Binary file not shown.
Reference in New Issue
Block a user