1
0

[HUDI-1129] Improving schema evolution support in hudi (#2927)

* Adding support to ingest records with old schema after table's schema is evolved

* Rebasing against latest master

- Trimming test file to be < 800 lines
- Renaming config names

* Addressing feedback

Co-authored-by: Vinoth Chandar <vinoth@apache.org>
This commit is contained in:
Sivabalan Narayanan
2021-08-10 12:15:37 -04:00
committed by GitHub
parent 73d898322b
commit 1196736185
22 changed files with 778 additions and 213 deletions

View File

@@ -357,7 +357,7 @@ object AvroConversionHelper {
val fieldNamesIterator = dataType.asInstanceOf[StructType].fieldNames.iterator val fieldNamesIterator = dataType.asInstanceOf[StructType].fieldNames.iterator
val rowIterator = item.asInstanceOf[Row].toSeq.iterator val rowIterator = item.asInstanceOf[Row].toSeq.iterator
while (convertersIterator.hasNext) { while (convertersIterator.hasNext && rowIterator.hasNext) {
val converter = convertersIterator.next() val converter = convertersIterator.next()
record.put(fieldNamesIterator.next(), converter(rowIterator.next())) record.put(fieldNamesIterator.next(), converter(rowIterator.next()))
} }

View File

@@ -92,22 +92,43 @@ object HoodieSparkUtils extends SparkAdapterSupport {
new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, fileStatusCache) new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, fileStatusCache)
} }
def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = { def createRdd(df: DataFrame, structName: String, recordNamespace: String, reconcileToLatestSchema: Boolean, latestTableSchema:
val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, recordNamespace) org.apache.hudi.common.util.Option[Schema] = org.apache.hudi.common.util.Option.empty()): RDD[GenericRecord] = {
createRdd(df, avroSchema, structName, recordNamespace) val dfWriteSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, recordNamespace)
var writeSchema : Schema = null;
var toReconcileSchema : Schema = null;
if (reconcileToLatestSchema && latestTableSchema.isPresent) {
// if reconcileToLatestSchema is set to true and latestSchema is present, then try to leverage latestTableSchema.
// this code path will handle situations where records are serialized in odl schema, but callers wish to convert
// to Rdd[GenericRecord] using different schema(could be evolved schema or could be latest table schema)
writeSchema = dfWriteSchema
toReconcileSchema = latestTableSchema.get()
} else {
// there are paths where callers wish to use latestTableSchema to convert to Rdd[GenericRecords] and not use
// row's schema. So use latestTableSchema if present. if not available, fallback to using row's schema.
writeSchema = if (latestTableSchema.isPresent) { latestTableSchema.get()} else { dfWriteSchema}
}
createRddInternal(df, writeSchema, toReconcileSchema, structName, recordNamespace)
} }
def createRdd(df: DataFrame, avroSchema: Schema, structName: String, recordNamespace: String) def createRddInternal(df: DataFrame, writeSchema: Schema, latestTableSchema: Schema, structName: String, recordNamespace: String)
: RDD[GenericRecord] = { : RDD[GenericRecord] = {
// Use the Avro schema to derive the StructType which has the correct nullability information // Use the write avro schema to derive the StructType which has the correct nullability information
val dataType = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType] val writeDataType = SchemaConverters.toSqlType(writeSchema).dataType.asInstanceOf[StructType]
val encoder = RowEncoder.apply(dataType).resolveAndBind() val encoder = RowEncoder.apply(writeDataType).resolveAndBind()
val deserializer = sparkAdapter.createSparkRowSerDe(encoder) val deserializer = sparkAdapter.createSparkRowSerDe(encoder)
// if records were serialized with old schema, but an evolved schema was passed in with latestTableSchema, we need
// latestTableSchema equivalent datatype to be passed in to AvroConversionHelper.createConverterToAvro()
val reconciledDataType =
if (latestTableSchema != null) SchemaConverters.toSqlType(latestTableSchema).dataType.asInstanceOf[StructType] else writeDataType
// Note: deserializer.deserializeRow(row) is not capable of handling evolved schema. i.e. if Row was serialized in
// old schema, but deserializer was created with an encoder with evolved schema, deserialization fails.
// Hence we always need to deserialize in the same schema as serialized schema.
df.queryExecution.toRdd.map(row => deserializer.deserializeRow(row)) df.queryExecution.toRdd.map(row => deserializer.deserializeRow(row))
.mapPartitions { records => .mapPartitions { records =>
if (records.isEmpty) Iterator.empty if (records.isEmpty) Iterator.empty
else { else {
val convertor = AvroConversionHelper.createConverterToAvro(dataType, structName, recordNamespace) val convertor = AvroConversionHelper.createConverterToAvro(reconciledDataType, structName, recordNamespace)
records.map { x => convertor(x).asInstanceOf[GenericRecord] } records.map { x => convertor(x).asInstanceOf[GenericRecord] }
} }
} }

View File

@@ -109,6 +109,16 @@ public class FSUtils {
return getFs(path, conf); return getFs(path, conf);
} }
/**
* Check if table already exists in the given path.
* @param path base path of the table.
* @param fs instance of {@link FileSystem}.
* @return {@code true} if table exists. {@code false} otherwise.
*/
public static boolean isTableExists(String path, FileSystem fs) throws IOException {
return fs.exists(new Path(path + "/" + HoodieTableMetaClient.METAFOLDER_NAME));
}
public static Path addSchemeIfLocalPath(String path) { public static Path addSchemeIfLocalPath(String path) {
Path providedPath = new Path(path); Path providedPath = new Path(path);
File localFile = new File(path); File localFile = new File(path);

View File

@@ -414,6 +414,13 @@ public class HoodieTableMetaClient implements Serializable {
return fs.listStatus(metaPath, nameFilter); return fs.listStatus(metaPath, nameFilter);
} }
/**
* @return {@code true} if any commits are found, else {@code false}.
*/
public boolean isTimelineNonEmpty() {
return getCommitsTimeline().filterCompletedInstants().getInstants().collect(Collectors.toList()).size() > 0;
}
/** /**
* Get the commit timeline visible for this table. * Get the commit timeline visible for this table.
*/ */

View File

@@ -18,13 +18,6 @@
package org.apache.hudi.common.table; package org.apache.hudi.common.table;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaCompatibility;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileFormat;
@@ -36,11 +29,18 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Functions.Function1;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.InvalidTableException; import org.apache.hudi.exception.InvalidTableException;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaCompatibility;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.avro.AvroSchemaConverter;
@@ -49,6 +49,8 @@ import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageType;
import java.io.IOException;
/** /**
* Helper class to read schema from data files and log files and to convert it between different formats. * Helper class to read schema from data files and log files and to convert it between different formats.
*/ */
@@ -381,6 +383,37 @@ public class TableSchemaResolver {
return isSchemaCompatible(new Schema.Parser().parse(oldSchema), new Schema.Parser().parse(newSchema)); return isSchemaCompatible(new Schema.Parser().parse(oldSchema), new Schema.Parser().parse(newSchema));
} }
/**
* Get latest schema either from incoming schema or table schema.
* @param writeSchema incoming batch's write schema.
* @param convertTableSchemaToAddNamespace {@code true} if table schema needs to be converted. {@code false} otherwise.
* @param converterFn converter function to be called over table schema (to add namespace may be). Each caller can decide if any conversion is required.
* @return the latest schema.
*/
public Schema getLatestSchema(Schema writeSchema, boolean convertTableSchemaToAddNamespace,
Function1<Schema, Schema> converterFn) {
Schema latestSchema = writeSchema;
try {
if (metaClient.isTimelineNonEmpty()) {
Schema tableSchema = getTableAvroSchemaWithoutMetadataFields();
if (convertTableSchemaToAddNamespace && converterFn != null) {
tableSchema = converterFn.apply(tableSchema);
}
if (writeSchema.getFields().size() < tableSchema.getFields().size() && isSchemaCompatible(writeSchema, tableSchema)) {
// if incoming schema is a subset (old schema) compared to table schema. For eg, one of the
// ingestion pipeline is still producing events in old schema
latestSchema = tableSchema;
LOG.debug("Using latest table schema to rewrite incoming records " + tableSchema.toString());
}
}
} catch (IllegalArgumentException | InvalidTableException e) {
LOG.warn("Could not find any commits, falling back to using incoming batch's write schema");
} catch (Exception e) {
LOG.warn("Unknown exception thrown " + e.getMessage() + ", Falling back to using incoming batch's write schema");
}
return latestSchema;
}
/** /**
* Read the parquet schema from a parquet File. * Read the parquet schema from a parquet File.
*/ */

View File

@@ -18,15 +18,18 @@
package org.apache.hudi.integ.testsuite.reader; package org.apache.hudi.integ.testsuite.reader;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider; import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import java.util.List;
import scala.collection.JavaConverters; import scala.collection.JavaConverters;
@@ -51,7 +54,7 @@ public class SparkBasedReader {
return HoodieSparkUtils return HoodieSparkUtils
.createRdd(dataSet.toDF(), structName.orElse(RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME), .createRdd(dataSet.toDF(), structName.orElse(RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME),
nameSpace.orElse(RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE)) nameSpace.orElse(RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE), false, Option.empty())
.toJavaRDD(); .toJavaRDD();
} }
@@ -63,7 +66,7 @@ public class SparkBasedReader {
return HoodieSparkUtils return HoodieSparkUtils
.createRdd(dataSet.toDF(), structName.orElse(RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME), .createRdd(dataSet.toDF(), structName.orElse(RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME),
RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE) RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE, false, Option.empty())
.toJavaRDD(); .toJavaRDD();
} }
@@ -73,10 +76,11 @@ public class SparkBasedReader {
Dataset<Row> dataSet = sparkSession.read() Dataset<Row> dataSet = sparkSession.read()
.orc((JavaConverters.asScalaIteratorConverter(listOfPaths.iterator()).asScala().toSeq())); .orc((JavaConverters.asScalaIteratorConverter(listOfPaths.iterator()).asScala().toSeq()));
return HoodieSparkUtils return HoodieSparkUtils.createRdd(dataSet.toDF(),
.createRdd(dataSet.toDF(), structName.orElse(RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME), structName.orElse(RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME),
RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE) RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE,
.toJavaRDD(); false, Option.empty()
).toJavaRDD();
} }
} }

View File

@@ -308,6 +308,13 @@ object DataSourceWriteOptions {
.defaultValue(classOf[HiveSyncTool].getName) .defaultValue(classOf[HiveSyncTool].getName)
.withDocumentation("Sync tool class name used to sync to metastore. Defaults to Hive.") .withDocumentation("Sync tool class name used to sync to metastore. Defaults to Hive.")
val RECONCILE_SCHEMA: ConfigProperty[Boolean] = ConfigProperty
.key("hoodie.datasource.write.reconcile.schema")
.defaultValue(false)
.withDocumentation("When a new batch of write has records with old schema, but latest table schema got "
+ "evolved, this config will upgrade the records to leverage latest table schema(default values will be "
+ "injected to missing fields). If not, the write batch would fail.")
// HIVE SYNC SPECIFIC CONFIGS // HIVE SYNC SPECIFIC CONFIGS
// NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes // NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes
// unexpected issues with config getting reset // unexpected issues with config getting reset

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.bootstrap.FileStatusUtils; import org.apache.hudi.common.bootstrap.FileStatusUtils;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.keygen.KeyGenerator;
@@ -66,7 +67,8 @@ public class SparkParquetBootstrapDataProvider extends FullRecordBootstrapDataPr
KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
String structName = tableName + "_record"; String structName = tableName + "_record";
String namespace = "hoodie." + tableName; String namespace = "hoodie." + tableName;
RDD<GenericRecord> genericRecords = HoodieSparkUtils.createRdd(inputDataset, structName, namespace); RDD<GenericRecord> genericRecords = HoodieSparkUtils.createRdd(inputDataset, structName, namespace, false,
Option.empty());
return genericRecords.toJavaRDD().map(gr -> { return genericRecords.toJavaRDD().map(gr -> {
String orderingVal = HoodieAvroUtils.getNestedFieldValAsString( String orderingVal = HoodieAvroUtils.getNestedFieldValAsString(
gr, props.getString("hoodie.datasource.write.precombine.field"), false); gr, props.getString("hoodie.datasource.write.precombine.field"), false);

View File

@@ -17,6 +17,8 @@
package org.apache.hudi package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
@@ -25,9 +27,10 @@ import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType} import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils} import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, BOOTSTRAP_INDEX_CLASS_PROP} import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, BOOTSTRAP_INDEX_CLASS_PROP}
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig} import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
@@ -158,13 +161,17 @@ object HoodieSparkSqlWriter {
sparkContext.getConf.registerKryoClasses( sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData], Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema])) classOf[org.apache.avro.Schema]))
val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
val reconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
if (reconcileSchema) {
schema = getLatestTableSchema(fs, basePath, sparkContext, schema)
}
sparkContext.getConf.registerAvroSchemas(schema) sparkContext.getConf.registerAvroSchemas(schema)
log.info(s"Registered avro schema : ${schema.toString(true)}") log.info(s"Registered avro schema : ${schema.toString(true)}")
// Convert to RDD[HoodieRecord] // Convert to RDD[HoodieRecord]
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, schema, structName, nameSpace) val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,
org.apache.hudi.common.util.Option.of(schema))
val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean || val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
operation.equals(WriteOperationType.UPSERT) || operation.equals(WriteOperationType.UPSERT) ||
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key(), parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key(),
@@ -212,7 +219,8 @@ object HoodieSparkSqlWriter {
classOf[org.apache.avro.Schema])) classOf[org.apache.avro.Schema]))
// Convert to RDD[HoodieKey] // Convert to RDD[HoodieKey]
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace) val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace,
parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean)
val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD() val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD()
if (!tableExists) { if (!tableExists) {
@@ -249,6 +257,25 @@ object HoodieSparkSqlWriter {
} }
} }
/**
* Checks if schema needs upgrade (if incoming record's write schema is old while table schema got evolved).
*
* @param fs instance of FileSystem.
* @param basePath base path.
* @param sparkContext instance of spark context.
* @param schema incoming record's schema.
* @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required.
*/
def getLatestTableSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext, schema: Schema): Schema = {
var latestSchema: Schema = schema
if (FSUtils.isTableExists(basePath.toString, fs)) {
val tableMetaClient = HoodieTableMetaClient.builder.setConf(sparkContext.hadoopConfiguration).setBasePath(basePath.toString).build()
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
latestSchema = tableSchemaResolver.getLatestSchema(schema, false, null);
}
latestSchema
}
def bootstrap(sqlContext: SQLContext, def bootstrap(sqlContext: SQLContext,
mode: SaveMode, mode: SaveMode,
parameters: Map[String, String], parameters: Map[String, String],

View File

@@ -17,19 +17,16 @@
package org.apache.hudi package org.apache.hudi
import java.util.Properties
import scala.collection.JavaConverters._
import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.HoodieMetadataConfig.{METADATA_ENABLE_PROP, METADATA_VALIDATE_PROP}
import org.apache.hudi.common.config.{HoodieConfig, TypedProperties} import org.apache.hudi.common.config.{HoodieConfig, TypedProperties}
import scala.collection.JavaConversions.mapAsJavaMap
import scala.collection.JavaConverters.mapAsScalaMapConverter
import org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP
import org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator} import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator}
import java.util.Properties
import scala.collection.JavaConversions.mapAsJavaMap
import scala.collection.JavaConverters.{mapAsScalaMapConverter, _}
/** /**
* WriterUtils to assist in write path in Datasource and tests. * WriterUtils to assist in write path in Datasource and tests.
*/ */
@@ -78,7 +75,8 @@ object HoodieWriterUtils {
ASYNC_COMPACT_ENABLE.key -> ASYNC_COMPACT_ENABLE.defaultValue, ASYNC_COMPACT_ENABLE.key -> ASYNC_COMPACT_ENABLE.defaultValue,
INLINE_CLUSTERING_ENABLE.key -> INLINE_CLUSTERING_ENABLE.defaultValue, INLINE_CLUSTERING_ENABLE.key -> INLINE_CLUSTERING_ENABLE.defaultValue,
ASYNC_CLUSTERING_ENABLE.key -> ASYNC_CLUSTERING_ENABLE.defaultValue, ASYNC_CLUSTERING_ENABLE.key -> ASYNC_CLUSTERING_ENABLE.defaultValue,
ENABLE_ROW_WRITER.key -> ENABLE_ROW_WRITER.defaultValue ENABLE_ROW_WRITER.key -> ENABLE_ROW_WRITER.defaultValue,
RECONCILE_SCHEMA.key -> RECONCILE_SCHEMA.defaultValue.toString
) ++ DataSourceOptionsHelper.translateConfigurations(parameters) ) ++ DataSourceOptionsHelper.translateConfigurations(parameters)
} }

View File

