1
0

[HUDI-1618] Fixing NPE with Parquet src in multi table delta streamer (#2577)

This commit is contained in:
Sivabalan Narayanan
2021-03-07 16:40:40 -05:00
committed by GitHub
parent 9437e0ddef
commit 5cf2f2618b
3 changed files with 127 additions and 19 deletions

View File

@@ -47,6 +47,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
/**
@@ -75,9 +76,9 @@ public class HoodieMultiTableDeltaStreamer {
FileSystem fs = FSUtils.getFs(commonPropsFile, jssc.hadoopConfiguration());
configFolder = configFolder.charAt(configFolder.length() - 1) == '/' ? configFolder.substring(0, configFolder.length() - 1) : configFolder;
checkIfPropsFileAndConfigFolderExist(commonPropsFile, configFolder, fs);
TypedProperties properties = UtilHelpers.readConfig(fs, new Path(commonPropsFile), new ArrayList<>()).getConfig();
TypedProperties commonProperties = UtilHelpers.readConfig(fs, new Path(commonPropsFile), new ArrayList<>()).getConfig();
//get the tables to be ingested and their corresponding config files from this properties instance
populateTableExecutionContextList(properties, configFolder, fs, config);
populateTableExecutionContextList(commonProperties, configFolder, fs, config);
}
private void checkIfPropsFileAndConfigFolderExist(String commonPropsFile, String configFolder, FileSystem fs) throws IOException {
@@ -147,7 +148,7 @@ public class HoodieMultiTableDeltaStreamer {
}
private void populateSchemaProviderProps(HoodieDeltaStreamer.Config cfg, TypedProperties typedProperties) {
if (cfg.schemaProviderClassName.equals(SchemaRegistryProvider.class.getName())) {
if (Objects.equals(cfg.schemaProviderClassName, SchemaRegistryProvider.class.getName())) {
String schemaRegistryBaseUrl = typedProperties.getString(Constants.SCHEMA_REGISTRY_BASE_URL_PROP);
String schemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_URL_SUFFIX_PROP);
typedProperties.setProperty(Constants.SOURCE_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + schemaRegistrySuffix);

View File

@@ -118,8 +118,9 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
public static final String PROPS_INVALID_TABLE_CONFIG_FILE = "test-invalid-table-config.properties";
private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties";
private static final String PROPS_FILENAME_TEST_CSV = "test-csv-dfs-source.properties";
private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
protected static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
private static final String PROPS_FILENAME_TEST_JSON_KAFKA = "test-json-kafka-dfs-source.properties";
private static final String FIRST_PARQUET_FILE_NAME = "1.parquet";
private static String PARQUET_SOURCE_ROOT;
private static String JSON_KAFKA_SOURCE_ROOT;
private static final int PARQUET_NUM_RECORDS = 5;
@@ -214,7 +215,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_INVALID);
TypedProperties props1 = new TypedProperties();
populateCommonProps(props1);
populateAllCommonProps(props1);
UtilitiesTestBase.Helpers.savePropsToDFS(props1, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE1);
TypedProperties properties = new TypedProperties();
@@ -226,7 +227,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
UtilitiesTestBase.Helpers.savePropsToDFS(invalidHiveSyncProps, dfs, dfsBasePath + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1);
prepareParquetDFSFiles(PARQUET_NUM_RECORDS);
prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
}
private static void populateInvalidTableConfigFilePathProps(TypedProperties props) {
@@ -236,20 +237,30 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/invalid_uber_config.properties");
}
private static void populateCommonProps(TypedProperties props) {
private static void populateAllCommonProps(TypedProperties props) {
populateCommonProps(props);
populateCommonKafkaProps(props);
populateCommonHiveProps(props);
}
protected static void populateCommonProps(TypedProperties props) {
props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd");
props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "short_trip_db.dummy_table_short_trip,uber_db.dummy_table_uber");
props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/uber_config.properties");
props.setProperty("hoodie.deltastreamer.ingestion.short_trip_db.dummy_table_short_trip.configFile", dfsBasePath + "/config/short_trip_uber_config.properties");
}
protected static void populateCommonKafkaProps(TypedProperties props) {
//Kafka source properties
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
props.setProperty("hoodie.deltastreamer.source.kafka.auto.reset.offsets", "earliest");
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", String.valueOf(5000));
}
protected static void populateCommonHiveProps(TypedProperties props) {
// Hive Configs
props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), "jdbc:hive2://127.0.0.1:9999/");
props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), "testdb2");
@@ -975,12 +986,16 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
}
private static void prepareParquetDFSFiles(int numRecords) throws IOException {
prepareParquetDFSFiles(numRecords, "1.parquet", false, null, null);
prepareParquetDFSFiles(numRecords, PARQUET_SOURCE_ROOT);
}
private static void prepareParquetDFSFiles(int numRecords, String fileName, boolean useCustomSchema,
protected static void prepareParquetDFSFiles(int numRecords, String baseParquetPath) throws IOException {
prepareParquetDFSFiles(numRecords, baseParquetPath, FIRST_PARQUET_FILE_NAME, false, null, null);
}
protected static void prepareParquetDFSFiles(int numRecords, String baseParquetPath, String fileName, boolean useCustomSchema,
String schemaStr, Schema schema) throws IOException {
String path = PARQUET_SOURCE_ROOT + "/" + fileName;
String path = baseParquetPath + "/" + fileName;
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
if (useCustomSchema) {
Helpers.saveParquetToDFS(Helpers.toGenericRecords(
@@ -1006,13 +1021,18 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException {
prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc",
PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT);
PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false);
}
private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile,
String propsFileName, String parquetSourceRoot) throws IOException {
String propsFileName, String parquetSourceRoot, boolean addCommonProps) throws IOException {
// Properties used for testing delta-streamer with Parquet source
TypedProperties parquetProps = new TypedProperties();
if (addCommonProps) {
populateCommonProps(parquetProps);
}
parquetProps.setProperty("include", "base.properties");
parquetProps.setProperty("hoodie.embed.timeline.server","false");
parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
@@ -1042,7 +1062,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetValue, String topicName) throws IOException {
// Properties used for testing delta-streamer with JsonKafka source
TypedProperties props = new TypedProperties();
populateCommonProps(props);
populateAllCommonProps(props);
props.setProperty("include", "base.properties");
props.setProperty("hoodie.embed.timeline.server","false");
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
@@ -1065,10 +1085,10 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
// prep parquet source
PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFilesDfsToKafka" + testNum;
int parquetRecords = 10;
prepareParquetDFSFiles(parquetRecords,"1.parquet", true, HoodieTestDataGenerator.TRIP_SCHEMA, HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
prepareParquetDFSFiles(parquetRecords, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, true, HoodieTestDataGenerator.TRIP_SCHEMA, HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
prepareParquetDFSSource(true, false,"source_uber.avsc", "target_uber.avsc", PROPS_FILENAME_TEST_PARQUET,
PARQUET_SOURCE_ROOT);
PARQUET_SOURCE_ROOT, false);
// delta streamer w/ parquest source
String tableBasePath = dfsBasePath + "/test_dfs_to_kakfa" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(

View File

@@ -26,7 +26,9 @@ import org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.TableExecutionContext;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -34,7 +36,9 @@ import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -43,19 +47,27 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer {
private static volatile Logger log = LogManager.getLogger(TestHoodieMultiTableDeltaStreamer.class);
private static final Random RANDOM = new Random();
static class TestHelpers {
static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync) {
return getConfig(fileName, configFolder, sourceClassName, enableHiveSync, true, "multi_table_dataset");
}
static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync,
boolean setSchemaProvider, String basePathPrefix) {
HoodieMultiTableDeltaStreamer.Config config = new HoodieMultiTableDeltaStreamer.Config();
config.configFolder = configFolder;
config.targetTableName = "dummy_table";
config.basePathPrefix = dfsBasePath + "/multi_table_dataset";
config.basePathPrefix = dfsBasePath + "/" + basePathPrefix;
config.propsFilePath = dfsBasePath + "/" + fileName;
config.tableType = "COPY_ON_WRITE";
config.sourceClassName = sourceClassName;
config.sourceOrderingField = "timestamp";
config.schemaProviderClassName = FilebasedSchemaProvider.class.getName();
if (setSchemaProvider) {
config.schemaProviderClassName = FilebasedSchemaProvider.class.getName();
}
config.enableHiveSync = enableHiveSync;
return config;
}
@@ -117,7 +129,7 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer {
}
@Test //0 corresponds to fg
public void testMultiTableExecution() throws IOException {
public void testMultiTableExecutionWithKafkaSource() throws IOException {
//create topics for each table
String topicName1 = "topic" + testNum++;
String topicName2 = "topic" + testNum;
@@ -128,7 +140,7 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer {
testUtils.sendMessages(topicName1, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5, HoodieTestDataGenerator.TRIP_SCHEMA)));
testUtils.sendMessages(topicName2, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1,dfsBasePath + "/config", JsonKafkaSource.class.getName(), false);
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", JsonKafkaSource.class.getName(), false);
HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
List<TableExecutionContext> executionContexts = streamer.getTableExecutionContexts();
TypedProperties properties = executionContexts.get(1).getProperties();
@@ -160,4 +172,79 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer {
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2 + "/*/*.parquet", sqlContext);
testNum++;
}
@Test
public void testMultiTableExecutionWithParquetSource() throws IOException {
// ingest test data to 2 parquet source paths
String parquetSourceRoot1 = dfsBasePath + "/parquetSrcPath1/";
prepareParquetDFSFiles(10, parquetSourceRoot1);
String parquetSourceRoot2 = dfsBasePath + "/parquetSrcPath2/";
prepareParquetDFSFiles(5, parquetSourceRoot2);
// add only common props. later we can add per table props
String parquetPropsFile = populateCommonPropsAndWriteToFile();
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(parquetPropsFile, dfsBasePath + "/config", ParquetDFSSource.class.getName(), false,
false, "multi_table_parquet");
HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
List<TableExecutionContext> executionContexts = streamer.getTableExecutionContexts();
// fetch per parquet source props and add per table properties
ingestPerParquetSourceProps(executionContexts, Arrays.asList(new String[] {parquetSourceRoot1, parquetSourceRoot2}));
String targetBasePath1 = executionContexts.get(0).getConfig().targetBasePath;
String targetBasePath2 = executionContexts.get(1).getConfig().targetBasePath;
// sync and verify
syncAndVerify(streamer, targetBasePath1, targetBasePath2, 10, 5);
int totalTable1Records = 10;
int totalTable2Records = 5;
// ingest multiple rounds and verify
for (int i = 0; i < 3; i++) {
int table1Records = 10 + RANDOM.nextInt(100);
int table2Records = 15 + RANDOM.nextInt(100);
prepareParquetDFSFiles(table1Records, parquetSourceRoot1, (i + 2) + ".parquet", false, null, null);
prepareParquetDFSFiles(table2Records, parquetSourceRoot2, (i + 2) + ".parquet", false, null, null);
totalTable1Records += table1Records;
totalTable2Records += table2Records;
// sync and verify
syncAndVerify(streamer, targetBasePath1, targetBasePath2, totalTable1Records, totalTable2Records);
}
}
private String populateCommonPropsAndWriteToFile() throws IOException {
TypedProperties commonProps = new TypedProperties();
populateCommonProps(commonProps);
UtilitiesTestBase.Helpers.savePropsToDFS(commonProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_PARQUET);
return PROPS_FILENAME_TEST_PARQUET;
}
private TypedProperties getParquetProps(String parquetSourceRoot) {
TypedProperties props = new TypedProperties();
props.setProperty("include", "base.properties");
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
props.setProperty("hoodie.deltastreamer.source.dfs.root", parquetSourceRoot);
return props;
}
private void ingestPerParquetSourceProps(List<TableExecutionContext> executionContexts, List<String> parquetSourceRoots) {
int counter = 0;
for (String parquetSourceRoot : parquetSourceRoots) {
TypedProperties properties = executionContexts.get(counter).getProperties();
TypedProperties parquetProps = getParquetProps(parquetSourceRoot);
parquetProps.forEach((k, v) -> {
properties.setProperty(k.toString(), v.toString());
});
executionContexts.get(counter).setProperties(properties);
counter++;
}
}
private void syncAndVerify(HoodieMultiTableDeltaStreamer streamer, String targetBasePath1, String targetBasePath2, long table1ExpectedRecords, long table2ExpectedRecords) {
streamer.sync();
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(table1ExpectedRecords, targetBasePath1 + "/*/*.parquet", sqlContext);
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(table2ExpectedRecords, targetBasePath2 + "/*/*.parquet", sqlContext);
}
}