1
0

[HUDI-3595] Fixing NULL schema provider for empty batch (#5002)

This commit is contained in:
Sivabalan Narayanan
2022-03-10 19:52:55 -08:00
committed by GitHub
parent fa5e75068e
commit 9dc6df5dca
3 changed files with 80 additions and 2 deletions

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.exception.HoodieException;
import org.apache.avro.Schema;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -40,6 +41,7 @@ import java.util.Map;
public class CommitUtils {
private static final Logger LOG = LogManager.getLogger(CommitUtils.class);
private static final String NULL_SCHEMA_STR = Schema.create(Schema.Type.NULL).toString();
/**
* Gets the commit action type for given write operation and table type.
@@ -84,7 +86,8 @@ public class CommitUtils {
if (extraMetadata.isPresent()) {
extraMetadata.get().forEach(commitMetadata::addMetadata);
}
commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schemaToStoreInCommit == null ? "" : schemaToStoreInCommit);
commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, (schemaToStoreInCommit == null || schemaToStoreInCommit.equals(NULL_SCHEMA_STR))
? "" : schemaToStoreInCommit);
commitMetadata.setOperationType(operationType);
return commitMetadata;
}

View File

@@ -70,6 +70,7 @@ import org.apache.hudi.utilities.sources.ORCDFSSource;
import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.sources.SqlSource;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.sources.TestParquetDFSSourceEmptyBatch;
import org.apache.hudi.utilities.testutils.JdbcTestUtils;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hudi.utilities.testutils.sources.DistributedTestDataSource;
@@ -130,6 +131,7 @@ import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_REC
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -1420,15 +1422,34 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
}
private void testParquetDFSSource(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
testParquetDFSSource(useSchemaProvider, transformerClassNames, false);
}
private void testParquetDFSSource(boolean useSchemaProvider, List<String> transformerClassNames, boolean testEmptyBatch) throws Exception {
prepareParquetDFSSource(useSchemaProvider, transformerClassNames != null);
String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, testEmptyBatch ? TestParquetDFSSourceEmptyBatch.class.getName()
: ParquetDFSSource.class.getName(),
transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
testNum++;
if (testEmptyBatch) {
prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null);
// parquet source to return empty batch
TestParquetDFSSourceEmptyBatch.returnEmptyBatch = true;
deltaStreamer.sync();
// since we mimic'ed empty batch, total records should be same as first sync().
TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
// validate table schema fetches valid schema from last but one commit.
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
assertNotEquals(tableSchemaResolver.getTableAvroSchema(), Schema.create(Schema.Type.NULL).toString());
}
}
private void testORCDFSSource(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
@@ -1584,6 +1605,11 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
testParquetDFSSource(false, null);
}
@Test
public void testParquetDFSSourceForEmptyBatch() throws Exception {
testParquetDFSSource(false, null, true);
}
@Test
public void testParquetDFSSourceWithoutSchemaProviderAndTransformer() throws Exception {
testParquetDFSSource(false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));

View File

@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class TestParquetDFSSourceEmptyBatch extends ParquetDFSSource {
public static boolean returnEmptyBatch;
public TestParquetDFSSourceEmptyBatch(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
}
@Override
public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
Pair<Option<Dataset<Row>>, String> toReturn = super.fetchNextBatch(lastCkptStr, sourceLimit);
if (returnEmptyBatch) {
return Pair.of(Option.empty(), toReturn.getRight());
}
return toReturn;
}
}