@@ -23,7 +23,7 @@ import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.config.HoodieConfig import org.apache.hudi.common.config.HoodieConfig
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord, HoodieRecordPayload} import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord, HoodieRecordPayload}
import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig} import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieException import org.apache.hudi.exception.HoodieException
@@ -48,7 +48,6 @@ import java.util.{Collections, Date, UUID}
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
var spark: SparkSession = _ var spark: SparkSession = _
var sc: SparkContext = _ var sc: SparkContext = _
var sqlContext: SQLContext = _ var sqlContext: SQLContext = _
@@ -82,15 +81,11 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
} }
} }
test("throw hoodie exception when there already exist a table with different name with Append Save mode") { test("throw hoodie exception when there already exist a table with different name with Append Save mode") {
initSparkContext("test_append_mode") initSparkContext("test_append_mode")
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
try { try {
val hoodieFooTableName = "hoodie_foo_tbl" val hoodieFooTableName = "hoodie_foo_tbl"
//create a new table //create a new table
val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
HoodieWriteConfig.TABLE_NAME.key -> hoodieFooTableName, HoodieWriteConfig.TABLE_NAME.key -> hoodieFooTableName,
@@ -149,7 +144,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
}) })
def testBulkInsertWithSortMode(sortMode: BulkInsertSortMode, path: java.nio.file.Path, populateMetaFields : Boolean = true) : Unit = { def testBulkInsertWithSortMode(sortMode: BulkInsertSortMode, path: java.nio.file.Path, populateMetaFields : Boolean = true) : Unit = {
val hoodieFooTableName = "hoodie_foo_tbl" val hoodieFooTableName = "hoodie_foo_tbl"
//create a new table //create a new table
val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
@@ -193,17 +187,12 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2)) val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2))
if (!populateMetaFields) { if (!populateMetaFields) {
assertEquals(0, actualDf.select(HoodieRecord.HOODIE_META_COLUMNS.get(0)).filter(entry => !(entry.mkString(",").equals(""))).count()) List(0, 1, 2, 3, 4).foreach(i => assertEquals(0, actualDf.select(HoodieRecord.HOODIE_META_COLUMNS.get(i)).filter(entry => !(entry.mkString(",").equals(""))).count()))
assertEquals(0, actualDf.select(HoodieRecord.HOODIE_META_COLUMNS.get(1)).filter(entry => !(entry.mkString(",").equals(""))).count())
assertEquals(0, actualDf.select(HoodieRecord.HOODIE_META_COLUMNS.get(2)).filter(entry => !(entry.mkString(",").equals(""))).count())
assertEquals(0, actualDf.select(HoodieRecord.HOODIE_META_COLUMNS.get(3)).filter(entry => !(entry.mkString(",").equals(""))).count())
assertEquals(0, actualDf.select(HoodieRecord.HOODIE_META_COLUMNS.get(4)).filter(entry => !(entry.mkString(",").equals(""))).count())
} }
// remove metadata columns so that expected and actual DFs can be compared as is // remove metadata columns so that expected and actual DFs can be compared as is
val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3)) .drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(4)) .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))
assert(df.except(trimmedDf).count() == 0) assert(df.except(trimmedDf).count() == 0)
} }
@@ -405,7 +394,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
try { try {
val hoodieFooTableName = "hoodie_foo_tbl" val hoodieFooTableName = "hoodie_foo_tbl"
//create a new table
val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
HoodieWriteConfig.TABLE_NAME.key -> hoodieFooTableName, HoodieWriteConfig.TABLE_NAME.key -> hoodieFooTableName,
HoodieWriteConfig.BASE_FILE_FORMAT.key -> baseFileFormat, HoodieWriteConfig.BASE_FILE_FORMAT.key -> baseFileFormat,
@@ -433,7 +421,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
hoodieFooTableName, hoodieFooTableName,
mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]) mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]])
// write to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df, Option.empty, HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df, Option.empty,
Option(client)) Option(client))
// Verify that asynchronous compaction is not scheduled // Verify that asynchronous compaction is not scheduled
@@ -479,12 +466,9 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
val srcPath = java.nio.file.Files.createTempDirectory("hoodie_bootstrap_source_path") val srcPath = java.nio.file.Files.createTempDirectory("hoodie_bootstrap_source_path")
try { try {
val hoodieFooTableName = "hoodie_foo_tbl" val hoodieFooTableName = "hoodie_foo_tbl"
val sourceDF = TestBootstrap.generateTestRawTripDataset(Instant.now.toEpochMilli, 0, 100, Collections.emptyList(), sc, val sourceDF = TestBootstrap.generateTestRawTripDataset(Instant.now.toEpochMilli, 0, 100, Collections.emptyList(), sc,
spark.sqlContext) spark.sqlContext)
// Write source data non-partitioned // Write source data non-partitioned
sourceDF.write sourceDF.write
.format("parquet") .format("parquet")
@@ -533,7 +517,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path_schema_evol") val path = java.nio.file.Files.createTempDirectory("hoodie_test_path_schema_evol")
try { try {
val hoodieFooTableName = "hoodie_foo_tbl_schema_evolution_" + tableType val hoodieFooTableName = "hoodie_foo_tbl_schema_evolution_" + tableType
//create a new table
val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
HoodieWriteConfig.TABLE_NAME.key -> hoodieFooTableName, HoodieWriteConfig.TABLE_NAME.key -> hoodieFooTableName,
"hoodie.insert.shuffle.parallelism" -> "1", "hoodie.insert.shuffle.parallelism" -> "1",
@@ -541,7 +524,9 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
DataSourceWriteOptions.TABLE_TYPE.key -> tableType, DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
DataSourceWriteOptions.KEYGENERATOR_CLASS.key -> "org.apache.hudi.keygen.SimpleKeyGenerator") DataSourceWriteOptions.KEYGENERATOR_CLASS.key -> "org.apache.hudi.keygen.SimpleKeyGenerator",
DataSourceWriteOptions.RECONCILE_SCHEMA.key -> "true"
)
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
// generate the inserts // generate the inserts
@@ -550,7 +535,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
var records = DataSourceTestUtils.generateRandomRows(10) var records = DataSourceTestUtils.generateRandomRows(10)
var recordsSeq = convertRowListToSeq(records) var recordsSeq = convertRowListToSeq(records)
var df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType) var df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
// write to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableParams, df1) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableParams, df1)
val snapshotDF1 = spark.read.format("org.apache.hudi") val snapshotDF1 = spark.read.format("org.apache.hudi")
@@ -565,10 +549,9 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
assert(df1.except(trimmedDf1).count() == 0) assert(df1.except(trimmedDf1).count() == 0)
// issue updates so that log files are created for MOR table // issue updates so that log files are created for MOR table
var updates = DataSourceTestUtils.generateUpdates(records, 5); val updates = DataSourceTestUtils.generateUpdates(records, 5);
var updatesSeq = convertRowListToSeq(updates) val updatesSeq = convertRowListToSeq(updates)
var updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), structType) val updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), structType)
// write updates to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, updatesDf) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, updatesDf)
val snapshotDF2 = spark.read.format("org.apache.hudi") val snapshotDF2 = spark.read.format("org.apache.hudi")
@@ -584,11 +567,11 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
assert(updatesDf.intersect(trimmedDf2).except(updatesDf).count() == 0) assert(updatesDf.intersect(trimmedDf2).except(updatesDf).count() == 0)
// getting new schema with new column // getting new schema with new column
schema = DataSourceTestUtils.getStructTypeExampleEvolvedSchema val evolSchema = DataSourceTestUtils.getStructTypeExampleEvolvedSchema
structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) val evolStructType = AvroConversionUtils.convertAvroSchemaToStructType(evolSchema)
records = DataSourceTestUtils.generateRandomRowsEvolvedSchema(5) records = DataSourceTestUtils.generateRandomRowsEvolvedSchema(5)
recordsSeq = convertRowListToSeq(records) recordsSeq = convertRowListToSeq(records)
val df3 = spark.createDataFrame(sc.parallelize(recordsSeq), structType) val df3 = spark.createDataFrame(sc.parallelize(recordsSeq), evolStructType)
// write to Hudi with new column // write to Hudi with new column
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df3) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df3)
@@ -604,6 +587,25 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
// ensure 2nd batch of updates matches. // ensure 2nd batch of updates matches.
assert(df3.intersect(trimmedDf3).except(df3).count() == 0) assert(df3.intersect(trimmedDf3).except(df3).count() == 0)
// ingest new batch with old schema.
records = DataSourceTestUtils.generateRandomRows(10)
recordsSeq = convertRowListToSeq(records)
val df4 = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df4)
val snapshotDF4 = spark.read.format("org.apache.hudi")
.load(path.toAbsolutePath.toString + "/*/*/*/*")
assertEquals(25, snapshotDF4.count())
val tableMetaClient = HoodieTableMetaClient.builder()
.setConf(spark.sparkContext.hadoopConfiguration)
.setBasePath(path.toAbsolutePath.toString)
.build()
val actualSchema = new TableSchemaResolver(tableMetaClient).getTableAvroSchemaWithoutMetadataFields
assertTrue(actualSchema != null)
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(hoodieFooTableName)
val expectedSchema = AvroConversionUtils.convertStructTypeToAvroSchema(evolStructType, structName, nameSpace)
assertEquals(expectedSchema, actualSchema)
} finally { } finally {
spark.stop() spark.stop()
FileUtils.deleteDirectory(path.toFile) FileUtils.deleteDirectory(path.toFile)
@@ -613,8 +615,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
test("Test build sync config for spark sql") { test("Test build sync config for spark sql") {
initSparkContext("test build sync config") initSparkContext("test build sync config")
val schema = DataSourceTestUtils.getStructTypeExampleSchema
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
val basePath = "/tmp/hoodie_test" val basePath = "/tmp/hoodie_test"
val params = Map( val params = Map(
"path" -> basePath, "path" -> basePath,
@@ -641,7 +641,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
test("Test build sync config for skip Ro Suffix vals") { test("Test build sync config for skip Ro Suffix vals") {
initSparkContext("test build sync config for skip Ro suffix vals") initSparkContext("test build sync config for skip Ro suffix vals")
val schema = DataSourceTestUtils.getStructTypeExampleSchema
val basePath = "/tmp/hoodie_test" val basePath = "/tmp/hoodie_test"
val params = Map( val params = Map(
"path" -> basePath, "path" -> basePath,
@@ -650,7 +649,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
) )
val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params) val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params)
val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters) val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters)
val buildSyncConfigMethod = val buildSyncConfigMethod =
HoodieSparkSqlWriter.getClass.getDeclaredMethod("buildSyncConfig", classOf[Path], HoodieSparkSqlWriter.getClass.getDeclaredMethod("buildSyncConfig", classOf[Path],
classOf[HoodieConfig], classOf[SQLConf]) classOf[HoodieConfig], classOf[SQLConf])
@@ -712,15 +710,14 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
.mode(SaveMode.Append).save(basePath) .mode(SaveMode.Append).save(basePath)
val currentCommits = spark.read.format("hudi").load(basePath).select("_hoodie_commit_time").take(1).map(_.getString(0)) val currentCommits = spark.read.format("hudi").load(basePath).select("_hoodie_commit_time").take(1).map(_.getString(0))
val incrementalKeyIdNum = spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) val incrementalKeyIdNum = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "0000") .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "0000")
.option(DataSourceReadOptions.END_INSTANTTIME.key, currentCommits(0)) .option(DataSourceReadOptions.END_INSTANTTIME.key, currentCommits(0))
.load(basePath).select("keyid").orderBy("keyid").count .load(basePath).select("keyid").orderBy("keyid").count
assert(incrementalKeyIdNum == 1000) assert(incrementalKeyIdNum == 1000)
// add bootstap test
df.write.mode(SaveMode.Overwrite).save(baseBootStrapPath) df.write.mode(SaveMode.Overwrite).save(baseBootStrapPath)
// boostrap table
spark.emptyDataFrame.write.format("hudi") spark.emptyDataFrame.write.format("hudi")
.options(options) .options(options)
.option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP.key, baseBootStrapPath) .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP.key, baseBootStrapPath)
@@ -736,7 +733,8 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
.mode(SaveMode.Append).save(basePath) .mode(SaveMode.Append).save(basePath)
val currentCommitsBootstrap = spark.read.format("hudi").load(basePath).select("_hoodie_commit_time").take(1).map(_.getString(0)) val currentCommitsBootstrap = spark.read.format("hudi").load(basePath).select("_hoodie_commit_time").take(1).map(_.getString(0))
val incrementalKeyIdNumBootstrap = spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) val incrementalKeyIdNumBootstrap = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "0000") .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "0000")
.option(DataSourceReadOptions.END_INSTANTTIME.key, currentCommitsBootstrap(0)) .option(DataSourceReadOptions.END_INSTANTTIME.key, currentCommitsBootstrap(0))
.load(basePath).select("keyid").orderBy("keyid").count .load(basePath).select("keyid").orderBy("keyid").count

View File

