diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java index 68d1f2dd3..86ea1f036 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.util.AvroOrcUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; @@ -47,6 +48,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.orc.TypeDescription; import java.io.IOException; import java.io.Serializable; @@ -129,10 +131,12 @@ public class HoodieTestDataGenerator { public static final Schema AVRO_SCHEMA = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA); + public static final TypeDescription ORC_SCHEMA = AvroOrcUtils.createOrcSchema(new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA)); public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS = HoodieAvroUtils.addMetadataFields(AVRO_SCHEMA); public static final Schema AVRO_SHORT_TRIP_SCHEMA = new Schema.Parser().parse(SHORT_TRIP_SCHEMA); public static final Schema AVRO_TRIP_SCHEMA = new Schema.Parser().parse(TRIP_SCHEMA); + public static final TypeDescription ORC_TRIP_SCHEMA = AvroOrcUtils.createOrcSchema(new Schema.Parser().parse(TRIP_SCHEMA)); public static final Schema FLATTENED_AVRO_SCHEMA = new Schema.Parser().parse(TRIP_FLATTENED_SCHEMA); private static final Random RAND = new Random(46474747); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ORCDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ORCDFSSource.java new file mode 100644 index 000000000..942bae89e --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ORCDFSSource.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.helpers.DFSPathSelector; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +/** + * DFS Source that reads ORC data. + */ +public class ORCDFSSource extends RowSource { + + private final DFSPathSelector pathSelector; + + public ORCDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, + SchemaProvider schemaProvider) { + super(props, sparkContext, sparkSession, schemaProvider); + this.pathSelector = DFSPathSelector.createSourceSelector(props, this.sparkContext.hadoopConfiguration()); + } + + @Override + public Pair>, String> fetchNextBatch(Option lastCkptStr, long sourceLimit) { + Pair, String> selectPathsWithMaxModificationTime = + pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext, lastCkptStr, sourceLimit); + return selectPathsWithMaxModificationTime.getLeft() + .map(pathStr -> Pair.of(Option.of(fromFiles(pathStr)), selectPathsWithMaxModificationTime.getRight())) + .orElseGet(() -> Pair.of(Option.empty(), selectPathsWithMaxModificationTime.getRight())); + } + + private Dataset fromFiles(String pathStr) { + return sparkSession.read().orc(pathStr.split(",")); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index fe1a1516d..6313ab7b0 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -57,6 +57,7 @@ import org.apache.hudi.utilities.sources.HoodieIncrSource; import org.apache.hudi.utilities.sources.InputBatch; import org.apache.hudi.utilities.sources.JdbcSource; import org.apache.hudi.utilities.sources.JsonKafkaSource; +import org.apache.hudi.utilities.sources.ORCDFSSource; import org.apache.hudi.utilities.sources.ParquetDFSSource; import org.apache.hudi.utilities.sources.TestDataSource; import org.apache.hudi.utilities.testutils.JdbcTestUtils; @@ -122,6 +123,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.junit.jupiter.params.provider.Arguments.arguments; /** * Basic tests against {@link HoodieDeltaStreamer}, by issuing bulk_inserts, upserts, inserts. Check counts at the end. @@ -1482,6 +1484,34 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { testNum++; } + private void testORCDFSSource(boolean useSchemaProvider, List transformerClassNames) throws Exception { + // prepare ORCDFSSource + TypedProperties orcProps = new TypedProperties(); + + // Properties used for testing delta-streamer with orc source + orcProps.setProperty("include", "base.properties"); + orcProps.setProperty("hoodie.embed.timeline.server","false"); + orcProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); + orcProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); + if (useSchemaProvider) { + orcProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/" + "source.avsc"); + if (transformerClassNames != null) { + orcProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/" + "target.avsc"); + } + } + orcProps.setProperty("hoodie.deltastreamer.source.dfs.root", ORC_SOURCE_ROOT); + UtilitiesTestBase.Helpers.savePropsToDFS(orcProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_ORC); + + String tableBasePath = dfsBasePath + "/test_orc_source_table" + testNum; + HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( + TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ORCDFSSource.class.getName(), + transformerClassNames, PROPS_FILENAME_TEST_ORC, false, + useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc); + deltaStreamer.sync(); + TestHelpers.assertRecordCount(ORC_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext); + testNum++; + } + private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetValue, String topicName) throws IOException { // Properties used for testing delta-streamer with JsonKafka source TypedProperties props = new TypedProperties(); @@ -1622,6 +1652,12 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { testParquetDFSSource(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName())); } + @ParameterizedTest + @MethodSource("testORCDFSSource") + public void testORCDFSSourceWithoutSchemaProviderAndNoTransformer(boolean useSchemaProvider, List transformerClassNames) throws Exception { + testORCDFSSource(useSchemaProvider, transformerClassNames); + } + private void prepareCsvDFSSource( boolean hasHeader, char sep, boolean useSchemaProvider, boolean hasTransformer) throws IOException { String sourceRoot = dfsBasePath + "/csvFiles"; @@ -1936,4 +1972,12 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { } } + private static Stream testORCDFSSource() { + // arg1 boolean useSchemaProvider, arg2 List transformerClassNames + return Stream.of( + arguments(false, null), + arguments(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName())) + ); + } + } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java index 5a1cfc332..51b51d865 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java @@ -50,12 +50,16 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase { static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties"; static final String PROPS_FILENAME_TEST_CSV = "test-csv-dfs-source.properties"; static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties"; + static final String PROPS_FILENAME_TEST_ORC = "test-orc-dfs-source.properties"; static final String PROPS_FILENAME_TEST_JSON_KAFKA = "test-json-kafka-dfs-source.properties"; static final String PROPS_FILENAME_TEST_MULTI_WRITER = "test-multi-writer.properties"; static final String FIRST_PARQUET_FILE_NAME = "1.parquet"; + static final String FIRST_ORC_FILE_NAME = "1.orc"; static String PARQUET_SOURCE_ROOT; + static String ORC_SOURCE_ROOT; static String JSON_KAFKA_SOURCE_ROOT; static final int PARQUET_NUM_RECORDS = 5; + static final int ORC_NUM_RECORDS = 5; static final int CSV_NUM_RECORDS = 3; static final int JSON_KAFKA_NUM_RECORDS = 5; String kafkaCheckpointType = "string"; @@ -84,6 +88,7 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase { public static void initClass() throws Exception { UtilitiesTestBase.initClass(true); PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles"; + ORC_SOURCE_ROOT = dfsBasePath + "/orcFiles"; JSON_KAFKA_SOURCE_ROOT = dfsBasePath + "/jsonKafkaFiles"; testUtils = new KafkaTestUtils(); testUtils.setup(); @@ -147,6 +152,7 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase { UtilitiesTestBase.Helpers.savePropsToDFS(invalidHiveSyncProps, dfs, dfsBasePath + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1); prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT); + prepareORCDFSFiles(ORC_NUM_RECORDS, ORC_SOURCE_ROOT); } protected static void writeCommonPropsToFile() throws IOException { @@ -247,4 +253,27 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase { dataGenerator.generateInserts("000", numRecords)), new Path(path)); } } + + protected static void prepareORCDFSFiles(int numRecords) throws IOException { + prepareORCDFSFiles(numRecords, ORC_SOURCE_ROOT); + } + + protected static void prepareORCDFSFiles(int numRecords, String baseORCPath) throws IOException { + prepareORCDFSFiles(numRecords, baseORCPath, FIRST_ORC_FILE_NAME, false, null, null); + } + + protected static void prepareORCDFSFiles(int numRecords, String baseORCPath, String fileName, boolean useCustomSchema, + String schemaStr, Schema schema) throws IOException { + String path = baseORCPath + "/" + fileName; + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + if (useCustomSchema) { + Helpers.saveORCToDFS(Helpers.toGenericRecords( + dataGenerator.generateInsertsAsPerSchema("000", numRecords, schemaStr), + schema), new Path(path), HoodieTestDataGenerator.ORC_TRIP_SCHEMA); + } else { + Helpers.saveORCToDFS(Helpers.toGenericRecords( + dataGenerator.generateInserts("000", numRecords)), new Path(path)); + } + } + } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java index 8bff47522..bb00d2fef 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.RawTripTestPayload; import org.apache.hudi.common.testutils.minicluster.HdfsTestService; import org.apache.hudi.common.testutils.minicluster.ZookeeperTestService; +import org.apache.hudi.common.util.AvroOrcUtils; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; @@ -57,6 +58,11 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.service.server.HiveServer2; import org.apache.log4j.Level; import org.apache.log4j.Logger; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; +import org.apache.orc.storage.ql.exec.vector.ColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.hadoop.ParquetFileWriter.Mode; import org.apache.parquet.hadoop.ParquetWriter; @@ -314,6 +320,27 @@ public class UtilitiesTestBase { } } + public static void saveORCToDFS(List records, Path targetFile) throws IOException { + saveORCToDFS(records, targetFile, HoodieTestDataGenerator.ORC_SCHEMA); + } + + public static void saveORCToDFS(List records, Path targetFile, TypeDescription schema) throws IOException { + OrcFile.WriterOptions options = OrcFile.writerOptions(HoodieTestUtils.getDefaultHadoopConf()).setSchema(schema); + try (Writer writer = OrcFile.createWriter(targetFile, options)) { + VectorizedRowBatch batch = schema.createRowBatch(); + for (GenericRecord record : records) { + addAvroRecord(batch, record, schema); + batch.size++; + if (batch.size % records.size() == 0 || batch.size == batch.getMaxSize()) { + writer.addRowBatch(batch); + batch.reset(); + batch.size = 0; + } + } + writer.addRowBatch(batch); + } + } + public static TypedProperties setupSchemaOnDFS() throws IOException { return setupSchemaOnDFS("delta-streamer-config", "source.avsc"); } @@ -364,5 +391,21 @@ public class UtilitiesTestBase { public static String[] jsonifyRecords(List records) { return records.stream().map(Helpers::toJsonString).toArray(String[]::new); } + + private static void addAvroRecord( + VectorizedRowBatch batch, + GenericRecord record, + TypeDescription orcSchema + ) { + for (int c = 0; c < batch.numCols; c++) { + ColumnVector colVector = batch.cols[c]; + final String thisField = orcSchema.getFieldNames().get(c); + final TypeDescription type = orcSchema.getChildren().get(c); + + Object fieldValue = record.get(thisField); + Schema.Field avroField = record.getSchema().getField(thisField); + AvroOrcUtils.addToVector(type, colVector, avroField.schema(), fieldValue, batch.size); + } + } } }