1
0

[HUDI-552] Fix the schema mismatch in Row-to-Avro conversion (#1246)

This commit is contained in:
Y Ethan Guo
2020-01-18 16:40:56 -08:00
committed by vinoth chandar
parent 3f4966ddea
commit d0ee95ed16
4 changed files with 104 additions and 8 deletions

View File

@@ -31,12 +31,14 @@ object AvroConversionUtils {
def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = { def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = {
val avroSchema = convertStructTypeToAvroSchema(df.schema, structName, recordNamespace) val avroSchema = convertStructTypeToAvroSchema(df.schema, structName, recordNamespace)
createRdd(df, avroSchema.toString, structName, recordNamespace) createRdd(df, avroSchema, structName, recordNamespace)
} }
def createRdd(df: DataFrame, avroSchemaAsJsonString: String, structName: String, recordNamespace: String) def createRdd(df: DataFrame, avroSchema: Schema, structName: String, recordNamespace: String)
: RDD[GenericRecord] = { : RDD[GenericRecord] = {
val dataType = df.schema // Use the Avro schema to derive the StructType which has the correct nullability information
val dataType = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
val avroSchemaAsJsonString = avroSchema.toString
val encoder = RowEncoder.apply(dataType).resolveAndBind() val encoder = RowEncoder.apply(dataType).resolveAndBind()
df.queryExecution.toRdd.map(encoder.fromRow) df.queryExecution.toRdd.map(encoder.fromRow)
.mapPartitions { records => .mapPartitions { records =>

View File

@@ -290,8 +290,19 @@ public class DeltaSync implements Serializable {
Option<Dataset<Row>> transformed = Option<Dataset<Row>> transformed =
dataAndCheckpoint.getBatch().map(data -> transformer.apply(jssc, sparkSession, data, props)); dataAndCheckpoint.getBatch().map(data -> transformer.apply(jssc, sparkSession, data, props));
checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch(); checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
if (this.schemaProvider != null && this.schemaProvider.getTargetSchema() != null) {
// If the target schema is specified through Avro schema,
// pass in the schema for the Row-to-Avro conversion
// to avoid nullability mismatch between Avro schema and Row schema
avroRDDOptional = transformed avroRDDOptional = transformed
.map(t -> AvroConversionUtils.createRdd(t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()); .map(t -> AvroConversionUtils.createRdd(
t, this.schemaProvider.getTargetSchema(),
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD());
} else {
avroRDDOptional = transformed
.map(t -> AvroConversionUtils.createRdd(
t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD());
}
// Use Transformed Row's schema if not overridden // Use Transformed Row's schema if not overridden
// Use Transformed Row's schema if not overridden. If target schema is not specified // Use Transformed Row's schema if not overridden. If target schema is not specified

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.AvroSource; import org.apache.hudi.utilities.sources.AvroSource;
import org.apache.hudi.utilities.sources.InputBatch; import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JsonSource; import org.apache.hudi.utilities.sources.JsonSource;
@@ -64,7 +65,17 @@ public final class SourceFormatAdapter {
case ROW: { case ROW: {
InputBatch<Dataset<Row>> r = ((RowSource) source).fetchNext(lastCkptStr, sourceLimit); InputBatch<Dataset<Row>> r = ((RowSource) source).fetchNext(lastCkptStr, sourceLimit);
return new InputBatch<>(Option.ofNullable(r.getBatch().map( return new InputBatch<>(Option.ofNullable(r.getBatch().map(
rdd -> (AvroConversionUtils.createRdd(rdd, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD())) rdd -> (
(r.getSchemaProvider() instanceof FilebasedSchemaProvider)
// If the source schema is specified through Avro schema,
// pass in the schema for the Row-to-Avro conversion
// to avoid nullability mismatch between Avro schema and Row schema
? AvroConversionUtils.createRdd(
rdd, r.getSchemaProvider().getSourceSchema(),
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
: AvroConversionUtils.createRdd(
rdd, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
))
.orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider()); .orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
} }
default: default:

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.utilities; package org.apache.hudi.utilities;
import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTableType;
@@ -44,6 +45,7 @@ import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.DistributedTestDataSource; import org.apache.hudi.utilities.sources.DistributedTestDataSource;
import org.apache.hudi.utilities.sources.HoodieIncrSource; import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.InputBatch; import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.sources.TestDataSource; import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.sources.config.TestSourceConfig; import org.apache.hudi.utilities.sources.config.TestSourceConfig;
import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer; import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
@@ -96,8 +98,13 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
private static final Random RANDOM = new Random(); private static final Random RANDOM = new Random();
private static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties"; private static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties";
private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties"; private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties";
private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
private static final String PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
private static final int PARQUET_NUM_RECORDS = 5;
private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class); private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class);
private static int parquetTestNum = 1;
@BeforeClass @BeforeClass
public static void initClass() throws Exception { public static void initClass() throws Exception {
UtilitiesTestBase.initClass(true); UtilitiesTestBase.initClass(true);
@@ -146,6 +153,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_INVALID); UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_INVALID);
prepareParquetDFSFiles(PARQUET_NUM_RECORDS);
} }
@AfterClass @AfterClass
@@ -186,17 +195,24 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName, static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName,
String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass,
String payloadClassName, String tableType) { String payloadClassName, String tableType) {
return makeConfig(basePath, op, TestDataSource.class.getName(), transformerClassName, propsFilename, enableHiveSync,
useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType);
}
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String sourceClassName,
String transformerClassName, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass,
int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType) {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetBasePath = basePath; cfg.targetBasePath = basePath;
cfg.targetTableName = "hoodie_trips"; cfg.targetTableName = "hoodie_trips";
cfg.tableType = tableType == null ? "COPY_ON_WRITE" : tableType; cfg.tableType = tableType == null ? "COPY_ON_WRITE" : tableType;
cfg.sourceClassName = TestDataSource.class.getName(); cfg.sourceClassName = sourceClassName;
cfg.transformerClassName = transformerClassName; cfg.transformerClassName = transformerClassName;
cfg.operation = op; cfg.operation = op;
cfg.enableHiveSync = enableHiveSync; cfg.enableHiveSync = enableHiveSync;
cfg.sourceOrderingField = "timestamp"; cfg.sourceOrderingField = "timestamp";
cfg.propsFilePath = dfsBasePath + "/" + propsFilename; cfg.propsFilePath = dfsBasePath + "/" + propsFilename;
cfg.sourceLimit = 1000; cfg.sourceLimit = sourceLimit;
if (updatePayloadClass) { if (updatePayloadClass) {
cfg.payloadClassName = payloadClassName; cfg.payloadClassName = payloadClassName;
} }
@@ -620,6 +636,62 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
Assert.assertEquals(1000, c); Assert.assertEquals(1000, c);
} }
private static void prepareParquetDFSFiles(int numRecords) throws IOException {
String path = PARQUET_SOURCE_ROOT + "/1.parquet";
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
Helpers.saveParquetToDFS(Helpers.toGenericRecords(
dataGenerator.generateInserts("000", numRecords), dataGenerator), new Path(path));
}
private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException {
// Properties used for testing delta-streamer with Parquet source
TypedProperties parquetProps = new TypedProperties();
parquetProps.setProperty("include", "base.properties");
parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
if (useSchemaProvider) {
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
if (hasTransformer) {
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/target.avsc");
}
}
parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", PARQUET_SOURCE_ROOT);
UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_PARQUET);
}
private void testParquetDFSSource(boolean useSchemaProvider, String transformerClassName) throws Exception {
prepareParquetDFSSource(useSchemaProvider, transformerClassName != null);
String tableBasePath = dfsBasePath + "/test_parquet_table" + parquetTestNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, Operation.INSERT, ParquetDFSSource.class.getName(),
transformerClassName, PROPS_FILENAME_TEST_PARQUET, false,
useSchemaProvider, 100000, false, null, null), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
parquetTestNum++;
}
@Test
public void testParquetDFSSourceWithoutSchemaProviderAndNoTransformer() throws Exception {
testParquetDFSSource(false, null);
}
@Test
public void testParquetDFSSourceWithoutSchemaProviderAndTransformer() throws Exception {
testParquetDFSSource(false, TripsWithDistanceTransformer.class.getName());
}
@Test
public void testParquetDFSSourceWithSourceSchemaFileAndNoTransformer() throws Exception {
testParquetDFSSource(true, null);
}
@Test
public void testParquetDFSSourceWithSchemaFilesAndTransformer() throws Exception {
testParquetDFSSource(true, TripsWithDistanceTransformer.class.getName());
}
/** /**
* UDF to calculate Haversine distance. * UDF to calculate Haversine distance.
*/ */