@@ -18,16 +18,23 @@
package org.apache.hudi package org.apache.hudi
import org.apache.avro.generic.GenericRecord
import java.io.File import java.io.File
import java.nio.file.Paths import java.nio.file.Paths
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession import org.apache.hudi.testutils.DataSourceTestUtils
import org.junit.jupiter.api.Assertions.assertEquals import org.apache.spark.sql.avro.IncompatibleSchemaException
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertNull, assertTrue, fail}
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.junit.jupiter.api.io.TempDir import org.junit.jupiter.api.io.TempDir
import java.util
import scala.collection.JavaConverters
class TestHoodieSparkUtils { class TestHoodieSparkUtils {
@Test @Test
@@ -103,4 +110,124 @@ class TestHoodieSparkUtils {
assertEquals(files.sortWith(_.toString < _.toString), indexedFilePaths.sortWith(_.toString < _.toString)) assertEquals(files.sortWith(_.toString < _.toString), indexedFilePaths.sortWith(_.toString < _.toString))
spark.stop() spark.stop()
} }
@Test
def testCreateRddSchemaEvol(): Unit = {
val spark = SparkSession.builder
.appName("Hoodie Datasource test")
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate
val schema = DataSourceTestUtils.getStructTypeExampleSchema
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
var records = DataSourceTestUtils.generateRandomRows(5)
var recordsSeq = convertRowListToSeq(records)
val df1 = spark.createDataFrame(spark.sparkContext.parallelize(recordsSeq), structType)
var genRecRDD = HoodieSparkUtils.createRdd(df1, "test_struct_name", "test_namespace", true,
org.apache.hudi.common.util.Option.of(schema))
genRecRDD.collect()
val evolSchema = DataSourceTestUtils.getStructTypeExampleEvolvedSchema
records = DataSourceTestUtils.generateRandomRowsEvolvedSchema(5)
recordsSeq = convertRowListToSeq(records)
genRecRDD = HoodieSparkUtils.createRdd(df1, "test_struct_name", "test_namespace", true,
org.apache.hudi.common.util.Option.of(evolSchema))
genRecRDD.collect()
// pass in evolved schema but with records serialized with old schema. should be able to convert with out any exception.
// Before https://github.com/apache/hudi/pull/2927, this will throw exception.
genRecRDD = HoodieSparkUtils.createRdd(df1, "test_struct_name", "test_namespace", true,
org.apache.hudi.common.util.Option.of(evolSchema))
val genRecs = genRecRDD.collect()
// if this succeeds w/o throwing any exception, test succeeded.
assertEquals(genRecs.size, 5)
spark.stop()
}
@Test
def testCreateRddWithNestedSchemas(): Unit = {
val spark = SparkSession.builder
.appName("Hoodie Datasource test")
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate
val innerStruct1 = new StructType().add("innerKey","string",false).add("innerValue", "long", true)
val structType1 = new StructType().add("key", "string", false)
.add("nonNullableInnerStruct",innerStruct1,false).add("nullableInnerStruct",innerStruct1,true)
val schema1 = AvroConversionUtils.convertStructTypeToAvroSchema(structType1, "test_struct_name", "test_namespace")
val records1 = Seq(Row("key1", Row("innerKey1_1", 1L), Row("innerKey1_2", 2L)))
val df1 = spark.createDataFrame(spark.sparkContext.parallelize(records1), structType1)
val genRecRDD1 = HoodieSparkUtils.createRdd(df1, "test_struct_name", "test_namespace", true,
org.apache.hudi.common.util.Option.of(schema1))
assert(schema1.equals(genRecRDD1.collect()(0).getSchema))
// create schema2 which has one addition column at the root level compared to schema1
val structType2 = new StructType().add("key", "string", false)
.add("nonNullableInnerStruct",innerStruct1,false).add("nullableInnerStruct",innerStruct1,true)
.add("nullableInnerStruct2",innerStruct1,true)
val schema2 = AvroConversionUtils.convertStructTypeToAvroSchema(structType2, "test_struct_name", "test_namespace")
val records2 = Seq(Row("key2", Row("innerKey2_1", 2L), Row("innerKey2_2", 2L), Row("innerKey2_3", 2L)))
val df2 = spark.createDataFrame(spark.sparkContext.parallelize(records2), structType2)
val genRecRDD2 = HoodieSparkUtils.createRdd(df2, "test_struct_name", "test_namespace", true,
org.apache.hudi.common.util.Option.of(schema2))
assert(schema2.equals(genRecRDD2.collect()(0).getSchema))
// send records1 with schema2. should succeed since the new column is nullable.
val genRecRDD3 = HoodieSparkUtils.createRdd(df1, "test_struct_name", "test_namespace", true,
org.apache.hudi.common.util.Option.of(schema2))
assert(genRecRDD3.collect()(0).getSchema.equals(schema2))
genRecRDD3.foreach(entry => assertNull(entry.get("nonNullableInnerStruct2")))
val innerStruct3 = new StructType().add("innerKey","string",false).add("innerValue", "long", true)
.add("new_nested_col","string",true)
// create a schema which has one additional nested column compared to schema1, which is nullable
val structType4 = new StructType().add("key", "string", false)
.add("nonNullableInnerStruct",innerStruct1,false).add("nullableInnerStruct",innerStruct3,true)
val schema4 = AvroConversionUtils.convertStructTypeToAvroSchema(structType4, "test_struct_name", "test_namespace")
val records4 = Seq(Row("key2", Row("innerKey2_1", 2L), Row("innerKey2_2", 2L, "new_nested_col_val1")))
val df4 = spark.createDataFrame(spark.sparkContext.parallelize(records4), structType4)
val genRecRDD4 = HoodieSparkUtils.createRdd(df4, "test_struct_name", "test_namespace", true,
org.apache.hudi.common.util.Option.of(schema4))
assert(schema4.equals(genRecRDD4.collect()(0).getSchema))
// convert batch 1 with schema4. should succeed.
val genRecRDD5 = HoodieSparkUtils.createRdd(df1, "test_struct_name", "test_namespace", true,
org.apache.hudi.common.util.Option.of(schema4))
assert(schema4.equals(genRecRDD4.collect()(0).getSchema))
val genRec = genRecRDD5.collect()(0)
val nestedRec : GenericRecord = genRec.get("nullableInnerStruct").asInstanceOf[GenericRecord]
assertNull(nestedRec.get("new_nested_col"))
assertNotNull(nestedRec.get("innerKey"))
assertNotNull(nestedRec.get("innerValue"))
val innerStruct4 = new StructType().add("innerKey","string",false).add("innerValue", "long", true)
.add("new_nested_col","string",false)
// create a schema which has one additional nested column compared to schema1, which is non nullable
val structType6 = new StructType().add("key", "string", false)
.add("nonNullableInnerStruct",innerStruct1,false).add("nullableInnerStruct",innerStruct4,true)
val schema6 = AvroConversionUtils.convertStructTypeToAvroSchema(structType6, "test_struct_name", "test_namespace")
// convert batch 1 with schema5. should fail since the missed out column is not nullable.
try {
val genRecRDD6 = HoodieSparkUtils.createRdd(df1, "test_struct_name", "test_namespace", true,
org.apache.hudi.common.util.Option.of(schema6))
genRecRDD6.collect()
fail("createRdd should fail, because records don't have a column which is not nullable in the passed in schema")
} catch {
case e: Exception =>
e.getCause.asInstanceOf[NullPointerException]
assertTrue(e.getMessage.contains("null of string in field new_nested_col of"))
}
spark.stop()
}
def convertRowListToSeq(inputList: util.List[Row]): Seq[Row] =
JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq
} }

View File

