From b038623ed3318404f9bc4707f005a9fc458c0adf Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Tue, 16 Mar 2021 12:44:11 -0400 Subject: [PATCH] [HUDI 1615] Fixing null schema in bulk_insert row writer path (#2653) * [HUDI-1615] Avoid passing in null schema from row writing/deltastreamer * Fixing null schema in bulk insert row writer path * Fixing tests Co-authored-by: vc --- .../src/main/avro/HoodieCommitMetadata.avsc | 3 +- .../apache/hudi/common/util/CommitUtils.java | 2 +- .../DataSourceInternalWriterHelper.java | 3 +- .../apache/hudi/HoodieSparkSqlWriter.scala | 24 +++++++---- .../HoodieSparkSqlWriterSuite.scala | 2 + .../hudi/functional/TestCOWDataSource.scala | 42 +++++++++++++++++-- .../apache/hudi/internal/DefaultSource.java | 3 +- .../hudi/spark3/internal/DefaultSource.java | 3 +- 8 files changed, 66 insertions(+), 16 deletions(-) diff --git a/hudi-common/src/main/avro/HoodieCommitMetadata.avsc b/hudi-common/src/main/avro/HoodieCommitMetadata.avsc index b7e736945..0f2a563d9 100644 --- a/hudi-common/src/main/avro/HoodieCommitMetadata.avsc +++ b/hudi-common/src/main/avro/HoodieCommitMetadata.avsc @@ -125,7 +125,8 @@ "name":"extraMetadata", "type":["null", { "type":"map", - "values":"string" + "values":"string", + "default": null }], "default": null }, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java index 7b4c7c5ca..1081d9c1f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java @@ -66,7 +66,7 @@ public class CommitUtils { if (extraMetadata.isPresent()) { extraMetadata.get().forEach(commitMetadata::addMetadata); } - commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schemaToStoreInCommit); + commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schemaToStoreInCommit == null ? "" : schemaToStoreInCommit); commitMetadata.setOperationType(operationType); return commitMetadata; } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java index e45f99a76..e8041636b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java @@ -40,6 +40,7 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.StructType; +import java.util.HashMap; import java.util.List; /** @@ -77,7 +78,7 @@ public class DataSourceInternalWriterHelper { public void commit(List writeStatList) { try { - writeClient.commitStats(instantTime, writeStatList, Option.empty(), + writeClient.commitStats(instantTime, writeStatList, Option.of(new HashMap<>()), DataSourceUtils.getCommitActionType(operationType, metaClient.getTableType())); } catch (Exception ioe) { throw new HoodieException(ioe.getMessage(), ioe); diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 1f1dc4d42..4adcf242d 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -296,30 +296,38 @@ private[hudi] object HoodieSparkSqlWriter { basePath: Path, path: Option[String], instantTime: String): (Boolean, common.util.Option[String]) = { - val structName = s"${tblName}_record" - val nameSpace = s"hoodie.${tblName}" - val writeConfig = DataSourceUtils.createHoodieConfig(null, path.get, tblName, mapAsJavaMap(parameters)) + val sparkContext = sqlContext.sparkContext + // register classes & schemas + val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName) + sparkContext.getConf.registerKryoClasses( + Array(classOf[org.apache.avro.generic.GenericData], + classOf[org.apache.avro.Schema])) + val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) + sparkContext.getConf.registerAvroSchemas(schema) + log.info(s"Registered avro schema : ${schema.toString(true)}") + val params = parameters.updated(HoodieWriteConfig.AVRO_SCHEMA, schema.toString) + val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, path.get, tblName, mapAsJavaMap(params)) val hoodieDF = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, writeConfig, df, structName, nameSpace) if (SPARK_VERSION.startsWith("2.")) { hoodieDF.write.format("org.apache.hudi.internal") .option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime) - .options(parameters) + .options(params) .save() } else if (SPARK_VERSION.startsWith("3.")) { hoodieDF.write.format("org.apache.hudi.spark3.internal") .option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime) .option(HoodieWriteConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL, hoodieDF.schema.toDDL) - .options(parameters) + .options(params) .mode(SaveMode.Append) .save() } else { throw new HoodieException("Bulk insert using row writer is not supported with current Spark version." + " To use row writer please switch to spark 2 or spark 3") } - val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) - val metaSyncEnabled = parameters.get(META_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) + val hiveSyncEnabled = params.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) + val metaSyncEnabled = params.get(META_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) val syncHiveSucess = if (hiveSyncEnabled || metaSyncEnabled) { - metaSync(parameters, basePath, sqlContext.sparkContext.hadoopConfiguration) + metaSync(params, basePath, sqlContext.sparkContext.hadoopConfiguration) } else { true } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala index 41a45b2b1..bbaeea108 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala @@ -125,6 +125,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName, "hoodie.bulkinsert.shuffle.parallelism" -> "4", DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL, + DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY -> "true", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition", DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.SimpleKeyGenerator") @@ -232,6 +233,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName, "hoodie.bulkinsert.shuffle.parallelism" -> "4", DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL, + DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY -> "true", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition", DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.SimpleKeyGenerator") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 856cc008d..b671bc62a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -18,9 +18,8 @@ package org.apache.hudi.functional import java.sql.{Date, Timestamp} - import org.apache.hudi.common.config.HoodieMetadataConfig -import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings @@ -28,7 +27,7 @@ import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.keygen._ import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config import org.apache.hudi.testutils.HoodieClientTestBase -import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} +import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} import org.apache.spark.sql._ import org.apache.spark.sql.functions.{col, concat, lit, udf} import org.apache.spark.sql.types._ @@ -86,6 +85,43 @@ class TestCOWDataSource extends HoodieClientTestBase { assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) } + /** + * Test for https://issues.apache.org/jira/browse/HUDI-1615. Null Schema in BulkInsert row writer flow. + * This was reported by customer when archival kicks in as the schema in commit metadata is not set for bulk_insert + * row writer flow. + * In this test, we trigger a round of bulk_inserts and set archive related configs to be minimal. So, after 4 rounds, + * archival should kick in and 2 commits should be archived. If schema is valid, no exception will be thrown. If not, + * NPE will be thrown. + */ + @Test + def testArchivalWithBulkInsert(): Unit = { + var structType : StructType = null + for (i <- 1 to 4) { + val records = recordsToStrings(dataGen.generateInserts("%05d".format(i), 100)).toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + structType = inputDF.schema + inputDF.write.format("hudi") + .options(commonOpts) + .option("hoodie.keep.min.commits", "1") + .option("hoodie.keep.max.commits", "2") + .option("hoodie.cleaner.commits.retained", "0") + .option("hoodie.datasource.write.row.writer.enable", "true") + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) + .mode(if (i == 0) SaveMode.Overwrite else SaveMode.Append) + .save(basePath) + } + + val tableMetaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath).build() + val actualSchema = new TableSchemaResolver(tableMetaClient).getTableAvroSchemaWithoutMetadataFields + val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(commonOpts(HoodieWriteConfig.TABLE_NAME)) + spark.sparkContext.getConf.registerKryoClasses( + Array(classOf[org.apache.avro.generic.GenericData], + classOf[org.apache.avro.Schema])) + val schema = AvroConversionUtils.convertStructTypeToAvroSchema(structType, structName, nameSpace) + assertTrue(actualSchema != null) + assertEquals(schema, actualSchema) + } + @ParameterizedTest //TODO(metadata): Needs HUDI-1459 to be fixed //@ValueSource(booleans = Array(true, false)) diff --git a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java index 526f0ce47..ebbd9508e 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java +++ b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java @@ -60,7 +60,8 @@ public class DefaultSource extends BaseDefaultSource implements DataSourceV2, String instantTime = options.get(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY).get(); String path = options.get("path").get(); String tblName = options.get(HoodieWriteConfig.TABLE_NAME).get(); - HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(null, path, tblName, options.asMap()); + // 1st arg to createHooodieConfig is not really reuqired to be set. but passing it anyways. + HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(options.get(HoodieWriteConfig.AVRO_SCHEMA).get(), path, tblName, options.asMap()); return Optional.of(new HoodieDataSourceInternalWriter(instantTime, config, schema, getSparkSession(), getConfiguration())); } diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java index d59b5ad5c..04cf0cc28 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java +++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java @@ -47,7 +47,8 @@ public class DefaultSource extends BaseDefaultSource implements TableProvider { String instantTime = properties.get(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY); String path = properties.get("path"); String tblName = properties.get(HoodieWriteConfig.TABLE_NAME); - HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(null, path, tblName, properties); + // 1st arg to createHooodieConfig is not really reuqired to be set. but passing it anyways. + HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(properties.get(HoodieWriteConfig.AVRO_SCHEMA), path, tblName, properties); return new HoodieDataSourceInternalTable(instantTime, config, schema, getSparkSession(), getConfiguration()); }