diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala index 31432aeee..177de90f3 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala @@ -357,7 +357,7 @@ object AvroConversionHelper { val fieldNamesIterator = dataType.asInstanceOf[StructType].fieldNames.iterator val rowIterator = item.asInstanceOf[Row].toSeq.iterator - while (convertersIterator.hasNext) { + while (convertersIterator.hasNext && rowIterator.hasNext) { val converter = convertersIterator.next() record.put(fieldNamesIterator.next(), converter(rowIterator.next())) } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 12366b681..43daba775 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -92,22 +92,43 @@ object HoodieSparkUtils extends SparkAdapterSupport { new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, fileStatusCache) } - def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = { - val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, recordNamespace) - createRdd(df, avroSchema, structName, recordNamespace) + def createRdd(df: DataFrame, structName: String, recordNamespace: String, reconcileToLatestSchema: Boolean, latestTableSchema: + org.apache.hudi.common.util.Option[Schema] = org.apache.hudi.common.util.Option.empty()): RDD[GenericRecord] = { + val dfWriteSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, recordNamespace) + var writeSchema : Schema = null; + var toReconcileSchema : Schema = null; + if (reconcileToLatestSchema && latestTableSchema.isPresent) { + // if reconcileToLatestSchema is set to true and latestSchema is present, then try to leverage latestTableSchema. + // this code path will handle situations where records are serialized in odl schema, but callers wish to convert + // to Rdd[GenericRecord] using different schema(could be evolved schema or could be latest table schema) + writeSchema = dfWriteSchema + toReconcileSchema = latestTableSchema.get() + } else { + // there are paths where callers wish to use latestTableSchema to convert to Rdd[GenericRecords] and not use + // row's schema. So use latestTableSchema if present. if not available, fallback to using row's schema. + writeSchema = if (latestTableSchema.isPresent) { latestTableSchema.get()} else { dfWriteSchema} + } + createRddInternal(df, writeSchema, toReconcileSchema, structName, recordNamespace) } - def createRdd(df: DataFrame, avroSchema: Schema, structName: String, recordNamespace: String) + def createRddInternal(df: DataFrame, writeSchema: Schema, latestTableSchema: Schema, structName: String, recordNamespace: String) : RDD[GenericRecord] = { - // Use the Avro schema to derive the StructType which has the correct nullability information - val dataType = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType] - val encoder = RowEncoder.apply(dataType).resolveAndBind() + // Use the write avro schema to derive the StructType which has the correct nullability information + val writeDataType = SchemaConverters.toSqlType(writeSchema).dataType.asInstanceOf[StructType] + val encoder = RowEncoder.apply(writeDataType).resolveAndBind() val deserializer = sparkAdapter.createSparkRowSerDe(encoder) + // if records were serialized with old schema, but an evolved schema was passed in with latestTableSchema, we need + // latestTableSchema equivalent datatype to be passed in to AvroConversionHelper.createConverterToAvro() + val reconciledDataType = + if (latestTableSchema != null) SchemaConverters.toSqlType(latestTableSchema).dataType.asInstanceOf[StructType] else writeDataType + // Note: deserializer.deserializeRow(row) is not capable of handling evolved schema. i.e. if Row was serialized in + // old schema, but deserializer was created with an encoder with evolved schema, deserialization fails. + // Hence we always need to deserialize in the same schema as serialized schema. df.queryExecution.toRdd.map(row => deserializer.deserializeRow(row)) .mapPartitions { records => if (records.isEmpty) Iterator.empty else { - val convertor = AvroConversionHelper.createConverterToAvro(dataType, structName, recordNamespace) + val convertor = AvroConversionHelper.createConverterToAvro(reconciledDataType, structName, recordNamespace) records.map { x => convertor(x).asInstanceOf[GenericRecord] } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index cb778e682..80883965d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -109,6 +109,16 @@ public class FSUtils { return getFs(path, conf); } + /** + * Check if table already exists in the given path. + * @param path base path of the table. + * @param fs instance of {@link FileSystem}. + * @return {@code true} if table exists. {@code false} otherwise. + */ + public static boolean isTableExists(String path, FileSystem fs) throws IOException { + return fs.exists(new Path(path + "/" + HoodieTableMetaClient.METAFOLDER_NAME)); + } + public static Path addSchemeIfLocalPath(String path) { Path providedPath = new Path(path); File localFile = new File(path); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 71a834e7b..6899b939c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -414,6 +414,13 @@ public class HoodieTableMetaClient implements Serializable { return fs.listStatus(metaPath, nameFilter); } + /** + * @return {@code true} if any commits are found, else {@code false}. + */ + public boolean isTimelineNonEmpty() { + return getCommitsTimeline().filterCompletedInstants().getInstants().collect(Collectors.toList()).size() > 0; + } + /** * Get the commit timeline visible for this table. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 8d16e91c2..70b820c86 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -18,13 +18,6 @@ package org.apache.hudi.common.table; -import java.io.IOException; - -import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; -import org.apache.avro.SchemaCompatibility; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; @@ -36,11 +29,18 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Functions.Function1; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.InvalidTableException; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.SchemaCompatibility; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroSchemaConverter; @@ -49,6 +49,8 @@ import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; +import java.io.IOException; + /** * Helper class to read schema from data files and log files and to convert it between different formats. */ @@ -381,6 +383,37 @@ public class TableSchemaResolver { return isSchemaCompatible(new Schema.Parser().parse(oldSchema), new Schema.Parser().parse(newSchema)); } + /** + * Get latest schema either from incoming schema or table schema. + * @param writeSchema incoming batch's write schema. + * @param convertTableSchemaToAddNamespace {@code true} if table schema needs to be converted. {@code false} otherwise. + * @param converterFn converter function to be called over table schema (to add namespace may be). Each caller can decide if any conversion is required. + * @return the latest schema. + */ + public Schema getLatestSchema(Schema writeSchema, boolean convertTableSchemaToAddNamespace, + Function1 converterFn) { + Schema latestSchema = writeSchema; + try { + if (metaClient.isTimelineNonEmpty()) { + Schema tableSchema = getTableAvroSchemaWithoutMetadataFields(); + if (convertTableSchemaToAddNamespace && converterFn != null) { + tableSchema = converterFn.apply(tableSchema); + } + if (writeSchema.getFields().size() < tableSchema.getFields().size() && isSchemaCompatible(writeSchema, tableSchema)) { + // if incoming schema is a subset (old schema) compared to table schema. For eg, one of the + // ingestion pipeline is still producing events in old schema + latestSchema = tableSchema; + LOG.debug("Using latest table schema to rewrite incoming records " + tableSchema.toString()); + } + } + } catch (IllegalArgumentException | InvalidTableException e) { + LOG.warn("Could not find any commits, falling back to using incoming batch's write schema"); + } catch (Exception e) { + LOG.warn("Unknown exception thrown " + e.getMessage() + ", Falling back to using incoming batch's write schema"); + } + return latestSchema; + } + /** * Read the parquet schema from a parquet File. */ diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/SparkBasedReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/SparkBasedReader.java index 5318e93b9..32969f2cf 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/SparkBasedReader.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/SparkBasedReader.java @@ -18,15 +18,18 @@ package org.apache.hudi.integ.testsuite.reader; -import java.util.List; -import org.apache.avro.generic.GenericRecord; import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.utilities.schema.RowBasedSchemaProvider; + +import org.apache.avro.generic.GenericRecord; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; + +import java.util.List; + import scala.collection.JavaConverters; @@ -51,7 +54,7 @@ public class SparkBasedReader { return HoodieSparkUtils .createRdd(dataSet.toDF(), structName.orElse(RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME), - nameSpace.orElse(RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE)) + nameSpace.orElse(RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE), false, Option.empty()) .toJavaRDD(); } @@ -63,7 +66,7 @@ public class SparkBasedReader { return HoodieSparkUtils .createRdd(dataSet.toDF(), structName.orElse(RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME), - RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE) + RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE, false, Option.empty()) .toJavaRDD(); } @@ -73,10 +76,11 @@ public class SparkBasedReader { Dataset dataSet = sparkSession.read() .orc((JavaConverters.asScalaIteratorConverter(listOfPaths.iterator()).asScala().toSeq())); - return HoodieSparkUtils - .createRdd(dataSet.toDF(), structName.orElse(RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME), - RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE) - .toJavaRDD(); + return HoodieSparkUtils.createRdd(dataSet.toDF(), + structName.orElse(RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME), + RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE, + false, Option.empty() + ).toJavaRDD(); } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 46aee6780..6e4b37722 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -308,6 +308,13 @@ object DataSourceWriteOptions { .defaultValue(classOf[HiveSyncTool].getName) .withDocumentation("Sync tool class name used to sync to metastore. Defaults to Hive.") + val RECONCILE_SCHEMA: ConfigProperty[Boolean] = ConfigProperty + .key("hoodie.datasource.write.reconcile.schema") + .defaultValue(false) + .withDocumentation("When a new batch of write has records with old schema, but latest table schema got " + + "evolved, this config will upgrade the records to leverage latest table schema(default values will be " + + "injected to missing fields). If not, the write batch would fail.") + // HIVE SYNC SPECIFIC CONFIGS // NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes // unexpected issues with config getting reset diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java index f4483d830..605131746 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java @@ -27,6 +27,7 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.bootstrap.FileStatusUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.keygen.KeyGenerator; @@ -66,7 +67,8 @@ public class SparkParquetBootstrapDataProvider extends FullRecordBootstrapDataPr KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); String structName = tableName + "_record"; String namespace = "hoodie." + tableName; - RDD genericRecords = HoodieSparkUtils.createRdd(inputDataset, structName, namespace); + RDD genericRecords = HoodieSparkUtils.createRdd(inputDataset, structName, namespace, false, + Option.empty()); return genericRecords.toJavaRDD().map(gr -> { String orderingVal = HoodieAvroUtils.getNestedFieldValAsString( gr, props.getString("hoodie.datasource.write.precombine.field"), false); diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 6a54931bc..0e4ad1791 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -17,6 +17,8 @@ package org.apache.hudi + +import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -25,9 +27,10 @@ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties} +import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType} import org.apache.hudi.common.table.timeline.HoodieActiveTimeline -import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils} import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, BOOTSTRAP_INDEX_CLASS_PROP} import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig} @@ -158,13 +161,17 @@ object HoodieSparkSqlWriter { sparkContext.getConf.registerKryoClasses( Array(classOf[org.apache.avro.generic.GenericData], classOf[org.apache.avro.Schema])) - val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) + var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) + val reconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean + if (reconcileSchema) { + schema = getLatestTableSchema(fs, basePath, sparkContext, schema) + } sparkContext.getConf.registerAvroSchemas(schema) log.info(s"Registered avro schema : ${schema.toString(true)}") // Convert to RDD[HoodieRecord] - val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, schema, structName, nameSpace) - + val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema, + org.apache.hudi.common.util.Option.of(schema)) val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean || operation.equals(WriteOperationType.UPSERT) || parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key(), @@ -212,7 +219,8 @@ object HoodieSparkSqlWriter { classOf[org.apache.avro.Schema])) // Convert to RDD[HoodieKey] - val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace) + val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, + parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean) val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD() if (!tableExists) { @@ -249,6 +257,25 @@ object HoodieSparkSqlWriter { } } + /** + * Checks if schema needs upgrade (if incoming record's write schema is old while table schema got evolved). + * + * @param fs instance of FileSystem. + * @param basePath base path. + * @param sparkContext instance of spark context. + * @param schema incoming record's schema. + * @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required. + */ + def getLatestTableSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext, schema: Schema): Schema = { + var latestSchema: Schema = schema + if (FSUtils.isTableExists(basePath.toString, fs)) { + val tableMetaClient = HoodieTableMetaClient.builder.setConf(sparkContext.hadoopConfiguration).setBasePath(basePath.toString).build() + val tableSchemaResolver = new TableSchemaResolver(tableMetaClient) + latestSchema = tableSchemaResolver.getLatestSchema(schema, false, null); + } + latestSchema + } + def bootstrap(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index 0fd299167..5128ab13f 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -17,19 +17,16 @@ package org.apache.hudi -import java.util.Properties - -import scala.collection.JavaConverters._ import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.common.config.HoodieMetadataConfig.{METADATA_ENABLE_PROP, METADATA_VALIDATE_PROP} import org.apache.hudi.common.config.{HoodieConfig, TypedProperties} - -import scala.collection.JavaConversions.mapAsJavaMap -import scala.collection.JavaConverters.mapAsScalaMapConverter -import org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP -import org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator} +import java.util.Properties +import scala.collection.JavaConversions.mapAsJavaMap +import scala.collection.JavaConverters.{mapAsScalaMapConverter, _} + /** * WriterUtils to assist in write path in Datasource and tests. */ @@ -78,7 +75,8 @@ object HoodieWriterUtils { ASYNC_COMPACT_ENABLE.key -> ASYNC_COMPACT_ENABLE.defaultValue, INLINE_CLUSTERING_ENABLE.key -> INLINE_CLUSTERING_ENABLE.defaultValue, ASYNC_CLUSTERING_ENABLE.key -> ASYNC_CLUSTERING_ENABLE.defaultValue, - ENABLE_ROW_WRITER.key -> ENABLE_ROW_WRITER.defaultValue + ENABLE_ROW_WRITER.key -> ENABLE_ROW_WRITER.defaultValue, + RECONCILE_SCHEMA.key -> RECONCILE_SCHEMA.defaultValue.toString ) ++ DataSourceOptionsHelper.translateConfigurations(parameters) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala index bee776aee..8ec0583ef 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala @@ -23,7 +23,7 @@ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.common.config.HoodieConfig import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord, HoodieRecordPayload} -import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig} import org.apache.hudi.exception.HoodieException @@ -48,7 +48,6 @@ import java.util.{Collections, Date, UUID} import scala.collection.JavaConversions._ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { - var spark: SparkSession = _ var sc: SparkContext = _ var sqlContext: SQLContext = _ @@ -82,15 +81,11 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { } } - test("throw hoodie exception when there already exist a table with different name with Append Save mode") { - initSparkContext("test_append_mode") val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") try { - val hoodieFooTableName = "hoodie_foo_tbl" - //create a new table val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, HoodieWriteConfig.TABLE_NAME.key -> hoodieFooTableName, @@ -149,7 +144,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { }) def testBulkInsertWithSortMode(sortMode: BulkInsertSortMode, path: java.nio.file.Path, populateMetaFields : Boolean = true) : Unit = { - val hoodieFooTableName = "hoodie_foo_tbl" //create a new table val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, @@ -193,17 +187,12 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2)) if (!populateMetaFields) { - assertEquals(0, actualDf.select(HoodieRecord.HOODIE_META_COLUMNS.get(0)).filter(entry => !(entry.mkString(",").equals(""))).count()) - assertEquals(0, actualDf.select(HoodieRecord.HOODIE_META_COLUMNS.get(1)).filter(entry => !(entry.mkString(",").equals(""))).count()) - assertEquals(0, actualDf.select(HoodieRecord.HOODIE_META_COLUMNS.get(2)).filter(entry => !(entry.mkString(",").equals(""))).count()) - assertEquals(0, actualDf.select(HoodieRecord.HOODIE_META_COLUMNS.get(3)).filter(entry => !(entry.mkString(",").equals(""))).count()) - assertEquals(0, actualDf.select(HoodieRecord.HOODIE_META_COLUMNS.get(4)).filter(entry => !(entry.mkString(",").equals(""))).count()) + List(0, 1, 2, 3, 4).foreach(i => assertEquals(0, actualDf.select(HoodieRecord.HOODIE_META_COLUMNS.get(i)).filter(entry => !(entry.mkString(",").equals(""))).count())) } // remove metadata columns so that expected and actual DFs can be compared as is val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) .drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3)) .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4)) - assert(df.except(trimmedDf).count() == 0) } @@ -405,7 +394,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") try { val hoodieFooTableName = "hoodie_foo_tbl" - //create a new table val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, HoodieWriteConfig.TABLE_NAME.key -> hoodieFooTableName, HoodieWriteConfig.BASE_FILE_FORMAT.key -> baseFileFormat, @@ -433,7 +421,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { hoodieFooTableName, mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]) - // write to Hudi HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df, Option.empty, Option(client)) // Verify that asynchronous compaction is not scheduled @@ -479,12 +466,9 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { val srcPath = java.nio.file.Files.createTempDirectory("hoodie_bootstrap_source_path") try { - val hoodieFooTableName = "hoodie_foo_tbl" - val sourceDF = TestBootstrap.generateTestRawTripDataset(Instant.now.toEpochMilli, 0, 100, Collections.emptyList(), sc, spark.sqlContext) - // Write source data non-partitioned sourceDF.write .format("parquet") @@ -533,7 +517,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { val path = java.nio.file.Files.createTempDirectory("hoodie_test_path_schema_evol") try { val hoodieFooTableName = "hoodie_foo_tbl_schema_evolution_" + tableType - //create a new table val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, HoodieWriteConfig.TABLE_NAME.key -> hoodieFooTableName, "hoodie.insert.shuffle.parallelism" -> "1", @@ -541,7 +524,9 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { DataSourceWriteOptions.TABLE_TYPE.key -> tableType, DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", - DataSourceWriteOptions.KEYGENERATOR_CLASS.key -> "org.apache.hudi.keygen.SimpleKeyGenerator") + DataSourceWriteOptions.KEYGENERATOR_CLASS.key -> "org.apache.hudi.keygen.SimpleKeyGenerator", + DataSourceWriteOptions.RECONCILE_SCHEMA.key -> "true" + ) val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) // generate the inserts @@ -550,7 +535,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { var records = DataSourceTestUtils.generateRandomRows(10) var recordsSeq = convertRowListToSeq(records) var df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType) - // write to Hudi HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableParams, df1) val snapshotDF1 = spark.read.format("org.apache.hudi") @@ -565,10 +549,9 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { assert(df1.except(trimmedDf1).count() == 0) // issue updates so that log files are created for MOR table - var updates = DataSourceTestUtils.generateUpdates(records, 5); - var updatesSeq = convertRowListToSeq(updates) - var updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), structType) - // write updates to Hudi + val updates = DataSourceTestUtils.generateUpdates(records, 5); + val updatesSeq = convertRowListToSeq(updates) + val updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), structType) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, updatesDf) val snapshotDF2 = spark.read.format("org.apache.hudi") @@ -584,11 +567,11 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { assert(updatesDf.intersect(trimmedDf2).except(updatesDf).count() == 0) // getting new schema with new column - schema = DataSourceTestUtils.getStructTypeExampleEvolvedSchema - structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) + val evolSchema = DataSourceTestUtils.getStructTypeExampleEvolvedSchema + val evolStructType = AvroConversionUtils.convertAvroSchemaToStructType(evolSchema) records = DataSourceTestUtils.generateRandomRowsEvolvedSchema(5) recordsSeq = convertRowListToSeq(records) - val df3 = spark.createDataFrame(sc.parallelize(recordsSeq), structType) + val df3 = spark.createDataFrame(sc.parallelize(recordsSeq), evolStructType) // write to Hudi with new column HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df3) @@ -604,6 +587,25 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { // ensure 2nd batch of updates matches. assert(df3.intersect(trimmedDf3).except(df3).count() == 0) + // ingest new batch with old schema. + records = DataSourceTestUtils.generateRandomRows(10) + recordsSeq = convertRowListToSeq(records) + val df4 = spark.createDataFrame(sc.parallelize(recordsSeq), structType) + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df4) + + val snapshotDF4 = spark.read.format("org.apache.hudi") + .load(path.toAbsolutePath.toString + "/*/*/*/*") + assertEquals(25, snapshotDF4.count()) + + val tableMetaClient = HoodieTableMetaClient.builder() + .setConf(spark.sparkContext.hadoopConfiguration) + .setBasePath(path.toAbsolutePath.toString) + .build() + val actualSchema = new TableSchemaResolver(tableMetaClient).getTableAvroSchemaWithoutMetadataFields + assertTrue(actualSchema != null) + val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(hoodieFooTableName) + val expectedSchema = AvroConversionUtils.convertStructTypeToAvroSchema(evolStructType, structName, nameSpace) + assertEquals(expectedSchema, actualSchema) } finally { spark.stop() FileUtils.deleteDirectory(path.toFile) @@ -613,8 +615,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { test("Test build sync config for spark sql") { initSparkContext("test build sync config") - val schema = DataSourceTestUtils.getStructTypeExampleSchema - val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) val basePath = "/tmp/hoodie_test" val params = Map( "path" -> basePath, @@ -641,7 +641,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { test("Test build sync config for skip Ro Suffix vals") { initSparkContext("test build sync config for skip Ro suffix vals") - val schema = DataSourceTestUtils.getStructTypeExampleSchema val basePath = "/tmp/hoodie_test" val params = Map( "path" -> basePath, @@ -650,7 +649,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { ) val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params) val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters) - val buildSyncConfigMethod = HoodieSparkSqlWriter.getClass.getDeclaredMethod("buildSyncConfig", classOf[Path], classOf[HoodieConfig], classOf[SQLConf]) @@ -712,15 +710,14 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { .mode(SaveMode.Append).save(basePath) val currentCommits = spark.read.format("hudi").load(basePath).select("_hoodie_commit_time").take(1).map(_.getString(0)) - val incrementalKeyIdNum = spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + val incrementalKeyIdNum = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "0000") .option(DataSourceReadOptions.END_INSTANTTIME.key, currentCommits(0)) .load(basePath).select("keyid").orderBy("keyid").count assert(incrementalKeyIdNum == 1000) - // add bootstap test df.write.mode(SaveMode.Overwrite).save(baseBootStrapPath) - // boostrap table spark.emptyDataFrame.write.format("hudi") .options(options) .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP.key, baseBootStrapPath) @@ -736,7 +733,8 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { .mode(SaveMode.Append).save(basePath) val currentCommitsBootstrap = spark.read.format("hudi").load(basePath).select("_hoodie_commit_time").take(1).map(_.getString(0)) - val incrementalKeyIdNumBootstrap = spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + val incrementalKeyIdNumBootstrap = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "0000") .option(DataSourceReadOptions.END_INSTANTTIME.key, currentCommitsBootstrap(0)) .load(basePath).select("keyid").orderBy("keyid").count diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala index d5da67626..b86eade9b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala @@ -18,16 +18,23 @@ package org.apache.hudi +import org.apache.avro.generic.GenericRecord + import java.io.File import java.nio.file.Paths - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.sql.SparkSession -import org.junit.jupiter.api.Assertions.assertEquals +import org.apache.hudi.testutils.DataSourceTestUtils +import org.apache.spark.sql.avro.IncompatibleSchemaException +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.{Row, SparkSession} +import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertNull, assertTrue, fail} import org.junit.jupiter.api.Test import org.junit.jupiter.api.io.TempDir +import java.util +import scala.collection.JavaConverters + class TestHoodieSparkUtils { @Test @@ -103,4 +110,124 @@ class TestHoodieSparkUtils { assertEquals(files.sortWith(_.toString < _.toString), indexedFilePaths.sortWith(_.toString < _.toString)) spark.stop() } + + @Test + def testCreateRddSchemaEvol(): Unit = { + val spark = SparkSession.builder + .appName("Hoodie Datasource test") + .master("local[2]") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .getOrCreate + + val schema = DataSourceTestUtils.getStructTypeExampleSchema + val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) + var records = DataSourceTestUtils.generateRandomRows(5) + var recordsSeq = convertRowListToSeq(records) + val df1 = spark.createDataFrame(spark.sparkContext.parallelize(recordsSeq), structType) + + var genRecRDD = HoodieSparkUtils.createRdd(df1, "test_struct_name", "test_namespace", true, + org.apache.hudi.common.util.Option.of(schema)) + genRecRDD.collect() + + val evolSchema = DataSourceTestUtils.getStructTypeExampleEvolvedSchema + records = DataSourceTestUtils.generateRandomRowsEvolvedSchema(5) + recordsSeq = convertRowListToSeq(records) + + genRecRDD = HoodieSparkUtils.createRdd(df1, "test_struct_name", "test_namespace", true, + org.apache.hudi.common.util.Option.of(evolSchema)) + genRecRDD.collect() + + // pass in evolved schema but with records serialized with old schema. should be able to convert with out any exception. + // Before https://github.com/apache/hudi/pull/2927, this will throw exception. + genRecRDD = HoodieSparkUtils.createRdd(df1, "test_struct_name", "test_namespace", true, + org.apache.hudi.common.util.Option.of(evolSchema)) + val genRecs = genRecRDD.collect() + // if this succeeds w/o throwing any exception, test succeeded. + assertEquals(genRecs.size, 5) + spark.stop() + } + + @Test + def testCreateRddWithNestedSchemas(): Unit = { + val spark = SparkSession.builder + .appName("Hoodie Datasource test") + .master("local[2]") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .getOrCreate + + val innerStruct1 = new StructType().add("innerKey","string",false).add("innerValue", "long", true) + val structType1 = new StructType().add("key", "string", false) + .add("nonNullableInnerStruct",innerStruct1,false).add("nullableInnerStruct",innerStruct1,true) + val schema1 = AvroConversionUtils.convertStructTypeToAvroSchema(structType1, "test_struct_name", "test_namespace") + val records1 = Seq(Row("key1", Row("innerKey1_1", 1L), Row("innerKey1_2", 2L))) + + val df1 = spark.createDataFrame(spark.sparkContext.parallelize(records1), structType1) + val genRecRDD1 = HoodieSparkUtils.createRdd(df1, "test_struct_name", "test_namespace", true, + org.apache.hudi.common.util.Option.of(schema1)) + assert(schema1.equals(genRecRDD1.collect()(0).getSchema)) + + // create schema2 which has one addition column at the root level compared to schema1 + val structType2 = new StructType().add("key", "string", false) + .add("nonNullableInnerStruct",innerStruct1,false).add("nullableInnerStruct",innerStruct1,true) + .add("nullableInnerStruct2",innerStruct1,true) + val schema2 = AvroConversionUtils.convertStructTypeToAvroSchema(structType2, "test_struct_name", "test_namespace") + val records2 = Seq(Row("key2", Row("innerKey2_1", 2L), Row("innerKey2_2", 2L), Row("innerKey2_3", 2L))) + val df2 = spark.createDataFrame(spark.sparkContext.parallelize(records2), structType2) + val genRecRDD2 = HoodieSparkUtils.createRdd(df2, "test_struct_name", "test_namespace", true, + org.apache.hudi.common.util.Option.of(schema2)) + assert(schema2.equals(genRecRDD2.collect()(0).getSchema)) + + // send records1 with schema2. should succeed since the new column is nullable. + val genRecRDD3 = HoodieSparkUtils.createRdd(df1, "test_struct_name", "test_namespace", true, + org.apache.hudi.common.util.Option.of(schema2)) + assert(genRecRDD3.collect()(0).getSchema.equals(schema2)) + genRecRDD3.foreach(entry => assertNull(entry.get("nonNullableInnerStruct2"))) + + val innerStruct3 = new StructType().add("innerKey","string",false).add("innerValue", "long", true) + .add("new_nested_col","string",true) + + // create a schema which has one additional nested column compared to schema1, which is nullable + val structType4 = new StructType().add("key", "string", false) + .add("nonNullableInnerStruct",innerStruct1,false).add("nullableInnerStruct",innerStruct3,true) + + val schema4 = AvroConversionUtils.convertStructTypeToAvroSchema(structType4, "test_struct_name", "test_namespace") + val records4 = Seq(Row("key2", Row("innerKey2_1", 2L), Row("innerKey2_2", 2L, "new_nested_col_val1"))) + val df4 = spark.createDataFrame(spark.sparkContext.parallelize(records4), structType4) + val genRecRDD4 = HoodieSparkUtils.createRdd(df4, "test_struct_name", "test_namespace", true, + org.apache.hudi.common.util.Option.of(schema4)) + assert(schema4.equals(genRecRDD4.collect()(0).getSchema)) + + // convert batch 1 with schema4. should succeed. + val genRecRDD5 = HoodieSparkUtils.createRdd(df1, "test_struct_name", "test_namespace", true, + org.apache.hudi.common.util.Option.of(schema4)) + assert(schema4.equals(genRecRDD4.collect()(0).getSchema)) + val genRec = genRecRDD5.collect()(0) + val nestedRec : GenericRecord = genRec.get("nullableInnerStruct").asInstanceOf[GenericRecord] + assertNull(nestedRec.get("new_nested_col")) + assertNotNull(nestedRec.get("innerKey")) + assertNotNull(nestedRec.get("innerValue")) + + val innerStruct4 = new StructType().add("innerKey","string",false).add("innerValue", "long", true) + .add("new_nested_col","string",false) + // create a schema which has one additional nested column compared to schema1, which is non nullable + val structType6 = new StructType().add("key", "string", false) + .add("nonNullableInnerStruct",innerStruct1,false).add("nullableInnerStruct",innerStruct4,true) + + val schema6 = AvroConversionUtils.convertStructTypeToAvroSchema(structType6, "test_struct_name", "test_namespace") + // convert batch 1 with schema5. should fail since the missed out column is not nullable. + try { + val genRecRDD6 = HoodieSparkUtils.createRdd(df1, "test_struct_name", "test_namespace", true, + org.apache.hudi.common.util.Option.of(schema6)) + genRecRDD6.collect() + fail("createRdd should fail, because records don't have a column which is not nullable in the passed in schema") + } catch { + case e: Exception => + e.getCause.asInstanceOf[NullPointerException] + assertTrue(e.getMessage.contains("null of string in field new_nested_col of")) + } + spark.stop() + } + + def convertRowListToSeq(inputList: util.List[Row]): Seq[Row] = + JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index c10bb8b04..9c182b408 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -17,19 +17,15 @@ package org.apache.hudi.functional -import java.sql.{Date, Timestamp} -import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ import org.apache.hudi.common.config.HoodieMetadataConfig -import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.timeline.HoodieInstant +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.testutils.HoodieTestDataGenerator -import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings -import org.apache.hudi.common.testutils.RawTripTestPayload.deleteRecordsToStrings +import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, recordsToStrings} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieUpsertException -import org.apache.hudi.keygen._ import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config +import org.apache.hudi.keygen._ import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} import org.apache.spark.sql._ @@ -42,6 +38,10 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{CsvSource, ValueSource} +import java.sql.{Date, Timestamp} +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + /** * Basic tests on the spark datasource for COW table. @@ -532,10 +532,8 @@ class TestCOWDataSource extends HoodieClientTestBase { var writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName) writer.partitionBy("current_ts") .save(basePath) - var recordsReadDF = spark.read.format("org.apache.hudi") .load(basePath + "/*/*") - assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("current_ts").cast("string")).count() == 0) // Specify fieldType as TIMESTAMP @@ -544,10 +542,8 @@ class TestCOWDataSource extends HoodieClientTestBase { .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS") .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd") .save(basePath) - recordsReadDF = spark.read.format("org.apache.hudi") .load(basePath + "/*/*") - val udf_date_format = udf((data: Long) => new DateTime(data).toString(DateTimeFormat.forPattern("yyyyMMdd"))) assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= udf_date_format(col("current_ts"))).count() == 0) @@ -557,7 +553,6 @@ class TestCOWDataSource extends HoodieClientTestBase { .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS") .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd") .save(basePath) - recordsReadDF = spark.read.format("org.apache.hudi") .load(basePath + "/*/*/*") assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= @@ -582,20 +577,16 @@ class TestCOWDataSource extends HoodieClientTestBase { var writer = getDataFrameWriter(classOf[SimpleKeyGenerator].getName) writer.partitionBy("driver") .save(basePath) - var recordsReadDF = spark.read.format("org.apache.hudi") .load(basePath + "/*/*") - assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("driver")).count() == 0) // Use the `driver,rider` field as the partition key, If no such field exists, the default value `default` is used writer = getDataFrameWriter(classOf[SimpleKeyGenerator].getName) writer.partitionBy("driver", "rider") .save(basePath) - recordsReadDF = spark.read.format("org.apache.hudi") .load(basePath + "/*/*") - assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("default")).count() == 0) } @@ -604,20 +595,16 @@ class TestCOWDataSource extends HoodieClientTestBase { var writer = getDataFrameWriter(classOf[ComplexKeyGenerator].getName) writer.partitionBy("driver") .save(basePath) - var recordsReadDF = spark.read.format("org.apache.hudi") .load(basePath + "/*/*") - assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("driver")).count() == 0) // Use the `driver`,`rider` field as the partition key writer = getDataFrameWriter(classOf[ComplexKeyGenerator].getName) writer.partitionBy("driver", "rider") .save(basePath) - recordsReadDF = spark.read.format("org.apache.hudi") .load(basePath + "/*/*") - assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= concat(col("driver"), lit("/"), col("rider"))).count() == 0) } @@ -649,7 +636,6 @@ class TestCOWDataSource extends HoodieClientTestBase { var writer = getDataFrameWriter(classOf[NonpartitionedKeyGenerator].getName) writer.partitionBy("") .save(basePath) - var recordsReadDF = spark.read.format("org.apache.hudi") .load(basePath + "/*") assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("")).count() == 0) @@ -658,7 +644,6 @@ class TestCOWDataSource extends HoodieClientTestBase { writer = getDataFrameWriter(classOf[NonpartitionedKeyGenerator].getName) writer.partitionBy("abc") .save(basePath) - recordsReadDF = spark.read.format("org.apache.hudi") .load(basePath + "/*") assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("")).count() == 0) @@ -717,9 +702,10 @@ class TestCOWDataSource extends HoodieClientTestBase { @Test def testSchemaEvolution(): Unit = { // open the schema validate - val opts = commonOpts ++ Map("hoodie.avro.schema.validate" -> "true") + val opts = commonOpts ++ Map("hoodie.avro.schema.validate" -> "true") ++ + Map(DataSourceWriteOptions.RECONCILE_SCHEMA.key() -> "true") // 1. write records with schema1 - val schema1 = StructType(StructField("_row_key", StringType, true) :: StructField("name", StringType, true):: + val schema1 = StructType(StructField("_row_key", StringType, true) :: StructField("name", StringType, false):: StructField("timestamp", IntegerType, true) :: StructField("partition", IntegerType, true)::Nil) val records1 = Seq(Row("1", "Andy", 1, 1), Row("2", "lisi", 1, 1), @@ -732,10 +718,9 @@ class TestCOWDataSource extends HoodieClientTestBase { .save(basePath) // 2. write records with schema2 add column age - val schema2 = StructType(StructField("_row_key", StringType, true) :: StructField("name", StringType, true) :: + val schema2 = StructType(StructField("_row_key", StringType, true) :: StructField("name", StringType, false) :: StructField("age", StringType, true) :: StructField("timestamp", IntegerType, true) :: StructField("partition", IntegerType, true)::Nil) - val records2 = Seq(Row("11", "Andy", "10", 1, 1), Row("22", "lisi", "11",1, 1), Row("33", "zhangsan", "12", 1, 1)) @@ -745,24 +730,25 @@ class TestCOWDataSource extends HoodieClientTestBase { .options(opts) .mode(SaveMode.Append) .save(basePath) - val recordsReadDF = spark.read.format("org.apache.hudi") .load(basePath + "/*/*") - val resultSchema = new StructType(recordsReadDF.schema.filter(p=> !p.name.startsWith("_hoodie")).toArray) - assertEquals(resultSchema, schema2) + val tableMetaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath).build() + val actualSchema = new TableSchemaResolver(tableMetaClient).getTableAvroSchemaWithoutMetadataFields + assertTrue(actualSchema != null) + val actualStructType = AvroConversionUtils.convertAvroSchemaToStructType(actualSchema) + assertEquals(actualStructType, schema2) - // 3. write records with schema3 delete column name + // 3. write records with schema4 by omitting a non nullable column(name). should fail try { - val schema3 = StructType(StructField("_row_key", StringType, true) :: + val schema4 = StructType(StructField("_row_key", StringType, true) :: StructField("age", StringType, true) :: StructField("timestamp", IntegerType, true) :: StructField("partition", IntegerType, true)::Nil) - - val records3 = Seq(Row("11", "10", 1, 1), + val records4 = Seq(Row("11", "10", 1, 1), Row("22", "11",1, 1), Row("33", "12", 1, 1)) - val rdd3 = jsc.parallelize(records3) - val recordsDF3 = spark.createDataFrame(rdd3, schema3) - recordsDF3.write.format("org.apache.hudi") + val rdd4 = jsc.parallelize(records4) + val recordsDF4 = spark.createDataFrame(rdd4, schema4) + recordsDF4.write.format("org.apache.hudi") .options(opts) .mode(SaveMode.Append) .save(basePath) @@ -777,19 +763,15 @@ class TestCOWDataSource extends HoodieClientTestBase { val opts = commonOpts ++ Map("hoodie.avro.schema.validate" -> "true") val schema1 = StructType(StructField("_row_key", StringType, true) :: StructField("name", StringType, true):: StructField("timestamp", IntegerType, true):: StructField("age", StringType, true) :: StructField("partition", IntegerType, true)::Nil) - val records = Array("{\"_row_key\":\"1\",\"name\":\"lisi\",\"timestamp\":1,\"partition\":1}", "{\"_row_key\":\"1\",\"name\":\"lisi\",\"timestamp\":1,\"partition\":1}") - val inputDF = spark.read.schema(schema1.toDDL).json(spark.sparkContext.parallelize(records, 2)) - inputDF.write.format("org.apache.hudi") .options(opts) .mode(SaveMode.Append) .save(basePath) val recordsReadDF = spark.read.format("org.apache.hudi") .load(basePath + "/*/*") - val resultSchema = new StructType(recordsReadDF.schema.filter(p=> !p.name.startsWith("_hoodie")).toArray) assertEquals(resultSchema, schema1) } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 79a495e55..3771a7d34 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -24,7 +24,11 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.DFSPropertiesConfiguration; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.util.Functions.Function1; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; @@ -38,12 +42,12 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; import org.apache.hudi.utilities.schema.DelegatingSchemaProvider; +import org.apache.hudi.utilities.schema.RowBasedSchemaProvider; import org.apache.hudi.utilities.schema.SchemaPostProcessor; import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor; import org.apache.hudi.utilities.schema.SparkAvroPostProcessor; -import org.apache.hudi.utilities.schema.RowBasedSchemaProvider; import org.apache.hudi.utilities.sources.Source; import org.apache.hudi.utilities.transform.ChainedTransformer; import org.apache.hudi.utilities.transform.Transformer; @@ -425,11 +429,53 @@ public class UtilHelpers { } public static SchemaProvider createRowBasedSchemaProvider(StructType structType, - TypedProperties cfg, JavaSparkContext jssc) { + TypedProperties cfg, JavaSparkContext jssc) { SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType); return wrapSchemaProviderWithPostProcessor(rowSchemaProvider, cfg, jssc, null); } + /** + * Create latest schema provider for Target schema. + * + * @param structType spark data type of incoming batch. + * @param jssc instance of {@link JavaSparkContext}. + * @param fs instance of {@link FileSystem}. + * @param basePath base path of the table. + * @return the schema provider where target schema refers to latest schema(either incoming schema or table schema). + */ + public static SchemaProvider createLatestSchemaProvider(StructType structType, + JavaSparkContext jssc, FileSystem fs, String basePath) { + SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType); + Schema writeSchema = rowSchemaProvider.getTargetSchema(); + Schema latestTableSchema = writeSchema; + + try { + if (FSUtils.isTableExists(basePath, fs)) { + HoodieTableMetaClient tableMetaClient = HoodieTableMetaClient.builder().setConf(jssc.sc().hadoopConfiguration()).setBasePath(basePath).build(); + TableSchemaResolver + tableSchemaResolver = new TableSchemaResolver(tableMetaClient); + latestTableSchema = tableSchemaResolver.getLatestSchema(writeSchema, true, (Function1) v1 -> AvroConversionUtils.convertStructTypeToAvroSchema( + AvroConversionUtils.convertAvroSchemaToStructType(v1), RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME, + RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE)); + } + } catch (IOException e) { + LOG.warn("Could not fetch table schema. Falling back to writer schema"); + } + + final Schema finalLatestTableSchema = latestTableSchema; + return new SchemaProvider(new TypedProperties()) { + @Override + public Schema getSourceSchema() { + return rowSchemaProvider.getSourceSchema(); + } + + @Override + public Schema getTargetSchema() { + return finalLatestTableSchema; + } + }; + } + @FunctionalInterface public interface CheckedSupplier { T get() throws Throwable; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index fead1b364..9cef947b9 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities.deltastreamer; import org.apache.hudi.DataSourceUtils; +import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.HoodieWriterUtils; import org.apache.hudi.avro.HoodieAvroUtils; @@ -48,6 +49,7 @@ import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieDeltaStreamerException; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HiveSyncTool; @@ -55,10 +57,9 @@ import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; import org.apache.hudi.sync.common.AbstractSyncTool; import org.apache.hudi.utilities.UtilHelpers; -import org.apache.hudi.exception.HoodieDeltaStreamerException; -import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Config; import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback; import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackConfig; +import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Config; import org.apache.hudi.utilities.schema.DelegatingSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.schema.SchemaSet; @@ -86,26 +87,26 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.function.Function; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Properties; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; import scala.collection.JavaConversions; import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP; import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE; -import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY; -import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY; import static org.apache.hudi.config.HoodieClusteringConfig.INLINE_CLUSTERING_PROP; import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_PROP; import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP; import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT_PROP; import static org.apache.hudi.config.HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP; +import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY; +import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY; import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE; import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME; @@ -365,7 +366,7 @@ public class DeltaSync implements Serializable { final Option> avroRDDOptional; final String checkpointStr; - final SchemaProvider schemaProvider; + SchemaProvider schemaProvider; if (transformer.isPresent()) { // Transformation is needed. Fetch New rows in Row Format, apply transformation and then convert them // to generic records for writing @@ -374,28 +375,40 @@ public class DeltaSync implements Serializable { Option> transformed = dataAndCheckpoint.getBatch().map(data -> transformer.get().apply(jssc, sparkSession, data, props)); + checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch(); + boolean reconcileSchema = props.getBoolean(DataSourceWriteOptions.RECONCILE_SCHEMA().key()); if (this.userProvidedSchemaProvider != null && this.userProvidedSchemaProvider.getTargetSchema() != null) { // If the target schema is specified through Avro schema, // pass in the schema for the Row-to-Avro conversion // to avoid nullability mismatch between Avro schema and Row schema avroRDDOptional = transformed .map(t -> HoodieSparkUtils.createRdd( - t, this.userProvidedSchemaProvider.getTargetSchema(), - HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()); + t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, reconcileSchema, + Option.of(this.userProvidedSchemaProvider.getTargetSchema()) + ).toJavaRDD()); schemaProvider = this.userProvidedSchemaProvider; } else { // Use Transformed Row's schema if not overridden. If target schema is not specified // default to RowBasedSchemaProvider schemaProvider = transformed - .map(r -> (SchemaProvider) new DelegatingSchemaProvider(props, jssc, - dataAndCheckpoint.getSchemaProvider(), - UtilHelpers.createRowBasedSchemaProvider(r.schema(), props, jssc))) + .map(r -> { + // determine the targetSchemaProvider. use latestTableSchema if reconcileSchema is enabled. + SchemaProvider targetSchemaProvider = null; + if (reconcileSchema) { + targetSchemaProvider = UtilHelpers.createLatestSchemaProvider(r.schema(), jssc, fs, cfg.targetBasePath); + } else { + targetSchemaProvider = UtilHelpers.createRowBasedSchemaProvider(r.schema(), props, jssc); + } + return (SchemaProvider) new DelegatingSchemaProvider(props, jssc, + dataAndCheckpoint.getSchemaProvider(), targetSchemaProvider); }) .orElse(dataAndCheckpoint.getSchemaProvider()); avroRDDOptional = transformed .map(t -> HoodieSparkUtils.createRdd( - t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()); + t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, reconcileSchema, + Option.ofNullable(schemaProvider.getTargetSchema()) + ).toJavaRDD()); } } else { // Pull the data from the source & prepare the write diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 0d4259b1a..58225b37e 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -18,6 +18,7 @@ package org.apache.hudi.utilities.deltastreamer; +import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.async.AsyncClusteringService; import org.apache.hudi.async.AsyncCompactService; import org.apache.hudi.async.HoodieAsyncService; @@ -120,13 +121,13 @@ public class HoodieDeltaStreamer implements Serializable { Option props) throws IOException { // Resolving the properties first in a consistent way if (props.isPresent()) { - this.properties = props.get(); + this.properties = setDefaults(props.get()); } else if (cfg.propsFilePath.equals(Config.DEFAULT_DFS_SOURCE_PROPERTIES)) { - this.properties = UtilHelpers.getConfig(cfg.configs).getConfig(); + this.properties = setDefaults(UtilHelpers.getConfig(cfg.configs).getConfig()); } else { - this.properties = UtilHelpers.readConfig( + this.properties = setDefaults(UtilHelpers.readConfig( FSUtils.getFs(cfg.propsFilePath, jssc.hadoopConfiguration()), - new Path(cfg.propsFilePath), cfg.configs).getConfig(); + new Path(cfg.propsFilePath), cfg.configs).getConfig()); } if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) { @@ -146,6 +147,13 @@ public class HoodieDeltaStreamer implements Serializable { deltaSyncService.ifPresent(ds -> ds.shutdown(false)); } + private TypedProperties setDefaults(TypedProperties props) { + if (!props.containsKey(DataSourceWriteOptions.RECONCILE_SCHEMA().key())) { + props.setProperty(DataSourceWriteOptions.RECONCILE_SCHEMA().key(), DataSourceWriteOptions.RECONCILE_SCHEMA().defaultValue().toString()); + } + return props; + } + /** * Main method to start syncing. * diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java index e9367707e..1260acb1c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java @@ -74,9 +74,10 @@ public final class SourceFormatAdapter { // If the source schema is specified through Avro schema, // pass in the schema for the Row-to-Avro conversion // to avoid nullability mismatch between Avro schema and Row schema - ? HoodieSparkUtils.createRdd(rdd, r.getSchemaProvider().getSourceSchema(), - HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD() : HoodieSparkUtils.createRdd(rdd, - HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD(); + ? HoodieSparkUtils.createRdd(rdd, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, true, + org.apache.hudi.common.util.Option.ofNullable(r.getSchemaProvider().getSourceSchema()) + ).toJavaRDD() : HoodieSparkUtils.createRdd(rdd, + HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, false, Option.empty()).toJavaRDD(); }) .orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider()); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java index 9f71a7f11..34549885c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java @@ -40,8 +40,8 @@ public class SparkAvroPostProcessor extends SchemaPostProcessor { @Override public Schema processSchema(Schema schema) { - return AvroConversionUtils.convertStructTypeToAvroSchema( + return schema != null ? AvroConversionUtils.convertStructTypeToAvroSchema( AvroConversionUtils.convertAvroSchemaToStructType(schema), RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME, - RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE); + RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE) : null; } } \ No newline at end of file diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 0d721f5fe..4ec1f99b0 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -18,10 +18,8 @@ package org.apache.hudi.utilities.functional; -import java.sql.Connection; -import java.sql.DriverManager; -import java.util.ConcurrentModificationException; -import java.util.concurrent.ExecutorService; +import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.common.config.DFSPropertiesConfiguration; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.TypedProperties; @@ -33,6 +31,7 @@ import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; @@ -49,6 +48,7 @@ import org.apache.hudi.utilities.DummySchemaProvider; import org.apache.hudi.utilities.HoodieClusteringJob; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; +import org.apache.hudi.utilities.schema.SparkAvroPostProcessor; import org.apache.hudi.utilities.sources.CsvDFSSource; import org.apache.hudi.utilities.sources.HoodieIncrSource; import org.apache.hudi.utilities.sources.InputBatch; @@ -63,6 +63,7 @@ import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs; import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer; import org.apache.hudi.utilities.transform.Transformer; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -92,11 +93,15 @@ import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.ConcurrentModificationException; import java.util.List; import java.util.Properties; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -105,6 +110,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY; +import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE; +import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -115,6 +122,7 @@ import static org.junit.jupiter.api.Assertions.fail; /** * Basic tests against {@link HoodieDeltaStreamer}, by issuing bulk_inserts, upserts, inserts. Check counts at the end. */ + public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class); @@ -157,7 +165,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, String clusteringInstantTime, boolean runSchedule, String scheduleAndExecute) { HoodieClusteringJob.Config scheduleClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath, - clusteringInstantTime, runSchedule, scheduleAndExecute); + clusteringInstantTime, runSchedule, scheduleAndExecute); return new HoodieClusteringJob(jsc, scheduleClusteringConfig); } @@ -226,7 +234,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { cfg.payloadClassName = payloadClassName; } if (useSchemaProviderClass) { - cfg.schemaProviderClassName = FilebasedSchemaProvider.class.getName(); + cfg.schemaProviderClassName = defaultSchemaProviderClassName; } return cfg; } @@ -391,6 +399,23 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { return base; } + /** + * args for schema evolution test. + * + * @return + */ + private static Stream schemaEvolArgs() { + return Stream.of( + Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, true), + Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, false), + Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, true), + Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, false), + Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, true), + Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, false), + Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, true), + Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, false)); + } + private static Stream provideValidCliArgs() { HoodieDeltaStreamer.Config base = getBaseConfig(); @@ -425,41 +450,41 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { conf.enableHiveSync = true; conf.configs = Arrays.asList(HOODIE_CONF_VALUE1, HOODIE_CONF_VALUE2); - String[] allConfig = new String[]{TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, SOURCE_LIMIT_PARAM, + String[] allConfig = new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, SOURCE_LIMIT_PARAM, SOURCE_LIMIT_VALUE, TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, BASE_FILE_FORMAT_PARAM, BASE_FILE_FORMAT_VALUE, ENABLE_HIVE_SYNC_PARAM, HOODIE_CONF_PARAM, HOODIE_CONF_VALUE1, HOODIE_CONF_PARAM, HOODIE_CONF_VALUE2}; return Stream.of( - // Base - Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, - TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE}, base), - // String - Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, - TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, - BASE_FILE_FORMAT_PARAM, BASE_FILE_FORMAT_VALUE}, conf1), - // Integer - Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, - TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, - SOURCE_LIMIT_PARAM, SOURCE_LIMIT_VALUE}, conf2), - // Boolean - Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, - TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, - ENABLE_HIVE_SYNC_PARAM}, conf3), - // Array List 1 - Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, - TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, - HOODIE_CONF_PARAM, HOODIE_CONF_VALUE1}, conf4), - // Array List with comma - Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, - TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, - HOODIE_CONF_PARAM, HOODIE_CONF_VALUE2}, conf5), - // Array list with multiple values - Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, - TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, - HOODIE_CONF_PARAM, HOODIE_CONF_VALUE1, HOODIE_CONF_PARAM, HOODIE_CONF_VALUE2}, conf6), - // All - Arguments.of(allConfig, conf) + // Base + Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, + TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE}, base), + // String + Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, + TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, + BASE_FILE_FORMAT_PARAM, BASE_FILE_FORMAT_VALUE}, conf1), + // Integer + Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, + TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, + SOURCE_LIMIT_PARAM, SOURCE_LIMIT_VALUE}, conf2), + // Boolean + Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, + TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, + ENABLE_HIVE_SYNC_PARAM}, conf3), + // Array List 1 + Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, + TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, + HOODIE_CONF_PARAM, HOODIE_CONF_VALUE1}, conf4), + // Array List with comma + Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, + TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, + HOODIE_CONF_PARAM, HOODIE_CONF_VALUE2}, conf5), + // Array list with multiple values + Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, + TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, + HOODIE_CONF_PARAM, HOODIE_CONF_VALUE1, HOODIE_CONF_PARAM, HOODIE_CONF_VALUE2}, conf6), + // All + Arguments.of(allConfig, conf) ); } @@ -494,7 +519,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { @Test public void testPropsWithInvalidKeyGenerator() throws Exception { Exception e = assertThrows(IOException.class, () -> { - String tableBasePath = dfsBasePath + "/test_table"; + String tableBasePath = dfsBasePath + "/test_table_invalid_key_gen"; HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_INVALID, false), jsc); @@ -548,8 +573,8 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { // Perform bootstrap with tableBasePath as source String bootstrapSourcePath = dfsBasePath + "/src_bootstrapped"; Dataset sourceDf = sqlContext.read() - .format("org.apache.hudi") - .load(tableBasePath + "/*/*.parquet"); + .format("org.apache.hudi") + .load(tableBasePath + "/*/*.parquet"); sourceDf.write().format("parquet").save(bootstrapSourcePath); String newDatasetBasePath = dfsBasePath + "/test_dataset_bootstrapped"; @@ -575,6 +600,83 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { assertTrue(fieldNames.containsAll(expectedFieldNames)); } + @ParameterizedTest + @MethodSource("schemaEvolArgs") + public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema, boolean useSchemaPostProcessor) throws Exception { + String tableBasePath = dfsBasePath + "/test_table_schema_evolution" + tableType + "_" + useUserProvidedSchema + "_" + useSchemaPostProcessor; + defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName(); + // Insert data produced with Schema A, pass Schema A + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, Collections.singletonList(TestIdentityTransformer.class.getName()), + PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType); + cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + dfsBasePath + "/source.avsc"); + cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + dfsBasePath + "/source.avsc"); + cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true"); + if (!useSchemaPostProcessor) { + cfg.configs.add(SparkAvroPostProcessor.Config.SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE + "=false"); + } + new HoodieDeltaStreamer(cfg, jsc).sync(); + TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*", sqlContext); + TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1); + + // Upsert data produced with Schema B, pass Schema B + cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, Collections.singletonList(TripsWithEvolvedOptionalFieldTransformer.class.getName()), + PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType); + cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + dfsBasePath + "/source.avsc"); + cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + dfsBasePath + "/source_evolved.avsc"); + cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true"); + if (!useSchemaPostProcessor) { + cfg.configs.add(SparkAvroPostProcessor.Config.SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE + "=false"); + } + new HoodieDeltaStreamer(cfg, jsc).sync(); + // out of 1000 new records, 500 are inserts, 450 are updates and 50 are deletes. + TestHelpers.assertRecordCount(1450, tableBasePath + "/*/*", sqlContext); + TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2); + List counts = TestHelpers.countsPerCommit(tableBasePath + "/*/*", sqlContext); + assertEquals(1450, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); + + sqlContext.read().format("org.apache.hudi").load(tableBasePath + "/*/*").createOrReplaceTempView("tmp_trips"); + long recordCount = + sqlContext.sparkSession().sql("select * from tmp_trips where evoluted_optional_union_field is not NULL").count(); + assertEquals(950, recordCount); + + // Upsert data produced with Schema A, pass Schema B + if (!useUserProvidedSchema) { + defaultSchemaProviderClassName = TestFileBasedSchemaProviderNullTargetSchema.class.getName(); + } + cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, Collections.singletonList(TestIdentityTransformer.class.getName()), + PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType); + cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + dfsBasePath + "/source.avsc"); + if (useUserProvidedSchema) { + cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + dfsBasePath + "/source_evolved.avsc"); + } + if (!useSchemaPostProcessor) { + cfg.configs.add(SparkAvroPostProcessor.Config.SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE + "=false"); + } + cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true"); + new HoodieDeltaStreamer(cfg, jsc).sync(); + // again, 1000 new records, 500 are inserts, 450 are updates and 50 are deletes. + TestHelpers.assertRecordCount(1900, tableBasePath + "/*/*", sqlContext); + TestHelpers.assertCommitMetadata("00002", tableBasePath, dfs, 3); + counts = TestHelpers.countsPerCommit(tableBasePath + "/*/*", sqlContext); + assertEquals(1900, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); + + TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(dfs.getConf()).build()); + Schema tableSchema = tableSchemaResolver.getTableAvroSchemaWithoutMetadataFields(); + assertNotNull(tableSchema); + + Schema expectedSchema = new Schema.Parser().parse(dfs.open(new Path(dfsBasePath + "/source_evolved.avsc"))); + if (!useUserProvidedSchema || useSchemaPostProcessor) { + expectedSchema = AvroConversionUtils.convertStructTypeToAvroSchema( + AvroConversionUtils.convertAvroSchemaToStructType(expectedSchema), HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE); + } + assertEquals(tableSchema, expectedSchema); + + // clean up and reinit + UtilitiesTestBase.Helpers.deleteFileFromDfs(FSUtils.getFs(cfg.targetBasePath, jsc.hadoopConfiguration()), dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE); + writeCommonPropsToFile(); + defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName(); + } + @Test public void testUpsertsCOWContinuousMode() throws Exception { testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow"); @@ -782,8 +884,8 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { } private void runJobsInParallel(String tableBasePath, HoodieTableType tableType, int totalRecords, - HoodieDeltaStreamer ingestionJob, HoodieDeltaStreamer.Config cfgIngestionJob, HoodieDeltaStreamer backfillJob, - HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict) throws Exception { + HoodieDeltaStreamer ingestionJob, HoodieDeltaStreamer.Config cfgIngestionJob, HoodieDeltaStreamer backfillJob, + HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict) throws Exception { ExecutorService service = Executors.newFixedThreadPool(2); HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build(); HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); @@ -1262,7 +1364,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { } private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile, - String propsFileName, String parquetSourceRoot, boolean addCommonProps) throws IOException { + String propsFileName, String parquetSourceRoot, boolean addCommonProps) throws IOException { // Properties used for testing delta-streamer with Parquet source TypedProperties parquetProps = new TypedProperties(); @@ -1271,7 +1373,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { } parquetProps.setProperty("include", "base.properties"); - parquetProps.setProperty("hoodie.embed.timeline.server","false"); + parquetProps.setProperty("hoodie.embed.timeline.server", "false"); parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); if (useSchemaProvider) { @@ -1301,7 +1403,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { TypedProperties props = new TypedProperties(); populateAllCommonProps(props); props.setProperty("include", "base.properties"); - props.setProperty("hoodie.embed.timeline.server","false"); + props.setProperty("hoodie.embed.timeline.server", "false"); props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); props.setProperty("hoodie.deltastreamer.source.dfs.root", JSON_KAFKA_SOURCE_ROOT); @@ -1316,6 +1418,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { /** * Tests Deltastreamer with parquet dfs source and transitions to JsonKafkaSource. + * * @param autoResetToLatest true if auto reset value to be set to LATEST. false to leave it as default(i.e. EARLIEST) * @throws Exception */ @@ -1325,7 +1428,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { int parquetRecords = 10; prepareParquetDFSFiles(parquetRecords, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, true, HoodieTestDataGenerator.TRIP_SCHEMA, HoodieTestDataGenerator.AVRO_TRIP_SCHEMA); - prepareParquetDFSSource(true, false,"source_uber.avsc", "target_uber.avsc", PROPS_FILENAME_TEST_PARQUET, + prepareParquetDFSSource(true, false, "source_uber.avsc", "target_uber.avsc", PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false); // delta streamer w/ parquest source String tableBasePath = dfsBasePath + "/test_dfs_to_kakfa" + testNum; @@ -1388,19 +1491,19 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest", topicName); String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum; HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( - TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), - Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false, - true, 100000, false, null, - null, "timestamp", String.valueOf(System.currentTimeMillis())), jsc); + TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), + Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false, + true, 100000, false, null, + null, "timestamp", String.valueOf(System.currentTimeMillis())), jsc); deltaStreamer.sync(); TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext); prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, false, topicName); deltaStreamer = new HoodieDeltaStreamer( - TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), - Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false, - true, 100000, false, null, null, - "timestamp", String.valueOf(System.currentTimeMillis())), jsc); + TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), + Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false, + true, 100000, false, null, null, + "timestamp", String.valueOf(System.currentTimeMillis())), jsc); deltaStreamer.sync(); TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS * 2, tableBasePath + "/*/*.parquet", sqlContext); } @@ -1717,8 +1820,36 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { @Override public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset rowDataset, - TypedProperties properties) { + TypedProperties properties) { return rowDataset; } } + + /** + * Add new field evoluted_optional_union_field with value of the field rider. + */ + public static class TripsWithEvolvedOptionalFieldTransformer implements Transformer { + + @Override + public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset rowDataset, + TypedProperties properties) { + return rowDataset.withColumn("evoluted_optional_union_field", functions.col("rider")); + } + } + + /** + * {@link FilebasedSchemaProvider} to be used in tests where target schema is null. + */ + public static class TestFileBasedSchemaProviderNullTargetSchema extends FilebasedSchemaProvider { + + public TestFileBasedSchemaProviderNullTargetSchema(TypedProperties props, JavaSparkContext jssc) { + super(props, jssc); + } + + @Override + public Schema getTargetSchema() { + return null; + } + } + } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java index 95f729120..5a1cfc332 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java @@ -22,6 +22,7 @@ import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.hive.MultiPartKeysValueExtractor; +import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import org.apache.avro.Schema; @@ -76,7 +77,7 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase { static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamerBase.class); public static KafkaTestUtils testUtils; protected static String topicName; - + protected static String defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName(); protected static int testNum = 1; @BeforeAll @@ -94,6 +95,7 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase { UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", dfs, dfsBasePath + "/sql-transformer.properties"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source_evolved.avsc", dfs, dfsBasePath + "/source_evolved.avsc"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc", dfs, dfsBasePath + "/source-flattened.avsc"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath + "/target.avsc"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target-flattened.avsc", dfs, dfsBasePath + "/target-flattened.avsc"); @@ -107,22 +109,7 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase { UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/short_trip_uber_config.properties", dfs, dfsBasePath + "/config/short_trip_uber_config.properties"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/clusteringjob.properties", dfs, dfsBasePath + "/clusteringjob.properties"); - TypedProperties props = new TypedProperties(); - props.setProperty("include", "sql-transformer.properties"); - props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName()); - props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); - props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); - props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); - props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); - - // Hive Configs - props.setProperty(DataSourceWriteOptions.HIVE_URL().key(), "jdbc:hive2://127.0.0.1:9999/"); - props.setProperty(DataSourceWriteOptions.HIVE_DATABASE().key(), "testdb1"); - props.setProperty(DataSourceWriteOptions.HIVE_TABLE().key(), "hive_trips"); - props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), "datestr"); - props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS().key(), - MultiPartKeysValueExtractor.class.getName()); - UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE); + writeCommonPropsToFile(); // Properties used for the delta-streamer which incrementally pulls from upstream Hudi source table and writes to // downstream hudi table @@ -162,6 +149,25 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase { prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT); } + protected static void writeCommonPropsToFile() throws IOException { + TypedProperties props = new TypedProperties(); + props.setProperty("include", "sql-transformer.properties"); + props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName()); + props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); + props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); + props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); + props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); + + // Hive Configs + props.setProperty(DataSourceWriteOptions.HIVE_URL().key(), "jdbc:hive2://127.0.0.1:9999/"); + props.setProperty(DataSourceWriteOptions.HIVE_DATABASE().key(), "testdb1"); + props.setProperty(DataSourceWriteOptions.HIVE_TABLE().key(), "hive_trips"); + props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), "datestr"); + props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS().key(), + MultiPartKeysValueExtractor.class.getName()); + UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE); + } + @BeforeEach public void setup() throws Exception { super.setup(); @@ -241,5 +247,4 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase { dataGenerator.generateInserts("000", numRecords)), new Path(path)); } } - } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java index 8a67582c4..8bff47522 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java @@ -243,6 +243,12 @@ public class UtilitiesTestBase { os.close(); } + public static void deleteFileFromDfs(FileSystem fs, String targetPath) throws IOException { + if (fs.exists(new Path(targetPath))) { + fs.delete(new Path(targetPath), true); + } + } + public static void savePropsToDFS(TypedProperties props, FileSystem fs, String targetPath) throws IOException { String[] lines = props.keySet().stream().map(k -> String.format("%s=%s", k, props.get(k))).toArray(String[]::new); saveStringsToDFS(lines, fs, targetPath); diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/source_evolved.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/source_evolved.avsc new file mode 100644 index 000000000..29a5499b7 --- /dev/null +++ b/hudi-utilities/src/test/resources/delta-streamer-config/source_evolved.avsc @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "type" : "record", + "name" : "triprec", + "fields" : [ + { + "name" : "timestamp", + "type" : "long" + }, { + "name" : "_row_key", + "type" : "string" + }, { + "name" : "partition_path", + "type" : "string" + }, { + "name" : "rider", + "type" : "string" + }, { + "name" : "driver", + "type" : "string" + }, { + "name" : "begin_lat", + "type" : "double" + }, { + "name" : "begin_lon", + "type" : "double" + }, { + "name" : "end_lat", + "type" : "double" + }, { + "name" : "end_lon", + "type" : "double" + }, { + "name" : "distance_in_meters", + "type" : "int" + }, { + "name" : "seconds_since_epoch", + "type" : "long" + }, { + "name" : "weight", + "type" : "float" + },{ + "name" : "nation", + "type" : "bytes" + },{ + "name" : "current_date", + "type" : { + "type" : "int", + "logicalType" : "date" + } + },{ + "name" : "current_ts", + "type" : { + "type" : "long" + } + },{ + "name" : "height", + "type" : { + "type" : "fixed", + "name" : "abc", + "size" : 5, + "logicalType" : "decimal", + "precision" : 10, + "scale": 6 + } + }, { + "name" :"city_to_state", + "type" : { + "type" : "map", + "values": "string" + } + }, + { + "name" : "fare", + "type" : { + "type" : "record", + "name" : "fare", + "fields" : [ + { + "name" : "amount", + "type" : "double" + }, + { + "name" : "currency", + "type" : "string" + } + ] + } + }, + { + "name" : "tip_history", + "type" : { + "type" : "array", + "items" : { + "type" : "record", + "name" : "tip_history", + "fields" : [ + { + "name" : "amount", + "type" : "double" + }, + { + "name" : "currency", + "type" : "string" + } + ] + } + } + }, + { + "name" : "_hoodie_is_deleted", + "type" : "boolean", + "default" : false + }, + { + "name": "evoluted_optional_union_field", + "type": [ + "null", + "string" + ], + "default": null + }] +}