@@ -17,19 +17,15 @@
package org.apache.hudi.functional package org.apache.hudi.functional
import java.sql.{Date, Timestamp}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, recordsToStrings}
import org.apache.hudi.common.testutils.RawTripTestPayload.deleteRecordsToStrings
import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieUpsertException import org.apache.hudi.exception.HoodieUpsertException
import org.apache.hudi.keygen._
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config
import org.apache.hudi.keygen._
import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.spark.sql._ import org.apache.spark.sql._
@@ -42,6 +38,10 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, ValueSource} import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
import java.sql.{Date, Timestamp}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
/** /**
* Basic tests on the spark datasource for COW table. * Basic tests on the spark datasource for COW table.
@@ -532,10 +532,8 @@ class TestCOWDataSource extends HoodieClientTestBase {
var writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName) var writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName)
writer.partitionBy("current_ts") writer.partitionBy("current_ts")
.save(basePath) .save(basePath)
var recordsReadDF = spark.read.format("org.apache.hudi") var recordsReadDF = spark.read.format("org.apache.hudi")
.load(basePath + "/*/*") .load(basePath + "/*/*")
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("current_ts").cast("string")).count() == 0) assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("current_ts").cast("string")).count() == 0)
// Specify fieldType as TIMESTAMP // Specify fieldType as TIMESTAMP
@@ -544,10 +542,8 @@ class TestCOWDataSource extends HoodieClientTestBase {
.option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS") .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS")
.option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd") .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd")
.save(basePath) .save(basePath)
recordsReadDF = spark.read.format("org.apache.hudi") recordsReadDF = spark.read.format("org.apache.hudi")
.load(basePath + "/*/*") .load(basePath + "/*/*")
val udf_date_format = udf((data: Long) => new DateTime(data).toString(DateTimeFormat.forPattern("yyyyMMdd"))) val udf_date_format = udf((data: Long) => new DateTime(data).toString(DateTimeFormat.forPattern("yyyyMMdd")))
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= udf_date_format(col("current_ts"))).count() == 0) assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= udf_date_format(col("current_ts"))).count() == 0)
@@ -557,7 +553,6 @@ class TestCOWDataSource extends HoodieClientTestBase {
.option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS") .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS")
.option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd") .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd")
.save(basePath) .save(basePath)
recordsReadDF = spark.read.format("org.apache.hudi") recordsReadDF = spark.read.format("org.apache.hudi")
.load(basePath + "/*/*/*") .load(basePath + "/*/*/*")
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!=
@@ -582,20 +577,16 @@ class TestCOWDataSource extends HoodieClientTestBase {
var writer = getDataFrameWriter(classOf[SimpleKeyGenerator].getName) var writer = getDataFrameWriter(classOf[SimpleKeyGenerator].getName)
writer.partitionBy("driver") writer.partitionBy("driver")
.save(basePath) .save(basePath)
var recordsReadDF = spark.read.format("org.apache.hudi") var recordsReadDF = spark.read.format("org.apache.hudi")
.load(basePath + "/*/*") .load(basePath + "/*/*")
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("driver")).count() == 0) assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("driver")).count() == 0)
// Use the `driver,rider` field as the partition key, If no such field exists, the default value `default` is used // Use the `driver,rider` field as the partition key, If no such field exists, the default value `default` is used
writer = getDataFrameWriter(classOf[SimpleKeyGenerator].getName) writer = getDataFrameWriter(classOf[SimpleKeyGenerator].getName)
writer.partitionBy("driver", "rider") writer.partitionBy("driver", "rider")
.save(basePath) .save(basePath)
recordsReadDF = spark.read.format("org.apache.hudi") recordsReadDF = spark.read.format("org.apache.hudi")
.load(basePath + "/*/*") .load(basePath + "/*/*")
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("default")).count() == 0) assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("default")).count() == 0)
} }
@@ -604,20 +595,16 @@ class TestCOWDataSource extends HoodieClientTestBase {
var writer = getDataFrameWriter(classOf[ComplexKeyGenerator].getName) var writer = getDataFrameWriter(classOf[ComplexKeyGenerator].getName)
writer.partitionBy("driver") writer.partitionBy("driver")
.save(basePath) .save(basePath)
var recordsReadDF = spark.read.format("org.apache.hudi") var recordsReadDF = spark.read.format("org.apache.hudi")
.load(basePath + "/*/*") .load(basePath + "/*/*")
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("driver")).count() == 0) assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("driver")).count() == 0)
// Use the `driver`,`rider` field as the partition key // Use the `driver`,`rider` field as the partition key
writer = getDataFrameWriter(classOf[ComplexKeyGenerator].getName) writer = getDataFrameWriter(classOf[ComplexKeyGenerator].getName)
writer.partitionBy("driver", "rider") writer.partitionBy("driver", "rider")
.save(basePath) .save(basePath)
recordsReadDF = spark.read.format("org.apache.hudi") recordsReadDF = spark.read.format("org.apache.hudi")
.load(basePath + "/*/*") .load(basePath + "/*/*")
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= concat(col("driver"), lit("/"), col("rider"))).count() == 0) assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= concat(col("driver"), lit("/"), col("rider"))).count() == 0)
} }
@@ -649,7 +636,6 @@ class TestCOWDataSource extends HoodieClientTestBase {
var writer = getDataFrameWriter(classOf[NonpartitionedKeyGenerator].getName) var writer = getDataFrameWriter(classOf[NonpartitionedKeyGenerator].getName)
writer.partitionBy("") writer.partitionBy("")
.save(basePath) .save(basePath)
var recordsReadDF = spark.read.format("org.apache.hudi") var recordsReadDF = spark.read.format("org.apache.hudi")
.load(basePath + "/*") .load(basePath + "/*")
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("")).count() == 0) assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("")).count() == 0)
@@ -658,7 +644,6 @@ class TestCOWDataSource extends HoodieClientTestBase {
writer = getDataFrameWriter(classOf[NonpartitionedKeyGenerator].getName) writer = getDataFrameWriter(classOf[NonpartitionedKeyGenerator].getName)
writer.partitionBy("abc") writer.partitionBy("abc")
.save(basePath) .save(basePath)
recordsReadDF = spark.read.format("org.apache.hudi") recordsReadDF = spark.read.format("org.apache.hudi")
.load(basePath + "/*") .load(basePath + "/*")
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("")).count() == 0) assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("")).count() == 0)
@@ -717,9 +702,10 @@ class TestCOWDataSource extends HoodieClientTestBase {
@Test def testSchemaEvolution(): Unit = { @Test def testSchemaEvolution(): Unit = {
// open the schema validate // open the schema validate
val opts = commonOpts ++ Map("hoodie.avro.schema.validate" -> "true") val opts = commonOpts ++ Map("hoodie.avro.schema.validate" -> "true") ++
Map(DataSourceWriteOptions.RECONCILE_SCHEMA.key() -> "true")
// 1. write records with schema1 // 1. write records with schema1
val schema1 = StructType(StructField("_row_key", StringType, true) :: StructField("name", StringType, true):: val schema1 = StructType(StructField("_row_key", StringType, true) :: StructField("name", StringType, false)::
StructField("timestamp", IntegerType, true) :: StructField("partition", IntegerType, true)::Nil) StructField("timestamp", IntegerType, true) :: StructField("partition", IntegerType, true)::Nil)
val records1 = Seq(Row("1", "Andy", 1, 1), val records1 = Seq(Row("1", "Andy", 1, 1),
Row("2", "lisi", 1, 1), Row("2", "lisi", 1, 1),
@@ -732,10 +718,9 @@ class TestCOWDataSource extends HoodieClientTestBase {
.save(basePath) .save(basePath)
// 2. write records with schema2 add column age // 2. write records with schema2 add column age
val schema2 = StructType(StructField("_row_key", StringType, true) :: StructField("name", StringType, true) :: val schema2 = StructType(StructField("_row_key", StringType, true) :: StructField("name", StringType, false) ::
StructField("age", StringType, true) :: StructField("timestamp", IntegerType, true) :: StructField("age", StringType, true) :: StructField("timestamp", IntegerType, true) ::
StructField("partition", IntegerType, true)::Nil) StructField("partition", IntegerType, true)::Nil)
val records2 = Seq(Row("11", "Andy", "10", 1, 1), val records2 = Seq(Row("11", "Andy", "10", 1, 1),
Row("22", "lisi", "11",1, 1), Row("22", "lisi", "11",1, 1),
Row("33", "zhangsan", "12", 1, 1)) Row("33", "zhangsan", "12", 1, 1))
@@ -745,24 +730,25 @@ class TestCOWDataSource extends HoodieClientTestBase {
.options(opts) .options(opts)
.mode(SaveMode.Append) .mode(SaveMode.Append)
.save(basePath) .save(basePath)
val recordsReadDF = spark.read.format("org.apache.hudi") val recordsReadDF = spark.read.format("org.apache.hudi")
.load(basePath + "/*/*") .load(basePath + "/*/*")
val resultSchema = new StructType(recordsReadDF.schema.filter(p=> !p.name.startsWith("_hoodie")).toArray) val tableMetaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath).build()
assertEquals(resultSchema, schema2) val actualSchema = new TableSchemaResolver(tableMetaClient).getTableAvroSchemaWithoutMetadataFields
assertTrue(actualSchema != null)
val actualStructType = AvroConversionUtils.convertAvroSchemaToStructType(actualSchema)
assertEquals(actualStructType, schema2)
// 3. write records with schema3 delete column name // 3. write records with schema4 by omitting a non nullable column(name). should fail
try { try {
val schema3 = StructType(StructField("_row_key", StringType, true) :: val schema4 = StructType(StructField("_row_key", StringType, true) ::
StructField("age", StringType, true) :: StructField("timestamp", IntegerType, true) :: StructField("age", StringType, true) :: StructField("timestamp", IntegerType, true) ::
StructField("partition", IntegerType, true)::Nil) StructField("partition", IntegerType, true)::Nil)
val records4 = Seq(Row("11", "10", 1, 1),
val records3 = Seq(Row("11", "10", 1, 1),
Row("22", "11",1, 1), Row("22", "11",1, 1),
Row("33", "12", 1, 1)) Row("33", "12", 1, 1))
val rdd3 = jsc.parallelize(records3) val rdd4 = jsc.parallelize(records4)
val recordsDF3 = spark.createDataFrame(rdd3, schema3) val recordsDF4 = spark.createDataFrame(rdd4, schema4)
recordsDF3.write.format("org.apache.hudi") recordsDF4.write.format("org.apache.hudi")
.options(opts) .options(opts)
.mode(SaveMode.Append) .mode(SaveMode.Append)
.save(basePath) .save(basePath)
@@ -777,19 +763,15 @@ class TestCOWDataSource extends HoodieClientTestBase {
val opts = commonOpts ++ Map("hoodie.avro.schema.validate" -> "true") val opts = commonOpts ++ Map("hoodie.avro.schema.validate" -> "true")
val schema1 = StructType(StructField("_row_key", StringType, true) :: StructField("name", StringType, true):: val schema1 = StructType(StructField("_row_key", StringType, true) :: StructField("name", StringType, true)::
StructField("timestamp", IntegerType, true):: StructField("age", StringType, true) :: StructField("partition", IntegerType, true)::Nil) StructField("timestamp", IntegerType, true):: StructField("age", StringType, true) :: StructField("partition", IntegerType, true)::Nil)
val records = Array("{\"_row_key\":\"1\",\"name\":\"lisi\",\"timestamp\":1,\"partition\":1}", val records = Array("{\"_row_key\":\"1\",\"name\":\"lisi\",\"timestamp\":1,\"partition\":1}",
"{\"_row_key\":\"1\",\"name\":\"lisi\",\"timestamp\":1,\"partition\":1}") "{\"_row_key\":\"1\",\"name\":\"lisi\",\"timestamp\":1,\"partition\":1}")
val inputDF = spark.read.schema(schema1.toDDL).json(spark.sparkContext.parallelize(records, 2)) val inputDF = spark.read.schema(schema1.toDDL).json(spark.sparkContext.parallelize(records, 2))
inputDF.write.format("org.apache.hudi") inputDF.write.format("org.apache.hudi")
.options(opts) .options(opts)
.mode(SaveMode.Append) .mode(SaveMode.Append)
.save(basePath) .save(basePath)
val recordsReadDF = spark.read.format("org.apache.hudi") val recordsReadDF = spark.read.format("org.apache.hudi")
.load(basePath + "/*/*") .load(basePath + "/*/*")
val resultSchema = new StructType(recordsReadDF.schema.filter(p=> !p.name.startsWith("_hoodie")).toArray) val resultSchema = new StructType(recordsReadDF.schema.filter(p=> !p.name.startsWith("_hoodie")).toArray)
assertEquals(resultSchema, schema1) assertEquals(resultSchema, schema1)
} }

View File

@@ -24,7 +24,11 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.DFSPropertiesConfiguration; import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.Functions.Function1;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.StringUtils;
@@ -38,12 +42,12 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider; import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.schema.DelegatingSchemaProvider; import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaPostProcessor; import org.apache.hudi.utilities.schema.SchemaPostProcessor;
import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config; import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config;
import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor; import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor;
import org.apache.hudi.utilities.schema.SparkAvroPostProcessor; import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
import org.apache.hudi.utilities.sources.Source; import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.transform.ChainedTransformer; import org.apache.hudi.utilities.transform.ChainedTransformer;
import org.apache.hudi.utilities.transform.Transformer; import org.apache.hudi.utilities.transform.Transformer;
@@ -425,11 +429,53 @@ public class UtilHelpers {
} }
public static SchemaProvider createRowBasedSchemaProvider(StructType structType, public static SchemaProvider createRowBasedSchemaProvider(StructType structType,
TypedProperties cfg, JavaSparkContext jssc) { TypedProperties cfg, JavaSparkContext jssc) {
SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType); SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType);
return wrapSchemaProviderWithPostProcessor(rowSchemaProvider, cfg, jssc, null); return wrapSchemaProviderWithPostProcessor(rowSchemaProvider, cfg, jssc, null);
} }
/**
* Create latest schema provider for Target schema.
*
* @param structType spark data type of incoming batch.
* @param jssc instance of {@link JavaSparkContext}.
* @param fs instance of {@link FileSystem}.
* @param basePath base path of the table.
* @return the schema provider where target schema refers to latest schema(either incoming schema or table schema).
*/
public static SchemaProvider createLatestSchemaProvider(StructType structType,
JavaSparkContext jssc, FileSystem fs, String basePath) {
SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType);
Schema writeSchema = rowSchemaProvider.getTargetSchema();
Schema latestTableSchema = writeSchema;
try {
if (FSUtils.isTableExists(basePath, fs)) {
HoodieTableMetaClient tableMetaClient = HoodieTableMetaClient.builder().setConf(jssc.sc().hadoopConfiguration()).setBasePath(basePath).build();
TableSchemaResolver
tableSchemaResolver = new TableSchemaResolver(tableMetaClient);
latestTableSchema = tableSchemaResolver.getLatestSchema(writeSchema, true, (Function1<Schema, Schema>) v1 -> AvroConversionUtils.convertStructTypeToAvroSchema(
AvroConversionUtils.convertAvroSchemaToStructType(v1), RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME,
RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE));
}
} catch (IOException e) {
LOG.warn("Could not fetch table schema. Falling back to writer schema");
}
final Schema finalLatestTableSchema = latestTableSchema;
return new SchemaProvider(new TypedProperties()) {
@Override
public Schema getSourceSchema() {
return rowSchemaProvider.getSourceSchema();
}
@Override
public Schema getTargetSchema() {
return finalLatestTableSchema;
}
};
}
@FunctionalInterface @FunctionalInterface
public interface CheckedSupplier<T> { public interface CheckedSupplier<T> {
T get() throws Throwable; T get() throws Throwable;

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.utilities.deltastreamer; package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.DataSourceUtils; import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.HoodieWriterUtils; import org.apache.hudi.HoodieWriterUtils;
import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroUtils;
@@ -48,6 +49,7 @@ import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieDeltaStreamerException;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool; import org.apache.hudi.hive.HiveSyncTool;
@@ -55,10 +57,9 @@ import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.sync.common.AbstractSyncTool; import org.apache.hudi.sync.common.AbstractSyncTool;
import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.exception.HoodieDeltaStreamerException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Config;
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback; import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback;
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackConfig; import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Config;
import org.apache.hudi.utilities.schema.DelegatingSchemaProvider; import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaSet; import org.apache.hudi.utilities.schema.SchemaSet;
@@ -86,26 +87,26 @@ import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.function.Function;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import scala.collection.JavaConversions; import scala.collection.JavaConversions;
import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP; import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP;
import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE; import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE;
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY;
import static org.apache.hudi.config.HoodieClusteringConfig.INLINE_CLUSTERING_PROP; import static org.apache.hudi.config.HoodieClusteringConfig.INLINE_CLUSTERING_PROP;
import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_PROP; import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_PROP;
import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP; import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP;
import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT_PROP; import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT_PROP;
import static org.apache.hudi.config.HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP; import static org.apache.hudi.config.HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP;
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY;
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE; import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME; import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
@@ -365,7 +366,7 @@ public class DeltaSync implements Serializable {
final Option<JavaRDD<GenericRecord>> avroRDDOptional; final Option<JavaRDD<GenericRecord>> avroRDDOptional;
final String checkpointStr; final String checkpointStr;
final SchemaProvider schemaProvider; SchemaProvider schemaProvider;
if (transformer.isPresent()) { if (transformer.isPresent()) {
// Transformation is needed. Fetch New rows in Row Format, apply transformation and then convert them // Transformation is needed. Fetch New rows in Row Format, apply transformation and then convert them
// to generic records for writing // to generic records for writing
@@ -374,28 +375,40 @@ public class DeltaSync implements Serializable {
Option<Dataset<Row>> transformed = Option<Dataset<Row>> transformed =
dataAndCheckpoint.getBatch().map(data -> transformer.get().apply(jssc, sparkSession, data, props)); dataAndCheckpoint.getBatch().map(data -> transformer.get().apply(jssc, sparkSession, data, props));
checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch(); checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
boolean reconcileSchema = props.getBoolean(DataSourceWriteOptions.RECONCILE_SCHEMA().key());
if (this.userProvidedSchemaProvider != null && this.userProvidedSchemaProvider.getTargetSchema() != null) { if (this.userProvidedSchemaProvider != null && this.userProvidedSchemaProvider.getTargetSchema() != null) {
// If the target schema is specified through Avro schema, // If the target schema is specified through Avro schema,
// pass in the schema for the Row-to-Avro conversion // pass in the schema for the Row-to-Avro conversion
// to avoid nullability mismatch between Avro schema and Row schema // to avoid nullability mismatch between Avro schema and Row schema
avroRDDOptional = transformed avroRDDOptional = transformed
.map(t -> HoodieSparkUtils.createRdd( .map(t -> HoodieSparkUtils.createRdd(
t, this.userProvidedSchemaProvider.getTargetSchema(), t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, reconcileSchema,
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()); Option.of(this.userProvidedSchemaProvider.getTargetSchema())
).toJavaRDD());
schemaProvider = this.userProvidedSchemaProvider; schemaProvider = this.userProvidedSchemaProvider;
} else { } else {
// Use Transformed Row's schema if not overridden. If target schema is not specified // Use Transformed Row's schema if not overridden. If target schema is not specified
// default to RowBasedSchemaProvider // default to RowBasedSchemaProvider
schemaProvider = schemaProvider =
transformed transformed
.map(r -> (SchemaProvider) new DelegatingSchemaProvider(props, jssc, .map(r -> {
dataAndCheckpoint.getSchemaProvider(), // determine the targetSchemaProvider. use latestTableSchema if reconcileSchema is enabled.
UtilHelpers.createRowBasedSchemaProvider(r.schema(), props, jssc))) SchemaProvider targetSchemaProvider = null;
if (reconcileSchema) {
targetSchemaProvider = UtilHelpers.createLatestSchemaProvider(r.schema(), jssc, fs, cfg.targetBasePath);
} else {
targetSchemaProvider = UtilHelpers.createRowBasedSchemaProvider(r.schema(), props, jssc);
}
return (SchemaProvider) new DelegatingSchemaProvider(props, jssc,
dataAndCheckpoint.getSchemaProvider(), targetSchemaProvider); })
.orElse(dataAndCheckpoint.getSchemaProvider()); .orElse(dataAndCheckpoint.getSchemaProvider());
avroRDDOptional = transformed avroRDDOptional = transformed
.map(t -> HoodieSparkUtils.createRdd( .map(t -> HoodieSparkUtils.createRdd(
t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()); t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, reconcileSchema,
Option.ofNullable(schemaProvider.getTargetSchema())
).toJavaRDD());
} }
} else { } else {
// Pull the data from the source & prepare the write // Pull the data from the source & prepare the write

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.utilities.deltastreamer; package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.async.AsyncClusteringService; import org.apache.hudi.async.AsyncClusteringService;
import org.apache.hudi.async.AsyncCompactService; import org.apache.hudi.async.AsyncCompactService;
import org.apache.hudi.async.HoodieAsyncService; import org.apache.hudi.async.HoodieAsyncService;
@@ -120,13 +121,13 @@ public class HoodieDeltaStreamer implements Serializable {
Option<TypedProperties> props) throws IOException { Option<TypedProperties> props) throws IOException {
// Resolving the properties first in a consistent way // Resolving the properties first in a consistent way
if (props.isPresent()) { if (props.isPresent()) {
this.properties = props.get(); this.properties = setDefaults(props.get());
} else if (cfg.propsFilePath.equals(Config.DEFAULT_DFS_SOURCE_PROPERTIES)) { } else if (cfg.propsFilePath.equals(Config.DEFAULT_DFS_SOURCE_PROPERTIES)) {
this.properties = UtilHelpers.getConfig(cfg.configs).getConfig(); this.properties = setDefaults(UtilHelpers.getConfig(cfg.configs).getConfig());
} else { } else {
this.properties = UtilHelpers.readConfig( this.properties = setDefaults(UtilHelpers.readConfig(
FSUtils.getFs(cfg.propsFilePath, jssc.hadoopConfiguration()), FSUtils.getFs(cfg.propsFilePath, jssc.hadoopConfiguration()),
new Path(cfg.propsFilePath), cfg.configs).getConfig(); new Path(cfg.propsFilePath), cfg.configs).getConfig());
} }
if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) { if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) {
@@ -146,6 +147,13 @@ public class HoodieDeltaStreamer implements Serializable {
deltaSyncService.ifPresent(ds -> ds.shutdown(false)); deltaSyncService.ifPresent(ds -> ds.shutdown(false));
} }
private TypedProperties setDefaults(TypedProperties props) {
if (!props.containsKey(DataSourceWriteOptions.RECONCILE_SCHEMA().key())) {
props.setProperty(DataSourceWriteOptions.RECONCILE_SCHEMA().key(), DataSourceWriteOptions.RECONCILE_SCHEMA().defaultValue().toString());
}
return props;
}
/** /**
* Main method to start syncing. * Main method to start syncing.
* *

View File

@@ -74,9 +74,10 @@ public final class SourceFormatAdapter {
// If the source schema is specified through Avro schema, // If the source schema is specified through Avro schema,
// pass in the schema for the Row-to-Avro conversion // pass in the schema for the Row-to-Avro conversion
// to avoid nullability mismatch between Avro schema and Row schema // to avoid nullability mismatch between Avro schema and Row schema
? HoodieSparkUtils.createRdd(rdd, r.getSchemaProvider().getSourceSchema(), ? HoodieSparkUtils.createRdd(rdd, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, true,
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD() : HoodieSparkUtils.createRdd(rdd, org.apache.hudi.common.util.Option.ofNullable(r.getSchemaProvider().getSourceSchema())
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD(); ).toJavaRDD() : HoodieSparkUtils.createRdd(rdd,
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, false, Option.empty()).toJavaRDD();
}) })
.orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider()); .orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
} }

View File

@@ -40,8 +40,8 @@ public class SparkAvroPostProcessor extends SchemaPostProcessor {
@Override @Override
public Schema processSchema(Schema schema) { public Schema processSchema(Schema schema) {
return AvroConversionUtils.convertStructTypeToAvroSchema( return schema != null ? AvroConversionUtils.convertStructTypeToAvroSchema(
AvroConversionUtils.convertAvroSchemaToStructType(schema), RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME, AvroConversionUtils.convertAvroSchemaToStructType(schema), RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME,
RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE); RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE) : null;
} }
} }

View File

@@ -18,10 +18,8 @@
package org.apache.hudi.utilities.functional; package org.apache.hudi.utilities.functional;
import java.sql.Connection; import org.apache.hudi.AvroConversionUtils;
import java.sql.DriverManager; import org.apache.hudi.DataSourceWriteOptions;
import java.util.ConcurrentModificationException;
import java.util.concurrent.ExecutorService;
import org.apache.hudi.common.config.DFSPropertiesConfiguration; import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
@@ -33,6 +31,7 @@ import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
@@ -49,6 +48,7 @@ import org.apache.hudi.utilities.DummySchemaProvider;
import org.apache.hudi.utilities.HoodieClusteringJob; import org.apache.hudi.utilities.HoodieClusteringJob;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
import org.apache.hudi.utilities.sources.CsvDFSSource; import org.apache.hudi.utilities.sources.CsvDFSSource;
import org.apache.hudi.utilities.sources.HoodieIncrSource; import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.InputBatch; import org.apache.hudi.utilities.sources.InputBatch;
@@ -63,6 +63,7 @@ import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs;
import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer; import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
import org.apache.hudi.utilities.transform.Transformer; import org.apache.hudi.utilities.transform.Transformer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@@ -92,11 +93,15 @@ import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource; import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException; import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@@ -105,6 +110,8 @@ import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY; import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -115,6 +122,7 @@ import static org.junit.jupiter.api.Assertions.fail;
/** /**
* Basic tests against {@link HoodieDeltaStreamer}, by issuing bulk_inserts, upserts, inserts. Check counts at the end. * Basic tests against {@link HoodieDeltaStreamer}, by issuing bulk_inserts, upserts, inserts. Check counts at the end.
*/ */
public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class); private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class);
@@ -157,7 +165,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, String clusteringInstantTime, boolean runSchedule, String scheduleAndExecute) { protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, String clusteringInstantTime, boolean runSchedule, String scheduleAndExecute) {
HoodieClusteringJob.Config scheduleClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath, HoodieClusteringJob.Config scheduleClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath,
clusteringInstantTime, runSchedule, scheduleAndExecute); clusteringInstantTime, runSchedule, scheduleAndExecute);
return new HoodieClusteringJob(jsc, scheduleClusteringConfig); return new HoodieClusteringJob(jsc, scheduleClusteringConfig);
} }
@@ -226,7 +234,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
cfg.payloadClassName = payloadClassName; cfg.payloadClassName = payloadClassName;
} }
if (useSchemaProviderClass) { if (useSchemaProviderClass) {
cfg.schemaProviderClassName = FilebasedSchemaProvider.class.getName(); cfg.schemaProviderClassName = defaultSchemaProviderClassName;
} }
return cfg; return cfg;
} }
@@ -391,6 +399,23 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
return base; return base;
} }
/**
* args for schema evolution test.
*
* @return
*/
private static Stream<Arguments> schemaEvolArgs() {
return Stream.of(
Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, true),
Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, false),
Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, true),
Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, false),
Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, true),
Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, false),
Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, true),
Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, false));
}
private static Stream<Arguments> provideValidCliArgs() { private static Stream<Arguments> provideValidCliArgs() {
HoodieDeltaStreamer.Config base = getBaseConfig(); HoodieDeltaStreamer.Config base = getBaseConfig();
@@ -425,41 +450,41 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
conf.enableHiveSync = true; conf.enableHiveSync = true;
conf.configs = Arrays.asList(HOODIE_CONF_VALUE1, HOODIE_CONF_VALUE2); conf.configs = Arrays.asList(HOODIE_CONF_VALUE1, HOODIE_CONF_VALUE2);
String[] allConfig = new String[]{TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, SOURCE_LIMIT_PARAM, String[] allConfig = new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, SOURCE_LIMIT_PARAM,
SOURCE_LIMIT_VALUE, TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, SOURCE_LIMIT_VALUE, TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE,
BASE_FILE_FORMAT_PARAM, BASE_FILE_FORMAT_VALUE, ENABLE_HIVE_SYNC_PARAM, HOODIE_CONF_PARAM, HOODIE_CONF_VALUE1, BASE_FILE_FORMAT_PARAM, BASE_FILE_FORMAT_VALUE, ENABLE_HIVE_SYNC_PARAM, HOODIE_CONF_PARAM, HOODIE_CONF_VALUE1,
HOODIE_CONF_PARAM, HOODIE_CONF_VALUE2}; HOODIE_CONF_PARAM, HOODIE_CONF_VALUE2};
return Stream.of( return Stream.of(
// Base // Base
Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE,
TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE}, base), TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE}, base),
// String // String
Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE,
TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE,
BASE_FILE_FORMAT_PARAM, BASE_FILE_FORMAT_VALUE}, conf1), BASE_FILE_FORMAT_PARAM, BASE_FILE_FORMAT_VALUE}, conf1),
// Integer // Integer
Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE,
TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE,
SOURCE_LIMIT_PARAM, SOURCE_LIMIT_VALUE}, conf2), SOURCE_LIMIT_PARAM, SOURCE_LIMIT_VALUE}, conf2),
// Boolean // Boolean
Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE,
TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE,
ENABLE_HIVE_SYNC_PARAM}, conf3), ENABLE_HIVE_SYNC_PARAM}, conf3),
// Array List 1 // Array List 1
Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE,
TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE,
HOODIE_CONF_PARAM, HOODIE_CONF_VALUE1}, conf4), HOODIE_CONF_PARAM, HOODIE_CONF_VALUE1}, conf4),
// Array List with comma // Array List with comma
Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE,
TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE,
HOODIE_CONF_PARAM, HOODIE_CONF_VALUE2}, conf5), HOODIE_CONF_PARAM, HOODIE_CONF_VALUE2}, conf5),
// Array list with multiple values // Array list with multiple values
Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, Arguments.of(new String[] {TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE,
TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE,
HOODIE_CONF_PARAM, HOODIE_CONF_VALUE1, HOODIE_CONF_PARAM, HOODIE_CONF_VALUE2}, conf6), HOODIE_CONF_PARAM, HOODIE_CONF_VALUE1, HOODIE_CONF_PARAM, HOODIE_CONF_VALUE2}, conf6),
// All // All
Arguments.of(allConfig, conf) Arguments.of(allConfig, conf)
); );
} }
@@ -494,7 +519,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
@Test @Test
public void testPropsWithInvalidKeyGenerator() throws Exception { public void testPropsWithInvalidKeyGenerator() throws Exception {
Exception e = assertThrows(IOException.class, () -> { Exception e = assertThrows(IOException.class, () -> {
String tableBasePath = dfsBasePath + "/test_table"; String tableBasePath = dfsBasePath + "/test_table_invalid_key_gen";
HoodieDeltaStreamer deltaStreamer = HoodieDeltaStreamer deltaStreamer =
new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT, new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT,
Collections.singletonList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_INVALID, false), jsc); Collections.singletonList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_INVALID, false), jsc);
@@ -548,8 +573,8 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
// Perform bootstrap with tableBasePath as source // Perform bootstrap with tableBasePath as source
String bootstrapSourcePath = dfsBasePath + "/src_bootstrapped"; String bootstrapSourcePath = dfsBasePath + "/src_bootstrapped";
Dataset<Row> sourceDf = sqlContext.read() Dataset<Row> sourceDf = sqlContext.read()
.format("org.apache.hudi") .format("org.apache.hudi")
.load(tableBasePath + "/*/*.parquet"); .load(tableBasePath + "/*/*.parquet");
sourceDf.write().format("parquet").save(bootstrapSourcePath); sourceDf.write().format("parquet").save(bootstrapSourcePath);
String newDatasetBasePath = dfsBasePath + "/test_dataset_bootstrapped"; String newDatasetBasePath = dfsBasePath + "/test_dataset_bootstrapped";
@@ -575,6 +600,83 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
assertTrue(fieldNames.containsAll(expectedFieldNames)); assertTrue(fieldNames.containsAll(expectedFieldNames));
} }
@ParameterizedTest
@MethodSource("schemaEvolArgs")
public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema, boolean useSchemaPostProcessor) throws Exception {
String tableBasePath = dfsBasePath + "/test_table_schema_evolution" + tableType + "_" + useUserProvidedSchema + "_" + useSchemaPostProcessor;
defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
// Insert data produced with Schema A, pass Schema A
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, Collections.singletonList(TestIdentityTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType);
cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + dfsBasePath + "/source.avsc");
cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + dfsBasePath + "/source.avsc");
cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
if (!useSchemaPostProcessor) {
cfg.configs.add(SparkAvroPostProcessor.Config.SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE + "=false");
}
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*", sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
// Upsert data produced with Schema B, pass Schema B
cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, Collections.singletonList(TripsWithEvolvedOptionalFieldTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType);
cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + dfsBasePath + "/source.avsc");
cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + dfsBasePath + "/source_evolved.avsc");
cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
if (!useSchemaPostProcessor) {
cfg.configs.add(SparkAvroPostProcessor.Config.SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE + "=false");
}
new HoodieDeltaStreamer(cfg, jsc).sync();
// out of 1000 new records, 500 are inserts, 450 are updates and 50 are deletes.
TestHelpers.assertRecordCount(1450, tableBasePath + "/*/*", sqlContext);
TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
List<Row> counts = TestHelpers.countsPerCommit(tableBasePath + "/*/*", sqlContext);
assertEquals(1450, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
sqlContext.read().format("org.apache.hudi").load(tableBasePath + "/*/*").createOrReplaceTempView("tmp_trips");
long recordCount =
sqlContext.sparkSession().sql("select * from tmp_trips where evoluted_optional_union_field is not NULL").count();
assertEquals(950, recordCount);
// Upsert data produced with Schema A, pass Schema B
if (!useUserProvidedSchema) {
defaultSchemaProviderClassName = TestFileBasedSchemaProviderNullTargetSchema.class.getName();
}
cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, Collections.singletonList(TestIdentityTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType);
cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + dfsBasePath + "/source.avsc");
if (useUserProvidedSchema) {
cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + dfsBasePath + "/source_evolved.avsc");
}
if (!useSchemaPostProcessor) {
cfg.configs.add(SparkAvroPostProcessor.Config.SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE + "=false");
}
cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
new HoodieDeltaStreamer(cfg, jsc).sync();
// again, 1000 new records, 500 are inserts, 450 are updates and 50 are deletes.
TestHelpers.assertRecordCount(1900, tableBasePath + "/*/*", sqlContext);
TestHelpers.assertCommitMetadata("00002", tableBasePath, dfs, 3);
counts = TestHelpers.countsPerCommit(tableBasePath + "/*/*", sqlContext);
assertEquals(1900, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(dfs.getConf()).build());
Schema tableSchema = tableSchemaResolver.getTableAvroSchemaWithoutMetadataFields();
assertNotNull(tableSchema);
Schema expectedSchema = new Schema.Parser().parse(dfs.open(new Path(dfsBasePath + "/source_evolved.avsc")));
if (!useUserProvidedSchema || useSchemaPostProcessor) {
expectedSchema = AvroConversionUtils.convertStructTypeToAvroSchema(
AvroConversionUtils.convertAvroSchemaToStructType(expectedSchema), HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE);
}
assertEquals(tableSchema, expectedSchema);
// clean up and reinit
UtilitiesTestBase.Helpers.deleteFileFromDfs(FSUtils.getFs(cfg.targetBasePath, jsc.hadoopConfiguration()), dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE);
writeCommonPropsToFile();
defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
}
@Test @Test
public void testUpsertsCOWContinuousMode() throws Exception { public void testUpsertsCOWContinuousMode() throws Exception {
testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow"); testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow");
@@ -782,8 +884,8 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
} }
private void runJobsInParallel(String tableBasePath, HoodieTableType tableType, int totalRecords, private void runJobsInParallel(String tableBasePath, HoodieTableType tableType, int totalRecords,
HoodieDeltaStreamer ingestionJob, HoodieDeltaStreamer.Config cfgIngestionJob, HoodieDeltaStreamer backfillJob, HoodieDeltaStreamer ingestionJob, HoodieDeltaStreamer.Config cfgIngestionJob, HoodieDeltaStreamer backfillJob,
HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict) throws Exception { HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict) throws Exception {
ExecutorService service = Executors.newFixedThreadPool(2); ExecutorService service = Executors.newFixedThreadPool(2);
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build(); HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
@@ -1262,7 +1364,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
} }
private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile, private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile,
String propsFileName, String parquetSourceRoot, boolean addCommonProps) throws IOException { String propsFileName, String parquetSourceRoot, boolean addCommonProps) throws IOException {
// Properties used for testing delta-streamer with Parquet source // Properties used for testing delta-streamer with Parquet source
TypedProperties parquetProps = new TypedProperties(); TypedProperties parquetProps = new TypedProperties();
@@ -1271,7 +1373,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
} }
parquetProps.setProperty("include", "base.properties"); parquetProps.setProperty("include", "base.properties");
parquetProps.setProperty("hoodie.embed.timeline.server","false"); parquetProps.setProperty("hoodie.embed.timeline.server", "false");
parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
if (useSchemaProvider) { if (useSchemaProvider) {
@@ -1301,7 +1403,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
TypedProperties props = new TypedProperties(); TypedProperties props = new TypedProperties();
populateAllCommonProps(props); populateAllCommonProps(props);
props.setProperty("include", "base.properties"); props.setProperty("include", "base.properties");
props.setProperty("hoodie.embed.timeline.server","false"); props.setProperty("hoodie.embed.timeline.server", "false");
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
props.setProperty("hoodie.deltastreamer.source.dfs.root", JSON_KAFKA_SOURCE_ROOT); props.setProperty("hoodie.deltastreamer.source.dfs.root", JSON_KAFKA_SOURCE_ROOT);
@@ -1316,6 +1418,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
/** /**
* Tests Deltastreamer with parquet dfs source and transitions to JsonKafkaSource. * Tests Deltastreamer with parquet dfs source and transitions to JsonKafkaSource.
*
* @param autoResetToLatest true if auto reset value to be set to LATEST. false to leave it as default(i.e. EARLIEST) * @param autoResetToLatest true if auto reset value to be set to LATEST. false to leave it as default(i.e. EARLIEST)
* @throws Exception * @throws Exception
*/ */
@@ -1325,7 +1428,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
int parquetRecords = 10; int parquetRecords = 10;
prepareParquetDFSFiles(parquetRecords, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, true, HoodieTestDataGenerator.TRIP_SCHEMA, HoodieTestDataGenerator.AVRO_TRIP_SCHEMA); prepareParquetDFSFiles(parquetRecords, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, true, HoodieTestDataGenerator.TRIP_SCHEMA, HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
prepareParquetDFSSource(true, false,"source_uber.avsc", "target_uber.avsc", PROPS_FILENAME_TEST_PARQUET, prepareParquetDFSSource(true, false, "source_uber.avsc", "target_uber.avsc", PROPS_FILENAME_TEST_PARQUET,
PARQUET_SOURCE_ROOT, false); PARQUET_SOURCE_ROOT, false);
// delta streamer w/ parquest source // delta streamer w/ parquest source
String tableBasePath = dfsBasePath + "/test_dfs_to_kakfa" + testNum; String tableBasePath = dfsBasePath + "/test_dfs_to_kakfa" + testNum;
@@ -1388,19 +1491,19 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest", topicName); prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest", topicName);
String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum; String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(),
Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false, Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
true, 100000, false, null, true, 100000, false, null,
null, "timestamp", String.valueOf(System.currentTimeMillis())), jsc); null, "timestamp", String.valueOf(System.currentTimeMillis())), jsc);
deltaStreamer.sync(); deltaStreamer.sync();
TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, false, topicName); prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, false, topicName);
deltaStreamer = new HoodieDeltaStreamer( deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(),
Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false, Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
true, 100000, false, null, null, true, 100000, false, null, null,
"timestamp", String.valueOf(System.currentTimeMillis())), jsc); "timestamp", String.valueOf(System.currentTimeMillis())), jsc);
deltaStreamer.sync(); deltaStreamer.sync();
TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS * 2, tableBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS * 2, tableBasePath + "/*/*.parquet", sqlContext);
} }
@@ -1717,8 +1820,36 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
@Override @Override
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
TypedProperties properties) { TypedProperties properties) {
return rowDataset; return rowDataset;
} }
} }
/**
* Add new field evoluted_optional_union_field with value of the field rider.
*/
public static class TripsWithEvolvedOptionalFieldTransformer implements Transformer {
@Override
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
TypedProperties properties) {
return rowDataset.withColumn("evoluted_optional_union_field", functions.col("rider"));
}
}
/**
* {@link FilebasedSchemaProvider} to be used in tests where target schema is null.
*/
public static class TestFileBasedSchemaProviderNullTargetSchema extends FilebasedSchemaProvider {
public TestFileBasedSchemaProviderNullTargetSchema(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
}
@Override
public Schema getTargetSchema() {
return null;
}
}
} }

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.avro.Schema; import org.apache.avro.Schema;
@@ -76,7 +77,7 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase {
static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamerBase.class); static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamerBase.class);
public static KafkaTestUtils testUtils; public static KafkaTestUtils testUtils;
protected static String topicName; protected static String topicName;
protected static String defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
protected static int testNum = 1; protected static int testNum = 1;
@BeforeAll @BeforeAll
@@ -94,6 +95,7 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase {
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", dfs, UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", dfs,
dfsBasePath + "/sql-transformer.properties"); dfsBasePath + "/sql-transformer.properties");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source_evolved.avsc", dfs, dfsBasePath + "/source_evolved.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc", dfs, dfsBasePath + "/source-flattened.avsc"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc", dfs, dfsBasePath + "/source-flattened.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath + "/target.avsc"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath + "/target.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target-flattened.avsc", dfs, dfsBasePath + "/target-flattened.avsc"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target-flattened.avsc", dfs, dfsBasePath + "/target-flattened.avsc");
@@ -107,22 +109,7 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase {
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/short_trip_uber_config.properties", dfs, dfsBasePath + "/config/short_trip_uber_config.properties"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/short_trip_uber_config.properties", dfs, dfsBasePath + "/config/short_trip_uber_config.properties");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/clusteringjob.properties", dfs, dfsBasePath + "/clusteringjob.properties"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/clusteringjob.properties", dfs, dfsBasePath + "/clusteringjob.properties");
TypedProperties props = new TypedProperties(); writeCommonPropsToFile();
props.setProperty("include", "sql-transformer.properties");
props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
// Hive Configs
props.setProperty(DataSourceWriteOptions.HIVE_URL().key(), "jdbc:hive2://127.0.0.1:9999/");
props.setProperty(DataSourceWriteOptions.HIVE_DATABASE().key(), "testdb1");
props.setProperty(DataSourceWriteOptions.HIVE_TABLE().key(), "hive_trips");
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), "datestr");
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS().key(),
MultiPartKeysValueExtractor.class.getName());
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE);
// Properties used for the delta-streamer which incrementally pulls from upstream Hudi source table and writes to // Properties used for the delta-streamer which incrementally pulls from upstream Hudi source table and writes to
// downstream hudi table // downstream hudi table
@@ -162,6 +149,25 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase {
prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT); prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
} }
protected static void writeCommonPropsToFile() throws IOException {
TypedProperties props = new TypedProperties();
props.setProperty("include", "sql-transformer.properties");
props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
// Hive Configs
props.setProperty(DataSourceWriteOptions.HIVE_URL().key(), "jdbc:hive2://127.0.0.1:9999/");
props.setProperty(DataSourceWriteOptions.HIVE_DATABASE().key(), "testdb1");
props.setProperty(DataSourceWriteOptions.HIVE_TABLE().key(), "hive_trips");
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), "datestr");
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS().key(),
MultiPartKeysValueExtractor.class.getName());
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE);
}
@BeforeEach @BeforeEach
public void setup() throws Exception { public void setup() throws Exception {
super.setup(); super.setup();
@@ -241,5 +247,4 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase {
dataGenerator.generateInserts("000", numRecords)), new Path(path)); dataGenerator.generateInserts("000", numRecords)), new Path(path));
} }
} }
} }

