1
0

[HUDI-1104] Adding support for UserDefinedPartitioners and SortModes to BulkInsert with Rows (#3149)

This commit is contained in:
Sivabalan Narayanan
2021-07-07 11:15:25 -04:00
committed by GitHub
parent 55ecbc662e
commit ea9e5d0e8b
31 changed files with 618 additions and 82 deletions

View File

@@ -49,6 +49,8 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import java.io.IOException;
import java.util.ArrayList;
@@ -98,6 +100,25 @@ public class DataSourceUtils {
}
}
/**
* Create a UserDefinedBulkInsertPartitionerRows class via reflection,
* <br>
* if the class name of UserDefinedBulkInsertPartitioner is configured through the HoodieWriteConfig.
*
* @see HoodieWriteConfig#getUserDefinedBulkInsertPartitionerClass()
*/
public static Option<BulkInsertPartitioner<Dataset<Row>>> createUserDefinedBulkInsertPartitionerWithRows(HoodieWriteConfig config)
throws HoodieException {
String bulkInsertPartitionerClass = config.getUserDefinedBulkInsertPartitionerClass();
try {
return StringUtils.isNullOrEmpty(bulkInsertPartitionerClass)
? Option.empty() :
Option.of((BulkInsertPartitioner) ReflectionUtils.loadClass(bulkInsertPartitionerClass));
} catch (Throwable e) {
throw new HoodieException("Could not create UserDefinedBulkInsertPartitionerRows class " + bulkInsertPartitionerClass, e);
}
}
/**
* Create a payload class via reflection, passing in an ordering/precombine value.
*/

View File

