1
0

[HUDI-3995] Making perf optimizations for bulk insert row writer path (#5462)

- Avoid using udf for key generator for SimpleKeyGen and NonPartitionedKeyGen.
- Fixed NonPartitioned Key generator to directly fetch record key from row rather than involving GenericRecord.
- Other minor fixes around using static values instead of looking up hashmap.
This commit is contained in:
Sivabalan Narayanan
2022-05-09 12:40:22 -04:00
committed by GitHub
parent 6b47ef6ed2
commit 6285a239a3
20 changed files with 217 additions and 187 deletions

View File

@@ -22,7 +22,11 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hive.NonPartitionedExtractor;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.keygen.ComplexKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.log4j.LogManager;
@@ -57,18 +61,18 @@ public class HoodieDatasetBulkInsertHelper {
/**
* Prepares input hoodie spark dataset for bulk insert. It does the following steps.
* 1. Uses KeyGenerator to generate hoodie record keys and partition path.
* 2. Add hoodie columns to input spark dataset.
* 3. Reorders input dataset columns so that hoodie columns appear in the beginning.
* 4. Sorts input dataset by hoodie partition path and record key
* 1. Uses KeyGenerator to generate hoodie record keys and partition path.
* 2. Add hoodie columns to input spark dataset.
* 3. Reorders input dataset columns so that hoodie columns appear in the beginning.
* 4. Sorts input dataset by hoodie partition path and record key
*
* @param sqlContext SQL Context
* @param config Hoodie Write Config
* @param rows Spark Input dataset
* @param config Hoodie Write Config
* @param rows Spark Input dataset
* @return hoodie dataset which is ready for bulk insert.
*/
public static Dataset<Row> prepareHoodieDatasetForBulkInsert(SQLContext sqlContext,
HoodieWriteConfig config, Dataset<Row> rows, String structName, String recordNamespace,
HoodieWriteConfig config, Dataset<Row> rows, String structName, String recordNamespace,
BulkInsertPartitioner<Dataset<Row>> bulkInsertPartitionerRows,
boolean isGlobalIndex, boolean dropPartitionColumns) {
List<Column> originalFields =
@@ -77,27 +81,46 @@ public class HoodieDatasetBulkInsertHelper {
TypedProperties properties = new TypedProperties();
properties.putAll(config.getProps());
String keyGeneratorClass = properties.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key());
String recordKeyFields = properties.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key());
String partitionPathFields = properties.containsKey(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
? properties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()) : "";
BuiltinKeyGenerator keyGenerator = (BuiltinKeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, properties);
String tableName = properties.getString(HoodieWriteConfig.TBL_NAME.key());
String recordKeyUdfFn = RECORD_KEY_UDF_FN + tableName;
String partitionPathUdfFn = PARTITION_PATH_UDF_FN + tableName;
sqlContext.udf().register(recordKeyUdfFn, (UDF1<Row, String>) keyGenerator::getRecordKey, DataTypes.StringType);
sqlContext.udf().register(partitionPathUdfFn, (UDF1<Row, String>) keyGenerator::getPartitionPath, DataTypes.StringType);
final Dataset<Row> rowDatasetWithRecordKeys = rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD,
callUDF(recordKeyUdfFn, org.apache.spark.sql.functions.struct(
JavaConverters.collectionAsScalaIterableConverter(originalFields).asScala().toSeq())));
Dataset<Row> rowDatasetWithRecordKeysAndPartitionPath;
if (keyGeneratorClass.equals(NonPartitionedExtractor.class.getName())) {
// for non partitioned, set partition path to empty.
rowDatasetWithRecordKeysAndPartitionPath = rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD, functions.col(recordKeyFields))
.withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, functions.lit("").cast(DataTypes.StringType));
} else if (keyGeneratorClass.equals(SimpleKeyGenerator.class.getName())
|| (keyGeneratorClass.equals(ComplexKeyGenerator.class.getName()) && !recordKeyFields.contains(",") && !partitionPathFields.contains(",")
&& (!partitionPathFields.contains("timestamp")))) { // incase of ComplexKeyGen, check partition path type.
// simple fields for both record key and partition path: can directly use withColumn
String partitionPathField = keyGeneratorClass.equals(SimpleKeyGenerator.class.getName()) ? partitionPathFields :
partitionPathFields.substring(partitionPathFields.indexOf(":") + 1);
rowDatasetWithRecordKeysAndPartitionPath = rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD, functions.col(recordKeyFields).cast(DataTypes.StringType))
.withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, functions.col(partitionPathField).cast(DataTypes.StringType));
} else {
// use udf
String tableName = properties.getString(HoodieWriteConfig.TBL_NAME.key());
String recordKeyUdfFn = RECORD_KEY_UDF_FN + tableName;
String partitionPathUdfFn = PARTITION_PATH_UDF_FN + tableName;
sqlContext.udf().register(recordKeyUdfFn, (UDF1<Row, String>) keyGenerator::getRecordKey, DataTypes.StringType);
sqlContext.udf().register(partitionPathUdfFn, (UDF1<Row, String>) keyGenerator::getPartitionPath, DataTypes.StringType);
final Dataset<Row> rowDatasetWithRecordKeysAndPartitionPath =
rowDatasetWithRecordKeys.withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD,
callUDF(partitionPathUdfFn,
org.apache.spark.sql.functions.struct(
JavaConverters.collectionAsScalaIterableConverter(originalFields).asScala().toSeq())));
final Dataset<Row> rowDatasetWithRecordKeys = rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD,
callUDF(recordKeyUdfFn, org.apache.spark.sql.functions.struct(
JavaConverters.collectionAsScalaIterableConverter(originalFields).asScala().toSeq())));
rowDatasetWithRecordKeysAndPartitionPath =
rowDatasetWithRecordKeys.withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD,
callUDF(partitionPathUdfFn,
org.apache.spark.sql.functions.struct(
JavaConverters.collectionAsScalaIterableConverter(originalFields).asScala().toSeq())));
}
// Add other empty hoodie fields which will be populated before writing to parquet.
Dataset<Row> rowDatasetWithHoodieColumns =
rowDatasetWithRecordKeysAndPartitionPath.withColumn(HoodieRecord.COMMIT_TIME_METADATA_FIELD,
functions.lit("").cast(DataTypes.StringType))
functions.lit("").cast(DataTypes.StringType))
.withColumn(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD,
functions.lit("").cast(DataTypes.StringType))
.withColumn(HoodieRecord.FILENAME_METADATA_FIELD,
@@ -106,7 +129,7 @@ public class HoodieDatasetBulkInsertHelper {
Dataset<Row> processedDf = rowDatasetWithHoodieColumns;
if (dropPartitionColumns) {
String partitionColumns = String.join(",", keyGenerator.getPartitionPathFields());
for (String partitionField: keyGenerator.getPartitionPathFields()) {
for (String partitionField : keyGenerator.getPartitionPathFields()) {
originalFields.remove(new Column(partitionField));
}
processedDf = rowDatasetWithHoodieColumns.drop(partitionColumns);

View File

@@ -25,8 +25,8 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.row.HoodieRowCreateHandleWithoutMetaFields;
import org.apache.hudi.io.storage.row.HoodieRowCreateHandle;
import org.apache.hudi.io.storage.row.HoodieRowCreateHandleWithoutMetaFields;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
@@ -123,8 +123,7 @@ public class BulkInsertDataInternalWriterHelper {
try {
String partitionPath = null;
if (populateMetaFields) { // usual path where meta fields are pre populated in prep step.
partitionPath = record.getUTF8String(
HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toString();
partitionPath = String.valueOf(record.getUTF8String(HoodieRecord.PARTITION_PATH_META_FIELD_POS));
} else { // if meta columns are disabled.
if (!keyGeneratorOpt.isPresent()) { // NoPartitionerKeyGen
partitionPath = "";

View File

@@ -564,8 +564,7 @@ object HoodieSparkSqlWriter {
throw new HoodieException("Bulk insert using row writer is not supported with current Spark version."
+ " To use row writer please switch to spark 2 or spark 3")
}
val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(params)
val syncHiveSuccess = metaSync(sqlContext.sparkSession, hoodieConfig, basePath, df.schema)
val syncHiveSuccess = metaSync(sqlContext.sparkSession, writeConfig, basePath, df.schema)
(syncHiveSuccess, common.util.Option.ofNullable(instantTime))
}

View File

@@ -24,6 +24,9 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.bulkinsert.NonSortPartitionerWithRows;
import org.apache.hudi.keygen.ComplexKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.testutils.DataSourceTestUtils;
import org.apache.hudi.testutils.HoodieClientTestBase;
@@ -94,20 +97,36 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase {
public void testBulkInsertHelperConcurrently() {
IntStream.range(0, 2).parallel().forEach(i -> {
if (i % 2 == 0) {
testBulkInsertHelperFor("_row_key");
testBulkInsertHelperFor(SimpleKeyGenerator.class.getName(), "_row_key");
} else {
testBulkInsertHelperFor("ts");
testBulkInsertHelperFor(SimpleKeyGenerator.class.getName(), "ts");
}
});
}
@Test
public void testBulkInsertHelper() {
testBulkInsertHelperFor("_row_key");
private static Stream<Arguments> provideKeyGenArgs() {
return Stream.of(
Arguments.of(SimpleKeyGenerator.class.getName()),
Arguments.of(ComplexKeyGenerator.class.getName()),
Arguments.of(NonpartitionedKeyGenerator.class.getName()));
}
private void testBulkInsertHelperFor(String recordKey) {
HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(getPropsAllSet(recordKey)).combineInput(false, false).build();
@ParameterizedTest
@MethodSource("provideKeyGenArgs")
public void testBulkInsertHelper(String keyGenClass) {
testBulkInsertHelperFor(keyGenClass, "_row_key");
}
private void testBulkInsertHelperFor(String keyGenClass, String recordKey) {
Map<String, String> props = null;
if (keyGenClass.equals(SimpleKeyGenerator.class.getName())) {
props = getPropsAllSet(recordKey);
} else if (keyGenClass.equals(ComplexKeyGenerator.class.getName())) {
props = getPropsForComplexKeyGen(recordKey);
} else { // NonPartitioned key gen
props = getPropsForNonPartitionedKeyGen(recordKey);
}
HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(props).combineInput(false, false).build();
List<Row> rows = DataSourceTestUtils.generateRandomRows(10);
Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
Dataset<Row> result = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName",
@@ -121,9 +140,10 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase {
assertTrue(resultSchema.fieldIndex(entry.getKey()) == entry.getValue());
}
boolean isNonPartitioned = keyGenClass.equals(NonpartitionedKeyGenerator.class.getName());
result.toJavaRDD().foreach(entry -> {
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)).equals(entry.getAs(recordKey).toString()));
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).equals(entry.getAs("partition")));
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).equals(isNonPartitioned ? "" : entry.getAs("partition")));
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)).equals(""));
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).equals(""));
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.FILENAME_METADATA_FIELD)).equals(""));
@@ -253,6 +273,23 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase {
return props;
}
private Map<String, String> getPropsForComplexKeyGen(String recordKey) {
Map<String, String> props = new HashMap<>();
props.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), ComplexKeyGenerator.class.getName());
props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), recordKey);
props.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "simple:partition");
props.put(HoodieWriteConfig.TBL_NAME.key(), recordKey + "_table");
return props;
}
private Map<String, String> getPropsForNonPartitionedKeyGen(String recordKey) {
Map<String, String> props = new HashMap<>();
props.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), NonpartitionedKeyGenerator.class.getName());
props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), recordKey);
props.put(HoodieWriteConfig.TBL_NAME.key(), recordKey + "_table");
return props;
}
@Test
public void testNoPropsSet() {
HoodieWriteConfig config = getConfigBuilder(schemaStr).build();

View File

@@ -83,7 +83,7 @@ public class TestComplexKeyGenerator extends KeyGeneratorTestUtilities {
public void testWrongRecordKeyField() {
ComplexKeyGenerator keyGenerator = new ComplexKeyGenerator(getWrongRecordKeyFieldProps());
Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord()));
Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldPositionMapIfNeeded(KeyGeneratorTestUtilities.structType));
Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldSchemaInfoIfNeeded(KeyGeneratorTestUtilities.structType));
}
@Test

