[HUDI-4040] Bulk insert Support CustomColumnsSortPartitioner with Row (#5502)
* Along the lines of RDDCustomColumnsSortPartitioner but for Row
This commit is contained in:
@@ -29,6 +29,8 @@ import org.apache.hudi.table.BulkInsertPartitioner;
|
|||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A partitioner that does sorting based on specified column values for each RDD partition.
|
* A partitioner that does sorting based on specified column values for each RDD partition.
|
||||||
*
|
*
|
||||||
@@ -78,6 +80,7 @@ public class RDDCustomColumnsSortPartitioner<T extends HoodieRecordPayload>
|
|||||||
}
|
}
|
||||||
|
|
||||||
private String[] getSortColumnName(HoodieWriteConfig config) {
|
private String[] getSortColumnName(HoodieWriteConfig config) {
|
||||||
return config.getUserDefinedBulkInsertPartitionerSortColumns().split(",");
|
return Arrays.stream(config.getUserDefinedBulkInsertPartitionerSortColumns().split(","))
|
||||||
|
.map(String::trim).toArray(String[]::new);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,60 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.execution.bulkinsert;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
|
import org.apache.hudi.table.BulkInsertPartitioner;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Row;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A partitioner that does sorting based on specified column values for each spark partitions.
|
||||||
|
*/
|
||||||
|
public class RowCustomColumnsSortPartitioner implements BulkInsertPartitioner<Dataset<Row>> {
|
||||||
|
|
||||||
|
private final String[] sortColumnNames;
|
||||||
|
|
||||||
|
public RowCustomColumnsSortPartitioner(HoodieWriteConfig config) {
|
||||||
|
this.sortColumnNames = getSortColumnName(config);
|
||||||
|
}
|
||||||
|
|
||||||
|
public RowCustomColumnsSortPartitioner(String[] columnNames) {
|
||||||
|
this.sortColumnNames = columnNames;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputSparkPartitions) {
|
||||||
|
final String[] sortColumns = this.sortColumnNames;
|
||||||
|
return records.coalesce(outputSparkPartitions)
|
||||||
|
.sortWithinPartitions(HoodieRecord.PARTITION_PATH_METADATA_FIELD, sortColumns);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean arePartitionRecordsSorted() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String[] getSortColumnName(HoodieWriteConfig config) {
|
||||||
|
return Arrays.stream(config.getUserDefinedBulkInsertPartitionerSortColumns().split(","))
|
||||||
|
.map(String::trim).toArray(String[]::new);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -20,6 +20,8 @@ package org.apache.hudi.execution.bulkinsert;
|
|||||||
|
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||||
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.table.BulkInsertPartitioner;
|
import org.apache.hudi.table.BulkInsertPartitioner;
|
||||||
import org.apache.hudi.testutils.HoodieClientTestHarness;
|
import org.apache.hudi.testutils.HoodieClientTestHarness;
|
||||||
import org.apache.hudi.testutils.SparkDatasetTestUtils;
|
import org.apache.hudi.testutils.SparkDatasetTestUtils;
|
||||||
@@ -29,6 +31,7 @@ import org.apache.spark.sql.Dataset;
|
|||||||
import org.apache.spark.sql.Row;
|
import org.apache.spark.sql.Row;
|
||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.params.ParameterizedTest;
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
import org.junit.jupiter.params.provider.Arguments;
|
import org.junit.jupiter.params.provider.Arguments;
|
||||||
import org.junit.jupiter.params.provider.MethodSource;
|
import org.junit.jupiter.params.provider.MethodSource;
|
||||||
@@ -48,6 +51,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
|||||||
*/
|
*/
|
||||||
public class TestBulkInsertInternalPartitionerForRows extends HoodieClientTestHarness {
|
public class TestBulkInsertInternalPartitionerForRows extends HoodieClientTestHarness {
|
||||||
|
|
||||||
|
private static final Comparator<Row> KEY_COMPARATOR =
|
||||||
|
Comparator.comparing(o -> (o.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + "+" + o.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD)));
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
initSparkContexts("TestBulkInsertInternalPartitionerForRows");
|
initSparkContexts("TestBulkInsertInternalPartitionerForRows");
|
||||||
@@ -77,21 +82,47 @@ public class TestBulkInsertInternalPartitionerForRows extends HoodieClientTestHa
|
|||||||
Dataset<Row> records1 = generateTestRecords();
|
Dataset<Row> records1 = generateTestRecords();
|
||||||
Dataset<Row> records2 = generateTestRecords();
|
Dataset<Row> records2 = generateTestRecords();
|
||||||
testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerWithRowsFactory.get(sortMode),
|
testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerWithRowsFactory.get(sortMode),
|
||||||
records1, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records1));
|
records1, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records1), Option.empty());
|
||||||
testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerWithRowsFactory.get(sortMode),
|
testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerWithRowsFactory.get(sortMode),
|
||||||
records2, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records2));
|
records2, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records2), Option.empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCustomColumnSortPartitionerWithRows() {
|
||||||
|
Dataset<Row> records1 = generateTestRecords();
|
||||||
|
Dataset<Row> records2 = generateTestRecords();
|
||||||
|
String sortColumnString = records1.columns()[5];
|
||||||
|
String[] sortColumns = sortColumnString.split(",");
|
||||||
|
Comparator<Row> comparator = getCustomColumnComparator(sortColumns);
|
||||||
|
|
||||||
|
testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(sortColumns),
|
||||||
|
records1, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator));
|
||||||
|
testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(sortColumns),
|
||||||
|
records2, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator));
|
||||||
|
|
||||||
|
HoodieWriteConfig config = HoodieWriteConfig
|
||||||
|
.newBuilder()
|
||||||
|
.withPath("/")
|
||||||
|
.withUserDefinedBulkInsertPartitionerClass(RowCustomColumnsSortPartitioner.class.getName())
|
||||||
|
.withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString)
|
||||||
|
.build();
|
||||||
|
testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(config),
|
||||||
|
records1, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator));
|
||||||
|
testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(config),
|
||||||
|
records2, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner,
|
private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner,
|
||||||
Dataset<Row> rows,
|
Dataset<Row> rows,
|
||||||
boolean isGloballySorted, boolean isLocallySorted,
|
boolean isGloballySorted, boolean isLocallySorted,
|
||||||
Map<String, Long> expectedPartitionNumRecords) {
|
Map<String, Long> expectedPartitionNumRecords,
|
||||||
|
Option<Comparator<Row>> comparator) {
|
||||||
int numPartitions = 2;
|
int numPartitions = 2;
|
||||||
Dataset<Row> actualRecords = (Dataset<Row>) partitioner.repartitionRecords(rows, numPartitions);
|
Dataset<Row> actualRecords = (Dataset<Row>) partitioner.repartitionRecords(rows, numPartitions);
|
||||||
List<Row> collectedActualRecords = actualRecords.collectAsList();
|
List<Row> collectedActualRecords = actualRecords.collectAsList();
|
||||||
if (isGloballySorted) {
|
if (isGloballySorted) {
|
||||||
// Verify global order
|
// Verify global order
|
||||||
verifyRowsAscendingOrder(collectedActualRecords);
|
verifyRowsAscendingOrder(collectedActualRecords, comparator);
|
||||||
} else if (isLocallySorted) {
|
} else if (isLocallySorted) {
|
||||||
// Verify local order
|
// Verify local order
|
||||||
actualRecords.mapPartitions((MapPartitionsFunction<Row, Object>) input -> {
|
actualRecords.mapPartitions((MapPartitionsFunction<Row, Object>) input -> {
|
||||||
@@ -99,7 +130,7 @@ public class TestBulkInsertInternalPartitionerForRows extends HoodieClientTestHa
|
|||||||
while (input.hasNext()) {
|
while (input.hasNext()) {
|
||||||
partitionRows.add(input.next());
|
partitionRows.add(input.next());
|
||||||
}
|
}
|
||||||
verifyRowsAscendingOrder(partitionRows);
|
verifyRowsAscendingOrder(partitionRows, comparator);
|
||||||
return Collections.emptyList().iterator();
|
return Collections.emptyList().iterator();
|
||||||
}, SparkDatasetTestUtils.ENCODER);
|
}, SparkDatasetTestUtils.ENCODER);
|
||||||
}
|
}
|
||||||
@@ -130,10 +161,20 @@ public class TestBulkInsertInternalPartitionerForRows extends HoodieClientTestHa
|
|||||||
return rowsPart1.union(rowsPart2);
|
return rowsPart1.union(rowsPart2);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyRowsAscendingOrder(List<Row> records) {
|
private void verifyRowsAscendingOrder(List<Row> records, Option<Comparator<Row>> comparator) {
|
||||||
List<Row> expectedRecords = new ArrayList<>(records);
|
List<Row> expectedRecords = new ArrayList<>(records);
|
||||||
Collections.sort(expectedRecords, Comparator.comparing(o -> (o.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + "+" + o.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD))));
|
Collections.sort(expectedRecords,comparator.orElse(KEY_COMPARATOR));
|
||||||
assertEquals(expectedRecords, records);
|
assertEquals(expectedRecords, records);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Comparator<Row> getCustomColumnComparator(String[] sortColumns) {
|
||||||
|
Comparator<Row> comparator = Comparator.comparing(row -> {
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
for (String col : sortColumns) {
|
||||||
|
sb.append(row.getAs(col).toString());
|
||||||
|
}
|
||||||
|
return sb.toString();
|
||||||
|
});
|
||||||
|
return comparator;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user