1
0

[HUDI-2277] HoodieDeltaStreamer reading ORC files directly using ORCDFSSource (#3413)

* add ORCDFSSource to support reading orc file into hudi format && add UTs

* remove ununsed import

* simplify tes

* code review

* code review

* code review

* code review

* code review

* code review

Co-authored-by: yuezhang <yuezhang@freewheel.tv>
This commit is contained in:
zhangyue19921010
2021-09-29 23:54:12 +08:00
committed by GitHub
parent 2aa660f99d
commit dd1bd62684
5 changed files with 176 additions and 0 deletions

View File

@@ -31,6 +31,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.AvroOrcUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
@@ -47,6 +48,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.orc.TypeDescription;
import java.io.IOException;
import java.io.Serializable;
@@ -129,10 +131,12 @@ public class HoodieTestDataGenerator {
public static final Schema AVRO_SCHEMA = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
public static final TypeDescription ORC_SCHEMA = AvroOrcUtils.createOrcSchema(new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA));
public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS =
HoodieAvroUtils.addMetadataFields(AVRO_SCHEMA);
public static final Schema AVRO_SHORT_TRIP_SCHEMA = new Schema.Parser().parse(SHORT_TRIP_SCHEMA);
public static final Schema AVRO_TRIP_SCHEMA = new Schema.Parser().parse(TRIP_SCHEMA);
public static final TypeDescription ORC_TRIP_SCHEMA = AvroOrcUtils.createOrcSchema(new Schema.Parser().parse(TRIP_SCHEMA));
public static final Schema FLATTENED_AVRO_SCHEMA = new Schema.Parser().parse(TRIP_FLATTENED_SCHEMA);
private static final Random RAND = new Random(46474747);

View File

@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* DFS Source that reads ORC data.
*/
public class ORCDFSSource extends RowSource {
private final DFSPathSelector pathSelector;
public ORCDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
this.pathSelector = DFSPathSelector.createSourceSelector(props, this.sparkContext.hadoopConfiguration());
}
@Override
public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
Pair<Option<String>, String> selectPathsWithMaxModificationTime =
pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext, lastCkptStr, sourceLimit);
return selectPathsWithMaxModificationTime.getLeft()
.map(pathStr -> Pair.of(Option.of(fromFiles(pathStr)), selectPathsWithMaxModificationTime.getRight()))
.orElseGet(() -> Pair.of(Option.empty(), selectPathsWithMaxModificationTime.getRight()));
}
private Dataset<Row> fromFiles(String pathStr) {
return sparkSession.read().orc(pathStr.split(","));
}
}

View File