View File

@@ -68,7 +68,7 @@ public class TestGlobalDeleteRecordGenerator extends KeyGeneratorTestUtilities {
public void testWrongRecordKeyField() {
GlobalDeleteKeyGenerator keyGenerator = new GlobalDeleteKeyGenerator(getWrongRecordKeyFieldProps());
Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord()));
Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldPositionMapIfNeeded(KeyGeneratorTestUtilities.structType));
Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldSchemaInfoIfNeeded(KeyGeneratorTestUtilities.structType));
}
@Test
@@ -78,7 +78,7 @@ public class TestGlobalDeleteRecordGenerator extends KeyGeneratorTestUtilities {
HoodieKey key = keyGenerator.getKey(record);
Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi");
Assertions.assertEquals(key.getPartitionPath(), "");
keyGenerator.buildFieldPositionMapIfNeeded(KeyGeneratorTestUtilities.structType);
keyGenerator.buildFieldSchemaInfoIfNeeded(KeyGeneratorTestUtilities.structType);
Row row = KeyGeneratorTestUtilities.getRow(record);
Assertions.assertEquals(keyGenerator.getRecordKey(row), "_row_key:key1,pii_col:pi");
Assertions.assertEquals(keyGenerator.getPartitionPath(row), "");

View File

@@ -94,7 +94,7 @@ public class TestNonpartitionedKeyGenerator extends KeyGeneratorTestUtilities {
public void testWrongRecordKeyField() {
NonpartitionedKeyGenerator keyGenerator = new NonpartitionedKeyGenerator(getWrongRecordKeyFieldProps());
Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord()));
Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldPositionMapIfNeeded(KeyGeneratorTestUtilities.structType));
Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldSchemaInfoIfNeeded(KeyGeneratorTestUtilities.structType));
}
@Test

View File

@@ -100,7 +100,7 @@ public class TestSimpleKeyGenerator extends KeyGeneratorTestUtilities {
public void testWrongRecordKeyField() {
SimpleKeyGenerator keyGenerator = new SimpleKeyGenerator(getWrongRecordKeyFieldProps());
Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord()));
Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldPositionMapIfNeeded(KeyGeneratorTestUtilities.structType));
Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldSchemaInfoIfNeeded(KeyGeneratorTestUtilities.structType));
}
@Test
@@ -116,7 +116,7 @@ public class TestSimpleKeyGenerator extends KeyGeneratorTestUtilities {
public void testComplexRecordKeyField() {
SimpleKeyGenerator keyGenerator = new SimpleKeyGenerator(getComplexRecordKeyProp());
Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord()));
Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldPositionMapIfNeeded(KeyGeneratorTestUtilities.structType));
Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldSchemaInfoIfNeeded(KeyGeneratorTestUtilities.structType));
}
@Test