1
0

[HUDI-76] Add CSV Source support for Hudi Delta Streamer

This commit is contained in:
Y Ethan Guo
2020-01-18 23:11:14 -08:00
committed by Y Ethan Guo
parent 23afe7a487
commit cf765df606
10 changed files with 597 additions and 33 deletions

View File

@@ -74,20 +74,30 @@ public class HoodieTestDataGenerator {
public static final String[] DEFAULT_PARTITION_PATHS = public static final String[] DEFAULT_PARTITION_PATHS =
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH}; {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH};
public static final int DEFAULT_PARTITION_DEPTH = 3; public static final int DEFAULT_PARTITION_DEPTH = 3;
public static final String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ " public static final String TRIP_SCHEMA_PREFIX = "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": \"_row_key\", \"type\": \"string\"}," + "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": \"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": \"driver\", \"type\": \"string\"}," + "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": \"driver\", \"type\": \"string\"},"
+ "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": \"begin_lon\", \"type\": \"double\"}," + "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": \"begin_lon\", \"type\": \"double\"},"
+ "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\", \"type\": \"double\"}," + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\", \"type\": \"double\"},";
+ "{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [" public static final String TRIP_SCHEMA_SUFFIX = "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}";
+ "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}}," public static final String FARE_NESTED_SCHEMA = "{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": ["
+ "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}"; + "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},";
public static final String FARE_FLATTENED_SCHEMA = "{\"name\": \"fare\", \"type\": \"double\"},"
+ "{\"name\": \"currency\", \"type\": \"string\"},";
public static final String TRIP_EXAMPLE_SCHEMA =
TRIP_SCHEMA_PREFIX + FARE_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
public static final String TRIP_FLATTENED_SCHEMA =
TRIP_SCHEMA_PREFIX + FARE_FLATTENED_SCHEMA + TRIP_SCHEMA_SUFFIX;
public static final String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString(); public static final String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString();
public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double," public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,"
+ "struct<amount:double,currency:string>,boolean"; + "struct<amount:double,currency:string>,boolean";
public static final Schema AVRO_SCHEMA = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA); public static final Schema AVRO_SCHEMA = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS = public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS =
HoodieAvroUtils.addMetadataFields(AVRO_SCHEMA); HoodieAvroUtils.addMetadataFields(AVRO_SCHEMA);
public static final Schema FLATTENED_AVRO_SCHEMA = new Schema.Parser().parse(TRIP_FLATTENED_SCHEMA);
private static final Random RAND = new Random(46474747); private static final Random RAND = new Random(46474747);
@@ -115,10 +125,33 @@ public class HoodieTestDataGenerator {
} }
/** /**
* Generates a new avro record of the above schema format, retaining the key if optionally provided. * Generates a new avro record of the above nested schema format,
* retaining the key if optionally provided.
*
* @param key Hoodie key.
* @param commitTime Commit time to use.
* @return Raw paylaod of a test record.
* @throws IOException
*/ */
public static TestRawTripPayload generateRandomValue(HoodieKey key, String commitTime) throws IOException { public static TestRawTripPayload generateRandomValue(HoodieKey key, String commitTime) throws IOException {
GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0); return generateRandomValue(key, commitTime, false);
}
/**
* Generates a new avro record with the specified schema (nested or flattened),
* retaining the key if optionally provided.
*
* @param key Hoodie key.
* @param commitTime Commit time to use.
* @param isFlattened whether the schema of the record should be flattened.
* @return Raw paylaod of a test record.
* @throws IOException
*/
public static TestRawTripPayload generateRandomValue(
HoodieKey key, String commitTime, boolean isFlattened) throws IOException {
GenericRecord rec = generateGenericRecord(
key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0,
false, isFlattened);
return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA); return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
} }
@@ -127,7 +160,7 @@ public class HoodieTestDataGenerator {
*/ */
public static TestRawTripPayload generateRandomDeleteValue(HoodieKey key, String commitTime) throws IOException { public static TestRawTripPayload generateRandomDeleteValue(HoodieKey key, String commitTime) throws IOException {
GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0, GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0,
true); true, false);
return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA); return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
} }
@@ -141,12 +174,13 @@ public class HoodieTestDataGenerator {
public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName, public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName,
double timestamp) { double timestamp) {
return generateGenericRecord(rowKey, riderName, driverName, timestamp, false); return generateGenericRecord(rowKey, riderName, driverName, timestamp, false, false);
} }
public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName, public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName,
double timestamp, boolean isDeleteRecord) { double timestamp, boolean isDeleteRecord,
GenericRecord rec = new GenericData.Record(AVRO_SCHEMA); boolean isFlattened) {
GenericRecord rec = new GenericData.Record(isFlattened ? FLATTENED_AVRO_SCHEMA : AVRO_SCHEMA);
rec.put("_row_key", rowKey); rec.put("_row_key", rowKey);
rec.put("timestamp", timestamp); rec.put("timestamp", timestamp);
rec.put("rider", riderName); rec.put("rider", riderName);
@@ -156,10 +190,15 @@ public class HoodieTestDataGenerator {
rec.put("end_lat", RAND.nextDouble()); rec.put("end_lat", RAND.nextDouble());
rec.put("end_lon", RAND.nextDouble()); rec.put("end_lon", RAND.nextDouble());
if (isFlattened) {
rec.put("fare", RAND.nextDouble() * 100);
rec.put("currency", "USD");
} else {
GenericRecord fareRecord = new GenericData.Record(AVRO_SCHEMA.getField("fare").schema()); GenericRecord fareRecord = new GenericData.Record(AVRO_SCHEMA.getField("fare").schema());
fareRecord.put("amount", RAND.nextDouble() * 100); fareRecord.put("amount", RAND.nextDouble() * 100);
fareRecord.put("currency", "USD"); fareRecord.put("currency", "USD");
rec.put("fare", fareRecord); rec.put("fare", fareRecord);
}
if (isDeleteRecord) { if (isDeleteRecord) {
rec.put("_hoodie_is_deleted", true); rec.put("_hoodie_is_deleted", true);
@@ -230,16 +269,31 @@ public class HoodieTestDataGenerator {
} }
/** /**
* Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys. * Generates new inserts with nested schema, uniformly across the partition paths above.
* It also updates the list of existing keys.
*/ */
public List<HoodieRecord> generateInserts(String commitTime, Integer n) { public List<HoodieRecord> generateInserts(String commitTime, Integer n) {
return generateInsertsStream(commitTime, n).collect(Collectors.toList()); return generateInserts(commitTime, n, false);
}
/**
* Generates new inserts, uniformly across the partition paths above.
* It also updates the list of existing keys.
*
* @param commitTime Commit time to use.
* @param n Number of records.
* @param isFlattened whether the schema of the generated record is flattened
* @return List of {@link HoodieRecord}s
*/
public List<HoodieRecord> generateInserts(String commitTime, Integer n, boolean isFlattened) {
return generateInsertsStream(commitTime, n, isFlattened).collect(Collectors.toList());
} }
/** /**
* Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys. * Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys.
*/ */
public Stream<HoodieRecord> generateInsertsStream(String commitTime, Integer n) { public Stream<HoodieRecord> generateInsertsStream(
String commitTime, Integer n, boolean isFlattened) {
int currSize = getNumExistingKeys(); int currSize = getNumExistingKeys();
return IntStream.range(0, n).boxed().map(i -> { return IntStream.range(0, n).boxed().map(i -> {
@@ -251,7 +305,7 @@ public class HoodieTestDataGenerator {
existingKeys.put(currSize + i, kp); existingKeys.put(currSize + i, kp);
numExistingKeys++; numExistingKeys++;
try { try {
return new HoodieRecord(key, generateRandomValue(key, commitTime)); return new HoodieRecord(key, generateRandomValue(key, commitTime, isFlattened));
} catch (IOException e) { } catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e); throw new HoodieIOException(e.getMessage(), e);
} }

View File

@@ -137,6 +137,11 @@
<groupId>com.fasterxml.jackson.module</groupId> <groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId> <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-csv</artifactId>
<version>${fasterxml.version}</version>
</dependency>
<!-- Parquet --> <!-- Parquet -->
<dependency> <dependency>

View File

@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.avro.SchemaConverters;
import org.apache.spark.sql.types.StructType;
import java.util.Arrays;
import java.util.List;
/**
* Reads data from CSV files on DFS as the data source.
*
* Internally, we use Spark to read CSV files thus any limitation of Spark CSV also applies here
* (e.g., limited support for nested schema).
*
* You can set the CSV-specific configs in the format of hoodie.deltastreamer.csv.*
* that are Spark compatible to deal with CSV files in Hudi. The supported options are:
*
* "sep", "encoding", "quote", "escape", "charToEscapeQuoteEscaping", "comment",
* "header", "enforceSchema", "inferSchema", "samplingRatio", "ignoreLeadingWhiteSpace",
* "ignoreTrailingWhiteSpace", "nullValue", "emptyValue", "nanValue", "positiveInf",
* "negativeInf", "dateFormat", "timestampFormat", "maxColumns", "maxCharsPerColumn",
* "mode", "columnNameOfCorruptRecord", "multiLine"
*
* Detailed information of these CSV options can be found at:
* https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrameReader.html#csv-scala.collection.Seq-
*
* If the source Avro schema is provided through the {@link org.apache.hudi.utilities.schema.FilebasedSchemaProvider}
* using "hoodie.deltastreamer.schemaprovider.source.schema.file" config, the schema is
* passed to the CSV reader without inferring the schema from the CSV file.
*/
public class CsvDFSSource extends RowSource {
// CsvSource config prefix
public static final String CSV_SRC_CONFIG_PREFIX = "hoodie.deltastreamer.csv.";
// CSV-specific configurations to pass in from Hudi to Spark
public static final List<String> CSV_CONFIG_KEYS = Arrays.asList(
"sep", "encoding", "quote", "escape", "charToEscapeQuoteEscaping", "comment",
"header", "enforceSchema", "inferSchema", "samplingRatio", "ignoreLeadingWhiteSpace",
"ignoreTrailingWhiteSpace", "nullValue", "emptyValue", "nanValue", "positiveInf",
"negativeInf", "dateFormat", "timestampFormat", "maxColumns", "maxCharsPerColumn",
"mode", "columnNameOfCorruptRecord", "multiLine"
);
private final DFSPathSelector pathSelector;
private final StructType sourceSchema;
public CsvDFSSource(TypedProperties props,
JavaSparkContext sparkContext,
SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
this.pathSelector = new DFSPathSelector(props, sparkContext.hadoopConfiguration());
if (schemaProvider != null) {
sourceSchema = (StructType) SchemaConverters.toSqlType(schemaProvider.getSourceSchema())
.dataType();
} else {
sourceSchema = null;
}
}
@Override
protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr,
long sourceLimit) {
Pair<Option<String>, String> selPathsWithMaxModificationTime =
pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, sourceLimit);
return Pair.of(fromFiles(
selPathsWithMaxModificationTime.getLeft()), selPathsWithMaxModificationTime.getRight());
}
/**
* Reads the CSV files and parsed the lines into {@link Dataset} of {@link Row}.
*
* @param pathStr The list of file paths, separated by ','.
* @return {@link Dataset} of {@link Row} containing the records.
*/
private Option<Dataset<Row>> fromFiles(Option<String> pathStr) {
if (pathStr.isPresent()) {
DataFrameReader dataFrameReader = sparkSession.read().format("csv");
CSV_CONFIG_KEYS.forEach(optionKey -> {
String configPropName = CSV_SRC_CONFIG_PREFIX + optionKey;
String value = props.getString(configPropName, null);
// Pass down the Hudi CSV configs to Spark DataFrameReader
if (value != null) {
dataFrameReader.option(optionKey, value);
}
});
if (sourceSchema != null) {
// Source schema is specified, pass it to the reader
dataFrameReader.schema(sourceSchema);
}
dataFrameReader.option("inferSchema", Boolean.toString(sourceSchema == null));
return Option.of(dataFrameReader.load(pathStr.get().split(",")));
} else {
return Option.empty();
}
}
}

View File

@@ -19,8 +19,8 @@
package org.apache.hudi.utilities; package org.apache.hudi.utilities;
import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
@@ -42,6 +42,7 @@ import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.CsvDFSSource;
import org.apache.hudi.utilities.sources.DistributedTestDataSource; import org.apache.hudi.utilities.sources.DistributedTestDataSource;
import org.apache.hudi.utilities.sources.HoodieIncrSource; import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.InputBatch; import org.apache.hudi.utilities.sources.InputBatch;
@@ -60,6 +61,7 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SQLContext;
@@ -98,12 +100,14 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
private static final Random RANDOM = new Random(); private static final Random RANDOM = new Random();
private static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties"; private static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties";
private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties"; private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties";
private static final String PROPS_FILENAME_TEST_CSV = "test-csv-dfs-source.properties";
private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties"; private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
private static final String PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles"; private static final String PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
private static final int PARQUET_NUM_RECORDS = 5; private static final int PARQUET_NUM_RECORDS = 5;
private static final int CSV_NUM_RECORDS = 3;
private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class); private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class);
private static int parquetTestNum = 1; private static int testNum = 1;
@BeforeClass @BeforeClass
public static void initClass() throws Exception { public static void initClass() throws Exception {
@@ -114,7 +118,9 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", dfs, UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", dfs,
dfsBasePath + "/sql-transformer.properties"); dfsBasePath + "/sql-transformer.properties");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc", dfs, dfsBasePath + "/source-flattened.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath + "/target.avsc"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath + "/target.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target-flattened.avsc", dfs, dfsBasePath + "/target-flattened.avsc");
TypedProperties props = new TypedProperties(); TypedProperties props = new TypedProperties();
props.setProperty("include", "sql-transformer.properties"); props.setProperty("include", "sql-transformer.properties");
@@ -197,12 +203,12 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass,
String payloadClassName, String tableType) { String payloadClassName, String tableType) {
return makeConfig(basePath, op, TestDataSource.class.getName(), transformerClassName, propsFilename, enableHiveSync, return makeConfig(basePath, op, TestDataSource.class.getName(), transformerClassName, propsFilename, enableHiveSync,
useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType); useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType, "timestamp");
} }
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String sourceClassName, static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String sourceClassName,
String transformerClassName, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, String transformerClassName, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass,
int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType) { int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField) {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetBasePath = basePath; cfg.targetBasePath = basePath;
cfg.targetTableName = "hoodie_trips"; cfg.targetTableName = "hoodie_trips";
@@ -211,7 +217,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
cfg.transformerClassName = transformerClassName; cfg.transformerClassName = transformerClassName;
cfg.operation = op; cfg.operation = op;
cfg.enableHiveSync = enableHiveSync; cfg.enableHiveSync = enableHiveSync;
cfg.sourceOrderingField = "timestamp"; cfg.sourceOrderingField = sourceOrderingField;
cfg.propsFilePath = dfsBasePath + "/" + propsFilename; cfg.propsFilePath = dfsBasePath + "/" + propsFilename;
cfg.sourceLimit = sourceLimit; cfg.sourceLimit = sourceLimit;
if (updatePayloadClass) { if (updatePayloadClass) {
@@ -653,7 +659,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
if (useSchemaProvider) { if (useSchemaProvider) {
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
if (hasTransformer) { if (hasTransformer) {
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/target.avsc"); parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
} }
} }
parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", PARQUET_SOURCE_ROOT); parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", PARQUET_SOURCE_ROOT);
@@ -663,14 +669,14 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
private void testParquetDFSSource(boolean useSchemaProvider, String transformerClassName) throws Exception { private void testParquetDFSSource(boolean useSchemaProvider, String transformerClassName) throws Exception {
prepareParquetDFSSource(useSchemaProvider, transformerClassName != null); prepareParquetDFSSource(useSchemaProvider, transformerClassName != null);
String tableBasePath = dfsBasePath + "/test_parquet_table" + parquetTestNum; String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, Operation.INSERT, ParquetDFSSource.class.getName(), TestHelpers.makeConfig(tableBasePath, Operation.INSERT, ParquetDFSSource.class.getName(),
transformerClassName, PROPS_FILENAME_TEST_PARQUET, false, transformerClassName, PROPS_FILENAME_TEST_PARQUET, false,
useSchemaProvider, 100000, false, null, null), jsc); useSchemaProvider, 100000, false, null, null, "timestamp"), jsc);
deltaStreamer.sync(); deltaStreamer.sync();
TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
parquetTestNum++; testNum++;
} }
@Test @Test
@@ -693,6 +699,146 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
testParquetDFSSource(true, TripsWithDistanceTransformer.class.getName()); testParquetDFSSource(true, TripsWithDistanceTransformer.class.getName());
} }
private void prepareCsvDFSSource(
boolean hasHeader, char sep, boolean useSchemaProvider, boolean hasTransformer) throws IOException {
String sourceRoot = dfsBasePath + "/csvFiles";
String recordKeyField = (hasHeader || useSchemaProvider) ? "_row_key" : "_c0";
// Properties used for testing delta-streamer with CSV source
TypedProperties csvProps = new TypedProperties();
csvProps.setProperty("include", "base.properties");
csvProps.setProperty("hoodie.datasource.write.recordkey.field", recordKeyField);
csvProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
if (useSchemaProvider) {
csvProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source-flattened.avsc");
if (hasTransformer) {
csvProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target-flattened.avsc");
}
}
csvProps.setProperty("hoodie.deltastreamer.source.dfs.root", sourceRoot);
if (sep != ',') {
if (sep == '\t') {
csvProps.setProperty("hoodie.deltastreamer.csv.sep", "\\t");
} else {
csvProps.setProperty("hoodie.deltastreamer.csv.sep", Character.toString(sep));
}
}
if (hasHeader) {
csvProps.setProperty("hoodie.deltastreamer.csv.header", Boolean.toString(hasHeader));
}
UtilitiesTestBase.Helpers.savePropsToDFS(csvProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_CSV);
String path = sourceRoot + "/1.csv";
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
UtilitiesTestBase.Helpers.saveCsvToDFS(
hasHeader, sep,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", CSV_NUM_RECORDS, true)),
dfs, path);
}
private void testCsvDFSSource(
boolean hasHeader, char sep, boolean useSchemaProvider, String transformerClassName) throws Exception {
prepareCsvDFSSource(hasHeader, sep, useSchemaProvider, transformerClassName != null);
String tableBasePath = dfsBasePath + "/test_csv_table" + testNum;
String sourceOrderingField = (hasHeader || useSchemaProvider) ? "timestamp" : "_c0";
HoodieDeltaStreamer deltaStreamer =
new HoodieDeltaStreamer(TestHelpers.makeConfig(
tableBasePath, Operation.INSERT, CsvDFSSource.class.getName(),
transformerClassName, PROPS_FILENAME_TEST_CSV, false,
useSchemaProvider, 1000, false, null, null, sourceOrderingField), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(CSV_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
testNum++;
}
@Test
public void testCsvDFSSourceWithHeaderWithoutSchemaProviderAndNoTransformer() throws Exception {
// The CSV files have header, the columns are separated by ',', the default separator
// No schema provider is specified, no transformer is applied
// In this case, the source schema comes from the inferred schema of the CSV files
testCsvDFSSource(true, ',', false, null);
}
@Test
public void testCsvDFSSourceWithHeaderAndSepWithoutSchemaProviderAndNoTransformer() throws Exception {
// The CSV files have header, the columns are separated by '\t',
// which is passed in through the Hudi CSV properties
// No schema provider is specified, no transformer is applied
// In this case, the source schema comes from the inferred schema of the CSV files
testCsvDFSSource(true, '\t', false, null);
}
@Test
public void testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndNoTransformer() throws Exception {
// The CSV files have header, the columns are separated by '\t'
// File schema provider is used, no transformer is applied
// In this case, the source schema comes from the source Avro schema file
testCsvDFSSource(true, '\t', true, null);
}
@Test
public void testCsvDFSSourceWithHeaderAndSepWithoutSchemaProviderAndWithTransformer() throws Exception {
// The CSV files have header, the columns are separated by '\t'
// No schema provider is specified, transformer is applied
// In this case, the source schema comes from the inferred schema of the CSV files.
// Target schema is determined based on the Dataframe after transformation
testCsvDFSSource(true, '\t', false, TripsWithDistanceTransformer.class.getName());
}
@Test
public void testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndTransformer() throws Exception {
// The CSV files have header, the columns are separated by '\t'
// File schema provider is used, transformer is applied
// In this case, the source and target schema come from the Avro schema files
testCsvDFSSource(true, '\t', true, TripsWithDistanceTransformer.class.getName());
}
@Test
public void testCsvDFSSourceNoHeaderWithoutSchemaProviderAndNoTransformer() throws Exception {
// The CSV files do not have header, the columns are separated by '\t',
// which is passed in through the Hudi CSV properties
// No schema provider is specified, no transformer is applied
// In this case, the source schema comes from the inferred schema of the CSV files
// No CSV header and no schema provider at the same time are not recommended
// as the column names are not informative
testCsvDFSSource(false, '\t', false, null);
}
@Test
public void testCsvDFSSourceNoHeaderWithSchemaProviderAndNoTransformer() throws Exception {
// The CSV files do not have header, the columns are separated by '\t'
// File schema provider is used, no transformer is applied
// In this case, the source schema comes from the source Avro schema file
testCsvDFSSource(false, '\t', true, null);
}
@Test
public void testCsvDFSSourceNoHeaderWithoutSchemaProviderAndWithTransformer() throws Exception {
// The CSV files do not have header, the columns are separated by '\t'
// No schema provider is specified, transformer is applied
// In this case, the source schema comes from the inferred schema of the CSV files.
// Target schema is determined based on the Dataframe after transformation
// No CSV header and no schema provider at the same time are not recommended,
// as the transformer behavior may be unexpected
try {
testCsvDFSSource(false, '\t', false, TripsWithDistanceTransformer.class.getName());
fail("Should error out when doing the transformation.");
} catch (AnalysisException e) {
LOG.error("Expected error during transformation", e);
assertTrue(e.getMessage().contains("cannot resolve '`begin_lat`' given input columns:"));
}
}
@Test
public void testCsvDFSSourceNoHeaderWithSchemaProviderAndTransformer() throws Exception {
// The CSV files do not have header, the columns are separated by '\t'
// File schema provider is used, transformer is applied
// In this case, the source and target schema come from the Avro schema files
testCsvDFSSource(false, '\t', true, TripsWithDistanceTransformer.class.getName());
}
/** /**
* UDF to calculate Haversine distance. * UDF to calculate Haversine distance.
*/ */

View File

@@ -27,11 +27,20 @@ import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypedProperties; import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveClient; import org.apache.hudi.hive.HoodieHiveClient;
import org.apache.hudi.hive.util.HiveTestService; import org.apache.hudi.hive.util.HiveTestService;
import org.apache.hudi.utilities.sources.TestDataSource; import org.apache.hudi.utilities.sources.TestDataSource;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord; import org.apache.avro.generic.IndexedRecord;
@@ -42,6 +51,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.service.server.HiveServer2; import org.apache.hive.service.server.HiveServer2;
import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetFileWriter.Mode;
import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SQLContext;
@@ -56,6 +66,7 @@ import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
/** /**
@@ -72,6 +83,7 @@ public class UtilitiesTestBase {
protected transient SparkSession sparkSession = null; protected transient SparkSession sparkSession = null;
protected transient SQLContext sqlContext; protected transient SQLContext sqlContext;
protected static HiveServer2 hiveServer; protected static HiveServer2 hiveServer;
private static ObjectMapper mapper = new ObjectMapper();
@BeforeClass @BeforeClass
public static void initClass() throws Exception { public static void initClass() throws Exception {
@@ -193,9 +205,47 @@ public class UtilitiesTestBase {
os.close(); os.close();
} }
/**
* Converts the json records into CSV format and writes to a file.
*
* @param hasHeader whether the CSV file should have a header line.
* @param sep the column separator to use.
* @param lines the records in JSON format.
* @param fs {@link FileSystem} instance.
* @param targetPath File path.
* @throws IOException
*/
public static void saveCsvToDFS(
boolean hasHeader, char sep,
String[] lines, FileSystem fs, String targetPath) throws IOException {
Builder csvSchemaBuilder = CsvSchema.builder();
ArrayNode arrayNode = mapper.createArrayNode();
Arrays.stream(lines).forEachOrdered(
line -> {
try {
arrayNode.add(mapper.readValue(line, ObjectNode.class));
} catch (IOException e) {
throw new HoodieIOException(
"Error converting json records into CSV format: " + e.getMessage());
}
});
arrayNode.get(0).fieldNames().forEachRemaining(csvSchemaBuilder::addColumn);
ObjectWriter csvObjWriter = new CsvMapper()
.writerFor(JsonNode.class)
.with(csvSchemaBuilder.setUseHeader(hasHeader).setColumnSeparator(sep).build());
PrintStream os = new PrintStream(fs.create(new Path(targetPath), true));
csvObjWriter.writeValue(os, arrayNode);
os.flush();
os.close();
}
public static void saveParquetToDFS(List<GenericRecord> records, Path targetFile) throws IOException { public static void saveParquetToDFS(List<GenericRecord> records, Path targetFile) throws IOException {
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(targetFile) try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(targetFile)
.withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build()) { .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA)
.withConf(HoodieTestUtils.getDefaultHadoopConf())
.withWriteMode(Mode.OVERWRITE)
.build()) {
for (GenericRecord record : records) { for (GenericRecord record : records) {
writer.write(record); writer.write(record);
} }
@@ -203,9 +253,13 @@ public class UtilitiesTestBase {
} }
public static TypedProperties setupSchemaOnDFS() throws IOException { public static TypedProperties setupSchemaOnDFS() throws IOException {
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc"); return setupSchemaOnDFS("source.avsc");
}
public static TypedProperties setupSchemaOnDFS(String filename) throws IOException {
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/" + filename, dfs, dfsBasePath + "/" + filename);
TypedProperties props = new TypedProperties(); TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/" + filename);
return props; return props;
} }

View File

@@ -123,7 +123,7 @@ public abstract class AbstractBaseTestSource extends AvroSource {
updateStream = dataGenerator.generateUniqueUpdatesStream(commitTime, numUpdates) updateStream = dataGenerator.generateUniqueUpdatesStream(commitTime, numUpdates)
.map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator)); .map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
} }
Stream<GenericRecord> insertStream = dataGenerator.generateInsertsStream(commitTime, numInserts) Stream<GenericRecord> insertStream = dataGenerator.generateInsertsStream(commitTime, numInserts, false)
.map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator)); .map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
return Stream.concat(deleteStream, Stream.concat(updateStream, insertStream)); return Stream.concat(deleteStream, Stream.concat(updateStream, insertStream));
} }

