[HUDI-2345] Hoodie columns sort partitioner for bulk insert (#3523)
Co-authored-by: yuezhang <yuezhang@freewheel.tv>
This commit is contained in:
@@ -158,11 +158,18 @@ public class HoodieWriteConfig extends HoodieConfig {
|
|||||||
.withDocumentation("For large initial imports using bulk_insert operation, controls the parallelism to use for sort modes or custom partitioning done"
|
.withDocumentation("For large initial imports using bulk_insert operation, controls the parallelism to use for sort modes or custom partitioning done"
|
||||||
+ "before writing records to the table.");
|
+ "before writing records to the table.");
|
||||||
|
|
||||||
|
public static final ConfigProperty<String> BULKINSERT_USER_DEFINED_PARTITIONER_SORT_COLUMNS = ConfigProperty
|
||||||
|
.key("hoodie.bulkinsert.user.defined.partitioner.sort.columns")
|
||||||
|
.noDefaultValue()
|
||||||
|
.withDocumentation("Columns to sort the data by when use org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner as user defined partitioner during bulk_insert. "
|
||||||
|
+ "For example 'column1,column2'");
|
||||||
|
|
||||||
public static final ConfigProperty<String> BULKINSERT_USER_DEFINED_PARTITIONER_CLASS_NAME = ConfigProperty
|
public static final ConfigProperty<String> BULKINSERT_USER_DEFINED_PARTITIONER_CLASS_NAME = ConfigProperty
|
||||||
.key("hoodie.bulkinsert.user.defined.partitioner.class")
|
.key("hoodie.bulkinsert.user.defined.partitioner.class")
|
||||||
.noDefaultValue()
|
.noDefaultValue()
|
||||||
.withDocumentation("If specified, this class will be used to re-partition records before they are bulk inserted. This can be used to sort, pack, cluster data"
|
.withDocumentation("If specified, this class will be used to re-partition records before they are bulk inserted. This can be used to sort, pack, cluster data"
|
||||||
+ " optimally for common query patterns.");
|
+ " optimally for common query patterns. For now we support a build-in user defined bulkinsert partitioner org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner"
|
||||||
|
+ " which can does sorting based on specified column values set by " + BULKINSERT_USER_DEFINED_PARTITIONER_SORT_COLUMNS.key());
|
||||||
|
|
||||||
public static final ConfigProperty<String> UPSERT_PARALLELISM_VALUE = ConfigProperty
|
public static final ConfigProperty<String> UPSERT_PARALLELISM_VALUE = ConfigProperty
|
||||||
.key("hoodie.upsert.shuffle.parallelism")
|
.key("hoodie.upsert.shuffle.parallelism")
|
||||||
@@ -894,6 +901,10 @@ public class HoodieWriteConfig extends HoodieConfig {
|
|||||||
return getString(BULKINSERT_USER_DEFINED_PARTITIONER_CLASS_NAME);
|
return getString(BULKINSERT_USER_DEFINED_PARTITIONER_CLASS_NAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getUserDefinedBulkInsertPartitionerSortColumns() {
|
||||||
|
return getString(BULKINSERT_USER_DEFINED_PARTITIONER_SORT_COLUMNS);
|
||||||
|
}
|
||||||
|
|
||||||
public int getInsertShuffleParallelism() {
|
public int getInsertShuffleParallelism() {
|
||||||
return getInt(INSERT_PARALLELISM_VALUE);
|
return getInt(INSERT_PARALLELISM_VALUE);
|
||||||
}
|
}
|
||||||
@@ -1832,6 +1843,11 @@ public class HoodieWriteConfig extends HoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder withUserDefinedBulkInsertPartitionerSortColumns(String columns) {
|
||||||
|
writeConfig.setValue(BULKINSERT_USER_DEFINED_PARTITIONER_SORT_COLUMNS, columns);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public Builder withDeleteParallelism(int parallelism) {
|
public Builder withDeleteParallelism(int parallelism) {
|
||||||
writeConfig.setValue(DELETE_PARALLELISM_VALUE, String.valueOf(parallelism));
|
writeConfig.setValue(DELETE_PARALLELISM_VALUE, String.valueOf(parallelism));
|
||||||
return this;
|
return this;
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ import org.apache.hudi.avro.HoodieAvroUtils;
|
|||||||
import org.apache.hudi.common.config.SerializableSchema;
|
import org.apache.hudi.common.config.SerializableSchema;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
import org.apache.hudi.table.BulkInsertPartitioner;
|
import org.apache.hudi.table.BulkInsertPartitioner;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
@@ -41,6 +42,11 @@ public class RDDCustomColumnsSortPartitioner<T extends HoodieRecordPayload>
|
|||||||
private final String[] sortColumnNames;
|
private final String[] sortColumnNames;
|
||||||
private final SerializableSchema serializableSchema;
|
private final SerializableSchema serializableSchema;
|
||||||
|
|
||||||
|
public RDDCustomColumnsSortPartitioner(HoodieWriteConfig config) {
|
||||||
|
this.serializableSchema = new SerializableSchema(new Schema.Parser().parse(config.getSchema()));
|
||||||
|
this.sortColumnNames = getSortColumnName(config);
|
||||||
|
}
|
||||||
|
|
||||||
public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema) {
|
public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema) {
|
||||||
this.sortColumnNames = columnNames;
|
this.sortColumnNames = columnNames;
|
||||||
this.serializableSchema = new SerializableSchema(schema);
|
this.serializableSchema = new SerializableSchema(schema);
|
||||||
@@ -79,4 +85,8 @@ public class RDDCustomColumnsSortPartitioner<T extends HoodieRecordPayload>
|
|||||||
throw new HoodieIOException("Unable to read record with key:" + record.getKey(), e);
|
throw new HoodieIOException("Unable to read record with key:" + record.getKey(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String[] getSortColumnName(HoodieWriteConfig config) {
|
||||||
|
return config.getUserDefinedBulkInsertPartitionerSortColumns().split(",");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecord;
|
|||||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
import org.apache.hudi.table.BulkInsertPartitioner;
|
import org.apache.hudi.table.BulkInsertPartitioner;
|
||||||
import org.apache.hudi.testutils.HoodieClientTestBase;
|
import org.apache.hudi.testutils.HoodieClientTestBase;
|
||||||
@@ -43,6 +44,7 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase {
|
public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase {
|
||||||
@@ -139,7 +141,8 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCustomColumnSortPartitioner() throws Exception {
|
public void testCustomColumnSortPartitioner() throws Exception {
|
||||||
String[] sortColumns = new String[] {"rider"};
|
String sortColumnString = "rider";
|
||||||
|
String[] sortColumns = sortColumnString.split(",");
|
||||||
Comparator<HoodieRecord<? extends HoodieRecordPayload>> columnComparator = getCustomColumnComparator(HoodieTestDataGenerator.AVRO_SCHEMA, sortColumns);
|
Comparator<HoodieRecord<? extends HoodieRecordPayload>> columnComparator = getCustomColumnComparator(HoodieTestDataGenerator.AVRO_SCHEMA, sortColumns);
|
||||||
|
|
||||||
JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc);
|
JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc);
|
||||||
@@ -148,6 +151,19 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase {
|
|||||||
records1, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator));
|
records1, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator));
|
||||||
testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA),
|
testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA),
|
||||||
records2, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator));
|
records2, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator));
|
||||||
|
|
||||||
|
HoodieWriteConfig config = HoodieWriteConfig
|
||||||
|
.newBuilder()
|
||||||
|
.withPath("/")
|
||||||
|
.withSchema(TRIP_EXAMPLE_SCHEMA)
|
||||||
|
.withUserDefinedBulkInsertPartitionerClass(RDDCustomColumnsSortPartitioner.class.getName())
|
||||||
|
.withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString)
|
||||||
|
.build();
|
||||||
|
testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(config),
|
||||||
|
records1, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator));
|
||||||
|
testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(config),
|
||||||
|
records2, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private Comparator<HoodieRecord<? extends HoodieRecordPayload>> getCustomColumnComparator(Schema schema, String[] sortColumns) {
|
private Comparator<HoodieRecord<? extends HoodieRecordPayload>> getCustomColumnComparator(Schema schema, String[] sortColumns) {
|
||||||
|
|||||||
@@ -96,7 +96,7 @@ public class DataSourceUtils {
|
|||||||
try {
|
try {
|
||||||
return StringUtils.isNullOrEmpty(bulkInsertPartitionerClass)
|
return StringUtils.isNullOrEmpty(bulkInsertPartitionerClass)
|
||||||
? Option.empty() :
|
? Option.empty() :
|
||||||
Option.of((BulkInsertPartitioner) ReflectionUtils.loadClass(bulkInsertPartitionerClass));
|
Option.of((BulkInsertPartitioner) ReflectionUtils.loadClass(bulkInsertPartitionerClass, config));
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
throw new HoodieException("Could not create UserDefinedBulkInsertPartitioner class " + bulkInsertPartitionerClass, e);
|
throw new HoodieException("Could not create UserDefinedBulkInsertPartitioner class " + bulkInsertPartitionerClass, e);
|
||||||
}
|
}
|
||||||
@@ -115,7 +115,7 @@ public class DataSourceUtils {
|
|||||||
try {
|
try {
|
||||||
return StringUtils.isNullOrEmpty(bulkInsertPartitionerClass)
|
return StringUtils.isNullOrEmpty(bulkInsertPartitionerClass)
|
||||||
? Option.empty() :
|
? Option.empty() :
|
||||||
Option.of((BulkInsertPartitioner) ReflectionUtils.loadClass(bulkInsertPartitionerClass));
|
Option.of((BulkInsertPartitioner) ReflectionUtils.loadClass(bulkInsertPartitionerClass, config));
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
throw new HoodieException("Could not create UserDefinedBulkInsertPartitionerRows class " + bulkInsertPartitionerClass, e);
|
throw new HoodieException("Could not create UserDefinedBulkInsertPartitionerRows class " + bulkInsertPartitionerClass, e);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.WriteOperationType;
|
|||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
|
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
|
||||||
import org.apache.hudi.table.BulkInsertPartitioner;
|
import org.apache.hudi.table.BulkInsertPartitioner;
|
||||||
|
|
||||||
import org.apache.avro.Conversions;
|
import org.apache.avro.Conversions;
|
||||||
@@ -74,18 +75,11 @@ public class TestDataSourceUtils {
|
|||||||
private ArgumentCaptor<Option> optionCaptor;
|
private ArgumentCaptor<Option> optionCaptor;
|
||||||
private HoodieWriteConfig config;
|
private HoodieWriteConfig config;
|
||||||
|
|
||||||
@BeforeEach
|
|
||||||
public void setUp() {
|
|
||||||
config = HoodieWriteConfig.newBuilder().withPath("/").build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testAvroRecordsFieldConversion() {
|
|
||||||
// There are fields event_date1, event_date2, event_date3 with logical type as Date. event_date1 & event_date3 are
|
// There are fields event_date1, event_date2, event_date3 with logical type as Date. event_date1 & event_date3 are
|
||||||
// of UNION schema type, which is a union of null and date type in different orders. event_date2 is non-union
|
// of UNION schema type, which is a union of null and date type in different orders. event_date2 is non-union
|
||||||
// date type. event_cost1, event_cost2, event3 are decimal logical types with UNION schema, which is similar to
|
// date type. event_cost1, event_cost2, event3 are decimal logical types with UNION schema, which is similar to
|
||||||
// the event_date.
|
// the event_date.
|
||||||
String avroSchemaString = "{\"type\": \"record\"," + "\"name\": \"events\"," + "\"fields\": [ "
|
private String avroSchemaString = "{\"type\": \"record\"," + "\"name\": \"events\"," + "\"fields\": [ "
|
||||||
+ "{\"name\": \"event_date1\", \"type\" : [{\"type\" : \"int\", \"logicalType\" : \"date\"}, \"null\"]},"
|
+ "{\"name\": \"event_date1\", \"type\" : [{\"type\" : \"int\", \"logicalType\" : \"date\"}, \"null\"]},"
|
||||||
+ "{\"name\": \"event_date2\", \"type\" : {\"type\": \"int\", \"logicalType\" : \"date\"}},"
|
+ "{\"name\": \"event_date2\", \"type\" : {\"type\": \"int\", \"logicalType\" : \"date\"}},"
|
||||||
+ "{\"name\": \"event_date3\", \"type\" : [\"null\", {\"type\" : \"int\", \"logicalType\" : \"date\"}]},"
|
+ "{\"name\": \"event_date3\", \"type\" : [\"null\", {\"type\" : \"int\", \"logicalType\" : \"date\"}]},"
|
||||||
@@ -99,6 +93,14 @@ public class TestDataSourceUtils {
|
|||||||
+ "[\"null\", {\"type\": \"fixed\", \"name\": \"fg\", \"size\": 5, \"logicalType\": \"decimal\", \"precision\": 10, \"scale\": 6}]}"
|
+ "[\"null\", {\"type\": \"fixed\", \"name\": \"fg\", \"size\": 5, \"logicalType\": \"decimal\", \"precision\": 10, \"scale\": 6}]}"
|
||||||
+ "]}";
|
+ "]}";
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setUp() {
|
||||||
|
config = HoodieWriteConfig.newBuilder().withPath("/").build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAvroRecordsFieldConversion() {
|
||||||
|
|
||||||
Schema avroSchema = new Schema.Parser().parse(avroSchemaString);
|
Schema avroSchema = new Schema.Parser().parse(avroSchemaString);
|
||||||
GenericRecord record = new GenericData.Record(avroSchema);
|
GenericRecord record = new GenericData.Record(avroSchema);
|
||||||
record.put("event_date1", 18000);
|
record.put("event_date1", 18000);
|
||||||
@@ -183,6 +185,20 @@ public class TestDataSourceUtils {
|
|||||||
assertThat(partitioner.isPresent(), is(true));
|
assertThat(partitioner.isPresent(), is(true));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateRDDCustomColumnsSortPartitionerWithValidPartitioner() throws HoodieException {
|
||||||
|
config = HoodieWriteConfig
|
||||||
|
.newBuilder()
|
||||||
|
.withPath("/")
|
||||||
|
.withUserDefinedBulkInsertPartitionerClass(RDDCustomColumnsSortPartitioner.class.getName())
|
||||||
|
.withUserDefinedBulkInsertPartitionerSortColumns("column1, column2")
|
||||||
|
.withSchema(avroSchemaString)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Option<BulkInsertPartitioner<Dataset<Row>>> partitioner = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(config);
|
||||||
|
assertThat(partitioner.isPresent(), is(true));
|
||||||
|
}
|
||||||
|
|
||||||
private void setAndVerifyHoodieWriteClientWith(final String partitionerClassName) {
|
private void setAndVerifyHoodieWriteClientWith(final String partitionerClassName) {
|
||||||
config = HoodieWriteConfig.newBuilder().withPath(config.getBasePath())
|
config = HoodieWriteConfig.newBuilder().withPath(config.getBasePath())
|
||||||
.withUserDefinedBulkInsertPartitionerClass(partitionerClassName)
|
.withUserDefinedBulkInsertPartitionerClass(partitionerClassName)
|
||||||
@@ -195,6 +211,8 @@ public class TestDataSourceUtils {
|
|||||||
public static class NoOpBulkInsertPartitioner<T extends HoodieRecordPayload>
|
public static class NoOpBulkInsertPartitioner<T extends HoodieRecordPayload>
|
||||||
implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
|
implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
|
||||||
|
|
||||||
|
public NoOpBulkInsertPartitioner(HoodieWriteConfig config) {}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) {
|
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) {
|
||||||
return records;
|
return records;
|
||||||
@@ -209,6 +227,8 @@ public class TestDataSourceUtils {
|
|||||||
public static class NoOpBulkInsertPartitionerRows
|
public static class NoOpBulkInsertPartitionerRows
|
||||||
implements BulkInsertPartitioner<Dataset<Row>> {
|
implements BulkInsertPartitioner<Dataset<Row>> {
|
||||||
|
|
||||||
|
public NoOpBulkInsertPartitionerRows(HoodieWriteConfig config) {}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputSparkPartitions) {
|
public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputSparkPartitions) {
|
||||||
return records;
|
return records;
|
||||||
|
|||||||
Reference in New Issue
Block a user