1
0

[HUDI-3290] Different file formats for the partition metadata file. (#5179)

* [HUDI-3290] Different file formats for the partition metadata file.

Partition metadata files are stored in each partition to help identify the base path of a table. These files are saved in the properties file format. Some query engines do not work when non Parquet/ORC files are found in a partition.

Added a new table config 'hoodie.partition.metafile.use.data.format' which when enabled (default false for backward compatibility) ensures that partition metafiles will be saved in the same format as the base files of a dataset.

For new datasets, the config can be set via hudi-cli. Deltastreamer has a new parameter --partition-metafile-use-data-format which will create a table with this setting.

* Code review comments

- Adding a new command to migrate from text to base file formats for meta file.
- Reimplementing readFromFS() to first read the text format, then base format
- Avoid extra exists() checks in readFromFS()
- Added unit tests, enabled parquet format across hoodie-hadoop-mr
- Code cleanup, restructuring, naming consistency.

* Wiring in all the other Spark code paths to respect this config

 - Turned on parquet meta format for COW data source tests
 - Removed the deltastreamer command line to keep it shorter

* populate HoodiePartitionMetadata#format after readFromFS()

Co-authored-by: Vinoth Chandar <vinoth@apache.org>
Co-authored-by: Raymond Xu <2701446+xushiyan@users.noreply.github.com>
This commit is contained in:
Prashant Wason
2022-04-04 08:08:20 -07:00
committed by GitHub
parent 8add740d22
commit b28f0d6ceb
33 changed files with 544 additions and 94 deletions

View File

@@ -60,6 +60,12 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase {
return jsc.parallelize(records1, 1).union(jsc.parallelize(records2, 1));
}
public static JavaRDD<HoodieRecord> generateTestRecordsForBulkInsert(JavaSparkContext jsc, int count) {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
List<HoodieRecord> records = dataGenerator.generateInserts("0", count);
return jsc.parallelize(records, 1);
}
public static Map<String, Long> generateExpectedPartitionNumRecords(JavaRDD<HoodieRecord> records) {
return records.map(record -> record.getPartitionPath()).countByValue();
}

View File

@@ -25,10 +25,13 @@ import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.testutils.Transformations;
@@ -87,6 +90,7 @@ import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResou
import static org.apache.hudi.execution.bulkinsert.TestBulkInsertInternalPartitioner.generateExpectedPartitionNumRecords;
import static org.apache.hudi.execution.bulkinsert.TestBulkInsertInternalPartitioner.generateTestRecordsForBulkInsert;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -498,4 +502,52 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
public void testBulkInsertRecordsWithGlobalSort(String bulkInsertMode) throws Exception {
testBulkInsertRecords(bulkInsertMode);
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testPartitionMetafileFormat(boolean partitionMetafileUseBaseFormat) throws Exception {
// By default there is no format specified for partition metafile
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA).build();
HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable) HoodieSparkTable.create(config, context, metaClient);
assertFalse(table.getPartitionMetafileFormat().isPresent());
if (partitionMetafileUseBaseFormat) {
// Add the setting to use datafile format
Properties properties = new Properties();
properties.setProperty(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(), "true");
initMetaClient(HoodieTableType.COPY_ON_WRITE, properties);
metaClient = HoodieTableMetaClient.reload(metaClient);
assertTrue(metaClient.getTableConfig().getPartitionMetafileFormat().isPresent());
table = (HoodieSparkCopyOnWriteTable) HoodieSparkTable.create(config, context, metaClient);
assertTrue(table.getPartitionMetafileFormat().isPresent());
}
String instantTime = makeNewCommitTime();
SparkRDDWriteClient writeClient = getHoodieWriteClient(config);
writeClient.startCommitWithTime(instantTime);
// Insert new records
final JavaRDD<HoodieRecord> inputRecords = generateTestRecordsForBulkInsert(jsc, 10);
writeClient.bulkInsert(inputRecords, instantTime);
// Partition metafile should be created
Path partitionPath = new Path(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, partitionPath));
Option<Path> metafilePath = HoodiePartitionMetadata.getPartitionMetafilePath(fs, partitionPath);
if (partitionMetafileUseBaseFormat) {
// Extension should be the same as the data file format of the table
assertTrue(metafilePath.get().toString().endsWith(table.getBaseFileFormat().getFileExtension()));
} else {
// No extension as it is in properties file format
assertTrue(metafilePath.get().toString().endsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX));
}
// Validate contents of the partition metafile
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, partitionPath);
partitionMetadata.readFromFS();
assertTrue(partitionMetadata.getPartitionDepth() == 3);
assertTrue(partitionMetadata.readPartitionCreatedCommitTime().get().equals(instantTime));
}
}