1
0

[HUDI 1615] Fixing null schema in bulk_insert row writer path (#2653)

* [HUDI-1615] Avoid passing in null schema from row writing/deltastreamer
* Fixing null schema in bulk insert row writer path
* Fixing tests

Co-authored-by: vc <vinoth@apache.org>
This commit is contained in:
Sivabalan Narayanan
2021-03-16 12:44:11 -04:00
committed by GitHub
parent 16864aee14
commit b038623ed3
8 changed files with 66 additions and 16 deletions

View File

@@ -125,7 +125,8 @@
"name":"extraMetadata",
"type":["null", {
"type":"map",
"values":"string"
"values":"string",
"default": null
}],
"default": null
},

View File

@@ -66,7 +66,7 @@ public class CommitUtils {
if (extraMetadata.isPresent()) {
extraMetadata.get().forEach(commitMetadata::addMetadata);
}
commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schemaToStoreInCommit);
commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schemaToStoreInCommit == null ? "" : schemaToStoreInCommit);
commitMetadata.setOperationType(operationType);
return commitMetadata;
}

View File

@@ -40,6 +40,7 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import java.util.HashMap;
import java.util.List;
/**
@@ -77,7 +78,7 @@ public class DataSourceInternalWriterHelper {
public void commit(List<HoodieWriteStat> writeStatList) {
try {
writeClient.commitStats(instantTime, writeStatList, Option.empty(),
writeClient.commitStats(instantTime, writeStatList, Option.of(new HashMap<>()),
DataSourceUtils.getCommitActionType(operationType, metaClient.getTableType()));
} catch (Exception ioe) {
throw new HoodieException(ioe.getMessage(), ioe);

View File

@@ -296,30 +296,38 @@ private[hudi] object HoodieSparkSqlWriter {
basePath: Path,
path: Option[String],
instantTime: String): (Boolean, common.util.Option[String]) = {
val structName = s"${tblName}_record"
val nameSpace = s"hoodie.${tblName}"
val writeConfig = DataSourceUtils.createHoodieConfig(null, path.get, tblName, mapAsJavaMap(parameters))
val sparkContext = sqlContext.sparkContext
// register classes & schemas
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
sparkContext.getConf.registerAvroSchemas(schema)
log.info(s"Registered avro schema : ${schema.toString(true)}")
val params = parameters.updated(HoodieWriteConfig.AVRO_SCHEMA, schema.toString)
val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, path.get, tblName, mapAsJavaMap(params))
val hoodieDF = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, writeConfig, df, structName, nameSpace)
if (SPARK_VERSION.startsWith("2.")) {
hoodieDF.write.format("org.apache.hudi.internal")
.option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
.options(parameters)
.options(params)
.save()
} else if (SPARK_VERSION.startsWith("3.")) {
hoodieDF.write.format("org.apache.hudi.spark3.internal")
.option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
.option(HoodieWriteConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL, hoodieDF.schema.toDDL)
.options(parameters)
.options(params)
.mode(SaveMode.Append)
.save()
} else {
throw new HoodieException("Bulk insert using row writer is not supported with current Spark version."
+ " To use row writer please switch to spark 2 or spark 3")
}
val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
val metaSyncEnabled = parameters.get(META_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
val hiveSyncEnabled = params.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
val metaSyncEnabled = params.get(META_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
val syncHiveSucess = if (hiveSyncEnabled || metaSyncEnabled) {
metaSync(parameters, basePath, sqlContext.sparkContext.hadoopConfiguration)
metaSync(params, basePath, sqlContext.sparkContext.hadoopConfiguration)
} else {
true
}

View File

@@ -125,6 +125,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
"hoodie.bulkinsert.shuffle.parallelism" -> "4",
DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY -> "true",
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.SimpleKeyGenerator")
@@ -232,6 +233,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
"hoodie.bulkinsert.shuffle.parallelism" -> "4",
DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY -> "true",
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.SimpleKeyGenerator")

View File

@@ -18,9 +18,8 @@
package org.apache.hudi.functional
import java.sql.{Date, Timestamp}
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
@@ -28,7 +27,7 @@ import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen._
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, concat, lit, udf}
import org.apache.spark.sql.types._
@@ -86,6 +85,43 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
}
/**
* Test for https://issues.apache.org/jira/browse/HUDI-1615. Null Schema in BulkInsert row writer flow.
* This was reported by customer when archival kicks in as the schema in commit metadata is not set for bulk_insert
* row writer flow.
* In this test, we trigger a round of bulk_inserts and set archive related configs to be minimal. So, after 4 rounds,
* archival should kick in and 2 commits should be archived. If schema is valid, no exception will be thrown. If not,
* NPE will be thrown.
*/
@Test
def testArchivalWithBulkInsert(): Unit = {
var structType : StructType = null
for (i <- 1 to 4) {
val records = recordsToStrings(dataGen.generateInserts("%05d".format(i), 100)).toList
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
structType = inputDF.schema
inputDF.write.format("hudi")
.options(commonOpts)
.option("hoodie.keep.min.commits", "1")
.option("hoodie.keep.max.commits", "2")
.option("hoodie.cleaner.commits.retained", "0")
.option("hoodie.datasource.write.row.writer.enable", "true")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.mode(if (i == 0) SaveMode.Overwrite else SaveMode.Append)
.save(basePath)
}
val tableMetaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath).build()
val actualSchema = new TableSchemaResolver(tableMetaClient).getTableAvroSchemaWithoutMetadataFields
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(commonOpts(HoodieWriteConfig.TABLE_NAME))
spark.sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
val schema = AvroConversionUtils.convertStructTypeToAvroSchema(structType, structName, nameSpace)
assertTrue(actualSchema != null)
assertEquals(schema, actualSchema)
}
@ParameterizedTest
//TODO(metadata): Needs HUDI-1459 to be fixed
//@ValueSource(booleans = Array(true, false))

View File

@@ -60,7 +60,8 @@ public class DefaultSource extends BaseDefaultSource implements DataSourceV2,
String instantTime = options.get(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY).get();
String path = options.get("path").get();
String tblName = options.get(HoodieWriteConfig.TABLE_NAME).get();
HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(null, path, tblName, options.asMap());
// 1st arg to createHooodieConfig is not really reuqired to be set. but passing it anyways.
HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(options.get(HoodieWriteConfig.AVRO_SCHEMA).get(), path, tblName, options.asMap());
return Optional.of(new HoodieDataSourceInternalWriter(instantTime, config, schema, getSparkSession(),
getConfiguration()));
}

View File

@@ -47,7 +47,8 @@ public class DefaultSource extends BaseDefaultSource implements TableProvider {
String instantTime = properties.get(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY);
String path = properties.get("path");
String tblName = properties.get(HoodieWriteConfig.TABLE_NAME);
HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(null, path, tblName, properties);
// 1st arg to createHooodieConfig is not really reuqired to be set. but passing it anyways.
HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(properties.get(HoodieWriteConfig.AVRO_SCHEMA), path, tblName, properties);
return new HoodieDataSourceInternalTable(instantTime, config, schema, getSparkSession(),
getConfiguration());
}