diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java index c96d216e1..b3acf444a 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java @@ -34,7 +34,6 @@ import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.api.java.UDF1; import org.apache.spark.sql.functions; import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.StructType; import java.util.ArrayList; import java.util.Arrays; @@ -53,8 +52,8 @@ public class HoodieDatasetBulkInsertHelper { private static final Logger LOG = LogManager.getLogger(HoodieDatasetBulkInsertHelper.class); - private static final String RECORD_KEY_UDF_FN = "hudi_recordkey_gen_function"; - private static final String PARTITION_PATH_UDF_FN = "hudi_partition_gen_function"; + private static final String RECORD_KEY_UDF_FN = "hudi_recordkey_gen_function_"; + private static final String PARTITION_PATH_UDF_FN = "hudi_partition_gen_function_"; /** * Prepares input hoodie spark dataset for bulk insert. It does the following steps. @@ -79,18 +78,19 @@ public class HoodieDatasetBulkInsertHelper { properties.putAll(config.getProps()); String keyGeneratorClass = properties.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key()); BuiltinKeyGenerator keyGenerator = (BuiltinKeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, properties); - StructType structTypeForUDF = rows.schema(); - - sqlContext.udf().register(RECORD_KEY_UDF_FN, (UDF1) keyGenerator::getRecordKey, DataTypes.StringType); - sqlContext.udf().register(PARTITION_PATH_UDF_FN, (UDF1) keyGenerator::getPartitionPath, DataTypes.StringType); + String tableName = properties.getString(HoodieWriteConfig.TBL_NAME.key()); + String recordKeyUdfFn = RECORD_KEY_UDF_FN + tableName; + String partitionPathUdfFn = PARTITION_PATH_UDF_FN + tableName; + sqlContext.udf().register(recordKeyUdfFn, (UDF1) keyGenerator::getRecordKey, DataTypes.StringType); + sqlContext.udf().register(partitionPathUdfFn, (UDF1) keyGenerator::getPartitionPath, DataTypes.StringType); final Dataset rowDatasetWithRecordKeys = rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD, - callUDF(RECORD_KEY_UDF_FN, org.apache.spark.sql.functions.struct( + callUDF(recordKeyUdfFn, org.apache.spark.sql.functions.struct( JavaConverters.collectionAsScalaIterableConverter(originalFields).asScala().toSeq()))); final Dataset rowDatasetWithRecordKeysAndPartitionPath = rowDatasetWithRecordKeys.withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, - callUDF(PARTITION_PATH_UDF_FN, + callUDF(partitionPathUdfFn, org.apache.spark.sql.functions.struct( JavaConverters.collectionAsScalaIterableConverter(originalFields).asScala().toSeq()))); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java index abcc24739..610122cb3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java @@ -50,6 +50,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import scala.Tuple2; @@ -89,9 +90,24 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase { structType = AvroConversionUtils.convertAvroSchemaToStructType(schema); } + @Test + public void testBulkInsertHelperConcurrently() { + IntStream.range(0, 2).parallel().forEach(i -> { + if (i % 2 == 0) { + testBulkInsertHelperFor("_row_key"); + } else { + testBulkInsertHelperFor("ts"); + } + }); + } + @Test public void testBulkInsertHelper() { - HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(getPropsAllSet()).combineInput(false, false).build(); + testBulkInsertHelperFor("_row_key"); + } + + private void testBulkInsertHelperFor(String recordKey) { + HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(getPropsAllSet(recordKey)).combineInput(false, false).build(); List rows = DataSourceTestUtils.generateRandomRows(10); Dataset dataset = sqlContext.createDataFrame(rows, structType); Dataset result = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName", @@ -106,7 +122,7 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase { } result.toJavaRDD().foreach(entry -> { - assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)).equals(entry.getAs("_row_key"))); + assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)).equals(entry.getAs(recordKey).toString())); assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).equals(entry.getAs("partition"))); assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)).equals("")); assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).equals("")); @@ -148,7 +164,8 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase { @ParameterizedTest @MethodSource("providePreCombineArgs") public void testBulkInsertPreCombine(boolean enablePreCombine) { - HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(getPropsAllSet()).combineInput(enablePreCombine, enablePreCombine) + HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(getPropsAllSet("_row_key")) + .combineInput(enablePreCombine, enablePreCombine) .withPreCombineField("ts").build(); List inserts = DataSourceTestUtils.generateRandomRows(10); Dataset toUpdateDataset = sqlContext.createDataFrame(inserts.subList(0, 5), structType); @@ -207,22 +224,27 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase { } } - private Map getPropsAllSet() { - return getProps(true, true, true, true); + private Map getPropsAllSet(String recordKey) { + return getProps(recordKey, true, true, true, true); } private Map getProps(boolean setAll, boolean setKeyGen, boolean setRecordKey, boolean setPartitionPath) { + return getProps("_row_key", setAll, setKeyGen, setRecordKey, setPartitionPath); + } + + private Map getProps(String recordKey, boolean setAll, boolean setKeyGen, boolean setRecordKey, boolean setPartitionPath) { Map props = new HashMap<>(); if (setAll) { props.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), "org.apache.hudi.keygen.SimpleKeyGenerator"); - props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key"); + props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), recordKey); props.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition"); + props.put(HoodieWriteConfig.TBL_NAME.key(), recordKey + "_table"); } else { if (setKeyGen) { props.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), "org.apache.hudi.keygen.SimpleKeyGenerator"); } if (setRecordKey) { - props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key"); + props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), recordKey); } if (setPartitionPath) { props.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition");