From 4f74a84607d46249e9bb6e1397246f8dc076b390 Mon Sep 17 00:00:00 2001 From: Gary Li Date: Fri, 7 Aug 2020 00:28:14 -0700 Subject: [PATCH] [HUDI-69] Support Spark Datasource for MOR table - RDD approach (#1848) - This PR implements Spark Datasource for MOR table in the RDD approach. - Implemented SnapshotRelation - Implemented HudiMergeOnReadRDD - Implemented separate Iterator to handle merge and unmerge record reader. - Added TestMORDataSource to verify this feature. - Clean up test file name, add tests for mixed query type tests - We can now revert the change made in DefaultSource Co-authored-by: Vinoth Chandar --- .../hudi/testutils/HoodieClientTestBase.java | 2 +- .../hudi/common/table/log/LogReaderUtils.java | 8 +- .../testutils/HoodieTestDataGenerator.java | 15 + .../AbstractRealtimeRecordReader.java | 9 - .../RealtimeCompactedRecordReader.java | 2 +- .../RealtimeUnmergedRecordReader.java | 11 +- .../utils/HoodieRealtimeInputFormatUtils.java | 18 +- .../HoodieRealtimeRecordReaderUtils.java | 13 + .../org/apache/hudi/AvroConversionUtils.scala | 14 +- .../org/apache/hudi/DataSourceOptions.scala | 8 + .../scala/org/apache/hudi/DefaultSource.scala | 56 ++- ...lation.scala => HoodieEmptyRelation.scala} | 4 +- .../apache/hudi/HoodieMergeOnReadRDD.scala | 274 ++++++++++++ ...parkUtils.scala => HoodieSparkUtils.scala} | 4 +- .../org/apache/hudi/IncrementalRelation.scala | 7 +- .../hudi/MergeOnReadSnapshotRelation.scala | 151 +++++++ .../org/apache/hudi/client/TestBootstrap.java | 10 +- ...Utils.scala => TestHoodieSparkUtils.scala} | 15 +- .../hudi/functional/TestCOWDataSource.scala | 197 +++++++++ .../hudi/functional/TestDataSource.scala | 337 --------------- .../hudi/functional/TestMORDataSource.scala | 391 ++++++++++++++++++ .../functional/TestStructuredStreaming.scala | 180 ++++++++ 22 files changed, 1317 insertions(+), 409 deletions(-) rename hudi-spark/src/main/scala/org/apache/hudi/{HudiEmptyRelation.scala => HoodieEmptyRelation.scala} (90%) create mode 100644 hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala rename hudi-spark/src/main/scala/org/apache/hudi/{HudiSparkUtils.scala => HoodieSparkUtils.scala} (96%) create mode 100644 hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala rename hudi-spark/src/test/scala/org/apache/hudi/{TestHudiSparkUtils.scala => TestHoodieSparkUtils.scala} (89%) create mode 100644 hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala delete mode 100644 hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala create mode 100644 hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala create mode 100644 hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java index 667181c0b..203cc54f5 100644 --- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java @@ -71,7 +71,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; */ public class HoodieClientTestBase extends HoodieClientTestHarness { - private static final Logger LOG = LogManager.getLogger(HoodieClientTestBase.class); + protected static final Logger LOG = LogManager.getLogger(HoodieClientTestBase.class); @BeforeEach public void setUp() throws Exception { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java index 0662e68e1..ffc4b8582 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java @@ -29,9 +29,9 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobConf; import java.io.IOException; import java.util.List; @@ -62,15 +62,15 @@ public class LogReaderUtils { return writerSchema; } - public static Schema readLatestSchemaFromLogFiles(String basePath, List deltaFilePaths, JobConf jobConf) + public static Schema readLatestSchemaFromLogFiles(String basePath, List deltaFilePaths, Configuration config) throws IOException { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jobConf, basePath); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(config, basePath); List deltaPaths = deltaFilePaths.stream().map(s -> new HoodieLogFile(new Path(s))) .sorted(HoodieLogFile.getReverseLogFileComparator()).map(s -> s.getPath().toString()) .collect(Collectors.toList()); if (deltaPaths.size() > 0) { for (String logPath : deltaPaths) { - FileSystem fs = FSUtils.getFs(logPath, jobConf); + FileSystem fs = FSUtils.getFs(logPath, config); Schema schemaFromLogFile = readSchemaFromLogFileInReverse(fs, metaClient.getActiveTimeline(), new Path(logPath)); if (schemaFromLogFile != null) { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java index 6b6074b13..90b15d023 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java @@ -634,6 +634,10 @@ public class HoodieTestDataGenerator { return generateUniqueUpdatesStream(instantTime, n, TRIP_EXAMPLE_SCHEMA).collect(Collectors.toList()); } + public List generateUniqueUpdatesAsPerSchema(String instantTime, Integer n, String schemaStr) { + return generateUniqueUpdatesStream(instantTime, n, schemaStr).collect(Collectors.toList()); + } + /** * Generates deduped delete of keys previously inserted, randomly distributed across the keys above. * @@ -745,6 +749,17 @@ public class HoodieTestDataGenerator { return result.stream(); } + /** + * Generates deduped delete records previously inserted, randomly distributed across the keys above. + * + * @param instantTime Commit Timestamp + * @param n Number of unique records + * @return List of hoodie records for delete + */ + public List generateUniqueDeleteRecords(String instantTime, Integer n) { + return generateUniqueDeleteRecordStream(instantTime, n).collect(Collectors.toList()); + } + public boolean deleteExistingKeyIfPresent(HoodieKey key) { Map existingKeys = existingKeysBySchema.get(TRIP_EXAMPLE_SCHEMA); Integer numExistingKeys = numKeysBySchema.get(TRIP_EXAMPLE_SCHEMA); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java index 65c416c8b..050b91add 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -23,7 +23,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.LogReaderUtils; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.InputSplitUtils; -import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; import org.apache.avro.Schema; @@ -148,12 +147,4 @@ public abstract class AbstractRealtimeRecordReader { public Schema getHiveSchema() { return hiveSchema; } - - public long getMaxCompactionMemoryInBytes() { - // jobConf.getMemoryForMapTask() returns in MB - return (long) Math - .ceil(Double.parseDouble(jobConf.get(HoodieRealtimeConfig.COMPACTION_MEMORY_FRACTION_PROP, - HoodieRealtimeConfig.DEFAULT_COMPACTION_MEMORY_FRACTION)) - * jobConf.getMemoryForMapTask() * 1024 * 1024L); - } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index 78925c358..042199fd5 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -69,7 +69,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader split.getDeltaLogPaths(), usesCustomPayload ? getWriterSchema() : getReaderSchema(), split.getMaxCommitTime(), - getMaxCompactionMemoryInBytes(), + HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf), Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)), false, jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java index c06bff26e..76de84bd9 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java @@ -63,7 +63,7 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader * clients to consume. * * @param split File split - * @param job Job Configuration + * @param jobConf Job Configuration * @param realReader Parquet Reader */ public RealtimeUnmergedRecordReader(RealtimeSplit split, JobConf job, @@ -72,14 +72,15 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader this.parquetReader = new SafeParquetRecordReaderWrapper(realReader); // Iterator for consuming records from parquet file this.parquetRecordsIterator = new RecordReaderValueIterator<>(this.parquetReader); - this.executor = new BoundedInMemoryExecutor<>(getMaxCompactionMemoryInBytes(), getParallelProducers(), + this.executor = new BoundedInMemoryExecutor<>( + HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf), getParallelProducers(), Option.empty(), x -> x, new DefaultSizeEstimator<>()); // Consumer of this record reader this.iterator = this.executor.getQueue().iterator(); - this.logRecordScanner = new HoodieUnMergedLogRecordScanner(FSUtils.getFs(split.getPath().toString(), jobConf), + this.logRecordScanner = new HoodieUnMergedLogRecordScanner(FSUtils.getFs(split.getPath().toString(), this.jobConf), split.getBasePath(), split.getDeltaLogPaths(), getReaderSchema(), split.getMaxCommitTime(), - Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)), - false, jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), record -> { + Boolean.parseBoolean(this.jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)), + false, this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), record -> { // convert Hoodie log record to Hadoop AvroWritable and buffer GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema()).get(); ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema()); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java index cc46d96c2..346d7a011 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java @@ -20,6 +20,7 @@ package org.apache.hudi.hadoop.utils; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -34,7 +35,6 @@ import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit; import org.apache.hudi.hadoop.realtime.RealtimeBootstrapBaseFileSplit; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; @@ -119,15 +119,15 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { } // Return parquet file with a list of log files in the same file group. - public static Map> groupLogsByBaseFile(Configuration conf, Stream fileStatuses) { - Map> partitionsToParquetSplits = - fileStatuses.collect(Collectors.groupingBy(file -> file.getPath().getParent())); + public static Map> groupLogsByBaseFile(Configuration conf, List fileStatuses) { + Map> partitionsToParquetSplits = + fileStatuses.stream().collect(Collectors.groupingBy(file -> file.getFileStatus().getPath().getParent())); // TODO(vc): Should we handle also non-hoodie splits here? Map partitionsToMetaClient = getTableMetaClientByBasePath(conf, partitionsToParquetSplits.keySet()); // for all unique split parents, obtain all delta files based on delta commit timeline, // grouped on file id - Map> resultMap = new HashMap<>(); + Map> resultMap = new HashMap<>(); partitionsToParquetSplits.keySet().forEach(partitionPath -> { // for each partition path obtain the data & log file groupings, then map back to inputsplits HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath); @@ -144,15 +144,15 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { .orElse(Stream.empty()); // subgroup splits again by file id & match with log files. - Map> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream() - .collect(Collectors.groupingBy(file -> FSUtils.getFileId(file.getPath().getName()))); + Map> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream() + .collect(Collectors.groupingBy(file -> FSUtils.getFileId(file.getFileStatus().getPath().getName()))); latestFileSlices.forEach(fileSlice -> { - List dataFileSplits = groupedInputSplits.get(fileSlice.getFileId()); + List dataFileSplits = groupedInputSplits.get(fileSlice.getFileId()); dataFileSplits.forEach(split -> { try { List logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()); - resultMap.put(split.getPath().toString(), logFilePaths); + resultMap.put(split, logFilePaths); } catch (Exception e) { throw new HoodieException("Error creating hoodie real time split ", e); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java index e925c1899..871f7224d 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java @@ -22,6 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.avro.LogicalTypes; @@ -43,6 +44,7 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; import java.io.IOException; import java.nio.ByteBuffer; @@ -69,6 +71,17 @@ public class HoodieRealtimeRecordReaderUtils { } } + /** + * get the max compaction memory in bytes from JobConf. + */ + public static long getMaxCompactionMemoryInBytes(JobConf jobConf) { + // jobConf.getMemoryForMapTask() returns in MB + return (long) Math + .ceil(Double.parseDouble(jobConf.get(HoodieRealtimeConfig.COMPACTION_MEMORY_FRACTION_PROP, + HoodieRealtimeConfig.DEFAULT_COMPACTION_MEMORY_FRACTION)) + * jobConf.getMemoryForMapTask() * 1024 * 1024L); + } + /** * Prints a JSON representation of the ArrayWritable for easier debuggability. */ diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index bdb89559a..e6d6c55fe 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -17,7 +17,7 @@ package org.apache.hudi -import org.apache.avro.generic.GenericRecord +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} import org.apache.hudi.common.model.HoodieKey import org.apache.avro.Schema import org.apache.spark.rdd.RDD @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} +import scala.collection.JavaConverters._ object AvroConversionUtils { @@ -78,4 +79,15 @@ object AvroConversionUtils { def convertAvroSchemaToStructType(avroSchema: Schema): StructType = { SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType] } + + def buildAvroRecordBySchema(record: IndexedRecord, + requiredSchema: Schema, + requiredPos: List[Int], + recordBuilder: GenericRecordBuilder): GenericRecord = { + val requiredFields = requiredSchema.getFields.asScala + assert(requiredFields.length == requiredPos.length) + val positionIterator = requiredPos.iterator + requiredFields.foreach(f => recordBuilder.set(f, record.get(positionIterator.next()))) + recordBuilder.build() + } } diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 4d94463dc..917bfed1d 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -51,6 +51,14 @@ object DataSourceReadOptions { val QUERY_TYPE_INCREMENTAL_OPT_VAL = "incremental" val DEFAULT_QUERY_TYPE_OPT_VAL: String = QUERY_TYPE_SNAPSHOT_OPT_VAL + /** + * For Snapshot query on merge on read table. Use this key to define the payload class. + */ + val REALTIME_MERGE_OPT_KEY = "hoodie.datasource.merge.type" + val REALTIME_SKIP_MERGE_OPT_VAL = "skip_merge" + val REALTIME_PAYLOAD_COMBINE_OPT_VAL = "payload_combine" + val DEFAULT_REALTIME_MERGE_OPT_VAL = REALTIME_PAYLOAD_COMBINE_OPT_VAL + @Deprecated val VIEW_TYPE_OPT_KEY = "hoodie.datasource.view.type" @Deprecated diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala index e26c1c86f..10be30591 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -18,8 +18,9 @@ package org.apache.hudi import org.apache.hudi.DataSourceReadOptions._ +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.table.HoodieTableMetaClient -import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.HoodieROTablePathFilter import org.apache.log4j.LogManager @@ -60,26 +61,20 @@ class DefaultSource extends RelationProvider throw new HoodieException("'path' must be specified.") } + val fs = FSUtils.getFs(path.get, sqlContext.sparkContext.hadoopConfiguration) + val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(Seq(path.get), fs) + val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray) if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) { - // this is just effectively RO view only, where `path` can contain a mix of - // non-hoodie/hoodie path files. set the path filter up - sqlContext.sparkContext.hadoopConfiguration.setClass( - "mapreduce.input.pathFilter.class", - classOf[HoodieROTablePathFilter], - classOf[org.apache.hadoop.fs.PathFilter]) - - log.info("Constructing hoodie (as parquet) data source with options :" + parameters) - log.warn("Snapshot view not supported yet via data source, for MERGE_ON_READ tables. " + - "Please query the Hive table registered using Spark SQL.") - // simply return as a regular parquet relation - DataSource.apply( - sparkSession = sqlContext.sparkSession, - userSpecifiedSchema = Option(schema), - className = "parquet", - options = parameters) - .resolveRelation() + val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath) + if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) { + new MergeOnReadSnapshotRelation(sqlContext, optParams, schema, globPaths, metaClient) + } else { + getBaseFileOnlyView(sqlContext, parameters, schema) + } + } else if(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)) { + getBaseFileOnlyView(sqlContext, parameters, schema) } else if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) { - new IncrementalRelation(sqlContext, path.get, optParams, schema) + new IncrementalRelation(sqlContext, tablePath, optParams, schema) } else { throw new HoodieException("Invalid query type :" + parameters(QUERY_TYPE_OPT_KEY)) } @@ -107,7 +102,7 @@ class DefaultSource extends RelationProvider df: DataFrame): BaseRelation = { val parameters = HoodieSparkSqlWriter.parametersWithWriteDefaults(optParams) HoodieSparkSqlWriter.write(sqlContext, mode, parameters, df) - new HudiEmptyRelation(sqlContext, df.schema) + new HoodieEmptyRelation(sqlContext, df.schema) } override def createSink(sqlContext: SQLContext, @@ -123,4 +118,25 @@ class DefaultSource extends RelationProvider } override def shortName(): String = "hudi" + + private def getBaseFileOnlyView(sqlContext: SQLContext, + optParams: Map[String, String], + schema: StructType): BaseRelation = { + log.warn("Loading Base File Only View.") + // this is just effectively RO view only, where `path` can contain a mix of + // non-hoodie/hoodie path files. set the path filter up + sqlContext.sparkContext.hadoopConfiguration.setClass( + "mapreduce.input.pathFilter.class", + classOf[HoodieROTablePathFilter], + classOf[org.apache.hadoop.fs.PathFilter]) + + log.info("Constructing hoodie (as parquet) data source with options :" + optParams) + // simply return as a regular parquet relation + DataSource.apply( + sparkSession = sqlContext.sparkSession, + userSpecifiedSchema = Option(schema), + className = "parquet", + options = optParams) + .resolveRelation() + } } diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HudiEmptyRelation.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieEmptyRelation.scala similarity index 90% rename from hudi-spark/src/main/scala/org/apache/hudi/HudiEmptyRelation.scala rename to hudi-spark/src/main/scala/org/apache/hudi/HoodieEmptyRelation.scala index 8ddbe4650..46429934f 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HudiEmptyRelation.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieEmptyRelation.scala @@ -28,8 +28,8 @@ import org.apache.spark.sql.types.StructType * @param sqlContext Spark SQL Context * @param userSchema Users data schema */ -class HudiEmptyRelation(val sqlContext: SQLContext, - val userSchema: StructType) extends BaseRelation { +class HoodieEmptyRelation(val sqlContext: SQLContext, + val userSchema: StructType) extends BaseRelation { override def schema: StructType = userSchema } diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala new file mode 100644 index 000000000..f272084b5 --- /dev/null +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.hadoop.config.HoodieRealtimeConfig +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS + +import org.apache.avro.Schema +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} +import org.apache.hadoop.conf.Configuration +import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.avro.{AvroDeserializer, AvroSerializer} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.vectorized.ColumnarBatch + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.Try + +case class HoodieMergeOnReadPartition(index: Int, split: HoodieMergeOnReadFileSplit) extends Partition + +class HoodieMergeOnReadRDD(@transient sc: SparkContext, + @transient config: Configuration, + fullSchemaFileReader: PartitionedFile => Iterator[Any], + requiredSchemaFileReader: PartitionedFile => Iterator[Any], + tableState: HoodieMergeOnReadTableState) + extends RDD[InternalRow](sc, Nil) { + + private val confBroadcast = sc.broadcast(new SerializableWritable(config)) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + val mergeParquetPartition = split.asInstanceOf[HoodieMergeOnReadPartition] + mergeParquetPartition.split match { + case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty => + read(mergeParquetPartition.split.dataFile, requiredSchemaFileReader) + case skipMergeSplit if skipMergeSplit.mergeType + .equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) => + skipMergeFileIterator( + skipMergeSplit, + read(mergeParquetPartition.split.dataFile, requiredSchemaFileReader), + getConfig + ) + case payloadCombineSplit if payloadCombineSplit.mergeType + .equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) => + payloadCombineFileIterator( + payloadCombineSplit, + read(mergeParquetPartition.split.dataFile, fullSchemaFileReader), + getConfig + ) + case _ => throw new HoodieException(s"Unable to select an Iterator to read the Hoodie MOR File Split for " + + s"file path: ${mergeParquetPartition.split.dataFile.filePath}" + + s"log paths: ${mergeParquetPartition.split.logPaths.toString}" + + s"hoodie table path: ${mergeParquetPartition.split.tablePath}" + + s"spark partition Index: ${mergeParquetPartition.index}" + + s"merge type: ${mergeParquetPartition.split.mergeType}") + } + } + + override protected def getPartitions: Array[Partition] = { + tableState + .hoodieRealtimeFileSplits + .zipWithIndex + .map(file => HoodieMergeOnReadPartition(file._2, file._1)).toArray + } + + private def getConfig: Configuration = { + val conf = confBroadcast.value.value + HoodieMergeOnReadRDD.CONFIG_INSTANTIATION_LOCK.synchronized { + new Configuration(conf) + } + } + + private def read(partitionedFile: PartitionedFile, + readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] = { + val fileIterator = readFileFunction(partitionedFile) + val rows = fileIterator.flatMap(_ match { + case r: InternalRow => Seq(r) + case b: ColumnarBatch => b.rowIterator().asScala + }) + rows + } + + private def skipMergeFileIterator(split: HoodieMergeOnReadFileSplit, + baseFileIterator: Iterator[InternalRow], + config: Configuration): Iterator[InternalRow] = + new Iterator[InternalRow] { + private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema) + private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema) + private val requiredFieldPosition = + tableState.requiredStructSchema + .map(f => tableAvroSchema.getField(f.name).pos()).toList + private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema) + private val deserializer = new AvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema) + private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema) + private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords + private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala + + private var recordToLoad: InternalRow = _ + + @scala.annotation.tailrec + override def hasNext: Boolean = { + if (baseFileIterator.hasNext) { + recordToLoad = baseFileIterator.next() + true + } else { + if (logRecordsKeyIterator.hasNext) { + val curAvrokey = logRecordsKeyIterator.next() + val curAvroRecord = logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema) + if (!curAvroRecord.isPresent) { + // delete record found, skipping + this.hasNext + } else { + val requiredAvroRecord = AvroConversionUtils + .buildAvroRecordBySchema(curAvroRecord.get(), requiredAvroSchema, requiredFieldPosition, recordBuilder) + recordToLoad = unsafeProjection(deserializer.deserialize(requiredAvroRecord).asInstanceOf[InternalRow]) + true + } + } else { + false + } + } + } + + override def next(): InternalRow = { + recordToLoad + } + } + + private def payloadCombineFileIterator(split: HoodieMergeOnReadFileSplit, + baseFileIterator: Iterator[InternalRow], + config: Configuration): Iterator[InternalRow] = + new Iterator[InternalRow] { + private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema) + private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema) + private val requiredFieldPosition = + tableState.requiredStructSchema + .map(f => tableAvroSchema.getField(f.name).pos()).toList + private val serializer = new AvroSerializer(tableState.tableStructSchema, tableAvroSchema, false) + private val requiredDeserializer = new AvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema) + private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema) + private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema) + private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords + private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala + private val keyToSkip = mutable.Set.empty[String] + + private var recordToLoad: InternalRow = _ + + @scala.annotation.tailrec + override def hasNext: Boolean = { + if (baseFileIterator.hasNext) { + val curRow = baseFileIterator.next() + val curKey = curRow.getString(HOODIE_RECORD_KEY_COL_POS) + if (logRecords.containsKey(curKey)) { + // duplicate key found, merging + keyToSkip.add(curKey) + val mergedAvroRecord = mergeRowWithLog(curRow, curKey) + if (!mergedAvroRecord.isPresent) { + // deleted + this.hasNext + } else { + // load merged record as InternalRow with required schema + val requiredAvroRecord = AvroConversionUtils + .buildAvroRecordBySchema( + mergedAvroRecord.get(), + requiredAvroSchema, + requiredFieldPosition, + recordBuilder + ) + recordToLoad = unsafeProjection(requiredDeserializer + .deserialize(requiredAvroRecord).asInstanceOf[InternalRow]) + true + } + } else { + // No merge needed, load current row with required schema + recordToLoad = unsafeProjection(createRowWithRequiredSchema(curRow)) + true + } + } else { + if (logRecordsKeyIterator.hasNext) { + val curKey = logRecordsKeyIterator.next() + if (keyToSkip.contains(curKey)) { + this.hasNext + } else { + val insertAvroRecord = + logRecords.get(curKey).getData.getInsertValue(tableAvroSchema) + if (!insertAvroRecord.isPresent) { + // stand alone delete record, skipping + this.hasNext + } else { + val requiredAvroRecord = AvroConversionUtils + .buildAvroRecordBySchema( + insertAvroRecord.get(), + requiredAvroSchema, + requiredFieldPosition, + recordBuilder + ) + recordToLoad = unsafeProjection(requiredDeserializer + .deserialize(requiredAvroRecord).asInstanceOf[InternalRow]) + true + } + } + } else { + false + } + } + } + + override def next(): InternalRow = recordToLoad + + private def createRowWithRequiredSchema(row: InternalRow): InternalRow = { + val rowToReturn = new SpecificInternalRow(tableState.requiredStructSchema) + val posIterator = requiredFieldPosition.iterator + var curIndex = 0 + tableState.requiredStructSchema.foreach( + f => { + val curPos = posIterator.next() + val curField = row.get(curPos, f.dataType) + rowToReturn.update(curIndex, curField) + curIndex = curIndex + 1 + } + ) + rowToReturn + } + + private def mergeRowWithLog(curRow: InternalRow, curKey: String) = { + val historyAvroRecord = serializer.serialize(curRow).asInstanceOf[GenericRecord] + logRecords.get(curKey).getData.combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema) + } + } +} + +private object HoodieMergeOnReadRDD { + val CONFIG_INSTANTIATION_LOCK = new Object() + + def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: Configuration): HoodieMergedLogRecordScanner = { + val fs = FSUtils.getFs(split.tablePath, config) + new HoodieMergedLogRecordScanner( + fs, + split.tablePath, + split.logPaths.get.asJava, + logSchema, + split.latestCommit, + split.maxCompactionMemoryInBytes, + Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, + HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean).getOrElse(false), + false, + config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, + HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), + config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, + HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) + } +} diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HudiSparkUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala similarity index 96% rename from hudi-spark/src/main/scala/org/apache/hudi/HudiSparkUtils.scala rename to hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 861de1458..26babd834 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HudiSparkUtils.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -27,9 +27,9 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType} import scala.collection.JavaConverters._ -object HudiSparkUtils { +object HoodieSparkUtils { - def getHudiMetadataSchema: StructType = { + def getMetaSchema: StructType = { StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => { StructField(col, StringType, nullable = true) })) diff --git a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala index 436895bda..338a54eda 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -17,14 +17,14 @@ package org.apache.hudi -import org.apache.hadoop.fs.GlobPattern -import org.apache.hadoop.fs.Path import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieTableType} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException import org.apache.hudi.table.HoodieTable + +import org.apache.hadoop.fs.GlobPattern import org.apache.log4j.LogManager import org.apache.spark.rdd.RDD import org.apache.spark.sql.sources.{BaseRelation, TableScan} @@ -47,7 +47,8 @@ class IncrementalRelation(val sqlContext: SQLContext, private val log = LogManager.getLogger(classOf[IncrementalRelation]) - private val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true) + private val metaClient = + new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true) // MOR tables not supported yet if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) { throw new HoodieException("Incremental view not implemented yet, for merge-on-read tables") diff --git a/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala new file mode 100644 index 000000000..c1a6acdb0 --- /dev/null +++ b/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.common.model.HoodieBaseFile +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes + +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.mapred.JobConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} +import org.apache.spark.sql.types.StructType + +import scala.collection.JavaConverters._ + +case class HoodieMergeOnReadFileSplit(dataFile: PartitionedFile, + logPaths: Option[List[String]], + latestCommit: String, + tablePath: String, + maxCompactionMemoryInBytes: Long, + mergeType: String) + +case class HoodieMergeOnReadTableState(tableStructSchema: StructType, + requiredStructSchema: StructType, + tableAvroSchema: String, + requiredAvroSchema: String, + hoodieRealtimeFileSplits: List[HoodieMergeOnReadFileSplit]) + +class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, + val optParams: Map[String, String], + val userSchema: StructType, + val globPaths: Seq[Path], + val metaClient: HoodieTableMetaClient) + extends BaseRelation with PrunedFilteredScan with Logging { + + private val conf = sqlContext.sparkContext.hadoopConfiguration + private val jobConf = new JobConf(conf) + // use schema from latest metadata, if not present, read schema from the data file + private val schemaUtil = new TableSchemaResolver(metaClient) + private val tableAvroSchema = schemaUtil.getTableAvroSchema + private val tableStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema) + private val mergeType = optParams.getOrElse( + DataSourceReadOptions.REALTIME_MERGE_OPT_KEY, + DataSourceReadOptions.DEFAULT_REALTIME_MERGE_OPT_VAL) + private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf) + private val fileIndex = buildFileIndex() + + override def schema: StructType = tableStructSchema + + override def needConversion: Boolean = false + + override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}") + log.debug(s" buildScan filters = ${filters.mkString(",")}") + var requiredStructSchema = StructType(Seq()) + requiredColumns.foreach(col => { + val field = tableStructSchema.find(_.name == col) + if (field.isDefined) { + requiredStructSchema = requiredStructSchema.add(field.get) + } + }) + val requiredAvroSchema = AvroConversionUtils + .convertStructTypeToAvroSchema(requiredStructSchema, tableAvroSchema.getName, tableAvroSchema.getNamespace) + val hoodieTableState = HoodieMergeOnReadTableState( + tableStructSchema, + requiredStructSchema, + tableAvroSchema.toString, + requiredAvroSchema.toString, + fileIndex + ) + val fullSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues( + sparkSession = sqlContext.sparkSession, + dataSchema = tableStructSchema, + partitionSchema = StructType(Nil), + requiredSchema = tableStructSchema, + filters = Seq(), + options = optParams, + hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf() + ) + val requiredSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues( + sparkSession = sqlContext.sparkSession, + dataSchema = tableStructSchema, + partitionSchema = StructType(Nil), + requiredSchema = requiredStructSchema, + filters = filters, + options = optParams, + hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf() + ) + + // Follow the implementation of Spark internal HadoopRDD to handle the broadcast configuration. + FileSystem.getLocal(jobConf) + SparkHadoopUtil.get.addCredentials(jobConf) + val rdd = new HoodieMergeOnReadRDD( + sqlContext.sparkContext, + jobConf, + fullSchemaParquetReader, + requiredSchemaParquetReader, + hoodieTableState + ) + rdd.asInstanceOf[RDD[Row]] + } + + def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = { + val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths) + val fileStatuses = inMemoryFileIndex.allFiles() + if (fileStatuses.isEmpty) { + throw new HoodieException("No files found for reading in user provided path.") + } + + val fsView = new HoodieTableFileSystemView(metaClient, + metaClient.getActiveTimeline.getCommitsTimeline + .filterCompletedInstants, fileStatuses.toArray) + val latestFiles: List[HoodieBaseFile] = fsView.getLatestBaseFiles.iterator().asScala.toList + val latestCommit = fsView.getLastInstant.get().getTimestamp + val fileGroup = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, latestFiles.asJava).asScala + val fileSplits = fileGroup.map(kv => { + val baseFile = kv._1 + val logPaths = if (kv._2.isEmpty) Option.empty else Option(kv._2.asScala.toList) + val partitionedFile = PartitionedFile(InternalRow.empty, baseFile.getPath, 0, baseFile.getFileLen) + HoodieMergeOnReadFileSplit(partitionedFile, logPaths, latestCommit, + metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType) + }).toList + fileSplits + } +} diff --git a/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java b/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java index 0aa8ca45b..4e1984c98 100644 --- a/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java +++ b/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java @@ -130,14 +130,7 @@ public class TestBootstrap extends HoodieClientTestBase { public void setUp() throws Exception { bootstrapBasePath = tmpFolder.toAbsolutePath().toString() + "/data"; initPath(); - spark = SparkSession.builder() - .appName("Bootstrap test") - .master("local[2]") - .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .getOrCreate(); - jsc = new JavaSparkContext(spark.sparkContext()); - sqlContext = spark.sqlContext(); - hadoopConf = spark.sparkContext().hadoopConfiguration(); + initSparkContexts(); initTestDataGenerator(); initMetaClient(); // initialize parquet input format @@ -146,6 +139,7 @@ public class TestBootstrap extends HoodieClientTestBase { @AfterEach public void tearDown() throws IOException { + cleanupSparkContexts(); cleanupClients(); cleanupTestDataGenerator(); } diff --git a/hudi-spark/src/test/scala/org/apache/hudi/TestHudiSparkUtils.scala b/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala similarity index 89% rename from hudi-spark/src/test/scala/org/apache/hudi/TestHudiSparkUtils.scala rename to hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala index 6b1a178da..d5da67626 100644 --- a/hudi-spark/src/test/scala/org/apache/hudi/TestHudiSparkUtils.scala +++ b/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala @@ -28,7 +28,7 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.junit.jupiter.api.io.TempDir -class TestHudiSparkUtils { +class TestHoodieSparkUtils { @Test def testGlobPaths(@TempDir tempDir: File): Unit = { @@ -48,29 +48,29 @@ class TestHudiSparkUtils { files.foreach(file => new File(file.toUri).createNewFile()) var paths = Seq(tempDir.getAbsolutePath + "/*") - var globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths, + var globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths, new Path(paths.head).getFileSystem(new Configuration())) assertEquals(folders.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString)) paths = Seq(tempDir.getAbsolutePath + "/*/*") - globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths, + globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths, new Path(paths.head).getFileSystem(new Configuration())) assertEquals(files.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString)) paths = Seq(tempDir.getAbsolutePath + "/folder1/*") - globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths, + globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths, new Path(paths.head).getFileSystem(new Configuration())) assertEquals(Seq(files(0), files(1)).sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString)) paths = Seq(tempDir.getAbsolutePath + "/folder2/*") - globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths, + globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths, new Path(paths.head).getFileSystem(new Configuration())) assertEquals(Seq(files(2), files(3)).sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString)) paths = Seq(tempDir.getAbsolutePath + "/folder1/*", tempDir.getAbsolutePath + "/folder2/*") - globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths, + globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths, new Path(paths.head).getFileSystem(new Configuration())) assertEquals(files.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString)) } @@ -98,8 +98,9 @@ class TestHudiSparkUtils { folders.foreach(folder => new File(folder.toUri).mkdir()) files.foreach(file => new File(file.toUri).createNewFile()) - val index = HudiSparkUtils.createInMemoryFileIndex(spark, Seq(folders(0), folders(1))) + val index = HoodieSparkUtils.createInMemoryFileIndex(spark, Seq(folders(0), folders(1))) val indexedFilePaths = index.allFiles().map(fs => fs.getPath) assertEquals(files.sortWith(_.toString < _.toString), indexedFilePaths.sortWith(_.toString < _.toString)) + spark.stop() } } diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala new file mode 100644 index 000000000..c1e45c359 --- /dev/null +++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.testutils.HoodieClientTestBase +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} +import org.apache.log4j.LogManager +import org.apache.spark.sql._ +import org.apache.spark.sql.functions.col +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} + +import scala.collection.JavaConversions._ + +/** + * Basic tests on the spark datasource for COW table. + */ +class TestCOWDataSource extends HoodieClientTestBase { + private val log = LogManager.getLogger(getClass) + var spark: SparkSession = null + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition", + DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp", + HoodieWriteConfig.TABLE_NAME -> "hoodie_test" + ) + + @BeforeEach override def setUp() { + initPath() + initSparkContexts() + spark = sqlContext.sparkSession + initTestDataGenerator() + initFileSystem() + } + + @AfterEach override def tearDown() = { + cleanupSparkContexts() + cleanupTestDataGenerator() + cleanupFileSystem() + } + + @Test def testShortNameStorage() { + // Insert Operation + val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + inputDF.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) + } + + @Test def testCopyOnWriteStorage() { + // Insert Operation + val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) + val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath) + + // Snapshot query + val snapshotDF1 = spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*") + assertEquals(100, snapshotDF1.count()) + + val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList + val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count() + + // Upsert Operation + inputDF2.write.format("org.apache.hudi") + .options(commonOpts) + .mode(SaveMode.Append) + .save(basePath) + + val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, basePath) + assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").size()) + + // Snapshot Query + val snapshotDF2 = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*/*/*") + assertEquals(100, snapshotDF2.count()) // still 100, since we only updated + + // Read Incremental Query + // we have 2 commits, try pulling the first commit (which is not the latest) + val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").get(0) + val hoodieIncViewDF1 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000") + .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, firstCommit) + .load(basePath) + assertEquals(100, hoodieIncViewDF1.count()) // 100 initial inserts must be pulled + var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect() + assertEquals(1, countsPerCommit.length) + assertEquals(firstCommit, countsPerCommit(0).get(0)) + + // Upsert an empty dataFrame + val emptyRecords = recordsToStrings(dataGen.generateUpdates("002", 0)).toList + val emptyDF = spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1)) + emptyDF.write.format("org.apache.hudi") + .options(commonOpts) + .mode(SaveMode.Append) + .save(basePath) + + // pull the latest commit + val hoodieIncViewDF2 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) + .load(basePath) + + assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must be pulled + countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect() + assertEquals(1, countsPerCommit.length) + assertEquals(commitInstantTime2, countsPerCommit(0).get(0)) + + // pull the latest commit within certain partitions + val hoodieIncViewDF3 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) + .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/2016/*/*/*") + .load(basePath) + assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2016")).count(), hoodieIncViewDF3.count()) + + val timeTravelDF = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000") + .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, firstCommit) + .load(basePath) + assertEquals(100, timeTravelDF.count()) // 100 initial inserts must be pulled + } + + @Test def testDropInsertDup(): Unit = { + val insert1Cnt = 10 + val insert2DupKeyCnt = 9 + val insert2NewKeyCnt = 2 + + val totalUniqueKeyToGenerate = insert1Cnt + insert2NewKeyCnt + val allRecords = dataGen.generateInserts("001", totalUniqueKeyToGenerate) + val inserts1 = allRecords.subList(0, insert1Cnt) + val inserts2New = dataGen.generateSameKeyInserts("002", allRecords.subList(insert1Cnt, insert1Cnt + insert2NewKeyCnt)) + val inserts2Dup = dataGen.generateSameKeyInserts("002", inserts1.subList(0, insert2DupKeyCnt)) + + val records1 = recordsToStrings(inserts1).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + val hoodieROViewDF1 = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*/*/*") + assertEquals(insert1Cnt, hoodieROViewDF1.count()) + + val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath) + val records2 = recordsToStrings(inserts2Dup ++ inserts2New).toList + val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + inputDF2.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY, "true") + .mode(SaveMode.Append) + .save(basePath) + val hoodieROViewDF2 = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*/*/*") + assertEquals(hoodieROViewDF2.count(), totalUniqueKeyToGenerate) + + val hoodieIncViewDF2 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) + .load(basePath) + assertEquals(hoodieIncViewDF2.count(), insert2NewKeyCnt) + } +} diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala deleted file mode 100644 index 48582c18e..000000000 --- a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala +++ /dev/null @@ -1,337 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.functional - - -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.table.HoodieTableMetaClient -import org.apache.hudi.common.testutils.HoodieTestDataGenerator -import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings -import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.exception.TableNotFoundException -import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} -import org.apache.log4j.LogManager -import org.apache.spark.sql._ -import org.apache.spark.sql.functions.col -import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime} -import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} -import org.junit.jupiter.api.io.TempDir -import org.junit.jupiter.api.{BeforeEach, Test} - -import scala.collection.JavaConversions._ -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, Future} - -/** - * Basic tests on the spark datasource - */ -class TestDataSource { - private val log = LogManager.getLogger(getClass) - - var spark: SparkSession = null - var dataGen: HoodieTestDataGenerator = null - val commonOpts = Map( - "hoodie.insert.shuffle.parallelism" -> "4", - "hoodie.upsert.shuffle.parallelism" -> "4", - DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key", - DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition", - DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp", - HoodieWriteConfig.TABLE_NAME -> "hoodie_test" - ) - var basePath: String = null - var fs: FileSystem = null - - @BeforeEach def initialize(@TempDir tempDir: java.nio.file.Path) { - spark = SparkSession.builder - .appName("Hoodie Datasource test") - .master("local[2]") - .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .getOrCreate - dataGen = new HoodieTestDataGenerator() - basePath = tempDir.toAbsolutePath.toString - fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration) - } - - @Test def testShortNameStorage() { - // Insert Operation - val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList - val inputDF: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records, 2)) - inputDF.write.format("hudi") - .options(commonOpts) - .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) - .mode(SaveMode.Overwrite) - .save(basePath) - - assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) - } - - @Test def testCopyOnWriteStorage() { - // Insert Operation - val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList - val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) - inputDF1.write.format("org.apache.hudi") - .options(commonOpts) - .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) - .mode(SaveMode.Overwrite) - .save(basePath) - - assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) - val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath) - - // Read RO View - val hoodieROViewDF1 = spark.read.format("org.apache.hudi") - .load(basePath + "/*/*/*/*"); - assertEquals(100, hoodieROViewDF1.count()) - - val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList - val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) - val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count() - - // Upsert Operation - inputDF2.write.format("org.apache.hudi") - .options(commonOpts) - .mode(SaveMode.Append) - .save(basePath) - - val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, basePath) - assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").size()) - - // Read RO View - val hoodieROViewDF2 = spark.read.format("org.apache.hudi") - .load(basePath + "/*/*/*/*"); - assertEquals(100, hoodieROViewDF2.count()) // still 100, since we only updated - - // Read Incremental View - // we have 2 commits, try pulling the first commit (which is not the latest) - val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").get(0); - val hoodieIncViewDF1 = spark.read.format("org.apache.hudi") - .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000") - .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, firstCommit) - .load(basePath); - assertEquals(100, hoodieIncViewDF1.count()) // 100 initial inserts must be pulled - var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect(); - assertEquals(1, countsPerCommit.length) - assertEquals(firstCommit, countsPerCommit(0).get(0)) - - // Upsert an empty dataFrame - val emptyRecords = recordsToStrings(dataGen.generateUpdates("002", 0)).toList - val emptyDF: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1)) - emptyDF.write.format("org.apache.hudi") - .options(commonOpts) - .mode(SaveMode.Append) - .save(basePath) - - // pull the latest commit - val hoodieIncViewDF2 = spark.read.format("org.apache.hudi") - .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) - .load(basePath); - - assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must be pulled - countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect(); - assertEquals(1, countsPerCommit.length) - assertEquals(commitInstantTime2, countsPerCommit(0).get(0)) - - // pull the latest commit within certain partitions - val hoodieIncViewDF3 = spark.read.format("org.apache.hudi") - .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) - .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/2016/*/*/*") - .load(basePath); - assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2016")).count(), hoodieIncViewDF3.count()) - } - - @Test def testMergeOnReadStorage() { - // Bulk Insert Operation - val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList - val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) - inputDF1.write.format("org.apache.hudi") - .options(commonOpts) - .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same - .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) - .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) - .mode(SaveMode.Overwrite) - .save(basePath) - - assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) - - // Read RO View - val hoodieROViewDF1 = spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*") - assertEquals(100, hoodieROViewDF1.count()) // still 100, since we only updated - } - - @Test def testDropInsertDup(): Unit = { - val insert1Cnt = 10 - val insert2DupKeyCnt = 9 - val insert2NewKeyCnt = 2 - - val totalUniqueKeyToGenerate = insert1Cnt + insert2NewKeyCnt - val allRecords = dataGen.generateInserts("001", totalUniqueKeyToGenerate) - val inserts1 = allRecords.subList(0, insert1Cnt) - val inserts2New = dataGen.generateSameKeyInserts("002", allRecords.subList(insert1Cnt, insert1Cnt + insert2NewKeyCnt)) - val inserts2Dup = dataGen.generateSameKeyInserts("002", inserts1.subList(0, insert2DupKeyCnt)) - - val records1 = recordsToStrings(inserts1).toList - val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) - inputDF1.write.format("org.apache.hudi") - .options(commonOpts) - .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) - .mode(SaveMode.Overwrite) - .save(basePath) - val hoodieROViewDF1 = spark.read.format("org.apache.hudi") - .load(basePath + "/*/*/*/*") - assertEquals(insert1Cnt, hoodieROViewDF1.count()) - - val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath) - val records2 = recordsToStrings(inserts2Dup ++ inserts2New).toList - val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) - inputDF2.write.format("org.apache.hudi") - .options(commonOpts) - .option(DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY, "true") - .mode(SaveMode.Append) - .save(basePath) - val hoodieROViewDF2 = spark.read.format("org.apache.hudi") - .load(basePath + "/*/*/*/*") - assertEquals(hoodieROViewDF2.count(), totalUniqueKeyToGenerate) - - val hoodieIncViewDF2 = spark.read.format("org.apache.hudi") - .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) - .load(basePath) - assertEquals(hoodieIncViewDF2.count(), insert2NewKeyCnt) - } - - @Test - def testStructuredStreaming(): Unit = { - fs.delete(new Path(basePath), true) - val sourcePath = basePath + "/source" - val destPath = basePath + "/dest" - fs.mkdirs(new Path(sourcePath)) - - // First chunk of data - val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList - val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) - - // Second chunk of data - val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList - val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) - val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count() - - // define the source of streaming - val streamingInput = - spark.readStream - .schema(inputDF1.schema) - .json(sourcePath) - - val f1 = Future { - println("streaming starting") - //'writeStream' can be called only on streaming Dataset/DataFrame - streamingInput - .writeStream - .format("org.apache.hudi") - .options(commonOpts) - .trigger(new ProcessingTime(100)) - .option("checkpointLocation", basePath + "/checkpoint") - .outputMode(OutputMode.Append) - .start(destPath) - .awaitTermination(10000) - println("streaming ends") - } - - val f2 = Future { - inputDF1.write.mode(SaveMode.Append).json(sourcePath) - // wait for spark streaming to process one microbatch - val currNumCommits = waitTillAtleastNCommits(fs, destPath, 1, 120, 5); - assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, destPath, "000")) - val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, destPath) - // Read RO View - val hoodieROViewDF1 = spark.read.format("org.apache.hudi") - .load(destPath + "/*/*/*/*") - assert(hoodieROViewDF1.count() == 100) - - inputDF2.write.mode(SaveMode.Append).json(sourcePath) - // wait for spark streaming to process one microbatch - waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5); - val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, destPath) - assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size()) - // Read RO View - val hoodieROViewDF2 = spark.read.format("org.apache.hudi") - .load(destPath + "/*/*/*/*") - assertEquals(100, hoodieROViewDF2.count()) // still 100, since we only updated - - - // Read Incremental View - // we have 2 commits, try pulling the first commit (which is not the latest) - val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").get(0) - val hoodieIncViewDF1 = spark.read.format("org.apache.hudi") - .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000") - .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, firstCommit) - .load(destPath) - assertEquals(100, hoodieIncViewDF1.count()) - // 100 initial inserts must be pulled - var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect() - assertEquals(1, countsPerCommit.length) - assertEquals(firstCommit, countsPerCommit(0).get(0)) - - // pull the latest commit - val hoodieIncViewDF2 = spark.read.format("org.apache.hudi") - .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) - .load(destPath) - - assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must be pulled - countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect() - assertEquals(1, countsPerCommit.length) - assertEquals(commitInstantTime2, countsPerCommit(0).get(0)) - } - Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf) - } - - @throws[InterruptedException] - private def waitTillAtleastNCommits(fs: FileSystem, tablePath: String, - numCommits: Int, timeoutSecs: Int, sleepSecsAfterEachRun: Int): Int = { - val beginTime = System.currentTimeMillis - var currTime = beginTime - val timeoutMsecs = timeoutSecs * 1000 - var numInstants = 0 - var success: Boolean = false - while ({!success && (currTime - beginTime) < timeoutMsecs}) try { - val timeline = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, tablePath) - log.info("Timeline :" + timeline.getInstants.toArray) - if (timeline.countInstants >= numCommits) { - numInstants = timeline.countInstants - success = true - } - val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath, true) - } catch { - case te: TableNotFoundException => - log.info("Got table not found exception. Retrying") - } finally { - Thread.sleep(sleepSecsAfterEachRun * 1000) - currTime = System.currentTimeMillis - } - if (!success) { - throw new IllegalStateException("Timed-out waiting for " + numCommits + " commits to appear in " + tablePath) - } - numInstants - } -} diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala new file mode 100644 index 000000000..5938ee54e --- /dev/null +++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.testutils.HoodieTestDataGenerator +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.testutils.HoodieClientTestBase +import org.apache.log4j.LogManager +import org.apache.spark.sql._ +import org.apache.spark.sql.functions._ +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} + +import scala.collection.JavaConversions._ + +/** + * Tests on Spark DataSource for MOR table. + */ +class TestMORDataSource extends HoodieClientTestBase { + + var spark: SparkSession = null + private val log = LogManager.getLogger(classOf[TestMORDataSource]) + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition", + DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp", + HoodieWriteConfig.TABLE_NAME -> "hoodie_test" + ) + + @BeforeEach override def setUp() { + initPath() + initSparkContexts() + spark = sqlContext.sparkSession + initTestDataGenerator() + initFileSystem() + } + + @AfterEach override def tearDown() = { + cleanupSparkContexts() + cleanupTestDataGenerator() + cleanupFileSystem() + } + + @Test def testMergeOnReadStorage() { + + val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration) + // Bulk Insert Operation + val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList + val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) + + // Read RO View + val hudiRODF1 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) + .load(basePath + "/*/*/*/*") + assertEquals(100, hudiRODF1.count()) // still 100, since we only updated + val insertCommitTime = HoodieDataSourceHelpers.latestCommit(fs, basePath) + val insertCommitTimes = hudiRODF1.select("_hoodie_commit_time").distinct().collectAsList().map(r => r.getString(0)).toList + assertEquals(List(insertCommitTime), insertCommitTimes) + + // Upsert operation + val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList + val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + inputDF2.write.format("org.apache.hudi") + .options(commonOpts) + .mode(SaveMode.Append) + .save(basePath) + + // Read Snapshot query + val updateCommitTime = HoodieDataSourceHelpers.latestCommit(fs, basePath) + val hudiSnapshotDF2 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .load(basePath + "/*/*/*/*") + val updateCommitTimes = hudiSnapshotDF2.select("_hoodie_commit_time").distinct().collectAsList().map(r => r.getString(0)).toList + assertEquals(List(updateCommitTime), updateCommitTimes) + } + + @Test def testCount() { + // First Operation: + // Producing parquet files to three default partitions. + // SNAPSHOT view on MOR table with parquet files only. + val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) + val hudiSnapshotDF1 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .load(basePath + "/*/*/*/*") + assertEquals(100, hudiSnapshotDF1.count()) // still 100, since we only updated + + // Second Operation: + // Upsert the update to the default partitions with duplicate records. Produced a log file for each parquet. + // SNAPSHOT view should read the log files only with the latest commit time. + val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList + val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + inputDF2.write.format("org.apache.hudi") + .options(commonOpts) + .mode(SaveMode.Append) + .save(basePath) + val hudiSnapshotDF2 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .load(basePath + "/*/*/*/*") + assertEquals(100, hudiSnapshotDF2.count()) // still 100, since we only updated + val commit1Time = hudiSnapshotDF1.select("_hoodie_commit_time").head().get(0).toString + val commit2Time = hudiSnapshotDF2.select("_hoodie_commit_time").head().get(0).toString + assertEquals(hudiSnapshotDF2.select("_hoodie_commit_time").distinct().count(), 1) + assertTrue(commit2Time > commit1Time) + assertEquals(100, hudiSnapshotDF2.join(hudiSnapshotDF1, Seq("_hoodie_record_key"), "left").count()) + + // Unmerge + val hudiSnapshotSkipMergeDF2 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .option(DataSourceReadOptions.REALTIME_MERGE_OPT_KEY, DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) + .load(basePath + "/*/*/*/*") + assertEquals(200, hudiSnapshotSkipMergeDF2.count()) + assertEquals(100, hudiSnapshotSkipMergeDF2.select("_hoodie_record_key").distinct().count()) + assertEquals(200, hudiSnapshotSkipMergeDF2.join(hudiSnapshotDF2, Seq("_hoodie_record_key"), "left").count()) + + // Test Read Optimized Query on MOR table + val hudiRODF2 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) + .load(basePath + "/*/*/*/*") + assertEquals(100, hudiRODF2.count()) + + // Third Operation: + // Upsert another update to the default partitions with 50 duplicate records. Produced the second log file for each parquet. + // SNAPSHOT view should read the latest log files. + val records3 = recordsToStrings(dataGen.generateUniqueUpdates("003", 50)).toList + val inputDF3: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records3, 2)) + inputDF3.write.format("org.apache.hudi") + .options(commonOpts) + .mode(SaveMode.Append) + .save(basePath) + val hudiSnapshotDF3 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .load(basePath + "/*/*/*/*") + // still 100, because we only updated the existing records + assertEquals(100, hudiSnapshotDF3.count()) + + // 50 from commit2, 50 from commit3 + assertEquals(hudiSnapshotDF3.select("_hoodie_commit_time").distinct().count(), 2) + assertEquals(50, hudiSnapshotDF3.filter(col("_hoodie_commit_time") > commit2Time).count()) + assertEquals(50, + hudiSnapshotDF3.join(hudiSnapshotDF2, Seq("_hoodie_record_key", "_hoodie_commit_time"), "inner").count()) + + // Fourth Operation: + // Insert records to a new partition. Produced a new parquet file. + // SNAPSHOT view should read the latest log files from the default partition and parquet from the new partition. + val partitionPaths = new Array[String](1) + partitionPaths.update(0, "2020/01/10") + val newDataGen = new HoodieTestDataGenerator(partitionPaths) + val records4 = recordsToStrings(newDataGen.generateInserts("004", 100)).toList + val inputDF4: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records4, 2)) + inputDF4.write.format("org.apache.hudi") + .options(commonOpts) + .mode(SaveMode.Append) + .save(basePath) + val hudiSnapshotDF4 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .load(basePath + "/*/*/*/*") + // 200, because we insert 100 records to a new partition + assertEquals(200, hudiSnapshotDF4.count()) + assertEquals(100, + hudiSnapshotDF1.join(hudiSnapshotDF4, Seq("_hoodie_record_key"), "inner").count()) + + // Fifth Operation: + // Upsert records to the new partition. Produced a newer version of parquet file. + // SNAPSHOT view should read the latest log files from the default partition + // and the latest parquet from the new partition. + val records5 = recordsToStrings(newDataGen.generateUpdates("005", 100)).toList + val inputDF5: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records5, 2)) + inputDF5.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + val hudiSnapshotDF5 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .load(basePath + "/*/*/*/*") + assertEquals(200, hudiSnapshotDF5.count()) + } + + @Test + def testPayloadDelete() { + // First Operation: + // Producing parquet files to three default partitions. + // SNAPSHOT view on MOR table with parquet files only. + val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) + val hudiSnapshotDF1 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .load(basePath + "/*/*/*/*") + assertEquals(100, hudiSnapshotDF1.count()) // still 100, since we only updated + + // Second Operation: + // Upsert 50 delete records + // Snopshot view should only read 50 records + val records2 = recordsToStrings(dataGen.generateUniqueDeleteRecords("002", 50)).toList + val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + inputDF2.write.format("org.apache.hudi") + .options(commonOpts) + .mode(SaveMode.Append) + .save(basePath) + val hudiSnapshotDF2 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .load(basePath + "/*/*/*/*") + assertEquals(50, hudiSnapshotDF2.count()) // 50 records were deleted + assertEquals(hudiSnapshotDF2.select("_hoodie_commit_time").distinct().count(), 1) + val commit1Time = hudiSnapshotDF1.select("_hoodie_commit_time").head().get(0).toString + val commit2Time = hudiSnapshotDF2.select("_hoodie_commit_time").head().get(0).toString + assertTrue(commit1Time.equals(commit2Time)) + assertEquals(50, hudiSnapshotDF2.join(hudiSnapshotDF1, Seq("_hoodie_record_key"), "left").count()) + + // unmerge query, skip the delete records + val hudiSnapshotDF2Unmerge = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .option(DataSourceReadOptions.REALTIME_MERGE_OPT_KEY, DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) + .load(basePath + "/*/*/*/*") + assertEquals(100, hudiSnapshotDF2Unmerge.count()) + + // Third Operation: + // Upsert 50 delete records to delete the reset + // Snopshot view should read 0 record + val records3 = recordsToStrings(dataGen.generateUniqueDeleteRecords("003", 50)).toList + val inputDF3: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records3, 2)) + inputDF3.write.format("org.apache.hudi") + .options(commonOpts) + .mode(SaveMode.Append) + .save(basePath) + val hudiSnapshotDF3 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .load(basePath + "/*/*/*/*") + assertEquals(0, hudiSnapshotDF3.count()) // 100 records were deleted, 0 record to load + } + + @Test + def testPrunedFiltered() { + // First Operation: + // Producing parquet files to three default partitions. + // SNAPSHOT view on MOR table with parquet files only. + val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + val hudiSnapshotDF1 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .load(basePath + "/*/*/*/*") + assertEquals(100, hudiSnapshotDF1.count()) + // select nested columns with order different from the actual schema + assertEquals("amount,currency,tip_history,_hoodie_commit_seqno", + hudiSnapshotDF1 + .select("fare.amount", "fare.currency", "tip_history", "_hoodie_commit_seqno") + .orderBy(desc("_hoodie_commit_seqno")) + .columns.mkString(",")) + + // Second Operation: + // Upsert 50 update records + // Snopshot view should read 100 records + val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 50)) + .toList + val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + inputDF2.write.format("org.apache.hudi") + .options(commonOpts) + .mode(SaveMode.Append) + .save(basePath) + val hudiSnapshotDF2 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .load(basePath + "/*/*/*/*") + + val commit1Time = hudiSnapshotDF1.select("_hoodie_commit_time").head().get(0).toString + + // filter first commit and only read log records + assertEquals(50, hudiSnapshotDF2.select("_hoodie_commit_seqno", "fare.amount", "fare.currency", "tip_history") + .filter(col("_hoodie_commit_time") > commit1Time).count()) + + // select nested columns with order different from the actual schema + assertEquals("amount,currency,tip_history,_hoodie_commit_seqno", + hudiSnapshotDF2 + .select("fare.amount", "fare.currency", "tip_history", "_hoodie_commit_seqno") + .orderBy(desc("_hoodie_commit_seqno")) + .columns.mkString(",")) + + // Correctly loading type + val sampleRow = hudiSnapshotDF2 + .select("begin_lat", "current_date", "fare.currency", "tip_history", "nation") + .orderBy(desc("_hoodie_commit_time")) + .head() + assertEquals(sampleRow.getDouble(0), sampleRow.get(0)) + assertEquals(sampleRow.getLong(1), sampleRow.get(1)) + assertEquals(sampleRow.getString(2), sampleRow.get(2)) + assertEquals(sampleRow.getSeq(3), sampleRow.get(3)) + assertEquals(sampleRow.getStruct(4), sampleRow.get(4)) + + // make sure show() work + hudiSnapshotDF1.show(1) + hudiSnapshotDF2.show(1) + } + + @Test + def testVectorizedReader() { + spark.conf.set("spark.sql.parquet.enableVectorizedReader", true) + assertTrue(spark.conf.get("spark.sql.parquet.enableVectorizedReader").toBoolean) + // Vectorized Reader will only be triggered with AtomicType schema, + // which is not null, UDTs, arrays, structs, and maps. + val schema = HoodieTestDataGenerator.SHORT_TRIP_SCHEMA + val records1 = recordsToStrings(dataGen.generateInsertsAsPerSchema("001", 100, schema)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + val hudiSnapshotDF1 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .load(basePath + "/*/*/*/*") + assertEquals(100, hudiSnapshotDF1.count()) + + val records2 = recordsToStrings(dataGen.generateUniqueUpdatesAsPerSchema("002", 50, schema)) + .toList + val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + inputDF2.write.format("org.apache.hudi") + .options(commonOpts) + .mode(SaveMode.Append) + .save(basePath) + val hudiSnapshotDF2 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .load(basePath + "/*/*/*/*") + assertEquals(100, hudiSnapshotDF2.count()) + + // loading correct type + val sampleRow = hudiSnapshotDF2 + .select("fare", "driver", "_hoodie_is_deleted") + .head() + assertEquals(sampleRow.getDouble(0), sampleRow.get(0)) + assertEquals(sampleRow.getString(1), sampleRow.get(1)) + assertEquals(sampleRow.getBoolean(2), sampleRow.get(2)) + + // test show() + hudiSnapshotDF1.show(1) + hudiSnapshotDF2.show(1) + } +} diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala new file mode 100644 index 000000000..011573b8b --- /dev/null +++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.exception.TableNotFoundException +import org.apache.hudi.testutils.HoodieClientTestBase +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} +import org.apache.log4j.LogManager +import org.apache.spark.sql._ +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime} +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} + +import scala.collection.JavaConversions._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} + +/** + * Basic tests on the spark datasource for structured streaming sink + */ +class TestStructuredStreaming extends HoodieClientTestBase { + private val log = LogManager.getLogger(getClass) + var spark: SparkSession = null + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition", + DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp", + HoodieWriteConfig.TABLE_NAME -> "hoodie_test" + ) + + @BeforeEach override def setUp() { + initPath() + initSparkContexts() + spark = sqlContext.sparkSession + initTestDataGenerator() + initFileSystem() + } + + @AfterEach override def tearDown() = { + cleanupSparkContexts() + cleanupTestDataGenerator() + cleanupFileSystem() + } + + @Test + def testStructuredStreaming(): Unit = { + fs.delete(new Path(basePath), true) + val sourcePath = basePath + "/source" + val destPath = basePath + "/dest" + fs.mkdirs(new Path(sourcePath)) + + // First chunk of data + val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + + // Second chunk of data + val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList + val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count() + + // define the source of streaming + val streamingInput = + spark.readStream + .schema(inputDF1.schema) + .json(sourcePath) + + val f1 = Future { + println("streaming starting") + //'writeStream' can be called only on streaming Dataset/DataFrame + streamingInput + .writeStream + .format("org.apache.hudi") + .options(commonOpts) + .trigger(new ProcessingTime(100)) + .option("checkpointLocation", basePath + "/checkpoint") + .outputMode(OutputMode.Append) + .start(destPath) + .awaitTermination(10000) + println("streaming ends") + } + + val f2 = Future { + inputDF1.write.mode(SaveMode.Append).json(sourcePath) + // wait for spark streaming to process one microbatch + val currNumCommits = waitTillAtleastNCommits(fs, destPath, 1, 120, 5) + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, destPath, "000")) + val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, destPath) + // Read RO View + val hoodieROViewDF1 = spark.read.format("org.apache.hudi") + .load(destPath + "/*/*/*/*") + assert(hoodieROViewDF1.count() == 100) + + inputDF2.write.mode(SaveMode.Append).json(sourcePath) + // wait for spark streaming to process one microbatch + waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5) + val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, destPath) + assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size()) + // Read RO View + val hoodieROViewDF2 = spark.read.format("org.apache.hudi") + .load(destPath + "/*/*/*/*") + assertEquals(100, hoodieROViewDF2.count()) // still 100, since we only updated + + + // Read Incremental View + // we have 2 commits, try pulling the first commit (which is not the latest) + val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").get(0) + val hoodieIncViewDF1 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000") + .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, firstCommit) + .load(destPath) + assertEquals(100, hoodieIncViewDF1.count()) + // 100 initial inserts must be pulled + var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect() + assertEquals(1, countsPerCommit.length) + assertEquals(firstCommit, countsPerCommit(0).get(0)) + + // pull the latest commit + val hoodieIncViewDF2 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) + .load(destPath) + + assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must be pulled + countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect() + assertEquals(1, countsPerCommit.length) + assertEquals(commitInstantTime2, countsPerCommit(0).get(0)) + } + Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf) + } + + @throws[InterruptedException] + private def waitTillAtleastNCommits(fs: FileSystem, tablePath: String, + numCommits: Int, timeoutSecs: Int, sleepSecsAfterEachRun: Int) = { + val beginTime = System.currentTimeMillis + var currTime = beginTime + val timeoutMsecs = timeoutSecs * 1000 + var numInstants = 0 + var success = false + while ({!success && (currTime - beginTime) < timeoutMsecs}) try { + val timeline = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, tablePath) + log.info("Timeline :" + timeline.getInstants.toArray) + if (timeline.countInstants >= numCommits) { + numInstants = timeline.countInstants + success = true + } + val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath, true) + } catch { + case te: TableNotFoundException => + log.info("Got table not found exception. Retrying") + } finally { + Thread.sleep(sleepSecsAfterEachRun * 1000) + currTime = System.currentTimeMillis + } + if (!success) throw new IllegalStateException("Timed-out waiting for " + numCommits + " commits to appear in " + tablePath) + numInstants + } +}