1
0

[HUDI-4072] Fix NULL schema for empty batches in deltastreamer (#5543)

This commit is contained in:
Sivabalan Narayanan
2022-05-13 08:26:47 -04:00
committed by GitHub
parent a704e3740c
commit 5c4813f101
3 changed files with 52 additions and 13 deletions

View File

@@ -37,6 +37,7 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -77,6 +78,7 @@ import org.apache.hudi.utilities.transform.Transformer;
import com.codahale.metrics.Timer;
import org.apache.avro.Schema;
import org.apache.avro.SchemaCompatibility;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -794,7 +796,7 @@ public class DeltaSync implements Serializable {
.withProps(props);
if (schema != null) {
builder.withSchema(schema.toString());
builder.withSchema(getSchemaForWriteConfig(schema).toString());
}
HoodieWriteConfig config = builder.build();
@@ -829,6 +831,25 @@ public class DeltaSync implements Serializable {
return config;
}
private Schema getSchemaForWriteConfig(Schema targetSchema) {
Schema newWriteSchema = targetSchema;
try {
if (targetSchema != null) {
// check if targetSchema is equal to NULL schema
if (SchemaCompatibility.checkReaderWriterCompatibility(targetSchema, InputBatch.NULL_SCHEMA).getType() == SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE
&& SchemaCompatibility.checkReaderWriterCompatibility(InputBatch.NULL_SCHEMA, targetSchema).getType() == SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE) {
// target schema is null. fetch schema from commit metadata and use it
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf())).setBasePath(cfg.targetBasePath).setPayloadClassName(cfg.payloadClassName).build();
TableSchemaResolver schemaResolver = new TableSchemaResolver(meta);
newWriteSchema = schemaResolver.getTableAvroSchema(false);
}
}
return newWriteSchema;
} catch (Exception e) {
throw new HoodieException("Failed to fetch schema from table.");
}
}
/**
* Register Avro Schemas.
*

View File

@@ -28,6 +28,7 @@ import org.apache.spark.api.java.JavaSparkContext;
public class InputBatch<T> {
public static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL);
private final Option<T> batch;
private final String checkpointForNextBatch;
private final SchemaProvider schemaProvider;
@@ -69,7 +70,7 @@ public class InputBatch<T> {
@Override
public Schema getSourceSchema() {
return Schema.create(Schema.Type.NULL);
return NULL_SCHEMA;
}
}
}

View File

@@ -1518,12 +1518,6 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
prepareParquetDFSSource(useSchemaProvider, hasTransformer, "");
}
private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile,
String propsFileName, String parquetSourceRoot, boolean addCommonProps) throws IOException {
prepareParquetDFSSource(useSchemaProvider, hasTransformer, sourceSchemaFile, targetSchemaFile, propsFileName, parquetSourceRoot, addCommonProps,
"partition_path");
}
private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile,
String propsFileName, String parquetSourceRoot, boolean addCommonProps, String partitionPath) throws IOException {
prepareParquetDFSSource(useSchemaProvider, hasTransformer, sourceSchemaFile, targetSchemaFile, propsFileName, parquetSourceRoot, addCommonProps,
@@ -1562,7 +1556,13 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
}
private void testParquetDFSSource(boolean useSchemaProvider, List<String> transformerClassNames, boolean testEmptyBatch) throws Exception {
prepareParquetDFSSource(useSchemaProvider, transformerClassNames != null, testEmptyBatch ? "1" : "");
PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFilesDfs" + testNum;
int parquetRecordsCount = 10;
boolean hasTransformer = transformerClassNames != null && !transformerClassNames.isEmpty();
prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, false, null, null);
prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
PARQUET_SOURCE_ROOT, false, "partition_path", testEmptyBatch ? "1" : "");
String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, testEmptyBatch ? TestParquetDFSSourceEmptyBatch.class.getName()
@@ -1570,21 +1570,38 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath, sqlContext);
testNum++;
TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
if (testEmptyBatch) {
prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null);
// parquet source to return empty batch
deltaStreamer.sync();
// since we mimic'ed empty batch, total records should be same as first sync().
TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath, sqlContext);
TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
// validate table schema fetches valid schema from last but one commit.
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
assertNotEquals(tableSchemaResolver.getTableAvroSchema(), Schema.create(Schema.Type.NULL).toString());
}
// proceed w/ non empty batch.
prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "3.parquet", false, null, null);
deltaStreamer.sync();
TestHelpers.assertRecordCount(parquetRecordsCount + 100, tableBasePath, sqlContext);
// validate commit metadata for all completed commits to have valid schema in extra metadata.
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants().forEach(entry -> assertValidSchemaInCommitMetadata(entry, metaClient));
testNum++;
}
private void assertValidSchemaInCommitMetadata(HoodieInstant instant, HoodieTableMetaClient metaClient) {
try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
assertFalse(StringUtils.isNullOrEmpty(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY)));
} catch (IOException ioException) {
throw new HoodieException("Failed to parse commit metadata for " + instant.toString());
}
}
private void testORCDFSSource(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {