From b28f0d6ceb7750075be82b7bd4160a4475801159 Mon Sep 17 00:00:00 2001 From: Prashant Wason Date: Mon, 4 Apr 2022 08:08:20 -0700 Subject: [PATCH] [HUDI-3290] Different file formats for the partition metadata file. (#5179) * [HUDI-3290] Different file formats for the partition metadata file. Partition metadata files are stored in each partition to help identify the base path of a table. These files are saved in the properties file format. Some query engines do not work when non Parquet/ORC files are found in a partition. Added a new table config 'hoodie.partition.metafile.use.data.format' which when enabled (default false for backward compatibility) ensures that partition metafiles will be saved in the same format as the base files of a dataset. For new datasets, the config can be set via hudi-cli. Deltastreamer has a new parameter --partition-metafile-use-data-format which will create a table with this setting. * Code review comments - Adding a new command to migrate from text to base file formats for meta file. - Reimplementing readFromFS() to first read the text format, then base format - Avoid extra exists() checks in readFromFS() - Added unit tests, enabled parquet format across hoodie-hadoop-mr - Code cleanup, restructuring, naming consistency. * Wiring in all the other Spark code paths to respect this config - Turned on parquet meta format for COW data source tests - Removed the deltastreamer command line to keep it shorter * populate HoodiePartitionMetadata#format after readFromFS() Co-authored-by: Vinoth Chandar Co-authored-by: Raymond Xu <2701446+xushiyan@users.noreply.github.com> --- .../hudi/cli/HoodieTableHeaderFields.java | 2 + .../hudi/cli/commands/RepairsCommand.java | 66 +++++- .../apache/hudi/io/HoodieAppendHandle.java | 3 +- .../apache/hudi/io/HoodieCreateHandle.java | 3 +- .../org/apache/hudi/io/HoodieMergeHandle.java | 3 +- .../HoodieBackedTableMetadataWriter.java | 2 +- .../org/apache/hudi/table/HoodieTable.java | 4 + .../row/HoodieRowDataCreateHandle.java | 3 +- .../io/storage/row/HoodieRowCreateHandle.java | 3 +- .../TestBulkInsertInternalPartitioner.java | 6 + .../commit/TestCopyOnWriteActionExecutor.java | 52 +++++ .../hudi/avro/HoodieAvroWriteSupport.java | 17 +- .../org/apache/hudi/common/fs/FSUtils.java | 2 +- .../common/model/HoodiePartitionMetadata.java | 196 +++++++++++++++--- .../hudi/common/table/HoodieTableConfig.java | 16 ++ .../common/table/HoodieTableMetaClient.java | 12 ++ .../hudi/common/util/BaseFileUtils.java | 23 +- .../org/apache/hudi/common/util/OrcUtils.java | 49 +++-- .../apache/hudi/common/util/ParquetUtils.java | 6 + .../FileSystemBackedTableMetadata.java | 2 +- .../model/TestHoodiePartitionMetadata.java | 93 +++++++++ .../common/testutils/FileCreateUtils.java | 4 +- .../testutils/HoodieTestDataGenerator.java | 2 +- .../common/testutils/HoodieTestTable.java | 2 +- .../hudi/common/util/TestTablePathUtils.java | 29 ++- .../hadoop/testutils/InputFormatTestUtil.java | 7 +- .../apache/hudi/HoodieSparkSqlWriter.scala | 13 +- .../hudi/functional/TestCOWDataSource.scala | 3 +- .../hudi/utilities/HoodieSnapshotCopier.java | 4 +- .../utilities/HoodieSnapshotExporter.java | 4 +- .../deltastreamer/BootstrapExecutor.java | 1 + .../utilities/deltastreamer/DeltaSync.java | 4 + .../deltastreamer/HoodieDeltaStreamer.java | 2 +- 33 files changed, 544 insertions(+), 94 deletions(-) create mode 100644 hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodiePartitionMetadata.java diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java index e317d5a4f..a4a8e46df 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java @@ -83,6 +83,8 @@ public class HoodieTableHeaderFields { public static final String HEADER_HOODIE_PROPERTY = "Property"; public static final String HEADER_OLD_VALUE = "Old Value"; public static final String HEADER_NEW_VALUE = "New Value"; + public static final String HEADER_TEXT_METAFILE_PRESENT = "Text Metafile present ?"; + public static final String HEADER_BASE_METAFILE_PRESENT = "Base Metafile present ?"; /** * Fields of Savepoints. diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java index 6c068c898..ac1701915 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java @@ -24,6 +24,7 @@ import org.apache.hudi.cli.HoodiePrintHelper; import org.apache.hudi.cli.HoodieTableHeaderFields; import org.apache.hudi.cli.utils.InputStreamConsumer; import org.apache.hudi.cli.utils.SparkUtil; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.table.HoodieTableConfig; @@ -31,11 +32,13 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CleanerUtils; +import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; import org.apache.avro.AvroRuntimeException; import org.apache.hadoop.fs.Path; import org.apache.hudi.common.util.StringUtils; + import org.apache.log4j.Logger; import org.apache.spark.launcher.SparkLauncher; import org.apache.spark.util.Utils; @@ -133,7 +136,8 @@ public class RepairsCommand implements CommandMarker { row[1] = "No"; if (!dryRun) { HoodiePartitionMetadata partitionMetadata = - new HoodiePartitionMetadata(HoodieCLI.fs, latestCommit, basePath, partitionPath); + new HoodiePartitionMetadata(HoodieCLI.fs, latestCommit, basePath, partitionPath, + client.getTableConfig().getPartitionMetafileFormat()); partitionMetadata.trySave(0); row[2] = "Repaired"; } @@ -199,4 +203,64 @@ public class RepairsCommand implements CommandMarker { } }); } + + @CliCommand(value = "repair migrate-partition-meta", help = "Migrate all partition meta file currently stored in text format " + + "to be stored in base file format. See HoodieTableConfig#PARTITION_METAFILE_USE_DATA_FORMAT.") + public String migratePartitionMeta( + @CliOption(key = {"dryrun"}, help = "dry run without modifying anything.", unspecifiedDefaultValue = "true") final boolean dryRun) + throws IOException { + + HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(HoodieCLI.conf); + HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); + List partitionPaths = FSUtils.getAllPartitionPaths(engineContext, client.getBasePath(), false, false); + Path basePath = new Path(client.getBasePath()); + + String[][] rows = new String[partitionPaths.size()][]; + int ind = 0; + for (String partitionPath : partitionPaths) { + Path partition = FSUtils.getPartitionPath(client.getBasePath(), partitionPath); + Option textFormatFile = HoodiePartitionMetadata.textFormatMetaPathIfExists(HoodieCLI.fs, partition); + Option baseFormatFile = HoodiePartitionMetadata.baseFormatMetaPathIfExists(HoodieCLI.fs, partition); + String latestCommit = client.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp(); + + String[] row = new String[] { + partitionPath, + String.valueOf(textFormatFile.isPresent()), + String.valueOf(baseFormatFile.isPresent()), + textFormatFile.isPresent() ? "MIGRATE" : "NONE" + }; + + if (!dryRun) { + if (!baseFormatFile.isPresent()) { + HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(HoodieCLI.fs, latestCommit, basePath, partition, + Option.of(client.getTableConfig().getBaseFileFormat())); + partitionMetadata.trySave(0); + } + + // delete it, in case we failed midway last time. + textFormatFile.ifPresent(path -> { + try { + HoodieCLI.fs.delete(path, false); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + }); + + row[3] = "MIGRATED"; + } + + rows[ind++] = row; + } + + Properties props = new Properties(); + props.setProperty(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(), "true"); + HoodieTableConfig.update(HoodieCLI.fs, new Path(client.getMetaPath()), props); + + return HoodiePrintHelper.print(new String[] { + HoodieTableHeaderFields.HEADER_PARTITION_PATH, + HoodieTableHeaderFields.HEADER_TEXT_METAFILE_PRESENT, + HoodieTableHeaderFields.HEADER_BASE_METAFILE_PRESENT, + HoodieTableHeaderFields.HEADER_ACTION + }, rows); + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 4ab4be3c0..01c16a57b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -173,7 +173,8 @@ public class HoodieAppendHandle extends try { // Save hoodie partition meta in the partition path HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, baseInstantTime, - new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath)); + new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath), + hoodieTable.getPartitionMetafileFormat()); partitionMetadata.trySave(getPartitionId()); // Since the actual log file written to can be different based on when rollover happens, we use the diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 0bc349125..91a7622bf 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -94,7 +94,8 @@ public class HoodieCreateHandle extends try { HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, instantTime, - new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath)); + new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath), + hoodieTable.getPartitionMetafileFormat()); partitionMetadata.trySave(getPartitionId()); createMarkerFile(partitionPath, FSUtils.makeDataFileName(this.instantTime, this.writeToken, this.fileId, hoodieTable.getBaseFileExtension())); this.fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 7e6f1b36a..567ae63e1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -166,7 +166,8 @@ public class HoodieMergeHandle extends H writeStatus.getStat().setPrevCommit(FSUtils.getCommitTime(latestValidFilePath)); HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, instantTime, - new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath)); + new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath), + hoodieTable.getPartitionMetafileFormat()); partitionMetadata.trySave(getPartitionId()); String newFileName = FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index b64d8ec09..d09fd2cda 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -1074,7 +1074,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta if (!status.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) { this.subDirectories.add(status.getPath()); } - } else if (status.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) { + } else if (status.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) { // Presence of partition meta file implies this is a HUDI partition this.isHoodiePartition = true; } else if (FSUtils.isDataFile(status.getPath())) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 49c289d2b..3d8109e6f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -756,6 +756,10 @@ public abstract class HoodieTable implem return metaClient.getTableConfig().getLogFileFormat(); } + public Option getPartitionMetafileFormat() { + return metaClient.getTableConfig().getPartitionMetafileFormat(); + } + public String getBaseFileExtension() { return getBaseFileFormat().getFileExtension(); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java index bbd9b882d..486a5cc54 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java @@ -94,7 +94,8 @@ public class HoodieRowDataCreateHandle implements Serializable { fs, instantTime, new Path(writeConfig.getBasePath()), - FSUtils.getPartitionPath(writeConfig.getBasePath(), partitionPath)); + FSUtils.getPartitionPath(writeConfig.getBasePath(), partitionPath), + table.getPartitionMetafileFormat()); partitionMetadata.trySave(taskPartitionId); createMarkerFile(partitionPath, FSUtils.makeDataFileName(this.instantTime, getWriteToken(), this.fileId, table.getBaseFileExtension())); this.fileWriter = createNewFileWriter(path, table, writeConfig, rowType); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java index 5cdb2ff68..ce3cd6f09 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java @@ -93,7 +93,8 @@ public class HoodieRowCreateHandle implements Serializable { fs, instantTime, new Path(writeConfig.getBasePath()), - FSUtils.getPartitionPath(writeConfig.getBasePath(), partitionPath)); + FSUtils.getPartitionPath(writeConfig.getBasePath(), partitionPath), + table.getPartitionMetafileFormat()); partitionMetadata.trySave(taskPartitionId); createMarkerFile(partitionPath, FSUtils.makeDataFileName(this.instantTime, getWriteToken(), this.fileId, table.getBaseFileExtension())); this.fileWriter = createNewFileWriter(path, table, writeConfig, structType); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java index 712f40568..4d2f5e0c5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java @@ -60,6 +60,12 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase { return jsc.parallelize(records1, 1).union(jsc.parallelize(records2, 1)); } + public static JavaRDD generateTestRecordsForBulkInsert(JavaSparkContext jsc, int count) { + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + List records = dataGenerator.generateInserts("0", count); + return jsc.parallelize(records, 1); + } + public static Map generateExpectedPartitionNumRecords(JavaRDD records) { return records.map(record -> record.getPartitionPath()).countByValue(); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java index 4ae845636..8114daa30 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java @@ -25,10 +25,13 @@ import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.RawTripTestPayload; import org.apache.hudi.common.testutils.Transformations; @@ -87,6 +90,7 @@ import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResou import static org.apache.hudi.execution.bulkinsert.TestBulkInsertInternalPartitioner.generateExpectedPartitionNumRecords; import static org.apache.hudi.execution.bulkinsert.TestBulkInsertInternalPartitioner.generateTestRecordsForBulkInsert; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -498,4 +502,52 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { public void testBulkInsertRecordsWithGlobalSort(String bulkInsertMode) throws Exception { testBulkInsertRecords(bulkInsertMode); } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testPartitionMetafileFormat(boolean partitionMetafileUseBaseFormat) throws Exception { + // By default there is no format specified for partition metafile + HoodieWriteConfig config = HoodieWriteConfig.newBuilder() + .withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA).build(); + HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable) HoodieSparkTable.create(config, context, metaClient); + assertFalse(table.getPartitionMetafileFormat().isPresent()); + + if (partitionMetafileUseBaseFormat) { + // Add the setting to use datafile format + Properties properties = new Properties(); + properties.setProperty(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(), "true"); + initMetaClient(HoodieTableType.COPY_ON_WRITE, properties); + metaClient = HoodieTableMetaClient.reload(metaClient); + assertTrue(metaClient.getTableConfig().getPartitionMetafileFormat().isPresent()); + table = (HoodieSparkCopyOnWriteTable) HoodieSparkTable.create(config, context, metaClient); + assertTrue(table.getPartitionMetafileFormat().isPresent()); + } + + String instantTime = makeNewCommitTime(); + SparkRDDWriteClient writeClient = getHoodieWriteClient(config); + writeClient.startCommitWithTime(instantTime); + + // Insert new records + final JavaRDD inputRecords = generateTestRecordsForBulkInsert(jsc, 10); + writeClient.bulkInsert(inputRecords, instantTime); + + // Partition metafile should be created + Path partitionPath = new Path(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); + assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, partitionPath)); + Option metafilePath = HoodiePartitionMetadata.getPartitionMetafilePath(fs, partitionPath); + if (partitionMetafileUseBaseFormat) { + // Extension should be the same as the data file format of the table + assertTrue(metafilePath.get().toString().endsWith(table.getBaseFileFormat().getFileExtension())); + } else { + // No extension as it is in properties file format + assertTrue(metafilePath.get().toString().endsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)); + } + + // Validate contents of the partition metafile + HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, partitionPath); + partitionMetadata.readFromFS(); + assertTrue(partitionMetadata.getPartitionDepth() == 3); + assertTrue(partitionMetadata.readPartitionCreatedCommitTime().get().equals(instantTime)); + } + } diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java index 020fcc26b..c3920211a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java @@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.schema.MessageType; import java.util.HashMap; +import java.util.Map; /** * Wrap AvroWriterSupport for plugging in the bloom filter. @@ -36,6 +37,7 @@ public class HoodieAvroWriteSupport extends AvroWriteSupport { private Option bloomFilterOpt; private String minRecordKey; private String maxRecordKey; + private Map footerMetadata = new HashMap<>(); public static final String OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY = "com.uber.hoodie.bloomfilter"; public static final String HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY = "org.apache.hudi.bloomfilter"; @@ -50,18 +52,17 @@ public class HoodieAvroWriteSupport extends AvroWriteSupport { @Override public WriteSupport.FinalizedWriteContext finalizeWrite() { - HashMap extraMetaData = new HashMap<>(); if (bloomFilterOpt.isPresent()) { - extraMetaData.put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilterOpt.get().serializeToString()); + footerMetadata.put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilterOpt.get().serializeToString()); if (minRecordKey != null && maxRecordKey != null) { - extraMetaData.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey); - extraMetaData.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey); + footerMetadata.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey); + footerMetadata.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey); } if (bloomFilterOpt.get().getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) { - extraMetaData.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilterOpt.get().getBloomFilterTypeCode().name()); + footerMetadata.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilterOpt.get().getBloomFilterTypeCode().name()); } } - return new WriteSupport.FinalizedWriteContext(extraMetaData); + return new WriteSupport.FinalizedWriteContext(footerMetadata); } public void add(String recordKey) { @@ -80,4 +81,8 @@ public class HoodieAvroWriteSupport extends AvroWriteSupport { } } } + + public void addFooterMetadata(String key, String value) { + footerMetadata.put(key, value); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index 86bb32049..57fa4acc9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -246,7 +246,7 @@ public class FSUtils { final List partitions = new ArrayList<>(); processFiles(fs, basePathStr, (locatedFileStatus) -> { Path filePath = locatedFileStatus.getPath(); - if (filePath.getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) { + if (filePath.getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) { partitions.add(getRelativePartitionPath(basePath, filePath.getParent())); } return true; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java index 3a19c187f..93e9ea5d3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java @@ -18,27 +18,47 @@ package org.apache.hudi.common.model; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.HoodieAvroWriteSupport; +import org.apache.hudi.common.util.AvroOrcUtils; +import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.avro.Schema; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.orc.OrcFile; +import org.apache.orc.Writer; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Types; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * The metadata that goes into the meta file in each partition. */ public class HoodiePartitionMetadata { - public static final String HOODIE_PARTITION_METAFILE = ".hoodie_partition_metadata"; - public static final String PARTITION_DEPTH_KEY = "partitionDepth"; + public static final String HOODIE_PARTITION_METAFILE_PREFIX = ".hoodie_partition_metadata"; public static final String COMMIT_TIME_KEY = "commitTime"; + private static final String PARTITION_DEPTH_KEY = "partitionDepth"; + private static final Logger LOG = LogManager.getLogger(HoodiePartitionMetadata.class); /** * Contents of the metadata. @@ -52,7 +72,8 @@ public class HoodiePartitionMetadata { private final FileSystem fs; - private static final Logger LOG = LogManager.getLogger(HoodiePartitionMetadata.class); + // The format in which to write the partition metadata + private Option format; /** * Construct metadata from existing partition. @@ -61,13 +82,15 @@ public class HoodiePartitionMetadata { this.fs = fs; this.props = new Properties(); this.partitionPath = partitionPath; + this.format = Option.empty(); } /** * Construct metadata object to be written out. */ - public HoodiePartitionMetadata(FileSystem fs, String instantTime, Path basePath, Path partitionPath) { + public HoodiePartitionMetadata(FileSystem fs, String instantTime, Path basePath, Path partitionPath, Option format) { this(fs, partitionPath); + this.format = format; props.setProperty(COMMIT_TIME_KEY, instantTime); props.setProperty(PARTITION_DEPTH_KEY, String.valueOf(partitionPath.depth() - basePath.depth())); } @@ -83,21 +106,17 @@ public class HoodiePartitionMetadata { * Write the metadata safely into partition atomically. */ public void trySave(int taskPartitionId) { + String extension = getMetafileExtension(); Path tmpMetaPath = - new Path(partitionPath, HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE + "_" + taskPartitionId); - Path metaPath = new Path(partitionPath, HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE); + new Path(partitionPath, HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX + "_" + taskPartitionId + extension); + Path metaPath = new Path(partitionPath, HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX + extension); boolean metafileExists = false; try { metafileExists = fs.exists(metaPath); if (!metafileExists) { // write to temporary file - FSDataOutputStream os = fs.create(tmpMetaPath, true); - props.store(os, "partition metadata"); - os.hsync(); - os.hflush(); - os.close(); - + writeMetafile(tmpMetaPath); // move to actual path fs.rename(tmpMetaPath, metaPath); } @@ -118,22 +137,103 @@ public class HoodiePartitionMetadata { } } + private String getMetafileExtension() { + // To be backwards compatible, there is no extension to the properties file base partition metafile + return format.isPresent() ? format.get().getFileExtension() : StringUtils.EMPTY_STRING; + } + + /** + * Write the partition metadata in the correct format in the given file path. + * + * @param filePath Path of the file to write + * @throws IOException + */ + private void writeMetafile(Path filePath) throws IOException { + if (format.isPresent()) { + Schema schema = HoodieAvroUtils.getRecordKeySchema(); + + switch (format.get()) { + case PARQUET: + // Since we are only interested in saving metadata to the footer, the schema, blocksizes and other + // parameters are not important. + MessageType type = Types.buildMessage().optional(PrimitiveTypeName.INT64).named("dummyint").named("dummy"); + HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(type, schema, Option.empty()); + try (ParquetWriter writer = new ParquetWriter(filePath, writeSupport, CompressionCodecName.UNCOMPRESSED, 1024, 1024)) { + for (String key : props.stringPropertyNames()) { + writeSupport.addFooterMetadata(key, props.getProperty(key)); + } + } + break; + case ORC: + // Since we are only interested in saving metadata to the footer, the schema, blocksizes and other + // parameters are not important. + OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(fs.getConf()).fileSystem(fs) + .setSchema(AvroOrcUtils.createOrcSchema(schema)); + try (Writer writer = OrcFile.createWriter(filePath, writerOptions)) { + for (String key : props.stringPropertyNames()) { + writer.addUserMetadata(key, ByteBuffer.wrap(props.getProperty(key).getBytes())); + } + } + break; + default: + throw new HoodieException("Unsupported format for partition metafiles: " + format.get()); + } + } else { + // Backwards compatible properties file format + FSDataOutputStream os = fs.create(filePath, true); + props.store(os, "partition metadata"); + os.hsync(); + os.hflush(); + os.close(); + } + } + /** * Read out the metadata for this partition. */ public void readFromFS() throws IOException { - FSDataInputStream is = null; - try { - Path metaFile = new Path(partitionPath, HOODIE_PARTITION_METAFILE); - is = fs.open(metaFile); + // first try reading the text format (legacy, currently widespread) + boolean readFile = readTextFormatMetaFile(); + if (!readFile) { + // now try reading the base file formats. + readFile = readBaseFormatMetaFile(); + } + + // throw exception. + if (!readFile) { + throw new HoodieException("Unable to read any partition meta file to locate the table timeline."); + } + } + + private boolean readTextFormatMetaFile() { + // Properties file format + Path metafilePath = textFormatMetaFilePath(partitionPath); + try (FSDataInputStream is = fs.open(metafilePath)) { props.load(is); - } catch (IOException ioe) { - throw new HoodieException("Error reading Hoodie partition metadata for " + partitionPath, ioe); - } finally { - if (is != null) { - is.close(); + format = Option.empty(); + return true; + } catch (Throwable t) { + LOG.warn("Unable to read partition meta properties file for partition " + partitionPath, t); + return false; + } + } + + private boolean readBaseFormatMetaFile() { + for (Path metafilePath : baseFormatMetaFilePaths(partitionPath)) { + try { + BaseFileUtils reader = BaseFileUtils.getInstance(metafilePath.toString()); + // Data file format + Map metadata = reader.readFooter(fs.getConf(), true, metafilePath, PARTITION_DEPTH_KEY, COMMIT_TIME_KEY); + props.clear(); + props.putAll(metadata); + format = Option.of(reader.getFormat()); + return true; + } catch (Throwable t) { + // any error, log, check the next base format + LOG.warn("Unable to read partition metadata " + metafilePath.getName() + " for partition " + partitionPath, t); } } + return false; } /** @@ -141,12 +241,10 @@ public class HoodiePartitionMetadata { */ public Option readPartitionCreatedCommitTime() { try { - if (props.containsKey(COMMIT_TIME_KEY)) { - return Option.of(props.getProperty(COMMIT_TIME_KEY)); - } else { + if (!props.containsKey(COMMIT_TIME_KEY)) { readFromFS(); - return Option.of(props.getProperty(COMMIT_TIME_KEY)); } + return Option.of(props.getProperty(COMMIT_TIME_KEY)); } catch (IOException ioe) { LOG.warn("Error fetch Hoodie partition metadata for " + partitionPath, ioe); return Option.empty(); @@ -156,9 +254,55 @@ public class HoodiePartitionMetadata { // methods related to partition meta data public static boolean hasPartitionMetadata(FileSystem fs, Path partitionPath) { try { - return fs.exists(new Path(partitionPath, HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)); + return textFormatMetaPathIfExists(fs, partitionPath).isPresent() + || baseFormatMetaPathIfExists(fs, partitionPath).isPresent(); + } catch (IOException ioe) { + throw new HoodieIOException("Error checking presence of partition meta file for " + partitionPath, ioe); + } + } + + /** + * Returns the name of the partition metadata. + * + * @return Name of the partition metafile or empty option + */ + public static Option getPartitionMetafilePath(FileSystem fs, Path partitionPath) { + // The partition listing is a costly operation so instead we are searching for existence of the files instead. + // This is in expected order as properties file based partition metafiles should be the most common. + try { + Option textFormatPath = textFormatMetaPathIfExists(fs, partitionPath); + if (textFormatPath.isPresent()) { + return textFormatPath; + } else { + return baseFormatMetaPathIfExists(fs, partitionPath); + } } catch (IOException ioe) { throw new HoodieException("Error checking Hoodie partition metadata for " + partitionPath, ioe); } } + + public static Option baseFormatMetaPathIfExists(FileSystem fs, Path partitionPath) throws IOException { + // Parquet should be more common than ORC so check it first + for (Path metafilePath : baseFormatMetaFilePaths(partitionPath)) { + if (fs.exists(metafilePath)) { + return Option.of(metafilePath); + } + } + return Option.empty(); + } + + public static Option textFormatMetaPathIfExists(FileSystem fs, Path partitionPath) throws IOException { + Path path = textFormatMetaFilePath(partitionPath); + return Option.ofNullable(fs.exists(path) ? path : null); + } + + static Path textFormatMetaFilePath(Path partitionPath) { + return new Path(partitionPath, HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX); + } + + static List baseFormatMetaFilePaths(Path partitionPath) { + return Stream.of(HoodieFileFormat.PARQUET.getFileExtension(), HoodieFileFormat.ORC.getFileExtension()) + .map(ext -> new Path(partitionPath, HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX + ext)) + .collect(Collectors.toList()); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index cfb0df3b8..bfcec84ce 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -190,6 +190,12 @@ public class HoodieTableConfig extends HoodieConfig { .defaultValue(HoodieTimelineTimeZone.LOCAL) .withDocumentation("User can set hoodie commit timeline timezone, such as utc, local and so on. local is default"); + public static final ConfigProperty PARTITION_METAFILE_USE_BASE_FORMAT = ConfigProperty + .key("hoodie.partition.metafile.use.base.format") + .defaultValue(false) + .withDocumentation("If true, partition metafiles are saved in the same format as basefiles for this dataset (e.g. Parquet / ORC). " + + "If false (default) partition metafiles are saved as properties files."); + public static final ConfigProperty URL_ENCODE_PARTITIONING = KeyGeneratorOptions.URL_ENCODE_PARTITIONING; public static final ConfigProperty HIVE_STYLE_PARTITIONING_ENABLE = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE; @@ -608,6 +614,16 @@ public class HoodieTableConfig extends HoodieConfig { return getStringOrDefault(TABLE_METADATA_PARTITIONS, StringUtils.EMPTY_STRING); } + /** + * Returns the format to use for partition meta files. + */ + public Option getPartitionMetafileFormat() { + if (getBooleanOrDefault(PARTITION_METAFILE_USE_BASE_FORMAT)) { + return Option.of(getBaseFileFormat()); + } + return Option.empty(); + } + public Map propsMap() { return props.entrySet().stream() .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue()))); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 847244d7c..4e04ad9db 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -699,6 +699,7 @@ public class HoodieTableMetaClient implements Serializable { private Boolean hiveStylePartitioningEnable; private Boolean urlEncodePartitioning; private HoodieTimelineTimeZone commitTimeZone; + private Boolean partitionMetafileUseBaseFormat; /** * Persist the configs that is written at the first time, and should not be changed. @@ -813,6 +814,11 @@ public class HoodieTableMetaClient implements Serializable { return this; } + public PropertyBuilder setPartitionMetafileUseBaseFormat(Boolean useBaseFormat) { + this.partitionMetafileUseBaseFormat = useBaseFormat; + return this; + } + public PropertyBuilder set(String key, Object value) { if (HoodieTableConfig.PERSISTED_CONFIG_LIST.contains(key)) { this.others.put(key, value); @@ -908,6 +914,9 @@ public class HoodieTableMetaClient implements Serializable { if (hoodieConfig.contains(HoodieTableConfig.URL_ENCODE_PARTITIONING)) { setUrlEncodePartitioning(hoodieConfig.getBoolean(HoodieTableConfig.URL_ENCODE_PARTITIONING)); } + if (hoodieConfig.contains(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT)) { + setPartitionMetafileUseBaseFormat(hoodieConfig.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT)); + } return this; } @@ -986,6 +995,9 @@ public class HoodieTableMetaClient implements Serializable { if (null != commitTimeZone) { tableConfig.setValue(HoodieTableConfig.TIMELINE_TIMEZONE, commitTimeZone.toString()); } + if (null != partitionMetafileUseBaseFormat) { + tableConfig.setValue(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT, partitionMetafileUseBaseFormat.toString()); + } return tableConfig.getProps(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java index 7ec6110d7..d6391d178 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java @@ -18,14 +18,6 @@ package org.apache.hudi.common.util; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; @@ -36,6 +28,16 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.keygen.BaseKeyGenerator; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + public abstract class BaseFileUtils { public static BaseFileUtils getInstance(String path) { @@ -204,4 +206,9 @@ public abstract class BaseFileUtils { * @return The Avro schema of the data file */ public abstract Schema readAvroSchema(Configuration configuration, Path filePath); + + /** + * @return The subclass's {@link HoodieFileFormat}. + */ + public abstract HoodieFileFormat getFormat(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java index 88c28d752..0cc405919 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java @@ -18,6 +18,29 @@ package org.apache.hudi.common.util; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.MetadataNotFoundException; +import org.apache.hudi.keygen.BaseKeyGenerator; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.orc.OrcFile; +import org.apache.orc.OrcProto.UserMetadataItem; +import org.apache.orc.Reader; +import org.apache.orc.Reader.Options; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; +import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -28,27 +51,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; -import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.exception.MetadataNotFoundException; -import org.apache.hudi.keygen.BaseKeyGenerator; - -import org.apache.orc.OrcFile; -import org.apache.orc.OrcProto.UserMetadataItem; -import org.apache.orc.Reader; -import org.apache.orc.Reader.Options; -import org.apache.orc.RecordReader; -import org.apache.orc.TypeDescription; /** * Utility functions for ORC files. @@ -248,6 +250,11 @@ public class OrcUtils extends BaseFileUtils { } } + @Override + public HoodieFileFormat getFormat() { + return HoodieFileFormat.ORC; + } + @Override public long getRowCount(Configuration conf, Path orcFilePath) { try (Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf))) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index c0f7aabde..c779a3269 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -21,6 +21,7 @@ package org.apache.hudi.common.util; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieColumnRangeMetadata; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.exception.HoodieIOException; @@ -228,6 +229,11 @@ public class ParquetUtils extends BaseFileUtils { return new AvroSchemaConverter(configuration).convert(parquetSchema); } + @Override + public HoodieFileFormat getFormat() { + return HoodieFileFormat.PARQUET; + } + /** * NOTE: This literally reads the entire file contents, thus should be used with caution. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java index 1bb18bad1..b4a76a728 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java @@ -96,7 +96,7 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata { } else if (!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) { pathsToList.add(fileStatus.getPath()); } - } else if (fileStatus.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) { + } else if (fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) { String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath().getParent()); partitionPaths.add(partitionName); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodiePartitionMetadata.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodiePartitionMetadata.java new file mode 100644 index 000000000..3ec15d4f6 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodiePartitionMetadata.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.util.Arrays; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestHoodiePartitionMetadata extends HoodieCommonTestHarness { + + FileSystem fs; + + @BeforeEach + public void setupTest() throws IOException { + initMetaClient(); + fs = metaClient.getFs(); + } + + static Stream formatProviderFn() { + return Stream.of( + Arguments.arguments(Option.empty()), + Arguments.arguments(Option.of(HoodieFileFormat.PARQUET)), + Arguments.arguments(Option.of(HoodieFileFormat.ORC)) + ); + } + + @ParameterizedTest + @MethodSource("formatProviderFn") + public void testTextFormatMetaFile(Option format) throws IOException { + // given + final Path partitionPath = new Path(basePath, "a/b/" + + format.map(Enum::name).orElse("text")); + fs.mkdirs(partitionPath); + final String commitTime = "000000000001"; + HoodiePartitionMetadata writtenMetadata = new HoodiePartitionMetadata(metaClient.getFs(), commitTime, new Path(basePath), partitionPath, format); + writtenMetadata.trySave(0); + + // when + HoodiePartitionMetadata readMetadata = new HoodiePartitionMetadata(metaClient.getFs(), new Path(metaClient.getBasePath(), partitionPath)); + + // then + assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, partitionPath)); + assertEquals(Option.of(commitTime), readMetadata.readPartitionCreatedCommitTime()); + assertEquals(3, readMetadata.getPartitionDepth()); + } + + @Test + public void testErrorIfAbsent() throws IOException { + final Path partitionPath = new Path(basePath, "a/b/not-a-partition"); + fs.mkdirs(partitionPath); + HoodiePartitionMetadata readMetadata = new HoodiePartitionMetadata(metaClient.getFs(), new Path(metaClient.getBasePath(), partitionPath)); + assertThrows(HoodieException.class, readMetadata::readPartitionCreatedCommitTime); + } + + @Test + public void testFileNames() { + assertEquals(new Path("/a/b/c/.hoodie_partition_metadata"), HoodiePartitionMetadata.textFormatMetaFilePath(new Path("/a/b/c"))); + assertEquals(Arrays.asList(new Path("/a/b/c/.hoodie_partition_metadata.parquet"), + new Path("/a/b/c/.hoodie_partition_metadata.orc")), HoodiePartitionMetadata.baseFormatMetaFilePaths(new Path("/a/b/c"))); + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index c80c5e28f..06f0ac49b 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -280,7 +280,7 @@ public class FileCreateUtils { public static void createPartitionMetaFile(String basePath, String partitionPath) throws IOException { Path parentPath = Paths.get(basePath, partitionPath); Files.createDirectories(parentPath); - Path metaFilePath = parentPath.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE); + Path metaFilePath = parentPath.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX); if (Files.notExists(metaFilePath)) { Files.createFile(metaFilePath); } @@ -397,7 +397,7 @@ public class FileCreateUtils { } return Files.list(basePath).filter(entry -> (!entry.getFileName().toString().equals(HoodieTableMetaClient.METAFOLDER_NAME) && !entry.getFileName().toString().contains("parquet") && !entry.getFileName().toString().contains("log")) - && !entry.getFileName().toString().endsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)).collect(Collectors.toList()); + && !entry.getFileName().toString().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)).collect(Collectors.toList()); } /** diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java index 3e147b7fd..cb4f55707 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java @@ -205,7 +205,7 @@ public class HoodieTestDataGenerator implements AutoCloseable { */ public void writePartitionMetadata(FileSystem fs, String[] partitionPaths, String basePath) { for (String partitionPath : partitionPaths) { - new HoodiePartitionMetadata(fs, "000", new Path(basePath), new Path(basePath, partitionPath)).trySave(0); + new HoodiePartitionMetadata(fs, "000", new Path(basePath), new Path(basePath, partitionPath), Option.empty()).trySave(0); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index 881197eca..f0aae0a69 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -680,7 +680,7 @@ public class HoodieTestTable { boolean toReturn = true; String filePath = entry.getPath().toString(); String fileName = entry.getPath().getName(); - if (fileName.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE) || (!fileName.contains("log") && !fileName.contains("parquet")) + if (fileName.startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX) || (!fileName.contains("log") && !fileName.contains("parquet")) || filePath.contains("metadata")) { toReturn = false; } else { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java index 056f2121c..eae1cdce8 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java @@ -17,6 +17,7 @@ package org.apache.hudi.common.util; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -24,9 +25,10 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.io.File; import java.io.IOException; @@ -41,7 +43,7 @@ public final class TestTablePathUtils { private static final String BASE_FILE_EXTENSION = HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension(); @TempDir - static File tempDir; + public File tempDir; private static FileSystem fs; private static Path tablePath; private static Path partitionPath1; @@ -49,9 +51,12 @@ public final class TestTablePathUtils { private static Path filePath1; private static Path filePath2; - @BeforeAll - static void setup() throws IOException { - URI tablePathURI = Paths.get(tempDir.getAbsolutePath(),"test_table").toUri(); + private void setup() throws IOException { + setup(Option.empty()); + } + + private void setup(Option partitionMetafileFormat) throws IOException { + URI tablePathURI = Paths.get(tempDir.getAbsolutePath(), "test_table").toUri(); tablePath = new Path(tablePathURI); fs = tablePath.getFileSystem(new Configuration()); @@ -69,10 +74,10 @@ public final class TestTablePathUtils { assertTrue(new File(partitionPathURI2).mkdirs()); HoodiePartitionMetadata partitionMetadata1 = new HoodiePartitionMetadata(fs, Instant.now().toString(), tablePath, - partitionPath1); + partitionPath1, partitionMetafileFormat); partitionMetadata1.trySave(1); HoodiePartitionMetadata partitionMetadata2 = new HoodiePartitionMetadata(fs, Instant.now().toString(), tablePath, - partitionPath2); + partitionPath2, partitionMetafileFormat); partitionMetadata2.trySave(2); // Create files @@ -87,12 +92,14 @@ public final class TestTablePathUtils { @Test void getTablePathFromTablePath() throws IOException { + setup(); Option inferredTablePath = TablePathUtils.getTablePath(fs, tablePath); assertEquals(tablePath, inferredTablePath.get()); } @Test void getTablePathFromMetadataFolderPath() throws IOException { + setup(); Path metaFolder = new Path(tablePath, HoodieTableMetaClient.METAFOLDER_NAME); Option inferredTablePath = TablePathUtils.getTablePath(fs, metaFolder); assertEquals(tablePath, inferredTablePath.get()); @@ -100,6 +107,7 @@ public final class TestTablePathUtils { @Test void getTablePathFromMetadataSubFolderPath() throws IOException { + setup(); Path auxFolder = new Path(tablePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME); assertEquals(tablePath, TablePathUtils.getTablePath(fs, auxFolder).get()); @@ -117,8 +125,10 @@ public final class TestTablePathUtils { assertEquals(metadataTableFolder, TablePathUtils.getTablePath(fs, metadataTablePartitionFolder).get()); } - @Test - void getTablePathFromPartitionFolderPath() throws IOException { + @ParameterizedTest + @EnumSource(value = HoodieFileFormat.class, names = {"PARQUET", "ORC"}) + void getTablePathFromPartitionFolderPath(HoodieFileFormat partitionMetafileFormat) throws IOException { + setup(Option.of(partitionMetafileFormat)); Option inferredTablePath = TablePathUtils.getTablePath(fs, partitionPath1); assertEquals(tablePath, inferredTablePath.get()); @@ -128,6 +138,7 @@ public final class TestTablePathUtils { @Test void getTablePathFromFilePath() throws IOException { + setup(); Option inferredTablePath = TablePathUtils.getTablePath(fs, filePath1); assertEquals(tablePath, inferredTablePath.get()); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index 755793871..ccd85d382 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -34,6 +34,7 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.SchemaTestUtil; +import org.apache.hudi.common.util.Option; import org.apache.hudi.hadoop.utils.HoodieHiveUtils; import org.apache.avro.Schema; @@ -185,7 +186,7 @@ public class InputFormatTestUtil { public static void setupSnapshotScanMode(JobConf jobConf) { setupSnapshotScanMode(jobConf, false); } - + private static void setupSnapshotScanMode(JobConf jobConf, boolean includePending) { setUpScanMode(jobConf); String includePendingCommitsName = @@ -467,8 +468,8 @@ public class InputFormatTestUtil { new LocalFileSystem(lfs), "0", new Path(basePath.toAbsolutePath().toString()), - new Path(partitionPath.toAbsolutePath().toString()) - ); + new Path(partitionPath.toAbsolutePath().toString()), + Option.of(HoodieFileFormat.PARQUET)); partitionMetadata.trySave((int) (Math.random() * 1000)); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 6bccf9d7f..7dbc9997c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -141,6 +141,7 @@ object HoodieSparkSqlWriter { val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER) val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD) val populateMetaFields = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS) + val useBaseFormatMetaFile = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT); val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder() .setTableType(tableType) @@ -158,6 +159,7 @@ object HoodieSparkSqlWriter { .set(timestampKeyGeneratorConfigs) .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)) .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING)) + .setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile) .setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))) .initTable(sparkContext.hadoopConfiguration, path) tableConfig = tableMetaClient.getTableConfig @@ -437,9 +439,15 @@ object HoodieSparkSqlWriter { val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters) val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD) val keyGenProp = hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME) - val populateMetaFields = java.lang.Boolean.parseBoolean((parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(), - String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())))) + val populateMetaFields = java.lang.Boolean.parseBoolean(parameters.getOrElse( + HoodieTableConfig.POPULATE_META_FIELDS.key(), + String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()) + )) val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT) + val useBaseFormatMetaFile = java.lang.Boolean.parseBoolean(parameters.getOrElse( + HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(), + String.valueOf(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()) + )) HoodieTableMetaClient.withPropertyBuilder() .setTableType(HoodieTableType.valueOf(tableType)) @@ -457,6 +465,7 @@ object HoodieSparkSqlWriter { .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)) .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING)) .setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))) + .setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile) .initTable(sparkContext.hadoopConfiguration, path) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 0776a3116..8c9e9daf8 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -21,7 +21,7 @@ import org.apache.hadoop.fs.FileSystem import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.timeline.HoodieInstant -import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, recordsToStrings} import org.apache.hudi.config.HoodieWriteConfig @@ -56,6 +56,7 @@ class TestCOWDataSource extends HoodieClientTestBase { "hoodie.upsert.shuffle.parallelism" -> "4", "hoodie.bulkinsert.shuffle.parallelism" -> "2", "hoodie.delete.shuffle.parallelism" -> "1", + HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key() -> "true", DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java index 43e58d531..a2717a356 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java @@ -117,8 +117,8 @@ public class HoodieSnapshotCopier implements Serializable { dataFiles.forEach(hoodieDataFile -> filePaths.add(new Tuple2<>(partition, hoodieDataFile.getPath()))); // also need to copy over partition metadata - Path partitionMetaFile = - new Path(FSUtils.getPartitionPath(baseDir, partition), HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE); + Path partitionMetaFile = HoodiePartitionMetadata.getPartitionMetafilePath(fs1, + FSUtils.getPartitionPath(baseDir, partition)).get(); if (fs1.exists(partitionMetaFile)) { filePaths.add(new Tuple2<>(partition, partitionMetaFile.toString())); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java index c2cfa390d..255393b23 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java @@ -206,9 +206,9 @@ public class HoodieSnapshotExporter { Stream dataFiles = fsView.getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp); dataFiles.forEach(hoodieDataFile -> filePaths.add(new Tuple2<>(partition, hoodieDataFile.getPath()))); // also need to copy over partition metadata - Path partitionMetaFile = - new Path(FSUtils.getPartitionPath(cfg.sourceBasePath, partition), HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE); FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy()); + Path partitionMetaFile = HoodiePartitionMetadata.getPartitionMetafilePath(fs, + FSUtils.getPartitionPath(cfg.sourceBasePath, partition)).get(); if (fs.exists(partitionMetaFile)) { filePaths.add(new Tuple2<>(partition, partitionMetaFile.toString())); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java index 84b793376..7e605dbd3 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java @@ -185,6 +185,7 @@ public class BootstrapExecutor implements Serializable { } } HoodieTableMetaClient.withPropertyBuilder() + .fromProperties(props) .setTableType(cfg.tableType) .setTableName(cfg.targetTableName) .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue()) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 50338e551..0e57bd379 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -278,6 +278,8 @@ public class DeltaSync implements Serializable { .setKeyGeneratorClassProp(props.getProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), SimpleKeyGenerator.class.getName())) .setPreCombineField(cfg.sourceOrderingField) + .setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(), + HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue())) .initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath); } @@ -371,6 +373,8 @@ public class DeltaSync implements Serializable { HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())) .setKeyGeneratorClassProp(props.getProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), SimpleKeyGenerator.class.getName())) + .setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(), + HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue())) .initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 4b0f14835..56124b82a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -468,7 +468,7 @@ public class HoodieDeltaStreamer implements Serializable { compactSchedulingMinShare, clusterSchedulingMinShare, forceDisableCompaction, checkpoint, initialCheckpointProvider, help); } - + @Override public String toString() { return "Config{"