[HUDI-40] Add parquet support for the Delta Streamer (#949)
This commit is contained in:
committed by
vinoth chandar
parent
7381b66194
commit
ed745dfdbf
@@ -28,6 +28,7 @@ import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.utilities.sources.AvroSource;
|
||||
import org.apache.hudi.utilities.sources.InputBatch;
|
||||
import org.apache.hudi.utilities.sources.JsonSource;
|
||||
import org.apache.hudi.utilities.sources.ParquetSource;
|
||||
import org.apache.hudi.utilities.sources.RowSource;
|
||||
import org.apache.hudi.utilities.sources.Source;
|
||||
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
|
||||
@@ -59,6 +60,8 @@ public final class SourceFormatAdapter {
|
||||
switch (source.getSourceType()) {
|
||||
case AVRO:
|
||||
return ((AvroSource) source).fetchNext(lastCkptStr, sourceLimit);
|
||||
case PARQUET:
|
||||
return ((ParquetSource) source).fetchNext(lastCkptStr, sourceLimit);
|
||||
case JSON: {
|
||||
InputBatch<JavaRDD<String>> r = ((JsonSource) source).fetchNext(lastCkptStr, sourceLimit);
|
||||
AvroConvertor convertor = new AvroConvertor(r.getSchemaProvider().getSourceSchema());
|
||||
@@ -99,6 +102,18 @@ public final class SourceFormatAdapter {
|
||||
.orElse(null)),
|
||||
r.getCheckpointForNextBatch(), r.getSchemaProvider());
|
||||
}
|
||||
case PARQUET: {
|
||||
InputBatch<JavaRDD<GenericRecord>> r = ((ParquetSource) source).fetchNext(lastCkptStr, sourceLimit);
|
||||
Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
|
||||
return new InputBatch<>(
|
||||
Option
|
||||
.ofNullable(
|
||||
r.getBatch()
|
||||
.map(rdd -> AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(),
|
||||
source.getSparkSession()))
|
||||
.orElse(null)),
|
||||
r.getCheckpointForNextBatch(), r.getSchemaProvider());
|
||||
}
|
||||
case JSON: {
|
||||
InputBatch<JavaRDD<String>> r = ((JsonSource) source).fetchNext(lastCkptStr, sourceLimit);
|
||||
Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
|
||||
|
||||
@@ -0,0 +1,60 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.utilities.sources;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.TypedProperties;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.utilities.schema.SchemaProvider;
|
||||
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
|
||||
import org.apache.parquet.avro.AvroParquetInputFormat;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
/**
|
||||
* DFS Source that reads parquet data
|
||||
*/
|
||||
public class ParquetDFSSource extends ParquetSource {
|
||||
|
||||
private final DFSPathSelector pathSelector;
|
||||
|
||||
public ParquetDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
|
||||
SchemaProvider schemaProvider) {
|
||||
super(props, sparkContext, sparkSession, schemaProvider);
|
||||
this.pathSelector = new DFSPathSelector(props, this.sparkContext.hadoopConfiguration());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) {
|
||||
Pair<Option<String>, String> selectPathsWithMaxModificationTime =
|
||||
pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, sourceLimit);
|
||||
return selectPathsWithMaxModificationTime.getLeft()
|
||||
.map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)), selectPathsWithMaxModificationTime.getRight()))
|
||||
.orElseGet(() -> new InputBatch<>(Option.empty(), selectPathsWithMaxModificationTime.getRight()));
|
||||
}
|
||||
|
||||
private JavaRDD<GenericRecord> fromFiles(String pathStr) {
|
||||
JavaPairRDD<Void, GenericRecord> avroRDD = sparkContext.newAPIHadoopFile(pathStr, AvroParquetInputFormat.class,
|
||||
Void.class, GenericRecord.class, sparkContext.hadoopConfiguration());
|
||||
return avroRDD.values();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.utilities.sources;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hudi.common.util.TypedProperties;
|
||||
import org.apache.hudi.utilities.schema.SchemaProvider;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
public abstract class ParquetSource extends Source<JavaRDD<GenericRecord>> {
|
||||
|
||||
public ParquetSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
|
||||
SchemaProvider schemaProvider) {
|
||||
super(props, sparkContext, sparkSession, schemaProvider, SourceType.PARQUET);
|
||||
}
|
||||
}
|
||||
@@ -34,7 +34,7 @@ public abstract class Source<T> implements Serializable {
|
||||
protected static volatile Logger log = LogManager.getLogger(Source.class);
|
||||
|
||||
public enum SourceType {
|
||||
JSON, AVRO, ROW
|
||||
JSON, AVRO, ROW, PARQUET
|
||||
}
|
||||
|
||||
protected transient TypedProperties props;
|
||||
|
||||
@@ -23,23 +23,31 @@ import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.PrintStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hive.conf.HiveConf;
|
||||
import org.apache.hive.service.server.HiveServer2;
|
||||
import org.apache.hudi.common.HoodieTestDataGenerator;
|
||||
import org.apache.hudi.common.TestRawTripPayload;
|
||||
import org.apache.hudi.common.minicluster.HdfsTestService;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.model.HoodieTestUtils;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.TypedProperties;
|
||||
import org.apache.hudi.hive.HiveSyncConfig;
|
||||
import org.apache.hudi.hive.HoodieHiveClient;
|
||||
import org.apache.hudi.hive.util.HiveTestService;
|
||||
import org.apache.hudi.utilities.sources.TestDataSource;
|
||||
import org.apache.parquet.avro.AvroParquetWriter;
|
||||
import org.apache.parquet.hadoop.ParquetWriter;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
@@ -178,6 +186,15 @@ public class UtilitiesTestBase {
|
||||
os.close();
|
||||
}
|
||||
|
||||
public static void saveParquetToDFS(List<GenericRecord> records, Path targetFile) throws IOException {
|
||||
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(targetFile)
|
||||
.withSchema(HoodieTestDataGenerator.avroSchema).withConf(HoodieTestUtils.getDefaultHadoopConf()).build()) {
|
||||
for (GenericRecord record : records) {
|
||||
writer.write(record);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static TypedProperties setupSchemaOnDFS() throws IOException {
|
||||
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc");
|
||||
TypedProperties props = new TypedProperties();
|
||||
@@ -185,6 +202,24 @@ public class UtilitiesTestBase {
|
||||
return props;
|
||||
}
|
||||
|
||||
public static GenericRecord toGenericRecord(HoodieRecord hoodieRecord, HoodieTestDataGenerator dataGenerator) {
|
||||
try {
|
||||
Option<IndexedRecord> recordOpt = hoodieRecord.getData().getInsertValue(dataGenerator.avroSchema);
|
||||
return (GenericRecord) recordOpt.get();
|
||||
} catch (IOException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public static List<GenericRecord> toGenericRecords(List<HoodieRecord> hoodieRecords,
|
||||
HoodieTestDataGenerator dataGenerator) {
|
||||
List<GenericRecord> records = new ArrayList<GenericRecord>();
|
||||
for (HoodieRecord hoodieRecord : hoodieRecords) {
|
||||
records.add(toGenericRecord(hoodieRecord, dataGenerator));
|
||||
}
|
||||
return records;
|
||||
}
|
||||
|
||||
public static String toJsonString(HoodieRecord hr) {
|
||||
try {
|
||||
return ((TestRawTripPayload) hr.getData()).getJsonData();
|
||||
|
||||
@@ -19,10 +19,15 @@
|
||||
package org.apache.hudi.utilities.sources;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hudi.AvroConversionUtils;
|
||||
import org.apache.hudi.common.HoodieTestDataGenerator;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
@@ -41,7 +46,7 @@ import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Basic tests against all subclasses of {@link JsonDFSSource}
|
||||
* Basic tests against all subclasses of {@link JsonDFSSource} and {@link ParquetDFSSource}
|
||||
*/
|
||||
public class TestDFSSource extends UtilitiesTestBase {
|
||||
|
||||
@@ -82,11 +87,17 @@ public class TestDFSSource extends UtilitiesTestBase {
|
||||
assertEquals(Option.empty(), jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
|
||||
UtilitiesTestBase.Helpers.saveStringsToDFS(Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 100)), dfs,
|
||||
dfsBasePath + "/jsonFiles/1.json");
|
||||
assertEquals(Option.empty(), jsonSource.fetchNewDataInAvroFormat(Option.empty(), 10).getBatch());
|
||||
InputBatch<JavaRDD<GenericRecord>> fetch1 = jsonSource.fetchNewDataInAvroFormat(Option.empty(), 1000000);
|
||||
// Test respecting sourceLimit
|
||||
int sourceLimit = 10;
|
||||
RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(dfsBasePath + "/jsonFiles/1.json"), true);
|
||||
FileStatus file1Status = files.next();
|
||||
assertTrue(file1Status.getLen() > sourceLimit);
|
||||
assertEquals(Option.empty(), jsonSource.fetchNewDataInAvroFormat(Option.empty(), sourceLimit).getBatch());
|
||||
// Test json -> Avro
|
||||
InputBatch<JavaRDD<GenericRecord>> fetch1 = jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
|
||||
assertEquals(100, fetch1.getBatch().get().count());
|
||||
// Test json -> Row format
|
||||
InputBatch<Dataset<Row>> fetch1AsRows = jsonSource.fetchNewDataInRowFormat(Option.empty(), 1000000);
|
||||
InputBatch<Dataset<Row>> fetch1AsRows = jsonSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
|
||||
assertEquals(100, fetch1AsRows.getBatch().get().count());
|
||||
// Test Avro -> Row format
|
||||
Dataset<Row> fetch1Rows = AvroConversionUtils.createDataFrame(JavaRDD.toRDD(fetch1.getBatch().get()),
|
||||
@@ -113,5 +124,69 @@ public class TestDFSSource extends UtilitiesTestBase {
|
||||
InputBatch<JavaRDD<GenericRecord>> fetch4 =
|
||||
jsonSource.fetchNewDataInAvroFormat(Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
|
||||
assertEquals(Option.empty(), fetch4.getBatch());
|
||||
|
||||
// 5. Extract from the beginning
|
||||
InputBatch<JavaRDD<GenericRecord>> fetch5 = jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
|
||||
assertEquals(10100, fetch5.getBatch().get().count());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParquetDFSSource() throws IOException {
|
||||
dfs.mkdirs(new Path(dfsBasePath + "/parquetFiles"));
|
||||
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
||||
|
||||
TypedProperties props = new TypedProperties();
|
||||
props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsBasePath + "/parquetFiles");
|
||||
ParquetSource parquetDFSSource = new ParquetDFSSource(props, jsc, sparkSession, schemaProvider);
|
||||
SourceFormatAdapter parquetSource = new SourceFormatAdapter(parquetDFSSource);
|
||||
|
||||
// 1. Extract without any checkpoint => get all the data, respecting sourceLimit
|
||||
assertEquals(Option.empty(), parquetSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
|
||||
List<GenericRecord> batch1 = Helpers.toGenericRecords(dataGenerator.generateInserts("000", 100), dataGenerator);
|
||||
Path file1 = new Path(dfsBasePath + "/parquetFiles", "1.parquet");
|
||||
Helpers.saveParquetToDFS(batch1, file1);
|
||||
// Test respecting sourceLimit
|
||||
int sourceLimit = 10;
|
||||
RemoteIterator<LocatedFileStatus> files = dfs.listFiles(file1, true);
|
||||
FileStatus file1Status = files.next();
|
||||
assertTrue(file1Status.getLen() > sourceLimit);
|
||||
assertEquals(Option.empty(), parquetSource.fetchNewDataInAvroFormat(Option.empty(), sourceLimit).getBatch());
|
||||
// Test parquet -> Avro
|
||||
InputBatch<JavaRDD<GenericRecord>> fetch1 = parquetSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
|
||||
assertEquals(100, fetch1.getBatch().get().count());
|
||||
// Test parquet -> Row
|
||||
InputBatch<Dataset<Row>> fetch1AsRows = parquetSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
|
||||
assertEquals(100, fetch1AsRows.getBatch().get().count());
|
||||
|
||||
// 2. Produce new data, extract new data
|
||||
List<GenericRecord> batch2 = Helpers.toGenericRecords(dataGenerator.generateInserts("001", 10000), dataGenerator);
|
||||
Path file2 = new Path(dfsBasePath + "/parquetFiles", "2.parquet");
|
||||
Helpers.saveParquetToDFS(batch2, file2);
|
||||
// Test parquet -> Avro
|
||||
InputBatch<JavaRDD<GenericRecord>> fetch2 =
|
||||
parquetSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
|
||||
assertEquals(10000, fetch2.getBatch().get().count());
|
||||
// Test parquet -> Row
|
||||
InputBatch<Dataset<Row>> fetch2AsRows =
|
||||
parquetSource.fetchNewDataInRowFormat(Option.of(fetch1AsRows.getCheckpointForNextBatch()), Long.MAX_VALUE);
|
||||
assertEquals(10000, fetch2AsRows.getBatch().get().count());
|
||||
|
||||
// 3. Extract with previous checkpoint => gives same data back (idempotent)
|
||||
InputBatch<Dataset<Row>> fetch3AsRows =
|
||||
parquetSource.fetchNewDataInRowFormat(Option.of(fetch1AsRows.getCheckpointForNextBatch()), Long.MAX_VALUE);
|
||||
assertEquals(10000, fetch3AsRows.getBatch().get().count());
|
||||
assertEquals(fetch2AsRows.getCheckpointForNextBatch(), fetch3AsRows.getCheckpointForNextBatch());
|
||||
fetch3AsRows.getBatch().get().registerTempTable("test_dfs_table");
|
||||
Dataset<Row> rowDataset = new SQLContext(jsc.sc()).sql("select * from test_dfs_table");
|
||||
assertEquals(10000, rowDataset.count());
|
||||
|
||||
// 4. Extract with latest checkpoint => no new data returned
|
||||
InputBatch<JavaRDD<GenericRecord>> fetch4 =
|
||||
parquetSource.fetchNewDataInAvroFormat(Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
|
||||
assertEquals(Option.empty(), fetch4.getBatch());
|
||||
|
||||
// 5. Extract from the beginning
|
||||
InputBatch<JavaRDD<GenericRecord>> fetch5 = parquetSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
|
||||
assertEquals(10100, fetch5.getBatch().get().count());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user