View File

@@ -243,6 +243,12 @@ public class UtilitiesTestBase {
os.close(); os.close();
} }
public static void deleteFileFromDfs(FileSystem fs, String targetPath) throws IOException {
if (fs.exists(new Path(targetPath))) {
fs.delete(new Path(targetPath), true);
}
}
public static void savePropsToDFS(TypedProperties props, FileSystem fs, String targetPath) throws IOException { public static void savePropsToDFS(TypedProperties props, FileSystem fs, String targetPath) throws IOException {
String[] lines = props.keySet().stream().map(k -> String.format("%s=%s", k, props.get(k))).toArray(String[]::new); String[] lines = props.keySet().stream().map(k -> String.format("%s=%s", k, props.get(k))).toArray(String[]::new);
saveStringsToDFS(lines, fs, targetPath); saveStringsToDFS(lines, fs, targetPath);

View File

@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"type" : "record",
"name" : "triprec",
"fields" : [
{
"name" : "timestamp",
"type" : "long"
}, {
"name" : "_row_key",
"type" : "string"
}, {
"name" : "partition_path",
"type" : "string"
}, {
"name" : "rider",
"type" : "string"
}, {
"name" : "driver",
"type" : "string"
}, {
"name" : "begin_lat",
"type" : "double"
}, {
"name" : "begin_lon",
"type" : "double"
}, {
"name" : "end_lat",
"type" : "double"
}, {
"name" : "end_lon",
"type" : "double"
}, {
"name" : "distance_in_meters",
"type" : "int"
}, {
"name" : "seconds_since_epoch",
"type" : "long"
}, {
"name" : "weight",
"type" : "float"
},{
"name" : "nation",
"type" : "bytes"
},{
"name" : "current_date",
"type" : {
"type" : "int",
"logicalType" : "date"
}
},{
"name" : "current_ts",
"type" : {
"type" : "long"
}
},{
"name" : "height",
"type" : {
"type" : "fixed",
"name" : "abc",
"size" : 5,
"logicalType" : "decimal",
"precision" : 10,
"scale": 6
}
}, {
"name" :"city_to_state",
"type" : {
"type" : "map",
"values": "string"
}
},
{
"name" : "fare",
"type" : {
"type" : "record",
"name" : "fare",
"fields" : [
{
"name" : "amount",
"type" : "double"
},
{
"name" : "currency",
"type" : "string"
}
]
}
},
{
"name" : "tip_history",
"type" : {
"type" : "array",
"items" : {
"type" : "record",
"name" : "tip_history",
"fields" : [
{
"name" : "amount",
"type" : "double"
},
{
"name" : "currency",
"type" : "string"
}
]
}
}
},
{
"name" : "_hoodie_is_deleted",
"type" : "boolean",
"default" : false
},
{
"name": "evoluted_optional_union_field",
"type": [
"null",
"string"
],
"default": null
}]
}