View File

@@ -56,6 +56,7 @@ public abstract class AbstractDFSSourceTestBase extends UtilitiesTestBase {
String dfsRoot; String dfsRoot;
String fileSuffix; String fileSuffix;
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
boolean useFlattenedSchema = false;
@BeforeClass @BeforeClass
public static void initClass() throws Exception { public static void initClass() throws Exception {
@@ -105,7 +106,7 @@ public abstract class AbstractDFSSourceTestBase extends UtilitiesTestBase {
*/ */
Path generateOneFile(String filename, String commitTime, int n) throws IOException { Path generateOneFile(String filename, String commitTime, int n) throws IOException {
Path path = new Path(dfsRoot, filename + fileSuffix); Path path = new Path(dfsRoot, filename + fileSuffix);
writeNewDataToFile(dataGenerator.generateInserts(commitTime, n), path); writeNewDataToFile(dataGenerator.generateInserts(commitTime, n, useFlattenedSchema), path);
return path; return path;
} }

View File

@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.utilities.UtilitiesTestBase;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hadoop.fs.Path;
import org.junit.Before;
import java.io.IOException;
import java.util.List;
/**
* Basic tests for {@link CsvDFSSource}.
*/
public class TestCsvDFSSource extends AbstractDFSSourceTestBase {
@Before
public void setup() throws Exception {
super.setup();
this.dfsRoot = dfsBasePath + "/jsonFiles";
this.fileSuffix = ".json";
this.useFlattenedSchema = true;
this.schemaProvider = new FilebasedSchemaProvider(
Helpers.setupSchemaOnDFS("source-flattened.avsc"), jsc);
}
@Override
Source prepareDFSSource() {
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot);
props.setProperty("hoodie.deltastreamer.csv.header", Boolean.toString(true));
props.setProperty("hoodie.deltastreamer.csv.sep", "\t");
return new CsvDFSSource(props, jsc, sparkSession, schemaProvider);
}
@Override
void writeNewDataToFile(List<HoodieRecord> records, Path path) throws IOException {
UtilitiesTestBase.Helpers.saveCsvToDFS(
true, '\t', Helpers.jsonifyRecords(records), dfs, path.toString());
}
}

View File

@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"type" : "record",
"name" : "triprec",
"fields" : [
{
"name" : "timestamp",
"type" : "double"
}, {
"name" : "_row_key",
"type" : "string"
}, {
"name" : "rider",
"type" : "string"
}, {
"name" : "driver",
"type" : "string"
}, {
"name" : "begin_lat",
"type" : "double"
}, {
"name" : "begin_lon",
"type" : "double"
}, {
"name" : "end_lat",
"type" : "double"
}, {
"name" : "end_lon",
"type" : "double"
}, {
"name" : "fare",
"type" : "double"
}, {
"name" : "currency",
"type" : "string"
}, {
"name" : "_hoodie_is_deleted",
"type" : "boolean",
"default" : false
} ]
}

View File

@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"type" : "record",
"name" : "triprec",
"fields" : [
{
"name" : "timestamp",
"type" : "double"
}, {
"name" : "_row_key",
"type" : "string"
}, {
"name" : "rider",
"type" : "string"
}, {
"name" : "driver",
"type" : "string"
}, {
"name" : "begin_lat",
"type" : "double"
}, {
"name" : "begin_lon",
"type" : "double"
}, {
"name" : "end_lat",
"type" : "double"
}, {
"name" : "end_lon",
"type" : "double"
}, {
"name" : "fare",
"type" : "double"
}, {
"name" : "currency",
"type" : "string"
}, {
"name" : "_hoodie_is_deleted",
"type" : "boolean",
"default" : false
}, {
"name" : "haversine_distance",
"type" : "double"
}]
}