@@ -31,7 +31,9 @@ import org.apache.spark.sql.types.StructType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
/**
@@ -48,15 +50,17 @@ public class BulkInsertDataInternalWriterHelper {
private final HoodieTable hoodieTable;
private final HoodieWriteConfig writeConfig;
private final StructType structType;
private final Boolean arePartitionRecordsSorted;
private final List<HoodieInternalWriteStatus> writeStatusList = new ArrayList<>();
private HoodieRowCreateHandle handle;
private String lastKnownPartitionPath = null;
private String fileIdPrefix;
private int numFilesWritten = 0;
private Map<String, HoodieRowCreateHandle> handles = new HashMap<>();
public BulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, HoodieWriteConfig writeConfig,
String instantTime, int taskPartitionId, long taskId, long taskEpochId, StructType structType) {
String instantTime, int taskPartitionId, long taskId, long taskEpochId, StructType structType, boolean arePartitionRecordsSorted) {
this.hoodieTable = hoodieTable;
this.writeConfig = writeConfig;
this.instantTime = instantTime;
@@ -64,6 +68,7 @@ public class BulkInsertDataInternalWriterHelper {
this.taskId = taskId;
this.taskEpochId = taskEpochId;
this.structType = structType;
this.arePartitionRecordsSorted = arePartitionRecordsSorted;
this.fileIdPrefix = UUID.randomUUID().toString();
}
@@ -74,7 +79,7 @@ public class BulkInsertDataInternalWriterHelper {
if ((lastKnownPartitionPath == null) || !lastKnownPartitionPath.equals(partitionPath) || !handle.canWrite()) {
LOG.info("Creating new file for partition path " + partitionPath);
createNewHandle(partitionPath);
handle = getRowCreateHandle(partitionPath);
lastKnownPartitionPath = partitionPath;
}
handle.write(record);
@@ -92,19 +97,30 @@ public class BulkInsertDataInternalWriterHelper {
public void abort() {
}
private void createNewHandle(String partitionPath) throws IOException {
if (null != handle) {
close();
private HoodieRowCreateHandle getRowCreateHandle(String partitionPath) throws IOException {
if (!handles.containsKey(partitionPath)) { // if there is no handle corresponding to the partition path
// if records are sorted, we can close all existing handles
if (arePartitionRecordsSorted) {
close();
}
handles.put(partitionPath, new HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),
instantTime, taskPartitionId, taskId, taskEpochId, structType));
} else if (!handles.get(partitionPath).canWrite()) {
// even if there is a handle to the partition path, it could have reached its max size threshold. So, we close the handle here and
// create a new one.
writeStatusList.add(handles.remove(partitionPath).close());
handles.put(partitionPath, new HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),
instantTime, taskPartitionId, taskId, taskEpochId, structType));
}
handle = new HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),
instantTime, taskPartitionId, taskId, taskEpochId, structType);
return handles.get(partitionPath);
}
public void close() throws IOException {
if (null != handle) {
writeStatusList.add(handle.close());
handle = null;
for (HoodieRowCreateHandle rowCreateHandle: handles.values()) {
writeStatusList.add(rowCreateHandle.close());
}
handles.clear();
handle = null;
}
private String getNextFileId() {

View File

@@ -30,7 +30,9 @@ import org.apache.spark.sql.Row;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -61,13 +63,40 @@ public class HoodieBulkInsertInternalWriterTestBase extends HoodieClientTestHarn
}
protected void assertWriteStatuses(List<HoodieInternalWriteStatus> writeStatuses, int batches, int size,
Option<List<String>> fileAbsPaths, Option<List<String>> fileNames) {
assertEquals(batches, writeStatuses.size());
Option<List<String>> fileAbsPaths, Option<List<String>> fileNames) {
assertWriteStatuses(writeStatuses, batches, size, false, fileAbsPaths, fileNames);
}
protected void assertWriteStatuses(List<HoodieInternalWriteStatus> writeStatuses, int batches, int size, boolean areRecordsSorted,
Option<List<String>> fileAbsPaths, Option<List<String>> fileNames) {
if (areRecordsSorted) {
assertEquals(batches, writeStatuses.size());
} else {
assertEquals(Math.min(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS.length, batches), writeStatuses.size());
}
Map<String, Long> sizeMap = new HashMap<>();
if (!areRecordsSorted) {
// <size> no of records are written per batch. Every 4th batch goes into same writeStatus. So, populating the size expected
// per write status
for (int i = 0; i < batches; i++) {
String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[i % 3];
if (!sizeMap.containsKey(partitionPath)) {
sizeMap.put(partitionPath, 0L);
}
sizeMap.put(partitionPath, sizeMap.get(partitionPath) + size);
}
}
int counter = 0;
for (HoodieInternalWriteStatus writeStatus : writeStatuses) {
// verify write status
assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3], writeStatus.getPartitionPath());
assertEquals(writeStatus.getTotalRecords(), size);
if (areRecordsSorted) {
assertEquals(writeStatus.getTotalRecords(), size);
} else {
assertEquals(writeStatus.getTotalRecords(), sizeMap.get(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3]));
}
assertNull(writeStatus.getGlobalError());
assertEquals(writeStatus.getFailedRowsSize(), 0);
assertEquals(writeStatus.getTotalErrorRecords(), 0);
@@ -82,8 +111,13 @@ public class HoodieBulkInsertInternalWriterTestBase extends HoodieClientTestHarn
.substring(writeStatus.getStat().getPath().lastIndexOf('/') + 1));
}
HoodieWriteStat writeStat = writeStatus.getStat();
assertEquals(size, writeStat.getNumInserts());
assertEquals(size, writeStat.getNumWrites());
if (areRecordsSorted) {
assertEquals(size, writeStat.getNumInserts());
assertEquals(size, writeStat.getNumWrites());
} else {
assertEquals(sizeMap.get(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3]), writeStat.getNumInserts());
assertEquals(sizeMap.get(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3]), writeStat.getNumWrites());
}
assertEquals(fileId, writeStat.getFileId());
assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter++ % 3], writeStat.getPartitionPath());
assertEquals(0, writeStat.getNumDeletes());