[HUDI-2582] Support concurrent key gen for different tables with row writer path (#3817)
Co-authored-by: yao.zhou <yao.zhou@linkflowtech.com>
This commit is contained in:
@@ -34,7 +34,6 @@ import org.apache.spark.sql.SQLContext;
|
|||||||
import org.apache.spark.sql.api.java.UDF1;
|
import org.apache.spark.sql.api.java.UDF1;
|
||||||
import org.apache.spark.sql.functions;
|
import org.apache.spark.sql.functions;
|
||||||
import org.apache.spark.sql.types.DataTypes;
|
import org.apache.spark.sql.types.DataTypes;
|
||||||
import org.apache.spark.sql.types.StructType;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
@@ -53,8 +52,8 @@ public class HoodieDatasetBulkInsertHelper {
|
|||||||
|
|
||||||
private static final Logger LOG = LogManager.getLogger(HoodieDatasetBulkInsertHelper.class);
|
private static final Logger LOG = LogManager.getLogger(HoodieDatasetBulkInsertHelper.class);
|
||||||
|
|
||||||
private static final String RECORD_KEY_UDF_FN = "hudi_recordkey_gen_function";
|
private static final String RECORD_KEY_UDF_FN = "hudi_recordkey_gen_function_";
|
||||||
private static final String PARTITION_PATH_UDF_FN = "hudi_partition_gen_function";
|
private static final String PARTITION_PATH_UDF_FN = "hudi_partition_gen_function_";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Prepares input hoodie spark dataset for bulk insert. It does the following steps.
|
* Prepares input hoodie spark dataset for bulk insert. It does the following steps.
|
||||||
@@ -79,18 +78,19 @@ public class HoodieDatasetBulkInsertHelper {
|
|||||||
properties.putAll(config.getProps());
|
properties.putAll(config.getProps());
|
||||||
String keyGeneratorClass = properties.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key());
|
String keyGeneratorClass = properties.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key());
|
||||||
BuiltinKeyGenerator keyGenerator = (BuiltinKeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, properties);
|
BuiltinKeyGenerator keyGenerator = (BuiltinKeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, properties);
|
||||||
StructType structTypeForUDF = rows.schema();
|
String tableName = properties.getString(HoodieWriteConfig.TBL_NAME.key());
|
||||||
|
String recordKeyUdfFn = RECORD_KEY_UDF_FN + tableName;
|
||||||
sqlContext.udf().register(RECORD_KEY_UDF_FN, (UDF1<Row, String>) keyGenerator::getRecordKey, DataTypes.StringType);
|
String partitionPathUdfFn = PARTITION_PATH_UDF_FN + tableName;
|
||||||
sqlContext.udf().register(PARTITION_PATH_UDF_FN, (UDF1<Row, String>) keyGenerator::getPartitionPath, DataTypes.StringType);
|
sqlContext.udf().register(recordKeyUdfFn, (UDF1<Row, String>) keyGenerator::getRecordKey, DataTypes.StringType);
|
||||||
|
sqlContext.udf().register(partitionPathUdfFn, (UDF1<Row, String>) keyGenerator::getPartitionPath, DataTypes.StringType);
|
||||||
|
|
||||||
final Dataset<Row> rowDatasetWithRecordKeys = rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD,
|
final Dataset<Row> rowDatasetWithRecordKeys = rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD,
|
||||||
callUDF(RECORD_KEY_UDF_FN, org.apache.spark.sql.functions.struct(
|
callUDF(recordKeyUdfFn, org.apache.spark.sql.functions.struct(
|
||||||
JavaConverters.collectionAsScalaIterableConverter(originalFields).asScala().toSeq())));
|
JavaConverters.collectionAsScalaIterableConverter(originalFields).asScala().toSeq())));
|
||||||
|
|
||||||
final Dataset<Row> rowDatasetWithRecordKeysAndPartitionPath =
|
final Dataset<Row> rowDatasetWithRecordKeysAndPartitionPath =
|
||||||
rowDatasetWithRecordKeys.withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD,
|
rowDatasetWithRecordKeys.withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD,
|
||||||
callUDF(PARTITION_PATH_UDF_FN,
|
callUDF(partitionPathUdfFn,
|
||||||
org.apache.spark.sql.functions.struct(
|
org.apache.spark.sql.functions.struct(
|
||||||
JavaConverters.collectionAsScalaIterableConverter(originalFields).asScala().toSeq())));
|
JavaConverters.collectionAsScalaIterableConverter(originalFields).asScala().toSeq())));
|
||||||
|
|
||||||
|
|||||||
@@ -50,6 +50,7 @@ import java.util.HashMap;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
@@ -89,9 +90,24 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase {
|
|||||||
structType = AvroConversionUtils.convertAvroSchemaToStructType(schema);
|
structType = AvroConversionUtils.convertAvroSchemaToStructType(schema);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBulkInsertHelperConcurrently() {
|
||||||
|
IntStream.range(0, 2).parallel().forEach(i -> {
|
||||||
|
if (i % 2 == 0) {
|
||||||
|
testBulkInsertHelperFor("_row_key");
|
||||||
|
} else {
|
||||||
|
testBulkInsertHelperFor("ts");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBulkInsertHelper() {
|
public void testBulkInsertHelper() {
|
||||||
HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(getPropsAllSet()).combineInput(false, false).build();
|
testBulkInsertHelperFor("_row_key");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testBulkInsertHelperFor(String recordKey) {
|
||||||
|
HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(getPropsAllSet(recordKey)).combineInput(false, false).build();
|
||||||
List<Row> rows = DataSourceTestUtils.generateRandomRows(10);
|
List<Row> rows = DataSourceTestUtils.generateRandomRows(10);
|
||||||
Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
|
Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
|
||||||
Dataset<Row> result = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName",
|
Dataset<Row> result = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName",
|
||||||
@@ -106,7 +122,7 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
result.toJavaRDD().foreach(entry -> {
|
result.toJavaRDD().foreach(entry -> {
|
||||||
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)).equals(entry.getAs("_row_key")));
|
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)).equals(entry.getAs(recordKey).toString()));
|
||||||
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).equals(entry.getAs("partition")));
|
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).equals(entry.getAs("partition")));
|
||||||
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)).equals(""));
|
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)).equals(""));
|
||||||
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).equals(""));
|
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).equals(""));
|
||||||
@@ -148,7 +164,8 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase {
|
|||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@MethodSource("providePreCombineArgs")
|
@MethodSource("providePreCombineArgs")
|
||||||
public void testBulkInsertPreCombine(boolean enablePreCombine) {
|
public void testBulkInsertPreCombine(boolean enablePreCombine) {
|
||||||
HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(getPropsAllSet()).combineInput(enablePreCombine, enablePreCombine)
|
HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(getPropsAllSet("_row_key"))
|
||||||
|
.combineInput(enablePreCombine, enablePreCombine)
|
||||||
.withPreCombineField("ts").build();
|
.withPreCombineField("ts").build();
|
||||||
List<Row> inserts = DataSourceTestUtils.generateRandomRows(10);
|
List<Row> inserts = DataSourceTestUtils.generateRandomRows(10);
|
||||||
Dataset<Row> toUpdateDataset = sqlContext.createDataFrame(inserts.subList(0, 5), structType);
|
Dataset<Row> toUpdateDataset = sqlContext.createDataFrame(inserts.subList(0, 5), structType);
|
||||||
@@ -207,22 +224,27 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Map<String, String> getPropsAllSet() {
|
private Map<String, String> getPropsAllSet(String recordKey) {
|
||||||
return getProps(true, true, true, true);
|
return getProps(recordKey, true, true, true, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Map<String, String> getProps(boolean setAll, boolean setKeyGen, boolean setRecordKey, boolean setPartitionPath) {
|
private Map<String, String> getProps(boolean setAll, boolean setKeyGen, boolean setRecordKey, boolean setPartitionPath) {
|
||||||
|
return getProps("_row_key", setAll, setKeyGen, setRecordKey, setPartitionPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, String> getProps(String recordKey, boolean setAll, boolean setKeyGen, boolean setRecordKey, boolean setPartitionPath) {
|
||||||
Map<String, String> props = new HashMap<>();
|
Map<String, String> props = new HashMap<>();
|
||||||
if (setAll) {
|
if (setAll) {
|
||||||
props.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), "org.apache.hudi.keygen.SimpleKeyGenerator");
|
props.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), "org.apache.hudi.keygen.SimpleKeyGenerator");
|
||||||
props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key");
|
props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), recordKey);
|
||||||
props.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition");
|
props.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition");
|
||||||
|
props.put(HoodieWriteConfig.TBL_NAME.key(), recordKey + "_table");
|
||||||
} else {
|
} else {
|
||||||
if (setKeyGen) {
|
if (setKeyGen) {
|
||||||
props.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), "org.apache.hudi.keygen.SimpleKeyGenerator");
|
props.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), "org.apache.hudi.keygen.SimpleKeyGenerator");
|
||||||
}
|
}
|
||||||
if (setRecordKey) {
|
if (setRecordKey) {
|
||||||
props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key");
|
props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), recordKey);
|
||||||
}
|
}
|
||||||
if (setPartitionPath) {
|
if (setPartitionPath) {
|
||||||
props.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition");
|
props.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition");
|
||||||
|
|||||||
Reference in New Issue
Block a user