[HUDI-4140] Fixing hive style partitioning and default partition with bulk insert row writer with SimpleKeyGen and virtual keys (#5664)
Bulk insert row writer code path had a gap wrt hive style partitioning and default partition when virtual keys are enabled with SimpleKeyGen. This patch fixes the issue.
This commit is contained in:
committed by
GitHub
parent
4f7ea8c79a
commit
4f6fc726d0
@@ -1064,6 +1064,10 @@ public class HoodieWriteConfig extends HoodieConfig {
|
|||||||
return MarkerType.valueOf(markerType.toUpperCase());
|
return MarkerType.valueOf(markerType.toUpperCase());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isHiveStylePartitioningEnabled() {
|
||||||
|
return getBooleanOrDefault(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE);
|
||||||
|
}
|
||||||
|
|
||||||
public int getMarkersTimelineServerBasedBatchNumThreads() {
|
public int getMarkersTimelineServerBasedBatchNumThreads() {
|
||||||
return getInt(MARKERS_TIMELINE_SERVER_BASED_BATCH_NUM_THREADS);
|
return getInt(MARKERS_TIMELINE_SERVER_BASED_BATCH_NUM_THREADS);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ import org.apache.hudi.client.HoodieInternalWriteStatus;
|
|||||||
import org.apache.hudi.common.config.TypedProperties;
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
import org.apache.hudi.io.storage.row.HoodieRowCreateHandle;
|
import org.apache.hudi.io.storage.row.HoodieRowCreateHandle;
|
||||||
@@ -128,7 +129,11 @@ public class BulkInsertDataInternalWriterHelper {
|
|||||||
if (!keyGeneratorOpt.isPresent()) { // NoPartitionerKeyGen
|
if (!keyGeneratorOpt.isPresent()) { // NoPartitionerKeyGen
|
||||||
partitionPath = "";
|
partitionPath = "";
|
||||||
} else if (simpleKeyGen) { // SimpleKeyGen
|
} else if (simpleKeyGen) { // SimpleKeyGen
|
||||||
partitionPath = (record.get(simplePartitionFieldIndex, simplePartitionFieldDataType)).toString();
|
Object parititionPathValue = record.get(simplePartitionFieldIndex, simplePartitionFieldDataType);
|
||||||
|
partitionPath = parititionPathValue != null ? parititionPathValue.toString() : PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
|
||||||
|
if (writeConfig.isHiveStylePartitioningEnabled()) {
|
||||||
|
partitionPath = (keyGeneratorOpt.get()).getPartitionPathFields().get(0) + "=" + partitionPath;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// only BuiltIn key generators are supported if meta fields are disabled.
|
// only BuiltIn key generators are supported if meta fields are disabled.
|
||||||
partitionPath = keyGeneratorOpt.get().getPartitionPath(record, structType);
|
partitionPath = keyGeneratorOpt.get().getPartitionPath(record, structType);
|
||||||
|
|||||||
@@ -70,6 +70,10 @@ public class HoodieBulkInsertInternalWriterTestBase extends HoodieClientTestHarn
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected HoodieWriteConfig getWriteConfig(boolean populateMetaFields) {
|
protected HoodieWriteConfig getWriteConfig(boolean populateMetaFields) {
|
||||||
|
return getWriteConfig(populateMetaFields, DataSourceWriteOptions.HIVE_STYLE_PARTITIONING().defaultValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected HoodieWriteConfig getWriteConfig(boolean populateMetaFields, String hiveStylePartitioningValue) {
|
||||||
Properties properties = new Properties();
|
Properties properties = new Properties();
|
||||||
if (!populateMetaFields) {
|
if (!populateMetaFields) {
|
||||||
properties.setProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), SimpleKeyGenerator.class.getName());
|
properties.setProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), SimpleKeyGenerator.class.getName());
|
||||||
@@ -77,6 +81,7 @@ public class HoodieBulkInsertInternalWriterTestBase extends HoodieClientTestHarn
|
|||||||
properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), SparkDatasetTestUtils.PARTITION_PATH_FIELD_NAME);
|
properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), SparkDatasetTestUtils.PARTITION_PATH_FIELD_NAME);
|
||||||
properties.setProperty(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false");
|
properties.setProperty(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false");
|
||||||
}
|
}
|
||||||
|
properties.setProperty(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING().key(), hiveStylePartitioningValue);
|
||||||
return getConfigBuilder(basePath, timelineServicePort).withProperties(properties).build();
|
return getConfigBuilder(basePath, timelineServicePort).withProperties(properties).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -18,15 +18,18 @@
|
|||||||
|
|
||||||
package org.apache.hudi.internal;
|
package org.apache.hudi.internal;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.table.HoodieSparkTable;
|
import org.apache.hudi.table.HoodieSparkTable;
|
||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
|
import org.apache.hudi.testutils.SparkDatasetTestUtils;
|
||||||
|
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Row;
|
import org.apache.spark.sql.Row;
|
||||||
import org.apache.spark.sql.catalyst.InternalRow;
|
import org.apache.spark.sql.catalyst.InternalRow;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.params.ParameterizedTest;
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
import org.junit.jupiter.params.provider.Arguments;
|
import org.junit.jupiter.params.provider.Arguments;
|
||||||
@@ -109,6 +112,51 @@ public class TestHoodieBulkInsertDataInternalWriter extends
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDataInternalWriterHiveStylePartitioning() throws Exception {
|
||||||
|
boolean sorted = true;
|
||||||
|
boolean populateMetaFields = false;
|
||||||
|
// init config and table
|
||||||
|
HoodieWriteConfig cfg = getWriteConfig(populateMetaFields, "true");
|
||||||
|
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
|
||||||
|
for (int i = 0; i < 1; i++) {
|
||||||
|
String instantTime = "00" + i;
|
||||||
|
// init writer
|
||||||
|
HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(),
|
||||||
|
STRUCT_TYPE, populateMetaFields, sorted);
|
||||||
|
|
||||||
|
int size = 10 + RANDOM.nextInt(1000);
|
||||||
|
// write N rows to partition1, N rows to partition2 and N rows to partition3 ... Each batch should create a new RowCreateHandle and a new file
|
||||||
|
int batches = 3;
|
||||||
|
Dataset<Row> totalInputRows = null;
|
||||||
|
|
||||||
|
for (int j = 0; j < batches; j++) {
|
||||||
|
String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
|
||||||
|
Dataset<Row> inputRows = getRandomRows(sqlContext, size, partitionPath, false);
|
||||||
|
writeRows(inputRows, writer);
|
||||||
|
if (totalInputRows == null) {
|
||||||
|
totalInputRows = inputRows;
|
||||||
|
} else {
|
||||||
|
totalInputRows = totalInputRows.union(inputRows);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
BaseWriterCommitMessage commitMetadata = (BaseWriterCommitMessage) writer.commit();
|
||||||
|
Option<List<String>> fileAbsPaths = Option.of(new ArrayList<>());
|
||||||
|
Option<List<String>> fileNames = Option.of(new ArrayList<>());
|
||||||
|
|
||||||
|
// verify write statuses
|
||||||
|
assertWriteStatuses(commitMetadata.getWriteStatuses(), batches, size, sorted, fileAbsPaths, fileNames);
|
||||||
|
|
||||||
|
// verify rows
|
||||||
|
Dataset<Row> result = sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0]));
|
||||||
|
assertOutput(totalInputRows, result, instantTime, fileNames, populateMetaFields);
|
||||||
|
|
||||||
|
result.collectAsList().forEach(entry -> Assertions.assertTrue(entry.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString()
|
||||||
|
.contains(SparkDatasetTestUtils.PARTITION_PATH_FIELD_NAME + "=")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Issue some corrupted or wrong schematized InternalRow after few valid InternalRows so that global error is thrown. write batch 1 of valid records write batch2 of invalid records which is expected
|
* Issue some corrupted or wrong schematized InternalRow after few valid InternalRows so that global error is thrown. write batch 1 of valid records write batch2 of invalid records which is expected
|
||||||
* to throw Global Error. Verify global error is set appropriately and only first batch of records are written to disk.
|
* to throw Global Error. Verify global error is set appropriately and only first batch of records are written to disk.
|
||||||
|
|||||||
Reference in New Issue
Block a user