1
0

[HUDI-4085] Fixing flakiness with parquet empty batch tests in TestHoodieDeltaStreamer (#5559)

This commit is contained in:
Sivabalan Narayanan
2022-05-11 16:02:54 -04:00
committed by GitHub
parent 7f0c1f3ddf
commit b10ca7e69f
3 changed files with 31 additions and 9 deletions

View File

@@ -30,7 +30,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.TestParquetDFSSourceEmptyBatch; import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.avro.Schema; import org.apache.avro.Schema;
@@ -192,7 +192,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase {
@BeforeEach @BeforeEach
public void setup() throws Exception { public void setup() throws Exception {
super.setup(); super.setup();
TestParquetDFSSourceEmptyBatch.returnEmptyBatch = false; TestDataSource.returnEmptyBatch = false;
} }
@AfterAll @AfterAll

View File

@@ -1509,9 +1509,13 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
testUtils.sendMessages(topicName, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", numRecords, HoodieTestDataGenerator.TRIP_SCHEMA))); testUtils.sendMessages(topicName, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", numRecords, HoodieTestDataGenerator.TRIP_SCHEMA)));
} }
private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException { private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String emptyBatchParam) throws IOException {
prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc",
PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false); PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false, "partition_path", emptyBatchParam);
}
private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException {
prepareParquetDFSSource(useSchemaProvider, hasTransformer, "");
} }
private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile, private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile,
@@ -1520,9 +1524,15 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
"partition_path"); "partition_path");
} }
private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile,
String propsFileName, String parquetSourceRoot, boolean addCommonProps, String partitionPath) throws IOException {
prepareParquetDFSSource(useSchemaProvider, hasTransformer, sourceSchemaFile, targetSchemaFile, propsFileName, parquetSourceRoot, addCommonProps,
partitionPath, "");
}
private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile, private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile,
String propsFileName, String parquetSourceRoot, boolean addCommonProps, String propsFileName, String parquetSourceRoot, boolean addCommonProps,
String partitionPath) throws IOException { String partitionPath, String emptyBatchParam) throws IOException {
// Properties used for testing delta-streamer with Parquet source // Properties used for testing delta-streamer with Parquet source
TypedProperties parquetProps = new TypedProperties(); TypedProperties parquetProps = new TypedProperties();
@@ -1541,6 +1551,9 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
} }
} }
parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", parquetSourceRoot); parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", parquetSourceRoot);
if (!StringUtils.isNullOrEmpty(emptyBatchParam)) {
parquetProps.setProperty(TestParquetDFSSourceEmptyBatch.RETURN_EMPTY_BATCH, emptyBatchParam);
}
UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + propsFileName); UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + propsFileName);
} }
@@ -1549,7 +1562,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
} }
private void testParquetDFSSource(boolean useSchemaProvider, List<String> transformerClassNames, boolean testEmptyBatch) throws Exception { private void testParquetDFSSource(boolean useSchemaProvider, List<String> transformerClassNames, boolean testEmptyBatch) throws Exception {
prepareParquetDFSSource(useSchemaProvider, transformerClassNames != null); prepareParquetDFSSource(useSchemaProvider, transformerClassNames != null, testEmptyBatch ? "1" : "");
String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum; String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, testEmptyBatch ? TestParquetDFSSourceEmptyBatch.class.getName() TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, testEmptyBatch ? TestParquetDFSSourceEmptyBatch.class.getName()
@@ -1563,7 +1576,6 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
if (testEmptyBatch) { if (testEmptyBatch) {
prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null); prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null);
// parquet source to return empty batch // parquet source to return empty batch
TestParquetDFSSourceEmptyBatch.returnEmptyBatch = true;
deltaStreamer.sync(); deltaStreamer.sync();
// since we mimic'ed empty batch, total records should be same as first sync(). // since we mimic'ed empty batch, total records should be same as first sync().
TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath, sqlContext); TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath, sqlContext);

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -29,19 +30,28 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class TestParquetDFSSourceEmptyBatch extends ParquetDFSSource { public class TestParquetDFSSourceEmptyBatch extends ParquetDFSSource {
public static boolean returnEmptyBatch; public static String RETURN_EMPTY_BATCH = "test.dfs.source.return.empty.batches.for";
public static String DEFAULT_RETURN_EMPTY_BATCH = "";
public List<Integer> emptyBatches;
private int counter = 0;
public TestParquetDFSSourceEmptyBatch(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, public TestParquetDFSSourceEmptyBatch(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) { SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider); super(props, sparkContext, sparkSession, schemaProvider);
String[] emptyBatchesStr = props.getString(RETURN_EMPTY_BATCH, DEFAULT_RETURN_EMPTY_BATCH).split(",");
this.emptyBatches = Arrays.stream(emptyBatchesStr).filter(entry -> !StringUtils.isNullOrEmpty(entry)).map(entry -> Integer.parseInt(entry)).collect(Collectors.toList());
} }
@Override @Override
public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) { public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
Pair<Option<Dataset<Row>>, String> toReturn = super.fetchNextBatch(lastCkptStr, sourceLimit); Pair<Option<Dataset<Row>>, String> toReturn = super.fetchNextBatch(lastCkptStr, sourceLimit);
if (returnEmptyBatch) { if (emptyBatches.contains(counter++)) {
return Pair.of(Option.empty(), toReturn.getRight()); return Pair.of(Option.empty(), toReturn.getRight());
} }
return toReturn; return toReturn;