From f87c47352aab70883777d80e353f9833e5aa1caa Mon Sep 17 00:00:00 2001 From: Manoj Govindassamy Date: Wed, 26 Jan 2022 10:34:04 -0800 Subject: [PATCH] [HUDI-2763] Metadata table records - support for key deduplication based on hardcoded key field (#4449) * [HUDI-2763] Metadata table records - support for key deduplication and virtual keys - The backing log format for the metadata table is HFile, a KeyValue type. Since the key field in the metadata record payload is a duplicate of the Key in the Cell, the redundant key field in the record can be emptied to save on the cost. - HoodieHFileWriter and HoodieHFileDataBlock will now serialize records with the key field emptied by default. HFile writer tries to find if the record has metadata payload schema field 'key' and if so it does the key trimming from the record payload. - HoodieHFileReader when reading the serialized records back from disk, it materializes the missing keyFields if any. HFile reader tries to find if the record has metadata payload schema fiels 'key' and if so it does the key materialization in the record payload. - Tests have been added to verify the default virtual keys and key deduplication support for the metadata table records. Co-authored-by: Vinoth Chandar --- .../io/storage/HoodieFileWriterFactory.java | 3 +- .../hudi/io/storage/HoodieHFileConfig.java | 8 +- .../hudi/io/storage/HoodieHFileWriter.java | 27 +- .../HoodieBackedTableMetadataWriter.java | 8 +- .../HoodieTableMetadataKeyGenerator.java | 2 +- .../storage/TestHoodieHFileReaderWriter.java | 2 +- .../functional/TestHoodieBackedMetadata.java | 359 +++++++++++++----- .../TestHoodieBackedTableMetadata.java | 245 ++++++++++++ .../functional/TestHoodieMetadataBase.java | 115 +++++- .../common/config/HoodieMetadataConfig.java | 2 +- .../table/log/block/HoodieHFileDataBlock.java | 28 +- .../hudi/io/storage/HoodieHFileReader.java | 62 ++- .../HoodieMetadataMergedLogRecordReader.java | 2 +- .../hudi/metadata/HoodieMetadataPayload.java | 17 +- .../hadoop/testutils/InputFormatTestUtil.java | 3 +- ...sModeWithMultipleWriters.COPY_ON_WRITE.zip | Bin 2592484 -> 2592485 bytes ...sModeWithMultipleWriters.MERGE_ON_READ.zip | Bin 3015939 -> 3015940 bytes 17 files changed, 745 insertions(+), 138 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java index 0b6afd4d2..38db1cde4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java @@ -87,7 +87,8 @@ public class HoodieFileWriterFactory { BloomFilter filter = createBloomFilter(config); HoodieHFileConfig hfileConfig = new HoodieHFileConfig(hoodieTable.getHadoopConf(), config.getHFileCompressionAlgorithm(), config.getHFileBlockSize(), config.getHFileMaxFileSize(), - PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION, filter, HFILE_COMPARATOR); + HoodieHFileReader.KEY_FIELD_NAME, PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION, + filter, HFILE_COMPARATOR); return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema, taskContextSupplier, config.populateMetaFields()); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java index 7e4c519a8..1079566b7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java @@ -43,9 +43,10 @@ public class HoodieHFileConfig { private final Configuration hadoopConf; private final BloomFilter bloomFilter; private final KeyValue.KVComparator hfileComparator; + private final String keyFieldName; public HoodieHFileConfig(Configuration hadoopConf, Compression.Algorithm compressionAlgorithm, int blockSize, - long maxFileSize, boolean prefetchBlocksOnOpen, boolean cacheDataInL1, + long maxFileSize, String keyFieldName, boolean prefetchBlocksOnOpen, boolean cacheDataInL1, boolean dropBehindCacheCompaction, BloomFilter bloomFilter, KeyValue.KVComparator hfileComparator) { this.hadoopConf = hadoopConf; this.compressionAlgorithm = compressionAlgorithm; @@ -56,6 +57,7 @@ public class HoodieHFileConfig { this.dropBehindCacheCompaction = dropBehindCacheCompaction; this.bloomFilter = bloomFilter; this.hfileComparator = hfileComparator; + this.keyFieldName = keyFieldName; } public Configuration getHadoopConf() { @@ -97,4 +99,8 @@ public class HoodieHFileConfig { public KeyValue.KVComparator getHfileComparator() { return hfileComparator; } + + public String getKeyFieldName() { + return keyFieldName; + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java index a719bcb8f..2ad6d7f92 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java @@ -38,6 +38,8 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.io.Writable; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import java.io.DataInput; import java.io.DataOutput; @@ -63,6 +65,8 @@ public class HoodieHFileWriter keyFieldSchema; private HFile.Writer writer; private String minRecordKey; private String maxRecordKey; @@ -77,6 +81,8 @@ public class HoodieHFileWriter testMetadataRecordKeyExcludeFromPayloadArgs() { + return asList( + Arguments.of(COPY_ON_WRITE, true), + Arguments.of(COPY_ON_WRITE, false), + Arguments.of(MERGE_ON_READ, true), + Arguments.of(MERGE_ON_READ, false) + ); + } + + /** + * 1. Verify metadata table records key deduplication feature. When record key + * deduplication is enabled, verify the metadata record payload on disk has empty key. + * Otherwise, verify the valid key. + * 2. Verify populate meta fields work irrespective of record key deduplication config. + * 3. Verify table services like compaction benefit from record key deduplication feature. + */ + @ParameterizedTest + @MethodSource("testMetadataRecordKeyExcludeFromPayloadArgs") + public void testMetadataRecordKeyExcludeFromPayload(final HoodieTableType tableType, final boolean enableMetaFields) throws Exception { + initPath(); + writeConfig = getWriteConfigBuilder(true, true, false) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(true) + .withPopulateMetaFields(enableMetaFields) + .withMaxNumDeltaCommitsBeforeCompaction(3) + .build()) + .build(); + init(tableType, writeConfig); + + // 2nd commit + doWriteOperation(testTable, "0000001", INSERT); + + final HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder() + .setConf(hadoopConf) + .setBasePath(metadataTableBasePath) + .build(); + HoodieWriteConfig metadataTableWriteConfig = getMetadataWriteConfig(writeConfig); + metadataMetaClient.reloadActiveTimeline(); + final HoodieTable table = HoodieSparkTable.create(metadataTableWriteConfig, context, metadataMetaClient); + + // Compaction has not yet kicked in. Verify all the log files + // for the metadata records persisted on disk as per the config. + assertDoesNotThrow(() -> { + verifyMetadataRecordKeyExcludeFromPayloadLogFiles(table, metadataMetaClient, "0000001", + enableMetaFields); + }, "Metadata table should have valid log files!"); + + // Verify no base file created yet. + assertThrows(IllegalStateException.class, () -> { + verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(table, enableMetaFields); + }, "Metadata table should not have a base file yet!"); + + // 2 more commits + doWriteOperation(testTable, "0000002", UPSERT); + doWriteOperation(testTable, "0000004", UPSERT); + + // Compaction should be triggered by now. Let's verify the log files + // if any for the metadata records persisted on disk as per the config. + assertDoesNotThrow(() -> { + verifyMetadataRecordKeyExcludeFromPayloadLogFiles(table, metadataMetaClient, "0000002", + enableMetaFields); + }, "Metadata table should have valid log files!"); + + // Verify the base file created by the just completed compaction. + assertDoesNotThrow(() -> { + verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(table, enableMetaFields); + }, "Metadata table should have a valid base file!"); + + // 2 more commits to trigger one more compaction, along with a clean + doWriteOperation(testTable, "0000005", UPSERT); + doClean(testTable, "0000006", Arrays.asList("0000004")); + doWriteOperation(testTable, "0000007", UPSERT); + + assertDoesNotThrow(() -> { + verifyMetadataRecordKeyExcludeFromPayloadLogFiles(table, metadataMetaClient, "7", enableMetaFields); + }, "Metadata table should have valid log files!"); + + assertDoesNotThrow(() -> { + verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(table, enableMetaFields); + }, "Metadata table should have a valid base file!"); + + validateMetadata(testTable); + } + + /** + * Verify the metadata table log files for the record field correctness. On disk format + * should be based on meta fields and key deduplication config. And the in-memory merged + * records should all be materialized fully irrespective of the config. + * + * @param table - Hoodie metadata test table + * @param metadataMetaClient - Metadata meta client + * @param latestCommitTimestamp - Latest commit timestamp + * @param enableMetaFields - Enable meta fields for the table records + * @throws IOException + */ + private void verifyMetadataRecordKeyExcludeFromPayloadLogFiles(HoodieTable table, HoodieTableMetaClient metadataMetaClient, + String latestCommitTimestamp, + boolean enableMetaFields) throws IOException { + table.getHoodieView().sync(); + + // Compaction should not be triggered yet. Let's verify no base file + // and few log files available. + List fileSlices = table.getSliceView() + .getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList()); + if (fileSlices.isEmpty()) { + throw new IllegalStateException("LogFile slices are not available!"); + } + + // Verify the log files honor the key deduplication and virtual keys config + List logFiles = fileSlices.get(0).getLogFiles().map(logFile -> { + return logFile; + }).collect(Collectors.toList()); + + List logFilePaths = logFiles.stream().map(logFile -> { + return logFile.getPath().toString(); + }).collect(Collectors.toList()); + + // Verify the on-disk raw records before they get materialized + verifyMetadataRawRecords(table, logFiles, enableMetaFields); + + // Verify the in-memory materialized and merged records + verifyMetadataMergedRecords(metadataMetaClient, logFilePaths, latestCommitTimestamp, enableMetaFields); + } + + /** + * Verify the metadata table on-disk raw records. When populate meta fields is enabled, + * these records should have additional meta fields in the payload. When key deduplication + * is enabled, these records on the disk should have key in the payload as empty string. + * + * @param table + * @param logFiles - Metadata table log files to be verified + * @param enableMetaFields - Enable meta fields for records + * @throws IOException + */ + private void verifyMetadataRawRecords(HoodieTable table, List logFiles, boolean enableMetaFields) throws IOException { + for (HoodieLogFile logFile : logFiles) { + FileStatus[] fsStatus = fs.listStatus(logFile.getPath()); + MessageType writerSchemaMsg = TableSchemaResolver.readSchemaFromLogFile(fs, logFile.getPath()); + if (writerSchemaMsg == null) { + // not a data block + continue; + } + + Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg); + HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema); + + while (logFileReader.hasNext()) { + HoodieLogBlock logBlock = logFileReader.next(); + if (logBlock instanceof HoodieDataBlock) { + for (IndexedRecord indexRecord : ((HoodieDataBlock) logBlock).getRecords()) { + final GenericRecord record = (GenericRecord) indexRecord; + if (enableMetaFields) { + // Metadata table records should have meta fields! + assertNotNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + assertNotNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)); + } else { + // Metadata table records should not have meta fields! + assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)); + } + + final String key = String.valueOf(record.get(HoodieMetadataPayload.KEY_FIELD_NAME)); + assertFalse(key.isEmpty()); + if (enableMetaFields) { + assertTrue(key.equals(String.valueOf(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD)))); + } + } + } + } + } + } + + /** + * Verify the metadata table in-memory merged records. Irrespective of key deduplication + * config, the in-memory merged records should always have the key field in the record + * payload fully materialized. + * + * @param metadataMetaClient - Metadata table meta client + * @param logFilePaths - Metadata table log file paths + * @param latestCommitTimestamp + * @param enableMetaFields - Enable meta fields + */ + private void verifyMetadataMergedRecords(HoodieTableMetaClient metadataMetaClient, List logFilePaths, + String latestCommitTimestamp, boolean enableMetaFields) { + Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema()); + if (enableMetaFields) { + schema = HoodieAvroUtils.addMetadataFields(schema); + } + HoodieMetadataMergedLogRecordReader logRecordReader = HoodieMetadataMergedLogRecordReader.newBuilder() + .withFileSystem(metadataMetaClient.getFs()) + .withBasePath(metadataMetaClient.getBasePath()) + .withLogFilePaths(logFilePaths) + .withLatestInstantTime(latestCommitTimestamp) + .withPartition(MetadataPartitionType.FILES.partitionPath()) + .withReaderSchema(schema) + .withMaxMemorySizeInBytes(100000L) + .withBufferSize(4096) + .withSpillableMapBasePath(tempDir.toString()) + .withDiskMapType(ExternalSpillableMap.DiskMapType.BITCASK) + .build(); + + assertDoesNotThrow(() -> { + logRecordReader.scan(); + }, "Metadata log records materialization failed"); + + for (Map.Entry> entry : logRecordReader.getRecords().entrySet()) { + assertFalse(entry.getKey().isEmpty()); + assertFalse(entry.getValue().getRecordKey().isEmpty()); + assertEquals(entry.getKey(), entry.getValue().getRecordKey()); + } + } + + /** + * Verify metadata table base files for the records persisted based on the config. When + * the key deduplication is enabled, the records persisted on the disk in the base file + * should have key field in the payload as empty string. + * + * @param table - Metadata table + * @param enableMetaFields - Enable meta fields + */ + private void verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(HoodieTable table, boolean enableMetaFields) throws IOException { + table.getHoodieView().sync(); + List fileSlices = table.getSliceView() + .getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList()); + if (!fileSlices.get(0).getBaseFile().isPresent()) { + throw new IllegalStateException("Base file not available!"); + } + final HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get(); + + HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(context.getHadoopConf().get(), + new Path(baseFile.getPath()), + new CacheConfig(context.getHadoopConf().get())); + List> records = hoodieHFileReader.readAllRecords(); + records.forEach(entry -> { + if (enableMetaFields) { + assertNotNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + } else { + assertNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + } + + final String keyInPayload = (String) ((GenericRecord) entry.getSecond()) + .get(HoodieMetadataPayload.KEY_FIELD_NAME); + assertFalse(keyInPayload.isEmpty()); + }); + } + /** * Test rollback of various table operations sync to Metadata Table correctly. */ @@ -1492,95 +1746,6 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { } } - /** - * Fetching WriteConfig for metadata table from Data table's writeConfig is not trivial and the method is not public in source code. so, for now, - * using this method which mimics source code. - * @param writeConfig - * @return - */ - private HoodieWriteConfig getMetadataWriteConfig(HoodieWriteConfig writeConfig) { - int parallelism = writeConfig.getMetadataInsertParallelism(); - - int minCommitsToKeep = Math.max(writeConfig.getMetadataMinCommitsToKeep(), writeConfig.getMinCommitsToKeep()); - int maxCommitsToKeep = Math.max(writeConfig.getMetadataMaxCommitsToKeep(), writeConfig.getMaxCommitsToKeep()); - - // Create the write config for the metadata table by borrowing options from the main write config. - HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder() - .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) - .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder() - .withConsistencyCheckEnabled(writeConfig.getConsistencyGuardConfig().isConsistencyCheckEnabled()) - .withInitialConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getInitialConsistencyCheckIntervalMs()) - .withMaxConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getMaxConsistencyCheckIntervalMs()) - .withMaxConsistencyChecks(writeConfig.getConsistencyGuardConfig().getMaxConsistencyChecks()) - .build()) - .withWriteConcurrencyMode(WriteConcurrencyMode.SINGLE_WRITER) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).withFileListingParallelism(writeConfig.getFileListingParallelism()).build()) - .withAutoCommit(true) - .withAvroSchemaValidate(true) - .withEmbeddedTimelineServerEnabled(false) - .withMarkersType(MarkerType.DIRECT.name()) - .withRollbackUsingMarkers(false) - .withPath(HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath())) - .withSchema(HoodieMetadataRecord.getClassSchema().toString()) - .forTable(writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX) - .withCompactionConfig(HoodieCompactionConfig.newBuilder() - .withAsyncClean(writeConfig.isMetadataAsyncClean()) - // we will trigger cleaning manually, to control the instant times - .withAutoClean(false) - .withCleanerParallelism(parallelism) - .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) - .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) - .retainCommits(writeConfig.getMetadataCleanerCommitsRetained()) - .archiveCommitsWith(minCommitsToKeep, maxCommitsToKeep) - // we will trigger compaction manually, to control the instant times - .withInlineCompaction(false) - .withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()).build()) - .withParallelism(parallelism, parallelism) - .withDeleteParallelism(parallelism) - .withRollbackParallelism(parallelism) - .withFinalizeWriteParallelism(parallelism) - .withAllowMultiWriteOnSameInstant(true) - .withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName()) - .withPopulateMetaFields(writeConfig.getMetadataConfig().populateMetaFields()); - - // RecordKey properties are needed for the metadata table records - final Properties properties = new Properties(); - properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY); - properties.put("hoodie.datasource.write.recordkey.field", HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY); - builder.withProperties(properties); - - if (writeConfig.isMetricsOn()) { - builder.withMetricsConfig(HoodieMetricsConfig.newBuilder() - .withReporterType(writeConfig.getMetricsReporterType().toString()) - .withExecutorMetrics(writeConfig.isExecutorMetricsEnabled()) - .on(true).build()); - switch (writeConfig.getMetricsReporterType()) { - case GRAPHITE: - builder.withMetricsGraphiteConfig(HoodieMetricsGraphiteConfig.newBuilder() - .onGraphitePort(writeConfig.getGraphiteServerPort()) - .toGraphiteHost(writeConfig.getGraphiteServerHost()) - .usePrefix(writeConfig.getGraphiteMetricPrefix()).build()); - break; - case JMX: - builder.withMetricsJmxConfig(HoodieMetricsJmxConfig.newBuilder() - .onJmxPort(writeConfig.getJmxPort()) - .toJmxHost(writeConfig.getJmxHost()) - .build()); - break; - case DATADOG: - case PROMETHEUS: - case PROMETHEUS_PUSHGATEWAY: - case CONSOLE: - case INMEMORY: - case CLOUDWATCH: - break; - default: - throw new HoodieMetadataException("Unsupported Metrics Reporter type " + writeConfig.getMetricsReporterType()); - } - } - return builder.build(); - } - private void doPreBootstrapOperations(HoodieTestTable testTable) throws Exception { doPreBootstrapOperations(testTable, "0000001", "0000002"); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java index d6f151e34..1abe15bd0 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java @@ -18,30 +18,63 @@ package org.apache.hudi.client.functional; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.model.HoodieMetadataRecord; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.log.HoodieLogFormat; +import org.apache.hudi.common.table.log.block.HoodieDataBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.storage.HoodieHFileReader; import org.apache.hudi.metadata.HoodieBackedTableMetadata; +import org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader; +import org.apache.hudi.metadata.HoodieMetadataPayload; import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator; +import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hadoop.fs.FileStatus; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.schema.MessageType; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static org.apache.hudi.common.model.WriteOperationType.INSERT; +import static org.apache.hudi.common.model.WriteOperationType.UPSERT; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -123,4 +156,216 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase { tableMetadata.getAllFilesInPartition(new Path(writeConfig.getBasePath() + "dummy")); assertEquals(allFilesInPartition.length, 0); } + + /** + * 1. Verify metadata table records key deduplication feature. When record key + * deduplication is enabled, verify the metadata record payload on disk has empty key. + * Otherwise, verify the valid key. + * 2. Verify populate meta fields work irrespective of record key deduplication config. + * 3. Verify table services like compaction benefit from record key deduplication feature. + */ + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testMetadataRecordKeyExcludeFromPayload(final HoodieTableType tableType) throws Exception { + initPath(); + writeConfig = getWriteConfigBuilder(true, true, false) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(true) + .withPopulateMetaFields(false) + .withMaxNumDeltaCommitsBeforeCompaction(3) + .build()) + .build(); + init(tableType, writeConfig); + + // 2nd commit + doWriteOperation(testTable, "0000001", INSERT); + + final HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder() + .setConf(hadoopConf) + .setBasePath(metadataTableBasePath) + .build(); + HoodieWriteConfig metadataTableWriteConfig = getMetadataWriteConfig(writeConfig); + metadataMetaClient.reloadActiveTimeline(); + final HoodieTable table = HoodieSparkTable.create(metadataTableWriteConfig, context, metadataMetaClient); + + // Compaction has not yet kicked in. Verify all the log files + // for the metadata records persisted on disk as per the config. + assertDoesNotThrow(() -> { + verifyMetadataRecordKeyExcludeFromPayloadLogFiles(table, metadataMetaClient, "0000001"); + }, "Metadata table should have valid log files!"); + + // Verify no base file created yet. + assertThrows(IllegalStateException.class, () -> { + verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(table); + }, "Metadata table should not have a base file yet!"); + + // 2 more commits + doWriteOperation(testTable, "0000002", UPSERT); + doWriteOperation(testTable, "0000004", UPSERT); + + // Compaction should be triggered by now. Let's verify the log files + // if any for the metadata records persisted on disk as per the config. + assertDoesNotThrow(() -> { + verifyMetadataRecordKeyExcludeFromPayloadLogFiles(table, metadataMetaClient, "0000002"); + }, "Metadata table should have valid log files!"); + + // Verify the base file created by the just completed compaction. + assertDoesNotThrow(() -> { + verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(table); + }, "Metadata table should have a valid base file!"); + + // 2 more commits to trigger one more compaction, along with a clean + doWriteOperation(testTable, "0000005", UPSERT); + doClean(testTable, "0000006", Arrays.asList("0000004")); + doWriteOperation(testTable, "0000007", UPSERT); + + assertDoesNotThrow(() -> { + verifyMetadataRecordKeyExcludeFromPayloadLogFiles(table, metadataMetaClient, "7"); + }, "Metadata table should have valid log files!"); + + assertDoesNotThrow(() -> { + verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(table); + }, "Metadata table should have a valid base file!"); + + validateMetadata(testTable); + } + + /** + * Verify the metadata table log files for the record field correctness. On disk format + * should be based on meta fields and key deduplication config. And the in-memory merged + * records should all be materialized fully irrespective of the config. + * + * @param table - Hoodie metadata test table + * @param metadataMetaClient - Metadata meta client + * @param latestCommitTimestamp - Latest commit timestamp + * @throws IOException + */ + private void verifyMetadataRecordKeyExcludeFromPayloadLogFiles(HoodieTable table, HoodieTableMetaClient metadataMetaClient, + String latestCommitTimestamp) throws IOException { + table.getHoodieView().sync(); + + // Compaction should not be triggered yet. Let's verify no base file + // and few log files available. + List fileSlices = table.getSliceView() + .getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList()); + if (fileSlices.isEmpty()) { + throw new IllegalStateException("LogFile slices are not available!"); + } + + // Verify the log files honor the key deduplication and virtual keys config + List logFiles = fileSlices.get(0).getLogFiles().map(logFile -> { + return logFile; + }).collect(Collectors.toList()); + + List logFilePaths = logFiles.stream().map(logFile -> { + return logFile.getPath().toString(); + }).collect(Collectors.toList()); + + // Verify the on-disk raw records before they get materialized + verifyMetadataRawRecords(table, logFiles); + + // Verify the in-memory materialized and merged records + verifyMetadataMergedRecords(metadataMetaClient, logFilePaths, latestCommitTimestamp); + } + + /** + * Verify the metadata table on-disk raw records. When populate meta fields is enabled, + * these records should have additional meta fields in the payload. When key deduplication + * is enabled, these records on the disk should have key in the payload as empty string. + * + * @param table + * @param logFiles - Metadata table log files to be verified + * @throws IOException + */ + private void verifyMetadataRawRecords(HoodieTable table, List logFiles) throws IOException { + for (HoodieLogFile logFile : logFiles) { + FileStatus[] fsStatus = fs.listStatus(logFile.getPath()); + MessageType writerSchemaMsg = TableSchemaResolver.readSchemaFromLogFile(fs, logFile.getPath()); + if (writerSchemaMsg == null) { + // not a data block + continue; + } + + Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg); + HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema); + + while (logFileReader.hasNext()) { + HoodieLogBlock logBlock = logFileReader.next(); + if (logBlock instanceof HoodieDataBlock) { + for (IndexedRecord indexRecord : ((HoodieDataBlock) logBlock).getRecords()) { + final GenericRecord record = (GenericRecord) indexRecord; + // Metadata table records should not have meta fields! + assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)); + + final String key = String.valueOf(record.get(HoodieMetadataPayload.KEY_FIELD_NAME)); + assertFalse(key.isEmpty()); + } + } + } + } + } + + /** + * Verify the metadata table in-memory merged records. Irrespective of key deduplication + * config, the in-memory merged records should always have the key field in the record + * payload fully materialized. + * + * @param metadataMetaClient - Metadata table meta client + * @param logFilePaths - Metadata table log file paths + * @param latestCommitTimestamp - Latest commit timestamp + */ + private void verifyMetadataMergedRecords(HoodieTableMetaClient metadataMetaClient, List logFilePaths, String latestCommitTimestamp) { + Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema()); + HoodieMetadataMergedLogRecordReader logRecordReader = HoodieMetadataMergedLogRecordReader.newBuilder() + .withFileSystem(metadataMetaClient.getFs()) + .withBasePath(metadataMetaClient.getBasePath()) + .withLogFilePaths(logFilePaths) + .withLatestInstantTime(latestCommitTimestamp) + .withPartition(MetadataPartitionType.FILES.partitionPath()) + .withReaderSchema(schema) + .withMaxMemorySizeInBytes(100000L) + .withBufferSize(4096) + .withSpillableMapBasePath(tempDir.toString()) + .withDiskMapType(ExternalSpillableMap.DiskMapType.BITCASK) + .build(); + + assertDoesNotThrow(() -> { + logRecordReader.scan(); + }, "Metadata log records materialization failed"); + + for (Map.Entry> entry : logRecordReader.getRecords().entrySet()) { + assertFalse(entry.getKey().isEmpty()); + assertFalse(entry.getValue().getRecordKey().isEmpty()); + assertEquals(entry.getKey(), entry.getValue().getRecordKey()); + } + } + + /** + * Verify metadata table base files for the records persisted based on the config. When + * the key deduplication is enabled, the records persisted on the disk in the base file + * should have key field in the payload as empty string. + * + * @param table - Metadata table + */ + private void verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(HoodieTable table) throws IOException { + table.getHoodieView().sync(); + List fileSlices = table.getSliceView() + .getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList()); + if (!fileSlices.get(0).getBaseFile().isPresent()) { + throw new IllegalStateException("Base file not available!"); + } + final HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get(); + + HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(context.getHadoopConf().get(), + new Path(baseFile.getPath()), + new CacheConfig(context.getHadoopConf().get())); + List> records = hoodieHFileReader.readAllRecords(); + records.forEach(entry -> { + assertNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + final String keyInPayload = (String) ((GenericRecord) entry.getSecond()) + .get(HoodieMetadataPayload.KEY_FIELD_NAME); + assertFalse(keyInPayload.isEmpty()); + }); + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java index 56c9f016b..f41985855 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java @@ -18,12 +18,19 @@ package org.apache.hudi.client.functional; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.fs.ConsistencyGuardConfig; +import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.marker.MarkerType; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.testutils.HoodieMetadataTestTable; import org.apache.hudi.common.testutils.HoodieTestTable; @@ -33,17 +40,19 @@ import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig; +import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig; +import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.SimpleKeyGenerator; +import org.apache.hudi.metadata.HoodieMetadataPayload; import org.apache.hudi.metadata.HoodieTableMetadata; +import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTimelineArchiveLog; import org.apache.hudi.testutils.HoodieClientTestHarness; - -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.junit.jupiter.api.AfterEach; @@ -59,6 +68,7 @@ import static java.util.Collections.emptyList; import static org.apache.hudi.common.model.WriteOperationType.INSERT; import static org.apache.hudi.common.model.WriteOperationType.UPSERT; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; +import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX; public class TestHoodieMetadataBase extends HoodieClientTestHarness { @@ -94,6 +104,20 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness { initWriteConfigAndMetatableWriter(writeConfig, enableMetadataTable); } + public void init(HoodieTableType tableType, HoodieWriteConfig writeConfig) throws IOException { + this.tableType = tableType; + initPath(); + initSparkContexts("TestHoodieMetadata"); + initFileSystem(); + fs.mkdirs(new Path(basePath)); + initTimelineService(); + initMetaClient(tableType); + initTestDataGenerator(); + metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath); + this.writeConfig = writeConfig; + initWriteConfigAndMetatableWriter(writeConfig, writeConfig.isMetadataTableEnabled()); + } + protected void initWriteConfigAndMetatableWriter(HoodieWriteConfig writeConfig, boolean enableMetadataTable) { this.writeConfig = writeConfig; if (enableMetadataTable) { @@ -327,4 +351,91 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness { .withProperties(properties); } + /** + * Fetching WriteConfig for metadata table from Data table's writeConfig is not trivial and + * the method is not public in source code. so, for now, using this method which mimics source code. + */ + protected HoodieWriteConfig getMetadataWriteConfig(HoodieWriteConfig writeConfig) { + int parallelism = writeConfig.getMetadataInsertParallelism(); + + int minCommitsToKeep = Math.max(writeConfig.getMetadataMinCommitsToKeep(), writeConfig.getMinCommitsToKeep()); + int maxCommitsToKeep = Math.max(writeConfig.getMetadataMaxCommitsToKeep(), writeConfig.getMaxCommitsToKeep()); + + // Create the write config for the metadata table by borrowing options from the main write config. + HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder() + .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) + .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder() + .withConsistencyCheckEnabled(writeConfig.getConsistencyGuardConfig().isConsistencyCheckEnabled()) + .withInitialConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getInitialConsistencyCheckIntervalMs()) + .withMaxConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getMaxConsistencyCheckIntervalMs()) + .withMaxConsistencyChecks(writeConfig.getConsistencyGuardConfig().getMaxConsistencyChecks()) + .build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.SINGLE_WRITER) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).withFileListingParallelism(writeConfig.getFileListingParallelism()).build()) + .withAutoCommit(true) + .withAvroSchemaValidate(true) + .withEmbeddedTimelineServerEnabled(false) + .withMarkersType(MarkerType.DIRECT.name()) + .withRollbackUsingMarkers(false) + .withPath(HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath())) + .withSchema(HoodieMetadataRecord.getClassSchema().toString()) + .forTable(writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withAsyncClean(writeConfig.isMetadataAsyncClean()) + // we will trigger cleaning manually, to control the instant times + .withAutoClean(false) + .withCleanerParallelism(parallelism) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) + .retainCommits(writeConfig.getMetadataCleanerCommitsRetained()) + .archiveCommitsWith(minCommitsToKeep, maxCommitsToKeep) + // we will trigger compaction manually, to control the instant times + .withInlineCompaction(false) + .withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()).build()) + .withParallelism(parallelism, parallelism) + .withDeleteParallelism(parallelism) + .withRollbackParallelism(parallelism) + .withFinalizeWriteParallelism(parallelism) + .withAllowMultiWriteOnSameInstant(true) + .withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName()) + .withPopulateMetaFields(writeConfig.getMetadataConfig().populateMetaFields()); + + // RecordKey properties are needed for the metadata table records + final Properties properties = new Properties(); + properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), HoodieMetadataPayload.KEY_FIELD_NAME); + properties.put("hoodie.datasource.write.recordkey.field", HoodieMetadataPayload.KEY_FIELD_NAME); + builder.withProperties(properties); + + if (writeConfig.isMetricsOn()) { + builder.withMetricsConfig(HoodieMetricsConfig.newBuilder() + .withReporterType(writeConfig.getMetricsReporterType().toString()) + .withExecutorMetrics(writeConfig.isExecutorMetricsEnabled()) + .on(true).build()); + switch (writeConfig.getMetricsReporterType()) { + case GRAPHITE: + builder.withMetricsGraphiteConfig(HoodieMetricsGraphiteConfig.newBuilder() + .onGraphitePort(writeConfig.getGraphiteServerPort()) + .toGraphiteHost(writeConfig.getGraphiteServerHost()) + .usePrefix(writeConfig.getGraphiteMetricPrefix()).build()); + break; + case JMX: + builder.withMetricsJmxConfig(HoodieMetricsJmxConfig.newBuilder() + .onJmxPort(writeConfig.getJmxPort()) + .toJmxHost(writeConfig.getJmxHost()) + .build()); + break; + case DATADOG: + case PROMETHEUS: + case PROMETHEUS_PUSHGATEWAY: + case CONSOLE: + case INMEMORY: + case CLOUDWATCH: + break; + default: + throw new HoodieMetadataException("Unsupported Metrics Reporter type " + writeConfig.getMetricsReporterType()); + } + } + return builder.build(); + } + } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index 51791c945..21ba5f4dd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -126,7 +126,7 @@ public final class HoodieMetadataConfig extends HoodieConfig { public static final ConfigProperty POPULATE_META_FIELDS = ConfigProperty .key(METADATA_PREFIX + ".populate.meta.fields") - .defaultValue(true) + .defaultValue(false) .sinceVersion("0.10.0") .withDocumentation("When enabled, populates all meta fields. When disabled, no meta fields are populated."); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java index 7f1fa2aa1..02b500458 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java @@ -22,8 +22,8 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.inline.InLineFSUtils; import org.apache.hudi.common.fs.inline.InLineFileSystem; import org.apache.hudi.common.model.HoodieLogFile; -import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.storage.HoodieHBaseKVComparator; @@ -83,10 +83,6 @@ public class HoodieHFileDataBlock extends HoodieDataBlock { super(records, header, new HashMap<>(), keyField); } - public HoodieHFileDataBlock(@Nonnull List records, @Nonnull Map header) { - this(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD); - } - @Override public HoodieLogBlockType getBlockType() { return HoodieLogBlockType.HFILE_DATA_BLOCK; @@ -110,8 +106,8 @@ public class HoodieHFileDataBlock extends HoodieDataBlock { boolean useIntegerKey = false; int key = 0; int keySize = 0; - Field keyField = records.get(0).getSchema().getField(this.keyField); - if (keyField == null) { + final Field keyFieldSchema = records.get(0).getSchema().getField(HoodieHFileReader.KEY_FIELD_NAME); + if (keyFieldSchema == null) { // Missing key metadata field so we should use an integer sequence key useIntegerKey = true; keySize = (int) Math.ceil(Math.log(records.size())) + 1; @@ -122,9 +118,9 @@ public class HoodieHFileDataBlock extends HoodieDataBlock { if (useIntegerKey) { recordKey = String.format("%" + keySize + "s", key++); } else { - recordKey = record.get(keyField.pos()).toString(); + recordKey = record.get(keyFieldSchema.pos()).toString(); } - byte[] recordBytes = HoodieAvroUtils.indexedRecordToBytes(record); + final byte[] recordBytes = serializeRecord(record, Option.ofNullable(keyFieldSchema)); ValidationUtils.checkState(!sortedRecordsMap.containsKey(recordKey), "Writing multiple records with same key not supported for " + this.getClass().getName()); sortedRecordsMap.put(recordKey, recordBytes); @@ -162,6 +158,20 @@ public class HoodieHFileDataBlock extends HoodieDataBlock { return records; } + /** + * Serialize the record to byte buffer. + * + * @param record - Record to serialize + * @param keyField - Key field in the schema + * @return Serialized byte buffer for the record + */ + private byte[] serializeRecord(final IndexedRecord record, final Option keyField) { + if (keyField.isPresent()) { + record.put(keyField.get().pos(), StringUtils.EMPTY_STRING); + } + return HoodieAvroUtils.indexedRecordToBytes(record); + } + private void readWithInlineFS(List keys) throws IOException { boolean enableFullScan = keys.isEmpty(); // Get schema from the header diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java index e3e38eca8..f4058911e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java @@ -50,6 +50,7 @@ import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -63,6 +64,7 @@ public class HoodieHFileReader implements HoodieFileRea // key retrieval. private HFileScanner keyScanner; + public static final String KEY_FIELD_NAME = "key"; public static final String KEY_SCHEMA = "schema"; public static final String KEY_BLOOM_FILTER_META_BLOCK = "bloomFilter"; public static final String KEY_BLOOM_FILTER_TYPE_CODE = "bloomFilterTypeCode"; @@ -151,15 +153,15 @@ public class HoodieHFileReader implements HoodieFileRea } public List> readAllRecords(Schema writerSchema, Schema readerSchema) throws IOException { + final Option keyFieldSchema = Option.ofNullable(readerSchema.getField(KEY_FIELD_NAME)); List> recordList = new LinkedList<>(); try { final HFileScanner scanner = reader.getScanner(false, false); if (scanner.seekTo()) { do { Cell c = scanner.getKeyValue(); - byte[] keyBytes = Arrays.copyOfRange(c.getRowArray(), c.getRowOffset(), c.getRowOffset() + c.getRowLength()); - R record = getRecordFromCell(c, writerSchema, readerSchema); - recordList.add(new Pair<>(new String(keyBytes), record)); + final Pair keyAndRecordPair = getRecordFromCell(c, writerSchema, readerSchema, keyFieldSchema); + recordList.add(keyAndRecordPair); } while (scanner.next()); } @@ -196,6 +198,9 @@ public class HoodieHFileReader implements HoodieFileRea @Override public Iterator getRecordIterator(Schema readerSchema) throws IOException { final HFileScanner scanner = reader.getScanner(false, false); + final Option keyFieldSchema = Option.ofNullable(readerSchema.getField(KEY_FIELD_NAME)); + ValidationUtils.checkState(keyFieldSchema != null, + "Missing key field '" + KEY_FIELD_NAME + "' in the schema!"); return new Iterator() { private R next = null; private boolean eof = false; @@ -206,7 +211,8 @@ public class HoodieHFileReader implements HoodieFileRea // To handle when hasNext() is called multiple times for idempotency and/or the first time if (this.next == null && !this.eof) { if (!scanner.isSeeked() && scanner.seekTo()) { - this.next = getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema); + final Pair keyAndRecordPair = getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema, keyFieldSchema); + this.next = keyAndRecordPair.getSecond(); } } return this.next != null; @@ -226,7 +232,8 @@ public class HoodieHFileReader implements HoodieFileRea } R retVal = this.next; if (scanner.next()) { - this.next = getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema); + final Pair keyAndRecordPair = getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema, keyFieldSchema); + this.next = keyAndRecordPair.getSecond(); } else { this.next = null; this.eof = true; @@ -242,6 +249,8 @@ public class HoodieHFileReader implements HoodieFileRea @Override public Option getRecordByKey(String key, Schema readerSchema) throws IOException { byte[] value = null; + final Option keyFieldSchema = Option.ofNullable(readerSchema.getField(KEY_FIELD_NAME)); + ValidationUtils.checkState(keyFieldSchema != null); KeyValue kv = new KeyValue(key.getBytes(), null, null, null); synchronized (this) { @@ -257,16 +266,51 @@ public class HoodieHFileReader implements HoodieFileRea } if (value != null) { - R record = (R)HoodieAvroUtils.bytesToAvro(value, getSchema(), readerSchema); + R record = deserialize(key.getBytes(), value, getSchema(), readerSchema, keyFieldSchema); return Option.of(record); } return Option.empty(); } - private R getRecordFromCell(Cell c, Schema writerSchema, Schema readerSchema) throws IOException { - byte[] value = Arrays.copyOfRange(c.getValueArray(), c.getValueOffset(), c.getValueOffset() + c.getValueLength()); - return (R)HoodieAvroUtils.bytesToAvro(value, writerSchema, readerSchema); + private Pair getRecordFromCell(Cell cell, Schema writerSchema, Schema readerSchema, Option keyFieldSchema) throws IOException { + final byte[] keyBytes = Arrays.copyOfRange(cell.getRowArray(), cell.getRowOffset(), cell.getRowOffset() + cell.getRowLength()); + final byte[] valueBytes = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength()); + R record = deserialize(keyBytes, valueBytes, writerSchema, readerSchema, keyFieldSchema); + return new Pair<>(new String(keyBytes), record); + } + + /** + * Deserialize the record byte array contents to record object. + * + * @param keyBytes - Record key as byte array + * @param valueBytes - Record content as byte array + * @param writerSchema - Writer schema + * @param readerSchema - Reader schema + * @param keyFieldSchema - Key field id in the schema + * @return Deserialized record object + */ + private R deserialize(final byte[] keyBytes, final byte[] valueBytes, Schema writerSchema, Schema readerSchema, + Option keyFieldSchema) throws IOException { + R record = (R) HoodieAvroUtils.bytesToAvro(valueBytes, writerSchema, readerSchema); + materializeRecordIfNeeded(keyBytes, record, keyFieldSchema); + return record; + } + + /** + * Materialize the record for any missing fields, if needed. + * + * @param keyBytes - Key byte array + * @param record - Record object to materialize + * @param keyFieldSchema - Key field id in the schema + */ + private void materializeRecordIfNeeded(final byte[] keyBytes, R record, Option keyFieldSchema) { + if (keyFieldSchema.isPresent()) { + final Object keyObject = record.get(keyFieldSchema.get().pos()); + if (keyObject != null && keyObject.toString().isEmpty()) { + record.put(keyFieldSchema.get().pos(), new String(keyBytes)); + } + } } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java index 01c8d05e9..c03bf40c4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java @@ -139,7 +139,7 @@ public class HoodieMetadataMergedLogRecordReader extends HoodieMergedLogRecordSc @Override protected String getKeyField() { - return HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY; + return HoodieMetadataPayload.KEY_FIELD_NAME; } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 0b0d144a6..a80f33b2d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hudi.io.storage.HoodieHFileReader; import java.io.IOException; import java.util.Arrays; @@ -63,9 +64,9 @@ import static org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_L public class HoodieMetadataPayload implements HoodieRecordPayload { // HoodieMetadata schema field ids - public static final String SCHEMA_FIELD_ID_KEY = "key"; - public static final String SCHEMA_FIELD_ID_TYPE = "type"; - public static final String SCHEMA_FIELD_ID_METADATA = "filesystemMetadata"; + public static final String KEY_FIELD_NAME = HoodieHFileReader.KEY_FIELD_NAME; + public static final String SCHEMA_FIELD_NAME_TYPE = "type"; + public static final String SCHEMA_FIELD_NAME_METADATA = "filesystemMetadata"; // Type of the record // This can be an enum in the schema but Avro 1.8 has a bug - https://issues.apache.org/jira/browse/AVRO-1810 @@ -84,9 +85,9 @@ public class HoodieMetadataPayload implements HoodieRecordPayload) record.get().get("filesystemMetadata"); filesystemMetadata.keySet().forEach(k -> { GenericRecord v = filesystemMetadata.get(k); @@ -237,8 +238,8 @@ public class HoodieMetadataPayload implements HoodieRecordPayload header = new HashMap<>(); header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchema.toString()); - HoodieDataBlock dataBlock = (logBlockType == HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK) ? new HoodieHFileDataBlock(records, header) : + HoodieDataBlock dataBlock = (logBlockType == HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK) + ? new HoodieHFileDataBlock(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD) : new HoodieAvroDataBlock(records, header); writer.appendBlock(dataBlock); return writer; diff --git a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip index 299b070bee34ac21e9b4a8426141e71e10a1b682..9611d276905776de22078a21ae90c6e7374bb614 100644 GIT binary patch delta 9168 zcmZ`;30PD|w(bU$eU(*K1t|p;L5;G=;w~t=&}?ofs~`whAyFCx1T~;enN%c(#Y{Ad zM$m~Ew_s+>%*({(ON4CHL?f(%l(fk7lqoDF4z#3*X1yc9?A8J_+hT@hms>{C-^~K`LKy&^ns3e=raQw! zX_Ywv@lJDpv~=BTp&87EQBwO%^;f+Jq?4&6zlh&uE`lDE--Xv?w!4Nn`C#f}E)X;*js~ zF5+P6yb^qrIb$Y(34`=3*o$~#&Z1|N;q+H$Ocd}jdJGO+P;rY(hcP->68@$)AQHwN z#S$}G?(PACt?p>&j62%&_rTv79*KHk-w^1h9vHT}XFG;{+zTH(yu}>MN1=qLmGC*VWL1d%sb?fBq8tqS+wkKjeKfjHty`-2fOEFnN<8-h?& zh9DICLoiYE$qz7xpQfPE#Zwu20HbVR5pkl?q0V|d8Uz^3tj73>JbEt_d2}um8H0he zD$UvN@x+SG3)9aGk|+s7-~R~Hl=fjPF{g{dk#rc{U4d>3&x41j;UX`WfBxs+L@%lc z^imr!#7lN?7O|jDB8G@*+;yUiPMndfhrm;T?wGMyFZhQLd}ar&7R^qHM6=nEJoqSj zFwC6Qq!3GbDbiVQliQp`7s1pn$DB@!a@AX%nz5ZOi<;*v%s9T>E%Nwsp~_OBH2F!X zP?$(J$W4I`HYBo%MZ}p-igpFO%NkE9jbPe7Hi11!g^j!>)`m`v9-)`aeNlq8l-ST3 zl?7cha~R#1J3=qH$B^79-`YT$)DTOb#M-ipX=E793Bl@j%?f}Lb;c%AkJ(e;Icc^t zJQvIsAcpT|=U~l!R7q?}DRJTZ?8Et-G$=NhlE@~M5EmM?0QXc)))=Gf0K$z!Ym`;^a5*1RzYNPj?gB*&|<3VGEZ=vQH5VocGW zC)u66>$K@#bKcq4Wl}lzFNGG1ie^5}^F5fGbU85X@R`pJ&0kUf+P#wXdFwV`UUlZy z&UerH82wma^2v$|X?xyxxjc8}gGa9(*;ws#b|C9Od)nm{&w{c>xNS>LxfwGr?%3d0+nRQq$Unu0r4Oqcg&e`@84w_{R7dbvE68|E-Sy{o(0@)b<~GukCrf zbJqR+?d026?ycVTk2BFZdrZDgiFkHC<}X>_r<%ND{-Kde*}zxM?{7J0YxAyu%eJL2 zUs6zn!IXNjcF1<%0TvY5Hq=%v7Gt?b!C$0k0y2$VSPFZg6Bjs5q2}w-H-Xr*bTs0l z(p2@HsKatf!QzX0Gobb`N;05AT<qDS#3USI{rHDDnR0XRj^DO7H`hiU>6h?%O`sa*UKj(iz@Xd zv5Vl2EFzvXs2J;pI}&omCIY!a5n?$^my7jb0^R4uuY$_@)x05yo63{gm7pDbtv(QF zWC^DFr;>0H7GT3NXeKXGq3Np1wQK;ni4|z+g9@~SzJUNE>5NMK&JR9C!WHQfj(-PX z+Xq^`s6=iKz0}(T^lrBwH?=_ResS94lvs|d!sUPA?!4PL15ih89UT)>PE)p^=U zUVL!sQ(FmYZ`4|dexUeI>L)`Kh$WOet^srVq5*RoY^VXrvc}W+oal<0S+(AKq@A^hs+fTU1M|Y3F{V(3%7ON3QL+LEjHQLX9;CrkbF`@S|5V z&>`DeN9?I{4+>A{L66IOFlAJI@0pcDJ-a- z*t36Enxd)aM@NIEckeRzUEAvgzdL$k_38m1de3?bJIh^vRj#36)ic8ttscs7h2aKcBnju0iH0tV0gpufiVikXc)e|hXec?pi#4F8GfjFKEp2-UM0+}OnmGv^J{a2 z$GArE@kXQH=a~L`?S2T<)DAxwV|u4{_+NV4=wunQ4kvLVuu6$>N+U00UVb=dvHp6< zW^*{1?x4SVQ(zavNvX~}ujnIzB}R}!oq2BUiNLPUA|C8+1j%-NX^bmrPF}2ZCUIuv zGf0At2-klEaN`M#O#eRW7a1}aEH9H_J#rnL6`4+ZzH)nrk4dy8=6JXddju%_D_w`kcaoi>F7n zW+riBH|9x@>IS!vF&tgHn3qB@Nf$V>yq9mQbVDV?m{?e0nyv>Oo=xhA~fyDEJ=ub9vofBB26+#?g8!a zWXvIH$Tk4U!=VI5=8$qv{mvsfA+z^$NcR6tGf$;}QOPFYk2d^EWy5!*>GVbqXB z=cEdYi3+h&D5m&hU0arDM?Af<8!L#50gt#NjAmzJc+s>Y+l_r)NgNFN5seQ0nWpt) zr+5_ebvDQ9Ke@zB#|4J~>V^;~XS3*)61=qtomvSYY+9;C$cBwwWdI!jT|_4^R|7gf z7IS#MT19MSzM;cCS_Q^0}k0HNUwo4T}WdjkOmiW_1Z#v)?6TAme$XK0`PM{ zD`<UF{2vX86+f4YfEf)IV*sLETSg)bun{_jxUJY~7Mu@TJV&r)?yDuqbJDN^jQB)E8&PaM4E_}Mz5LX=s z+D&x@rMC-Mhed2onS|6_2;dZxS5>1>G*vg@J*64;o?$G#QPNJh1{Q4D>(Ae_Ggc^` zn<*4=L!5+5V`wLDzJ@6;VrAvTQPv^!=^Q4ZbD$bkC(~-RrQzLx-%h%_Awr_wq2}Bj zP+PIewFU?Q=)JWT@NlXX{V<KoXOF-fK20F6H@;B;vouAZuvVAJzAS3S`i0J!W zjmDh3(G6T2S~6=)kkEod2fbSa`%VK!fUVR9E*34scB)f89>hb=A-^U~Vcb}$V4z3n zqY@-RkI%241RkZ z0NUM*J`(82EtYJ=dP#^pZ)}P-H5(hofRE`gawLB8TF_4--o;q5?)3(k43)dG9vqEs zMafjQu!^|r;KiLl(W?4c^vRvaSkA+FTOTz8DdBp$8Zo1OF=`RT9oliux(^ z1s%p(stu$K{i3HD99?8QZtqu#ZxCcPbm-?kkQu*z`QUGULn&uVe7w%eUnLbf`-3at z)|zqX@e&PASrVnWyScxWuaQwQRrD80PS5m)tF11`DP5wfXyX+sYO`W?8;FA}T!du& ziDlm`gmANAEErPfo4T7NZ6M2JO1PapKK3<@1BEIWQXJLn`UV5KaM(n)h5<%1^b<>oh#hkR*Qrwaa9P;lBB! zu=CNlA;eS?rP95eMTgYHTShg!y--6ACBXI>3Z?aMYkfV|EOe6uGL8krhDRqO`=f06d@`Eth=3c;tfP+X zG)U`!z@;TulTgB!EfqV!R?&tp?F_SKlj_N69R_gqtAL^FyvV=WfhvwyBp5ofx_Yum zCl#&)`{#N+RCO0FyED5YOEc!vKpbV(F}S|JKmR~e)+mYf0j`dhg>?KeAGW_if(FMH zUT=SI05mF&AsQpmSuneA@9D*o8zrUCz`?2-iHC7Xr6~&z@EpbZ>^%eC?C|qoPby7^ z>$~7O(A39n2Au_)b#z$wX4BEkHOW9oApb4OpW7|U$D*hQ%V{zI08(_L`#IlZ@Z!O- zS^~gE*4aerbxO)pz7I;FimyJ(>l>ypbF)$J(1ohz{sn9nZ1s zx4x}s$D1nRQ?2yWZ?G55q(-NtTnX-)wr_!)=Br#eZWp$Yi83j;d9vLKQVtoSR4l97 zNDK;!@b delta 9254 zcmai330PD|w(bU$O&S^+L^eT|5*JkB3JAC)D5xx=(1-!tkWD~9#jQa^L~Ybm#uAgT z7^5a?++M&eM$9BmzBd}3nIzvtqff*wDk4tuoM$w?b8dAv_jcd+J^Tvptvbtp{&VWw zTh}}9H7Reya7s$BwsBD4kCUc!x6>501}^(1haB2OTiZkFu1R*gcX@h97rXTH=RUpn z_-|D0F^Ty_5og+8>7mq6UuC#bPm`2cO3mpL%Kg@GbIl>cL#JM z4;^}o-gKNu2lfvH(Wn8Vlr-P=tSu}hJ=V_^%=$-z+3m~GtlkdI<~T;P=4j$h-RwQg z@H_^^jX#_}MR!L|qHF94=(N~}pt+lVv+Q6tf*p<_E_9#+nhSFX;lSvea0lAtfXL3z zaJRaRXjd3+I?-XCi!pBeYo~amr&k?w1jLV&U?Wtu0B&kk;Y##%KEdG}?eLpKpE(Z^ zWJW7#y!s1}vvoyAa$Vm?MjSnsfR7Rnj0xCb5U9hRL}JfM(nx>$y9Yu9qO5wr{y@~L zAJ}Y-_TP-$GZ?6eV2|U89WC|t0l@}uG;`h?O@{d3wbm!e4D~HN{nQ6hd;2ya>d*Y} z#%Hh)f_(tG$O~w)wgVR-ZSOwsq@9qNxL}MSKez`qGDxDyCa;-mV3TB6P9D*Pvz{5m zg{kt0ic;cXDbhG}v1yXidRI>egd&W>&=zFKX(ZkhjT8z9(oW}$8E#VH{yUMoSk{w3 zJZa|`qzp3%lsShZ73JYb#i4Kv)c&=H7{hO4(dfK!3_V~i^~{D^ik)O$OT9pY-GJaA zp_V?Fgtc^O5*7>sX;9j;S98b!8W&+683a)jfxiD9Va#lIJh7*E;hBi$cedfHo(NM{#1@0kZ7gvUZ+PM6l6Ra(J{I$l_z#qlaSF4oN~ zoCjQ>p-4q_=?=7bYCpC#-_gx1z8;A0Y+%)JI&yge{jxws8_MixcDNhF**a|)Bq<<0 ziTX?*3-`&>J>Y)%^a{9EuW~!xJ3SsVACR6*Z)7>qg%xh}*bG!WKLZ6@Ym#VpsxwV2 zwPUeb*obYDu&Q(mf>k%cDfUVJLrW?ru+Oz5m_D3oUIV=tgw8B7C`v^2B%Y&kx9Zl@ zs04s@`c%T}2Auz(3*p7n>sczQFSDna9*FXIGKL2z*&hYa7(ZTU zfEkyWzyIw9o;WlyI_qkgSj|!VT@RrxtkJ<(||4-^aV)b_$G+%{%y=aDK zdB@T_5xnwxL@>(;}hcb~Rjdb?-W;s?1+asT|`{=%B8(-M#E z*wh}KDZ*8`t zqhtHgp<2i!-w$^<0-x#5G^y4AATO#9z`+G;!A?MC7=Xcz)0{cltgN8c{wp)bG|96M zfg#f7g`)&V?Mgd74`3Ryu;gsA!?EOAYm#YInaWf~?2I_y4Fd$+*#;6{9wrZm>V^*j-#)xo6Jn!Kw9wv$R1C?9cImMeOOA$|zHvx}&byDGM#<4}cysUsnhiVo z8fs5om{S#M(gp}l^t+QrZO2n;RKT~11#RZ9HSnsorwv~bUbYPZA^-MlxUO#ZgX@;| zc(Z(f@bmUOAFP7KD;L9WD}~POUNg2iUK#MZsynk=si{$aTBrV03BwA8H4GaV{o2)^ z_RsxXsb`htwr?vn?WzjfH`H)DZ;NpIV2kZzSGfJD(RQa&Qf^Igh2K;-c-HHuUXSHp0H;RXZm z-C=mZ7}#Dl-18qJKr}Vo6JMuBHxFmG(>*7_jGxvMl}~6rVGM%d1;e}jgx2RqJ1p;1 z`y!eUSKsFY;Wl8HFG`;t<_p6Q#$Xu!Falr}HV8E@YlGt9U4VI& z2)8Fog7#_Pc2=!$+f^I%6-IfmDG2!(-y8%ZxP5$c$dB(>X-b(Yk|eky@kt3vD?e+# zXNDYN1(77xV!lhy{uW7Ah~|09y7eqMisXsrxt%9^);W#%u=`OY)3eVBS27-{Sn*Wi z!B$Qti6RlM-wkDnM=LgF7KrprCMj|*p2UKS;%Jg)p>I54vGYlw|7$c!wGjCQT;Oqw zi>!)~i5j94qWE_V$+Yl)+pxGPR%WqN&j4JR$SDsUUwFO0C|Sru-#vvSitKWcYkKx{ z3R!9~|BIgGO(g{u^Y``a?o^o+z!kd;g&?yM_L!Ys!C%ub&{v?2_q`QtOl61n-{nWRK+{?JTPB%0?K?&?`cyd)ER z9vE+x&U3#YFI4cv1d=8)#uuT2S`&ykO?)2GG<=pMt=vhko*kJbLnQ3np|d5RxE&}b z!TGt_GTX;^sN{QN`W%_P0YM4+7v{)-hsZ`HN}r!qrf{$j_cS<hiHr2ve3q>+`PMN^&aOCvhbJa-E1Mo@}NmniW? zD9$T+VDVl$DYekNsb~3GvejZ9l!O@lGh||fZ6LV&JVWA+V}!mPvp|B8&x5Bc3rL*> zC-W&{ySl1#l(1HVSge7M+EmM#NWEwcida;_t#8uXz(ERGb{|ByS2vf!2BZ{RDs0-aP5U$8I`7qSQ&D?(M$ui<w5N4E^yF+%j4LKT;gd#@pUjHrg(~k;?a2`M6Z?;XZC6al+X^ilJj9 zOlOpwWGrsY@O-It7y9#12X-`1QqVjN(2a#OwDqZYuu=IW%)&S%UMR4f0>LyZvO_o}z_dU~T4kRw|7zmEZk0$n2Uoy+X770Mfs3_5@xo4_NHCN& ze4$nxsQ%NsJ~OimRjjm>X#U@w5h2+b!*raOo24Y!LKR=`#G5KgqS|Co&0a}}1uZ~e z<4T}KQ^g^6q}b8O#6iruOhOA!KBQCF6>ryYFv2qqzY@Qyab}0oW%|ZlK=#Yqwbs1O zV>X-gg4mWsNr`hFpky}J!fF0if3{|o1c|W%*t;lrTo6p8-t~@5 zvsy+FUS%e(2EnCpnnQ~v4SvEWD|T`<@f7*zX2Jh8ln!VxN)Ko4#WH3gH*42`;7@{J zBHLaK--(~Dkz~d{$U4`{yaqrMdUFGSOKUMj!VDh>`?KpCWYUUn z{u`md-Prk!1U{L!s#(=W2^?Oh20$rK}$t#365HlFBX z1WGs1!Xi-Dfr9f+a<@R6*ez!Pu;Ka5%B78!Mw2l3zUj`+ZIYqu7SPq3aCR^9fTOQt zVy)KisUdSk_05f610Bz}=)bstJdW`aw%poT?_?6l4%U!R5qPc&kJ!!{z(oZiFLtFV z$c~k*lz9puu#UgJ_|TZg7>Sc^kv<)9CV(B@EK@Vk6fbTDZ!6CrB*EJl_E7EX$L7{b zQozB%2b;=T;$vM{Zc9(L2e1x38OGjefWzIZTFD0E;?SZ)w}AMJEh6!;`)qyK@huW@ zUSlBs6N+DKv}MT+B%Xn&AIq+jNecFx4c?am&%kRwjzUT7>sf0ZsTqS-#V54~pfpKw z@yN1{ehRxmj*47~($zdi?zMR<3Ad1b~b5A|lC`u)d2iV$G z5AnpMdkSn_sh7MIav}KE<+2Tg)LKvOzH%E07n$NrLY~f|)F!pBk@(^4k=i$qC2W^T zD-==jb`aML^F^Zxtf!9nvj5&LlTz4zK082kxHiaWc^Cr<{MdmVk{od$0QWl-cPbHy zS86@|*nGW=IBdl#J&4EDn#8j$xWz)Q_PDz9!aG)qd}gOuD#=SDOKc$Py?8SF^@p4) Q{sjN7z-!DO5d`l37e5Q31poj5 diff --git a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip index d80439d20d3dfc8c61a89a6fa850a1ef11f9fc45..1e498310ff71ab88b8ebb08cba4c8d0af95c9705 100644 GIT binary patch delta 12862 zcmaJ{34BdQ7k~HVC9=Mk^<|GNwuH$>?IdaKO43?mOVBDsL{lxVG*Mb=$zl?Nh9oF$ zsKio1MFh1=P_3f253xmvol5(ib0_cR-Q4?q{D$1QXU_6J|2Z>vCi52U66Qm5i0f~m zu~p%p+W3qthtOufci%G>+l-lZDnav1P^mQdwn6SO$R)H7J`Xb36y#h%-dg2g7&_?n zbIm&}Tqks-Wwu&c+}(yYU2aLw$GZqBn%s`CUv+|R`#7OUXad5enh zItH#6W7?Vg((Dwj_BN6 zZDXL}PLtqal@l5`CH~yOnH!6yX-WR{xHAe*k8)Fc80a%+utP_x0NuMjNJWg5O#$I|u^ zYnr;)h8_KyV-RgC{gU-9|*7JA#Pl`OBgu0Y*@< z*9m)5>OpH_pmwd%+Lx`-8sZe|fx5KI+PIi(ai{>`pf(BM&Ac`_=#6K1Tq6lr1O0b! zcUqa{Lw^g0NViWNa#wKB0A{uQOIM&4S6blMjy=jFzBIHgQm1FzKxEY~JDtA~F-=X< zvCzXrOOH6?>q~80;A>yIWmbGrM7IkAy~*uFXh*e2avW}t`F_rgy?0k{H3pOwIVn`6z3*v&qO3c{`bT%;bOd}j zUq(kEU#3SPUvPTGZL5EAr~ZYTwJm5n_f~NjXzR=#Z6jKi0RPZw(Wvs9XjJ)AG{WQ_ zgD~}r!Jl`CKi`f)ZoT`qI4v>t`FnIMJ)PLm-n|-%1`o^1K&SRX@w2^_*Uu>S@I2I) z@_vm)Jq`4BKT&i4QWUj~#fNFJ7xAIrAkqFH^u>R86uaguXt~X(!Hp-IJA`H;M$r-- zE!xB9+Lc8J%cZu>)$YhRO#IX#_XNn?n&FXj(XfU|&a>xZpuY?o(kOcnk^k@{Cu8=w zBvfjeH#{oxPbkya6iPK0d=;ND2!|T~M9?#n+qD`p*hXbh1>-ED6vuKb9SrnLdaS&@ zZ{PYwi9jDRE39ROBMlH^^HHGyR8&eFOG_u7v~*Mh*xV0C15H{Khs5xuaicNk-s#QN zb_N=j6vxZ@8?hHFG4zIvnE_zskBPLf2XK2uZ@P)}^`V(ldor_vK5$$Jm_-$)f_`rt zVvcISeYf$bX7Tv0n9`#Y=E6h7sz|jpkIzz!59y(!{%x(r1?=G@tOv#bwQ8_#gifWR znUgxu{`)+s-;}^cS+X|JxG9Lli7Dt(R7x!M&!}f}3g?ID)*)h7t2V+zke zs>v}PBrvZc1r_x8^dm%J^XFf}L+hC*v4W$Urd5R2G)|hgfi5u21LbyK;QSQH;O#?K zf59=@hbofi%mIBhb4u|6GGSUneA6%y5ouTx5|+G&{)%hMZm}N{q(v0ArmgahvrwrV zY-zywcc3I|Y0O*?&^Ti5XbUG~uVcu|%a0%jKf-?HZDcR&R!&?}>j`{v){l+JKPRqy z(SdNs(!bq=hvgynV~&n3FSOe<@`HyTPTA0Y%2^w_<8;~8{GF$}RZp!d?eMU6LVWnF zn-jwqs=9p~^HaxSC--|jce-7;6nd#Jv6Bqi&W*mGnnXKu(ZuI=`dd`}cwypi7!f$Ck$J9_srRD7ltq0JmzTLj*c@2-+p5_d7`NQtW#HCr0nah z{W0(2yT#Kd4p@BlK=kNb&2NLczN&cpQ182gH9u(!t-O;xYEBM!y5H2N*8nf~vW6X; zvw>=J-lO|Nv^2IIFG5?IVII8Go7S@)m|FP_5~AmVR!E4XMVMinjcQmc7k>p0xr;H~ zm`w8p`-K4&)o)=EmLNm%0ZfJ^p-F!_WvNjDjKFk9F0k;Y^*fJ}Dclj%jl|;%ctZ^| z@%2nPQq(vH)xbhs*4Re0cH?xc%)({XBnmTnA2o(!VW5+;XMvYDvT;gkbNCA*4&XNu znlIG#8k&II3-&dS%mKH#8Bd@5)KGe$%INr{d$|H9oo*#+?z}Rwk$14P6t4URJd0S> z3D^0_xsy#2ydDD9Rk@$bJEyMxv{5JMs$IPpjA;v+V@<;{YPK3<;vnqUJYznf(qch0 zEy1JmjY$Db3-ZA$(1bZaMX;gP@r8Ke240l$&9wroWFyyNaNn#=fjC~TwGk_E<}T~r z4RDB~!*)OoS{;LVy(mmGMa(GxE8c=lsH1L^ z59;tL!ptGpAW%@q7j(a31!X;oXM$*HG5T?*7wTX>D0BAN8bL4fu3vp>%NvV>N?<-$nRXSnQK<(I2h%J9{D&MuFisyrFmRS27{%8> zbo{RpQSiUhk@!Y?SPOvryO}di*+bQ~r?JK1G{3@{-a7IYsPsPS1uEMgT?jvy9c_zy z%&Tt_$I;0ZBUp4b7#MTR2VQ+~tSyKY9ZLhh!jHGpz{7_X3*g0r3VabziH!KZQXDVL znpnq@ZG?EHwGr&-JEvj!py9_`Z2#)CPJ^jG-L-Y)R!Dt}9v1L07OoMR(b{uOLABHG zWr)|AKe6f+*0k7exZsGk7GK6`;(P^d^}2%f;3WRM>?-aczPcKTGC$uyf4bd7ndvuS zlF(Z>N5Id~w;=7b>{dMd>{$h4x93%zw1S5z4{*TK2e_qdSpycg7uK{#P1he|shIer zBR&+ofY7!#f7uBi65imD+i%c@cO4E{Q-@OQydZ!W8>tomPWG8vSOiakg#d!=Eeiq4 z1uKUqYluEDxa!-MG_+CnaZ_qYK!l8b3{GBP$Xun?U9d)<;1I^n+f@tiwt+H&z*T>N0bz0Tf4FkM)U zlMpG)Vgb%VqCf7k#(y#%{#C2WZbU999n#bd*EP;?TLUX_7M1E;Kq+j?vJ!%s|8ru? z!raVU;Im3&U_;yl*ub+gHxZzVZbF8jTJW)6fTo6KQSz}~GpiVazj;`b39n`x(UiQ?`VIknMlW+|ilJMxlYbK*b|701Q<7cC){h7E z-E#Cfp#HZUy`L>SKF-slfAjP8(A=SUK=Xv=1_9uUhL;frDsBYmOyL2C-l zA6h_3#K^#t`#|I$&ma`Z^a_I33|ez&Eugi8)(Tp1Nv2nb4QxW$sy-ozc1Yh4Xst^^ z`nJ)mgYK~EHp{`a!|7+M_;@Fb z?DK%fjeW$&?tMuw*j}=)69{7Jl}PS6!=q)2_%=6%9Q3e&voAJo3F#WBc=9Ev{VX_V z;ERgwUqX7cR6O}YInG4W*;geadKR;kjC4lo#`SMd14L7-^ReMX2PJ4GRRo7m)sdU1G|_(lFa&fFpdT`B2&Qwdr{dOGkLB@YhX!P z5hCQ?ln`5X@;BoP`*w-7mVrOP{Ai!2kU-$?x;zCB zU{FN9q@OEuHn6gM1-@gU)_#=k&dK~ifrQL_D5UXJfkN(~f9P5gZ|1iN*7CJvuB;!@ zK6oAZK&k)KUl3>2Ix<-4<-6<2RI`4QQzzDw*)k4c5yPWB&Y>CKE2Q0`p0gGk6w(g$ zLQMNIcqvnEJn7@g12!rY2OqHS7u)s<$Fpc7879+$XZzXl$s@T)yB`!Fhbp)F2W6Uz z{Oa_h0>5A`Zv0VU5X1|Ed|8h|36D6AP|>#*N(S+MAZ5fRNk306RHUt&6e=4`OwqIV zH%mO@gWN?@`mnz?OVYtD!AqdIytDvaVv2wkPktgXX6fe|!M$EvNP?^%Fc97NZ3`LY z(oh)rAY2IFE+PX070(qLRyjDEVOxvHkN`YKl$}4CFPJR#IC%I@F_|Ls1ZI1AF_|yx zcQdd@B`_#utAy3uL&$siukcd4jZBw~ge5U+8%a0o=Kw-~#CDP@2T`RD`*a8S#Ox*43ol>pAgQu`pv<>Flf|-rO9O}F?VSp6s10oYPO{K! z5cj|u*7Fhtn1B@k)3-8tfN$|G1$pQX*sTB#@U?um0yucGjR%e_YZri%v_}CPD4AlW zG`~nPWn%5TUlhRcHZa${3ULCMqO#+ACCWGmFsN@SS!xz6Hv)7J38vX6L6pA)M8Dcc z7MKk(Aqq6{VI9gOIpr?_R`KO|=}WE=FdVs`r200@8{RKDt7I?slTrVBW^gkA9r9T4 zWLf1TIjpfl{CL}R;mG@VYSG$8rRuK|$*Y2MP}Tamhj36kmVcRevHh!vWk7hx6y<4X zib*s!o|PYv7~`+;pu0m=ow(gVGsEkRp;PPUYFXkzGED{|WCU?i?^?`+izM23TF8ns zhz(0TM0BzdFyF-ygO`XS*wsTMTqXp#h(eDyy1BTG9bOwhehE5%<&+oopyw{h9frS z`+jf05c`aOI+!qYR6U5%WU;XaA!E0X5KoyAAeCrjMwaLin|746HWT7Gg3=*Ix}9?H z;A`5Sg&b1|5Eirk$H350YdDU+r%9Mh2*@N(vL1O{2#;pimKDUAT|TV8kwd}*_(K6Df@#2nk_a)(hnWf zbHi;M1;5~zLztl57S79|Rl{V=FA;n&Bq3jZ6rrK>h@zIu)C!U_)0I+pyJt zO6CN|6*q(^eYqIn8o=6IkQ7=jgoU=@Jil?AM|N;;;D|p^mRuJYnls>)H&5<_d`5Al%6K`CDHACSAl<&0MxR?O~(#9gix=2O@W zkY0a`)!rwuRD0ZxZB!D3Y@9&B5f!3&w)8RFoSd$(V*PF^sD%V4-2`cUC71r|CTS}3 z6hL*j1ws{-qHXamg?)WXQqfF&*@Th{&asFGB#wDj5ieOO!g+T0aI3gh@GFJ|@XKk^ z{p8_4d4S;;99UTuaW+eekv2mI_1X2-+2)Xtj?hL)l9IOkHc640v;>m}W+o>X$BdMW z>2gI39x%0`MOUm?%VP=!6t~;sNh_ys%m<9*^G zOA+qbg0H(gRL-yudvss&#lR7Q2|2wQwEyuKq3lLOp9Hao)d~dg0PWQof0ea{Ost1Y zNY|s~Ec^jUmf?@TP_n0E{z?&l_24&R1bFRbzJF^-)#~=)i$XyES{Xkb>v*OObq=hu zM&e$>a``Mx|CxBj37L?(cak|iB+0V$d4hXSKa1XNBxs^p`gYEptY%IRA@j+P;gYxJ zA?YYf4S?725r7wBfs)H;mQ4W$0T_NEo4W!L||!r&FM-^$<|h8HsGraF@b(*?Jdb$hB{6jq%v zPr>LcC+=o=lsErS!bQi7?%c(?Szsn+-BU-%Hn#R=$y+~Hzx){)U5w$TOmo9lpWN25+If)sI|R?Q;^yT0NKKi~9n{+5?}%veu71dQO5^`g79O z%wKLBOIhvB1G$rnEmUa}ZB!ph{H?^1=9f);5b);97sSWn!eLELZD=4X{+G05-H&Me znbk`LHo)g#$V(7^w95*_16a6MP%z6=hx)O=0^%A0DAj)-8{rr_DE{U8U0ru-R4=1! zRUb)QntRKkK}qy05-AG_Yl82;&3i_|b?L`yOVtF)_%W>DmBOVFtf}Fz!N>3nS7S)Q zjQk5nKRaJ5%p90w*HGNLz$7jrU~jsYbzl?!R=@&svIMSUEhc5_MLVbdR@guHq$FD03AaDm+T62l{M8UNJ0tjs{zT^pK!^ve~2GTO(C%Fc|x4M_{_Al S!vD{}-&L@i7`%kQ(|-Zr$^vo# delta 12954 zcmaJ{2V7J~x8J)9N?$riR{=#Fn#2}Ag~W=9V6dVF#28}*L!u@o3NcZm2(kl?8kAy* zU6g#HMr<)^EQom~w#431vB!euoioEOd%5>L{f%;G&Ybc;|2cE-4ByV%F3o~wpU_ul zVWYu6K88Vg_F;5Tgo~3aJT?q8ahaT#+&;u-Q~t?0Ykd7%pBrHn_9R#8chfv^#)2TJ_X5Qi(rt6ppe0U^_j9wh z1=Wvl&-jwk`>`G9Ld#&uwYk_J>u*F3LhKmqgfeekC@vdYvU5WaHv42ha;(_+YVQ$vuua z0C85$bD~E=(2*o>#5BtrG*&peNMRE7^{s>+#HDyKpxn_w5IdN%FdB*kMc35UqImw4z;S9;cc(g zXabth$pcz3`&A@_CUgMs%X@Y4)}qUKQDeh+RInO+>7K5bm_NJLFE{|9IUUhWqy?FP z8H5GpN3VCQ&!CkJC^_|3SvV-^%!+>}J~S-`$7aW%l8rGKx<@RA9v6#0|HOa38H;3j z?;XC#Fs1&O(%spD%W*v$=e3D}PV9+Oe($-cexa}jr=f1_i)*MGBJO4edNYn|?^}tY zHu3n77Jm*O;u5(JiRe$jkZx>YIk9JNw$`tq*-nkukOxF(OgykuPs?_SykDG$sGM&X zr1d~TVNxd!yd!Dz2GOD+k#yeRh6QXax%B730~;j{!U`BNsZkBq8c;_!dZ5t7-hk`%l@eB@b;G`daDs-1 z0WRG#5?J^Wz$GV#*wM=24FC%-91S#WUIOw!&r-HR+7d=!!hLdrw6+GClAR#xbRF-7 zI*q+;KKvXID*iBv7w4Wa>#xz_2jYYdv*O$@wGB9cDjJIO%~aHmdh89OFmtnb~2%X zZX!f015KEKh#Z}O4t2|jXZ^MV+ma``H!8Ajp!=)ZKjUcGusFsm!we@YdUqn&HcUb^ zF&OCgqst~4<8U$1qmwX>*tA5B$H1#$JwQQUx_6!f+q)C?su!Q4uw#bE$4nD#1jT`J zRT-$D`)6Yihz(O`z(eS?V_0p#S)OIn&fr$mnAP6kbm-b6HFWucIIPHGXXssul(wCY z>y}f++lMZjjv#KCz8f{CfBiK)-2b`~A22SyQ+Q>ftC8?8cJ}8n@LjdlwfcY^5Tq@c zOT80x8jZaTZ9eK(AC1s7!IU%#pbvqSN3wc@tuN&_mvsu@D#ARkZxsl+F z$o+M~zH$3w94lYB*Z2R{|i_Z25n0c|sf`xux{_sWZ z@#(vkc)TYK_37zW(rf*ifp1T_)4lS@pKYd}|ET78>Z*D7Dw4t$ot^L|e3;A7IXg>( zV}f#$d{Z7+%p5e>FE#o=2m9~W^*_Dr^PRDht@dSlw=P{zUzr?z&U$#4JGR|V4Q8uD zC+kuje$GAd)w;1Iv5#ZoMtY=n3eN6)Y2B3ne7Q5oJ1y_H%L^L(ZcN6p6R&pFbuPGB z_x0P?r(awAa>*aIQ|!MyUHU8{p;h?4nvkq_ZpJ?Nu(Mli+6t%p>Ek?(4RJc}@6)r7 z*O)yG?`h5ksxJVn+c?npXiMxt)WK;$&9PnHI{hZ>#N8R)qDFTmcld|aCP z0x@8W8l26_OJD&zi9PKX-Q1`GHihTxI}wl-)`Q$9k2mrXgoTFUnyp)m_&F{?t#2<$ zY2*bzMvRoOXd*=OGJYTgXo@Sc6LpC@qMlf~+! z2)cKoH7zT&Wa%eJTl%~hVbzxeVd2AyYqmvWu?1KXii}BuqKrx6n{yJMI2rym(7DCn zG$_H`pbmJ2*0Wk?NpNO$iQo*5UM|6cH@p<%pHrFvp}s1$=H+SOcB@_-E1b-+Dvrf@ znX6HD{c0px^B;R5$uRw9Te)W=954;*GEJR=sIUpnU5`56toK13-kUL{NJEGa74nVV zzpU{-1j>4pO#{*0W$4H4GSrd0B}0e#9dTetsTY)+yH;3kkdltnbs7L#-siuqc~k}O zzwqW!W#T6QGN3pSqq?^pwV~Zh&XS(}yOmh;7i_@FI%HR+6pX0zbEw10R zZd5k7BXES3VZHk^ury=0O?@ut9`E`H9_(w#g{eMZl}Du!PvF}vb3dZP7aU|{&;ew_ z$peT51_D_4&@sOPK*m+u`Owftgw_G>KTP}lxE&DMjwX~vu=HD`DZPHEJ*e_I>;*cO-#stZrKHjcRY~mC72-t2KL6Ek?Le&Tw@fe;c_i8b9zL#`19R?IwX=gc z58p5D6a@M^(%2HApY6FuLTT;^#DDb(u2}lwlhk8<&>L1$4OA1uQPd`Onftd@Y%EF$#sYT}9`+UgOTcjLrD&J#csheOJg5*Ld_k?RC<8c-jw7R)AlNkQ?8(qT%<4(7koGEX6{q zho+*)LeitI%~ldvVI8db?D5tTZa0vD3RfE;5a=LHhOWu*9U(q@O-zN4S!Sh|8ne5p zfraWN{9aw6mumRucwt)FBK*eZ-)gvkD-QraS*?>4CFQXe&QeM+Qhn5?qu^hyX3y2A zIh6zb-EbEbO8~!J^!}N{@tds^{}!MrG1mC@$m*OW(1qW|H4?m#=Ou@*OgG7jg}F%$ z3rUm#3j-VECc$?F+v~=mJMSjtO0ILpyGzit&~z2!-7UT-16-~jv{(p3Qw~{FOgUuv z-9zZA+ukG(x^i|jfz}k76*OyTHqdOL*;VB1uzwQ@q6@Cuqp1H4duR^O^w1nD{BJnL zZG+))h0dt_RiSgmt3sEUN6^(J*EJu)IGOAEjuSk(mbjwCsuI_VRV8jGP~!7^clbVH z=knc0!Bf`)_X3bVRp9Pt1CNi3+|l57#qQ8Npm{=T2F(kacg1(bK3+RPtT@RR#iEA! zLi2;>4=n&%^NOfpEspI5k$;=DM3FqNme2yB1wjjj)(To{XdxAOUTv)5eV6^vyA8q@ z+NTY)(2CGLZ7o(ncf{Sci@>!*@SiI@ekc+2^oc~=(NkMH{#as1xZ``>2?`JPATK?z zY}`3llvV~CZ5eaQApKdfHY|`DW<$xSxE)S*dxC;Bz4?r8eMnEBB%7T~5IFA?a>p4S zn`H3OWf|lbPaT||u~FHi3uBvz-UX?Y&{xv>=>#3%2#xK_Cf!@%AzGt1Rv{#JSpx(fC=a8=y{niF{X(3GdB$tdd z8x+pi8rb<7xI7(&rj7NmHfb@E=-mmp$zhNgsCWNQlGc9#VUD& z$geDxc`TH@&SR-d$Y=$a&H$$V#kG8a_tPaL#mspz6?igcsZ6B_0VC92r)dr#7`z({eWHQIGJ-AGUt?(TZ8@-&2Rru%w3u!q? zGV2%eK#+KNdt)7nR8}8wc4d)@2QX>h2AI^ZSQfC*=xktni&cp30Khwp$?*^@9$g}1 zv6u?EtSV6<0Q3(oB}2`eH(|TKlw>LTA@>7UkdM^*tNsGNZ>}H%)CS*QNhX^0o18kf zQUwHi14~+^(yyz}sl{rQghO@U38yfqpr-L4QBbqRk1C}A3U8uce7g$*#IhgBV1*XE z6UZN$K1_(bwMGSJsAi5=lvPl9|pkvf} zGD+dQ3poELah@uiuUap2UL+k>C%2clL1s@3dT?f%mwsUn`hn)kIt5HQvyqH2OS;el z{`J~Kk`?^`0ylnYlZtUIyg}kx`fxLeVt*W0*%{%Ghi%Rv`B zD13k=zQ2rotLS$#u!j{e>9Z{|7K?x&<(GehLH$-TSuqp#zT|2c+_aT^YBngKgTbhB zGEjR74QMD`d%Rq7-nx*k%xXBN!J}Jk-(<8e}^m*VH}Jk z?o?3*6_O{)@@HA1OicauXPGk5?`>diyHp|s7`d{eyJX6Q2QVqFk}NO_Qz!#AaDG|t zmSHId0n1st$sDsuCM-Dy9rwsEFvZFT&&UUbMnG@)UNX}cpUi14JM(1E_mbiN_refn z06L_xgK$VsDt zb5_mDtY1)Bsv=%UAKKEU&SR5zl6W|=t2Yt0k(iqhp1~(lVuu^ANAt5RP&=gF1e#fy zWzR}=t?XexHz!d1Eje^5&+_owb{&R zD~UmHlOsHjl?;kAr%5AJb6b6+DQDV=|9SHu@g}8(~XC$Yq5V?8jro)eIZqIyiG21(!Dn zQrj-}EaEsJX2L=%RExCZAe>xi)at~3KdzE^Tp-4z z`zo+Ca+%SVo-JDmL4Q)E9&fV~Abs5=&5xD4Dnft~%Coz8xe3QGg%GfdFRXWqP$&l2 z<;Tuft2ha@l?z_vg7J8K>da=qRiQ#9;9PVP9GqTkN_wP12)1ADTu2GG+i-j>xVPtb z6~(Z0FTp*u_Eh~Gyx?LFegt+Ot|WT1C27p}pu|F}tI{jDCl9gGX!^*iTrl$aDl=y}BfnlF^HUfGsD3yFelB0lO~6^L1AD$k#z;Xj zsDHc`4PPgGF&?n92N#GXTm6SDC?nhDbQ@%%8`k5RdPNYF6YU&g!@9v+r()Y^0zpMQ zuL^a5ZKDo=k#8H*S>~T4)ohY+K@3XlvHB*;!>F2CtTw80C$-`MVN@B(f%99Zr}nF9hw_>L*N2oCf)V60|(v2z4nl1U3PK+3r8 z+5@JPHDkLjsT4n~JeNQ(ImAxhBatlpGI2GlH^R8=mIPKvKiLXd#HY4nt1lDtYG`CM zyL}n-w>gaZ6WHJ@B+x7&LI~>lf-4|&vt9@;S6V8BV8umT1)&wcp{6L7d6jfl2tf*t zq0rDOVJ_zpv1L835l@9AWHjv>NV=Wil6aTGE?*FTbQ=v#+b9 z74&XE@!6+LZu)ZNzBh=68L}dBT!46I(;BWnO57X2g#LP_K?lFlVY_b-SH;rDm*w8u z-};?jGC_KV%hEHGNjFJ`!m=ey+CMEl**IyKY*Ob70u}ucxZSZ6}0Q(AnU7qeg z9+^V0rQwfTvIlonV8@N7;o7b7o(FHh_ei?JTl}4vogIspsOKt?Ro_#|6()FD`-pky zZK;Cc`qw6Jed+Yq_N@B8%v(`^FlkRS-KX$dT1c9Fhd#=jYE`_&-&J@rC1!`w+Z3}* zZ_66T%4%iDOCq&U4m&-7tcB>3Io)U84p3Y)XJ#Z56g%Xd;L&_V^GMNKLY6wtx=k*W>C!h&3GoTfuq(vDCpsU zf-P7;9(-ZLFZ(*+*GPau9L_WGJWN|G#A!f?o$1D%l|3e|X730B61?{ITM|;hs^QW{ z>!%lHil4%7s|3t}Z;U4@+c4}_$xpz_G^Ylq@bj)=%Q7REh~sr0n2BBS#1Zm~t-V>g zR%W>|=_z^NEE?gPje(^+g}1N*9B#?LsSb+EEm$six#g{`bwy{q#HDD+Gg*p+4{&@Q z{|waiVGo~yy4+`^otdA)Bo@7g*Y+2VFVkrAK$MyP5Fg!{gBC3I zZreVr+aZep*7Uhb;^8wi^f`#uZg0Z2B>BeS#h)MB`dsF&Df%-gnt9zGMf5(Uf;Zg9Q({u|UI>XD=wr?u34Ec1Fqp|i;?J62uj?8@w4%6t>}z!n_&5)6jt zx}tnv7WOY3LRP#~SsJjAE~9AE5-!Rk=Bk^rcqz=5jd`UK5a7Sy6-@dh-yQY0U{$YF zK49TJ(Y_9(BH`jUHf#|3>&)`&RGh$jMU=cz(mfVC?n&I-hFe{D54{o`k)T1{cTRU@9{^ll>*0D{Wn z9-wtPAggP6w=Lq3#@_rZ+kQnhupIB{JvSJCwF}#LSx);)1rNgpv%ZHcoD0^5d7AN9 z%*I{B>CWS!j_`<0KKo6JbiZwq$$%A+6YQZCFRMB`&5zVs2@}e6&CGZV*h+sR+XR^imxNev#ZDcqy3# yvS5oa^D{7^7BU%U0n#r%ZIDjMAi=ElW8&;3mXBo