From de94787a85b272f79181dff73907b0f20855ee78 Mon Sep 17 00:00:00 2001 From: zhangyue19921010 <69956021+zhangyue19921010@users.noreply.github.com> Date: Tue, 24 Aug 2021 21:45:17 +0800 Subject: [PATCH] [HUDI-2345] Hoodie columns sort partitioner for bulk insert (#3523) Co-authored-by: yuezhang --- .../apache/hudi/config/HoodieWriteConfig.java | 18 ++++++- .../RDDCustomColumnsSortPartitioner.java | 10 ++++ .../TestBulkInsertInternalPartitioner.java | 18 ++++++- .../java/org/apache/hudi/DataSourceUtils.java | 4 +- .../org/apache/hudi/TestDataSourceUtils.java | 54 +++++++++++++------ 5 files changed, 83 insertions(+), 21 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index d16d417b9..04660fc44 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -158,11 +158,18 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("For large initial imports using bulk_insert operation, controls the parallelism to use for sort modes or custom partitioning done" + "before writing records to the table."); + public static final ConfigProperty BULKINSERT_USER_DEFINED_PARTITIONER_SORT_COLUMNS = ConfigProperty + .key("hoodie.bulkinsert.user.defined.partitioner.sort.columns") + .noDefaultValue() + .withDocumentation("Columns to sort the data by when use org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner as user defined partitioner during bulk_insert. " + + "For example 'column1,column2'"); + public static final ConfigProperty BULKINSERT_USER_DEFINED_PARTITIONER_CLASS_NAME = ConfigProperty .key("hoodie.bulkinsert.user.defined.partitioner.class") .noDefaultValue() .withDocumentation("If specified, this class will be used to re-partition records before they are bulk inserted. This can be used to sort, pack, cluster data" - + " optimally for common query patterns."); + + " optimally for common query patterns. For now we support a build-in user defined bulkinsert partitioner org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner" + + " which can does sorting based on specified column values set by " + BULKINSERT_USER_DEFINED_PARTITIONER_SORT_COLUMNS.key()); public static final ConfigProperty UPSERT_PARALLELISM_VALUE = ConfigProperty .key("hoodie.upsert.shuffle.parallelism") @@ -894,6 +901,10 @@ public class HoodieWriteConfig extends HoodieConfig { return getString(BULKINSERT_USER_DEFINED_PARTITIONER_CLASS_NAME); } + public String getUserDefinedBulkInsertPartitionerSortColumns() { + return getString(BULKINSERT_USER_DEFINED_PARTITIONER_SORT_COLUMNS); + } + public int getInsertShuffleParallelism() { return getInt(INSERT_PARALLELISM_VALUE); } @@ -1832,6 +1843,11 @@ public class HoodieWriteConfig extends HoodieConfig { return this; } + public Builder withUserDefinedBulkInsertPartitionerSortColumns(String columns) { + writeConfig.setValue(BULKINSERT_USER_DEFINED_PARTITIONER_SORT_COLUMNS, columns); + return this; + } + public Builder withDeleteParallelism(int parallelism) { writeConfig.setValue(DELETE_PARALLELISM_VALUE, String.valueOf(parallelism)); return this; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java index 209531dd2..fb3c5ec0d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java @@ -24,6 +24,7 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.config.SerializableSchema; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.spark.api.java.JavaRDD; @@ -41,6 +42,11 @@ public class RDDCustomColumnsSortPartitioner private final String[] sortColumnNames; private final SerializableSchema serializableSchema; + public RDDCustomColumnsSortPartitioner(HoodieWriteConfig config) { + this.serializableSchema = new SerializableSchema(new Schema.Parser().parse(config.getSchema())); + this.sortColumnNames = getSortColumnName(config); + } + public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema) { this.sortColumnNames = columnNames; this.serializableSchema = new SerializableSchema(schema); @@ -79,4 +85,8 @@ public class RDDCustomColumnsSortPartitioner throw new HoodieIOException("Unable to read record with key:" + record.getKey(), e); } } + + private String[] getSortColumnName(HoodieWriteConfig config) { + return config.getUserDefinedBulkInsertPartitionerSortColumns().split(","); + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java index 81effaa2a..f4c5622d6 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.testutils.HoodieClientTestBase; @@ -43,6 +44,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Stream; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; import static org.junit.jupiter.api.Assertions.assertEquals; public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase { @@ -139,7 +141,8 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase { @Test public void testCustomColumnSortPartitioner() throws Exception { - String[] sortColumns = new String[] {"rider"}; + String sortColumnString = "rider"; + String[] sortColumns = sortColumnString.split(","); Comparator> columnComparator = getCustomColumnComparator(HoodieTestDataGenerator.AVRO_SCHEMA, sortColumns); JavaRDD records1 = generateTestRecordsForBulkInsert(jsc); @@ -148,6 +151,19 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase { records1, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator)); testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA), records2, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator)); + + HoodieWriteConfig config = HoodieWriteConfig + .newBuilder() + .withPath("/") + .withSchema(TRIP_EXAMPLE_SCHEMA) + .withUserDefinedBulkInsertPartitionerClass(RDDCustomColumnsSortPartitioner.class.getName()) + .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString) + .build(); + testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(config), + records1, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator)); + testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(config), + records2, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator)); + } private Comparator> getCustomColumnComparator(Schema schema, String[] sortColumns) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index 7e043eb72..0dafba4e5 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -96,7 +96,7 @@ public class DataSourceUtils { try { return StringUtils.isNullOrEmpty(bulkInsertPartitionerClass) ? Option.empty() : - Option.of((BulkInsertPartitioner) ReflectionUtils.loadClass(bulkInsertPartitionerClass)); + Option.of((BulkInsertPartitioner) ReflectionUtils.loadClass(bulkInsertPartitionerClass, config)); } catch (Throwable e) { throw new HoodieException("Could not create UserDefinedBulkInsertPartitioner class " + bulkInsertPartitionerClass, e); } @@ -115,7 +115,7 @@ public class DataSourceUtils { try { return StringUtils.isNullOrEmpty(bulkInsertPartitionerClass) ? Option.empty() : - Option.of((BulkInsertPartitioner) ReflectionUtils.loadClass(bulkInsertPartitionerClass)); + Option.of((BulkInsertPartitioner) ReflectionUtils.loadClass(bulkInsertPartitionerClass, config)); } catch (Throwable e) { throw new HoodieException("Could not create UserDefinedBulkInsertPartitionerRows class " + bulkInsertPartitionerClass, e); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java index 94f1a69de..081a8e4e6 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.avro.Conversions; @@ -74,6 +75,24 @@ public class TestDataSourceUtils { private ArgumentCaptor