@@ -57,6 +57,7 @@ import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JdbcSource;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
import org.apache.hudi.utilities.sources.ORCDFSSource;
import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.testutils.JdbcTestUtils;
@@ -122,6 +123,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.params.provider.Arguments.arguments;
/**
* Basic tests against {@link HoodieDeltaStreamer}, by issuing bulk_inserts, upserts, inserts. Check counts at the end.
@@ -1482,6 +1484,34 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
testNum++;
}
private void testORCDFSSource(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
// prepare ORCDFSSource
TypedProperties orcProps = new TypedProperties();
// Properties used for testing delta-streamer with orc source
orcProps.setProperty("include", "base.properties");
orcProps.setProperty("hoodie.embed.timeline.server","false");
orcProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
orcProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
if (useSchemaProvider) {
orcProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/" + "source.avsc");
if (transformerClassNames != null) {
orcProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/" + "target.avsc");
}
}
orcProps.setProperty("hoodie.deltastreamer.source.dfs.root", ORC_SOURCE_ROOT);
UtilitiesTestBase.Helpers.savePropsToDFS(orcProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_ORC);
String tableBasePath = dfsBasePath + "/test_orc_source_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ORCDFSSource.class.getName(),
transformerClassNames, PROPS_FILENAME_TEST_ORC, false,
useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(ORC_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
testNum++;
}
private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetValue, String topicName) throws IOException {
// Properties used for testing delta-streamer with JsonKafka source
TypedProperties props = new TypedProperties();
@@ -1622,6 +1652,12 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
testParquetDFSSource(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
}
@ParameterizedTest
@MethodSource("testORCDFSSource")
public void testORCDFSSourceWithoutSchemaProviderAndNoTransformer(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
testORCDFSSource(useSchemaProvider, transformerClassNames);
}
private void prepareCsvDFSSource(
boolean hasHeader, char sep, boolean useSchemaProvider, boolean hasTransformer) throws IOException {
String sourceRoot = dfsBasePath + "/csvFiles";
@@ -1936,4 +1972,12 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
}
}
private static Stream<Arguments> testORCDFSSource() {
// arg1 boolean useSchemaProvider, arg2 List<String> transformerClassNames
return Stream.of(
arguments(false, null),
arguments(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()))
);
}
}

View File

@@ -50,12 +50,16 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase {
static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties";
static final String PROPS_FILENAME_TEST_CSV = "test-csv-dfs-source.properties";
static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
static final String PROPS_FILENAME_TEST_ORC = "test-orc-dfs-source.properties";
static final String PROPS_FILENAME_TEST_JSON_KAFKA = "test-json-kafka-dfs-source.properties";
static final String PROPS_FILENAME_TEST_MULTI_WRITER = "test-multi-writer.properties";
static final String FIRST_PARQUET_FILE_NAME = "1.parquet";
static final String FIRST_ORC_FILE_NAME = "1.orc";
static String PARQUET_SOURCE_ROOT;
static String ORC_SOURCE_ROOT;
static String JSON_KAFKA_SOURCE_ROOT;
static final int PARQUET_NUM_RECORDS = 5;
static final int ORC_NUM_RECORDS = 5;
static final int CSV_NUM_RECORDS = 3;
static final int JSON_KAFKA_NUM_RECORDS = 5;
String kafkaCheckpointType = "string";
@@ -84,6 +88,7 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase {
public static void initClass() throws Exception {
UtilitiesTestBase.initClass(true);
PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
ORC_SOURCE_ROOT = dfsBasePath + "/orcFiles";
JSON_KAFKA_SOURCE_ROOT = dfsBasePath + "/jsonKafkaFiles";
testUtils = new KafkaTestUtils();
testUtils.setup();
@@ -147,6 +152,7 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase {
UtilitiesTestBase.Helpers.savePropsToDFS(invalidHiveSyncProps, dfs, dfsBasePath + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1);
prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
prepareORCDFSFiles(ORC_NUM_RECORDS, ORC_SOURCE_ROOT);
}
protected static void writeCommonPropsToFile() throws IOException {
@@ -247,4 +253,27 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase {
dataGenerator.generateInserts("000", numRecords)), new Path(path));
}
}
protected static void prepareORCDFSFiles(int numRecords) throws IOException {
prepareORCDFSFiles(numRecords, ORC_SOURCE_ROOT);
}
protected static void prepareORCDFSFiles(int numRecords, String baseORCPath) throws IOException {
prepareORCDFSFiles(numRecords, baseORCPath, FIRST_ORC_FILE_NAME, false, null, null);
}
protected static void prepareORCDFSFiles(int numRecords, String baseORCPath, String fileName, boolean useCustomSchema,
String schemaStr, Schema schema) throws IOException {
String path = baseORCPath + "/" + fileName;
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
if (useCustomSchema) {
Helpers.saveORCToDFS(Helpers.toGenericRecords(
dataGenerator.generateInsertsAsPerSchema("000", numRecords, schemaStr),
schema), new Path(path), HoodieTestDataGenerator.ORC_TRIP_SCHEMA);
} else {
Helpers.saveORCToDFS(Helpers.toGenericRecords(
dataGenerator.generateInserts("000", numRecords)), new Path(path));
}
}
}

View File

@@ -28,6 +28,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hudi.common.testutils.minicluster.ZookeeperTestService;
import org.apache.hudi.common.util.AvroOrcUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
@@ -57,6 +58,11 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.service.server.HiveServer2;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.apache.orc.storage.ql.exec.vector.ColumnVector;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetFileWriter.Mode;
import org.apache.parquet.hadoop.ParquetWriter;
@@ -314,6 +320,27 @@ public class UtilitiesTestBase {
}
}
public static void saveORCToDFS(List<GenericRecord> records, Path targetFile) throws IOException {
saveORCToDFS(records, targetFile, HoodieTestDataGenerator.ORC_SCHEMA);
}
public static void saveORCToDFS(List<GenericRecord> records, Path targetFile, TypeDescription schema) throws IOException {
OrcFile.WriterOptions options = OrcFile.writerOptions(HoodieTestUtils.getDefaultHadoopConf()).setSchema(schema);
try (Writer writer = OrcFile.createWriter(targetFile, options)) {
VectorizedRowBatch batch = schema.createRowBatch();
for (GenericRecord record : records) {
addAvroRecord(batch, record, schema);
batch.size++;
if (batch.size % records.size() == 0 || batch.size == batch.getMaxSize()) {
writer.addRowBatch(batch);
batch.reset();
batch.size = 0;
}
}
writer.addRowBatch(batch);
}
}
public static TypedProperties setupSchemaOnDFS() throws IOException {
return setupSchemaOnDFS("delta-streamer-config", "source.avsc");
}
@@ -364,5 +391,21 @@ public class UtilitiesTestBase {
public static String[] jsonifyRecords(List<HoodieRecord> records) {
return records.stream().map(Helpers::toJsonString).toArray(String[]::new);
}
private static void addAvroRecord(
VectorizedRowBatch batch,
GenericRecord record,
TypeDescription orcSchema
) {
for (int c = 0; c < batch.numCols; c++) {
ColumnVector colVector = batch.cols[c];
final String thisField = orcSchema.getFieldNames().get(c);
final TypeDescription type = orcSchema.getChildren().get(c);
Object fieldValue = record.get(thisField);
Schema.Field avroField = record.getSchema().getField(thisField);
AvroOrcUtils.addToVector(type, colVector, avroField.schema(), fieldValue, batch.size);
}
}
}
}