From e4a2d98f79f920ea60346fb78ef4b5af1f874afd Mon Sep 17 00:00:00 2001 From: Udit Mehrotra Date: Sun, 9 Aug 2020 14:06:13 -0700 Subject: [PATCH] [HUDI-426] Bootstrap datasource integration (#1702) --- hudi-cli/pom.xml | 4 + hudi-client/pom.xml | 5 + .../bootstrap/BootstrapSchemaProvider.java | 43 +- .../hudi/config/HoodieBootstrapConfig.java | 6 + .../BootstrapCommitActionExecutor.java | 1 + .../org/apache/hudi/avro/HoodieAvroUtils.java | 22 + .../common/model/HoodieCommitMetadata.java | 13 + .../SparkParquetBootstrapDataProvider.java | 3 +- .../org/apache/hudi/AvroConversionUtils.scala | 6 + .../org/apache/hudi/DataSourceOptions.scala | 3 + .../scala/org/apache/hudi/DefaultSource.scala | 93 ++- .../org/apache/hudi/HoodieBootstrapRDD.scala | 131 ++++ .../apache/hudi/HoodieBootstrapRelation.scala | 185 +++++ .../apache/hudi/HoodieSparkSqlWriter.scala | 144 ++-- .../org/apache/hudi/IncrementalRelation.scala | 94 ++- .../TestDataSourceForBootstrap.scala | 630 ++++++++++++++++++ .../functional/TestHoodieDeltaStreamer.java | 17 +- 17 files changed, 1287 insertions(+), 113 deletions(-) create mode 100644 hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala create mode 100644 hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala create mode 100644 hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala diff --git a/hudi-cli/pom.xml b/hudi-cli/pom.xml index 8e80c8499..52b90f445 100644 --- a/hudi-cli/pom.xml +++ b/hudi-cli/pom.xml @@ -217,6 +217,10 @@ org.apache.spark spark-sql_${scala.binary.version} + + org.apache.spark + spark-avro_${scala.binary.version} + org.springframework.shell diff --git a/hudi-client/pom.xml b/hudi-client/pom.xml index 5524cad80..0f0cd213e 100644 --- a/hudi-client/pom.xml +++ b/hudi-client/pom.xml @@ -101,6 +101,11 @@ org.apache.spark spark-sql_${scala.binary.version} + + org.apache.spark + spark-avro_${scala.binary.version} + provided + diff --git a/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/BootstrapSchemaProvider.java b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/BootstrapSchemaProvider.java index 61e29c2cf..2018ca9ba 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/BootstrapSchemaProvider.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/BootstrapSchemaProvider.java @@ -18,17 +18,25 @@ package org.apache.hudi.client.bootstrap; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieFileStatus; import org.apache.hudi.common.bootstrap.FileStatusUtils; import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.parquet.schema.MessageType; import org.apache.spark.api.java.JavaSparkContext; import org.apache.avro.Schema; import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.avro.SchemaConverters; +import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.types.StructType; import java.util.List; +import java.util.Objects; /** * Bootstrap Schema Provider. Schema provided in config is used. If not available, use schema from Parquet @@ -50,7 +58,10 @@ public class BootstrapSchemaProvider { public final Schema getBootstrapSchema(JavaSparkContext jsc, List>> partitions) { if (writeConfig.getSchema() != null) { // Use schema specified by user if set - return Schema.parse(writeConfig.getSchema()); + Schema userSchema = Schema.parse(writeConfig.getSchema()); + if (!HoodieAvroUtils.getNullSchema().equals(userSchema)) { + return userSchema; + } } return getBootstrapSourceSchema(jsc, partitions); } @@ -64,14 +75,26 @@ public class BootstrapSchemaProvider { */ protected Schema getBootstrapSourceSchema(JavaSparkContext jsc, List>> partitions) { - return partitions.stream().flatMap(p -> p.getValue().stream()) - .map(fs -> { - try { - Path filePath = FileStatusUtils.toPath(fs.getPath()); - return ParquetUtils.readAvroSchema(jsc.hadoopConfiguration(), filePath); - } catch (Exception ex) { - return null; - } - }).filter(x -> x != null).findAny().get(); + MessageType parquetSchema = partitions.stream().flatMap(p -> p.getValue().stream()).map(fs -> { + try { + Path filePath = FileStatusUtils.toPath(fs.getPath()); + return ParquetUtils.readSchema(jsc.hadoopConfiguration(), filePath); + } catch (Exception ex) { + return null; + } + }).filter(Objects::nonNull).findAny() + .orElseThrow(() -> new HoodieException("Could not determine schema from the data files.")); + + + ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter( + Boolean.parseBoolean(SQLConf.PARQUET_BINARY_AS_STRING().defaultValueString()), + Boolean.parseBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().defaultValueString())); + StructType sparkSchema = converter.convert(parquetSchema); + String tableName = HoodieAvroUtils.sanitizeName(writeConfig.getTableName()); + String structName = tableName + "_record"; + String recordNamespace = "hoodie." + tableName; + + return SchemaConverters.toAvroType(sparkSchema, false, structName, recordNamespace); } + } diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java index ebfaed0d3..783b34a97 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java @@ -21,6 +21,7 @@ package org.apache.hudi.config; import org.apache.hudi.client.bootstrap.BootstrapMode; import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector; import org.apache.hudi.client.bootstrap.translator.IdentityBootstrapPartitionPathTranslator; +import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex; import org.apache.hudi.common.config.DefaultHoodieConfig; import java.io.File; @@ -52,6 +53,9 @@ public class HoodieBootstrapConfig extends DefaultHoodieConfig { public static final String DEFAULT_BOOTSTRAP_MODE_SELECTOR_REGEX = ".*"; public static final String DEFAULT_BOOTSTRAP_MODE_SELECTOR_REGEX_MODE = BootstrapMode.METADATA_ONLY.name(); + public static final String BOOTSTRAP_INDEX_CLASS_PROP = "hoodie.bootstrap.index.class"; + public static final String DEFAULT_BOOTSTRAP_INDEX_CLASS = HFileBootstrapIndex.class.getName(); + public HoodieBootstrapConfig(Properties props) { super(props); } @@ -129,6 +133,8 @@ public class HoodieBootstrapConfig extends DefaultHoodieConfig { setDefaultOnCondition(props, !props.containsKey(BOOTSTRAP_MODE_SELECTOR_REGEX_MODE), BOOTSTRAP_MODE_SELECTOR_REGEX_MODE, DEFAULT_BOOTSTRAP_MODE_SELECTOR_REGEX_MODE); BootstrapMode.valueOf(props.getProperty(BOOTSTRAP_MODE_SELECTOR_REGEX_MODE)); + setDefaultOnCondition(props, !props.containsKey(BOOTSTRAP_INDEX_CLASS_PROP), BOOTSTRAP_INDEX_CLASS_PROP, + DEFAULT_BOOTSTRAP_INDEX_CLASS); return config; } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapCommitActionExecutor.java index d4be2b3ea..3b7c50112 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapCommitActionExecutor.java @@ -136,6 +136,7 @@ public class BootstrapCommitActionExecutor> } } + @Override protected String getSchemaToStoreInCommit() { return bootstrapSchema; } diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index c6374d7bb..6da6739f1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -69,6 +69,11 @@ public class HoodieAvroUtils { private static ThreadLocal reuseDecoder = ThreadLocal.withInitial(() -> null); + // As per https://avro.apache.org/docs/current/spec.html#names + private static String INVALID_AVRO_CHARS_IN_NAMES = "[^A-Za-z0-9_]"; + private static String INVALID_AVRO_FIRST_CHAR_IN_NAMES = "[^A-Za-z_]"; + private static String MASK_FOR_INVALID_CHARS_IN_NAMES = "__"; + // All metadata fields are optional strings. public static final Schema METADATA_FIELD_SCHEMA = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))); @@ -444,4 +449,21 @@ public class HoodieAvroUtils { } return fieldSchema.getLogicalType() == LogicalTypes.date(); } + + public static Schema getNullSchema() { + return Schema.create(Schema.Type.NULL); + } + + /** + * Sanitizes Name according to Avro rule for names. + * Removes characters other than the ones mentioned in https://avro.apache.org/docs/current/spec.html#names . + * @param name input name + * @return sanitized name + */ + public static String sanitizeName(String name) { + if (name.substring(0,1).matches(INVALID_AVRO_FIRST_CHAR_IN_NAMES)) { + name = name.replaceFirst(INVALID_AVRO_FIRST_CHAR_IN_NAMES, MASK_FOR_INVALID_CHARS_IN_NAMES); + } + return name.replaceAll(INVALID_AVRO_CHARS_IN_NAMES, MASK_FOR_INVALID_CHARS_IN_NAMES); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java index 1d5f238a1..d1e2cc6ad 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.model; +import org.apache.hadoop.fs.Path; import org.apache.hudi.common.fs.FSUtils; import com.fasterxml.jackson.annotation.JsonAutoDetect; @@ -126,6 +127,18 @@ public class HoodieCommitMetadata implements Serializable { return fullPaths; } + public Map getFileGroupIdAndFullPaths(String basePath) { + Map fileGroupIdToFullPaths = new HashMap<>(); + for (Map.Entry> entry : getPartitionToWriteStats().entrySet()) { + for (HoodieWriteStat stat : entry.getValue()) { + HoodieFileGroupId fileGroupId = new HoodieFileGroupId(stat.getPartitionPath(), stat.getFileId()); + Path fullPath = new Path(basePath, stat.getPath()); + fileGroupIdToFullPaths.put(fileGroupId, fullPath.toString()); + } + } + return fileGroupIdToFullPaths; + } + public String toJsonString() throws IOException { if (partitionToWriteStats.containsKey(null)) { LOG.info("partition path is null for " + partitionToWriteStats.get(null)); diff --git a/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java b/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java index 32e523004..3a5a79647 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java +++ b/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java @@ -57,8 +57,9 @@ public class SparkParquetBootstrapDataProvider extends FullRecordBootstrapDataPr public JavaRDD generateInputRecordRDD(String tableName, String sourceBasePath, List>> partitionPathsWithFiles) { String[] filePaths = partitionPathsWithFiles.stream().map(Pair::getValue) - .flatMap(f -> f.stream().map(fs -> FileStatusUtils.toPath(fs.getPath()).toUri().getPath())) + .flatMap(f -> f.stream().map(fs -> FileStatusUtils.toPath(fs.getPath()).toString())) .toArray(String[]::new); + Dataset inputDataset = sparkSession.read().parquet(filePaths); try { KeyGenerator keyGenerator = DataSourceUtils.createKeyGenerator(props); diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index e6d6c55fe..70a135624 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -20,6 +20,7 @@ package org.apache.hudi import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} import org.apache.hudi.common.model.HoodieKey import org.apache.avro.Schema +import org.apache.hudi.avro.HoodieAvroUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.catalyst.encoders.RowEncoder @@ -90,4 +91,9 @@ object AvroConversionUtils { requiredFields.foreach(f => recordBuilder.set(f, record.get(positionIterator.next()))) recordBuilder.build() } + + def getAvroRecordNameAndNamespace(tableName: String): (String, String) = { + val name = HoodieAvroUtils.sanitizeName(tableName) + (s"${name}_record", s"hoodie.${name}") + } } diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 917bfed1d..ec51cb6db 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -59,6 +59,8 @@ object DataSourceReadOptions { val REALTIME_PAYLOAD_COMBINE_OPT_VAL = "payload_combine" val DEFAULT_REALTIME_MERGE_OPT_VAL = REALTIME_PAYLOAD_COMBINE_OPT_VAL + val READ_PATHS_OPT_KEY = "hoodie.datasource.read.paths" + @Deprecated val VIEW_TYPE_OPT_KEY = "hoodie.datasource.view.type" @Deprecated @@ -138,6 +140,7 @@ object DataSourceWriteOptions { val INSERT_OPERATION_OPT_VAL = "insert" val UPSERT_OPERATION_OPT_VAL = "upsert" val DELETE_OPERATION_OPT_VAL = "delete" + val BOOTSTRAP_OPERATION_OPT_VAL = "bootstrap" val DEFAULT_OPERATION_OPT_VAL = UPSERT_OPERATION_OPT_VAL /** diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala index 10be30591..6f4216928 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -17,15 +17,16 @@ package org.apache.hudi +import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceReadOptions._ -import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION_OPT_KEY} +import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.HoodieROTablePathFilter import org.apache.log4j.LogManager import org.apache.spark.sql.execution.datasources.DataSource -import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand import org.apache.spark.sql.execution.streaming.Sink import org.apache.spark.sql.sources._ import org.apache.spark.sql.streaming.OutputMode @@ -57,22 +58,39 @@ class DefaultSource extends RelationProvider val parameters = Map(QUERY_TYPE_OPT_KEY -> DEFAULT_QUERY_TYPE_OPT_VAL) ++ translateViewTypesToQueryTypes(optParams) val path = parameters.get("path") - if (path.isEmpty) { - throw new HoodieException("'path' must be specified.") + val readPathsStr = parameters.get(DataSourceReadOptions.READ_PATHS_OPT_KEY) + if (path.isEmpty && readPathsStr.isEmpty) { + throw new HoodieException(s"'path' or '$READ_PATHS_OPT_KEY' or both must be specified.") } - val fs = FSUtils.getFs(path.get, sqlContext.sparkContext.hadoopConfiguration) - val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(Seq(path.get), fs) + val readPaths = readPathsStr.map(p => p.split(",").toSeq).getOrElse(Seq()) + val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths + + val fs = FSUtils.getFs(allPaths.head, sqlContext.sparkContext.hadoopConfiguration) + val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs) + val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray) + log.info("Obtained hudi table path: " + tablePath) + + val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath) + val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent + log.info("Is bootstrapped table => " + isBootstrappedTable) + if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) { - val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath) if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) { - new MergeOnReadSnapshotRelation(sqlContext, optParams, schema, globPaths, metaClient) + if (isBootstrappedTable) { + // Snapshot query is not supported for Bootstrapped MOR tables + log.warn("Snapshot query is not supported for Bootstrapped Merge-on-Read tables." + + " Falling back to Read Optimized query.") + new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, optParams) + } else { + new MergeOnReadSnapshotRelation(sqlContext, optParams, schema, globPaths, metaClient) + } } else { - getBaseFileOnlyView(sqlContext, parameters, schema) + getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, isBootstrappedTable, globPaths, metaClient) } } else if(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)) { - getBaseFileOnlyView(sqlContext, parameters, schema) + getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, isBootstrappedTable, globPaths, metaClient) } else if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) { new IncrementalRelation(sqlContext, tablePath, optParams, schema) } else { @@ -83,8 +101,8 @@ class DefaultSource extends RelationProvider /** * This DataSource API is used for writing the DataFrame at the destination. For now, we are returning a dummy * relation here because Spark does not really make use of the relation returned, and just returns an empty - * dataset at [[SaveIntoDataSourceCommand.run()]]. This saves us the cost of creating and returning a parquet - * relation here. + * dataset at [[org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run()]]. This saves us the cost + * of creating and returning a parquet relation here. * * TODO: Revisit to return a concrete relation here when we support CREATE TABLE AS for Hudi with DataSource API. * That is the only case where Spark seems to actually need a relation to be returned here @@ -101,7 +119,13 @@ class DefaultSource extends RelationProvider optParams: Map[String, String], df: DataFrame): BaseRelation = { val parameters = HoodieSparkSqlWriter.parametersWithWriteDefaults(optParams) - HoodieSparkSqlWriter.write(sqlContext, mode, parameters, df) + + if (parameters(OPERATION_OPT_KEY).equals(BOOTSTRAP_OPERATION_OPT_VAL)) { + HoodieSparkSqlWriter.bootstrap(sqlContext, mode, parameters, df) + } else { + HoodieSparkSqlWriter.write(sqlContext, mode, parameters, df) + } + new HoodieEmptyRelation(sqlContext, df.schema) } @@ -120,23 +144,34 @@ class DefaultSource extends RelationProvider override def shortName(): String = "hudi" private def getBaseFileOnlyView(sqlContext: SQLContext, - optParams: Map[String, String], - schema: StructType): BaseRelation = { + optParams: Map[String, String], + schema: StructType, + extraReadPaths: Seq[String], + isBootstrappedTable: Boolean, + globPaths: Seq[Path], + metaClient: HoodieTableMetaClient): BaseRelation = { log.warn("Loading Base File Only View.") - // this is just effectively RO view only, where `path` can contain a mix of - // non-hoodie/hoodie path files. set the path filter up - sqlContext.sparkContext.hadoopConfiguration.setClass( - "mapreduce.input.pathFilter.class", - classOf[HoodieROTablePathFilter], - classOf[org.apache.hadoop.fs.PathFilter]) - log.info("Constructing hoodie (as parquet) data source with options :" + optParams) - // simply return as a regular parquet relation - DataSource.apply( - sparkSession = sqlContext.sparkSession, - userSpecifiedSchema = Option(schema), - className = "parquet", - options = optParams) - .resolveRelation() + if (isBootstrappedTable) { + // For bootstrapped tables, use our custom Spark relation for querying + new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, optParams) + } else { + // this is just effectively RO view only, where `path` can contain a mix of + // non-hoodie/hoodie path files. set the path filter up + sqlContext.sparkContext.hadoopConfiguration.setClass( + "mapreduce.input.pathFilter.class", + classOf[HoodieROTablePathFilter], + classOf[org.apache.hadoop.fs.PathFilter]) + + log.info("Constructing hoodie (as parquet) data source with options :" + optParams) + // simply return as a regular parquet relation + DataSource.apply( + sparkSession = sqlContext.sparkSession, + paths = extraReadPaths, + userSpecifiedSchema = Option(schema), + className = "parquet", + options = optParams) + .resolveRelation() + } } } diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala new file mode 100644 index 000000000..a522db6af --- /dev/null +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.vectorized.ColumnarBatch + +class HoodieBootstrapRDD(@transient spark: SparkSession, + dataReadFunction: PartitionedFile => Iterator[Any], + skeletonReadFunction: PartitionedFile => Iterator[Any], + regularReadFunction: PartitionedFile => Iterator[Any], + dataSchema: StructType, + skeletonSchema: StructType, + requiredColumns: Array[String], + tableState: HoodieBootstrapTableState) + extends RDD[InternalRow](spark.sparkContext, Nil) { + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + val bootstrapPartition = split.asInstanceOf[HoodieBootstrapPartition] + + if (log.isDebugEnabled) { + if (bootstrapPartition.split.skeletonFile.isDefined) { + logDebug("Got Split => Index: " + bootstrapPartition.index + ", Data File: " + + bootstrapPartition.split.dataFile.filePath + ", Skeleton File: " + + bootstrapPartition.split.skeletonFile.get.filePath) + } else { + logDebug("Got Split => Index: " + bootstrapPartition.index + ", Data File: " + + bootstrapPartition.split.dataFile.filePath) + } + } + + var partitionedFileIterator: Iterator[InternalRow] = null + + if (bootstrapPartition.split.skeletonFile.isDefined) { + // It is a bootstrap split. Check both skeleton and data files. + if (dataSchema.isEmpty) { + // No data column to fetch, hence fetch only from skeleton file + partitionedFileIterator = read(bootstrapPartition.split.skeletonFile.get, skeletonReadFunction) + } else if (skeletonSchema.isEmpty) { + // No metadata column to fetch, hence fetch only from data file + partitionedFileIterator = read(bootstrapPartition.split.dataFile, dataReadFunction) + } else { + // Fetch from both data and skeleton file, and merge + val dataFileIterator = read(bootstrapPartition.split.dataFile, dataReadFunction) + val skeletonFileIterator = read(bootstrapPartition.split.skeletonFile.get, skeletonReadFunction) + partitionedFileIterator = merge(skeletonFileIterator, dataFileIterator) + } + } else { + partitionedFileIterator = read(bootstrapPartition.split.dataFile, regularReadFunction) + } + partitionedFileIterator + } + + def merge(skeletonFileIterator: Iterator[InternalRow], dataFileIterator: Iterator[InternalRow]) + : Iterator[InternalRow] = { + new Iterator[InternalRow] { + override def hasNext: Boolean = dataFileIterator.hasNext && skeletonFileIterator.hasNext + override def next(): InternalRow = { + mergeInternalRow(skeletonFileIterator.next(), dataFileIterator.next()) + } + } + } + + def mergeInternalRow(skeletonRow: InternalRow, dataRow: InternalRow): InternalRow = { + val skeletonArr = skeletonRow.copy().toSeq(skeletonSchema) + val dataArr = dataRow.copy().toSeq(dataSchema) + // We need to return it in the order requested + val mergedArr = requiredColumns.map(col => { + if (skeletonSchema.fieldNames.contains(col)) { + val idx = skeletonSchema.fieldIndex(col) + skeletonArr(idx) + } else { + val idx = dataSchema.fieldIndex(col) + dataArr(idx) + } + }) + + logDebug("Merged data and skeleton values => " + mergedArr.mkString(",")) + val mergedRow = InternalRow.fromSeq(mergedArr) + mergedRow + } + + def read(partitionedFile: PartitionedFile, readFileFunction: PartitionedFile => Iterator[Any]) + : Iterator[InternalRow] = { + val fileIterator = readFileFunction(partitionedFile) + + import scala.collection.JavaConverters._ + + val rows = fileIterator.flatMap(_ match { + case r: InternalRow => Seq(r) + case b: ColumnarBatch => b.rowIterator().asScala + }) + rows + } + + override protected def getPartitions: Array[Partition] = { + tableState.files.zipWithIndex.map(file => { + if (file._1.skeletonFile.isDefined) { + logDebug("Forming partition with => Index: " + file._2 + ", Files: " + file._1.dataFile.filePath + + "," + file._1.skeletonFile.get.filePath) + HoodieBootstrapPartition(file._2, file._1) + } else { + logDebug("Forming partition with => Index: " + file._2 + ", File: " + file._1.dataFile.filePath) + HoodieBootstrapPartition(file._2, file._1) + } + }).toArray + } +} + +case class HoodieBootstrapPartition(index: Int, split: HoodieBootstrapSplit) extends Partition diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala new file mode 100644 index 000000000..a1e9947ca --- /dev/null +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hadoop.fs.Path +import org.apache.hudi.common.model.HoodieBaseFile +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.exception.HoodieException +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} +import org.apache.spark.sql.types.StructType + +import scala.collection.JavaConverters._ + +/** + * This is Spark relation that can be used for querying metadata/fully bootstrapped query hoodie tables, as well as + * non-bootstrapped tables. It implements PrunedFilteredScan interface in order to support column pruning and filter + * push-down. For metadata bootstrapped files, if we query columns from both metadata and actual data then it will + * perform a merge of both to return the result. + * + * Caveat: Filter push-down does not work when querying both metadata and actual data columns over metadata + * bootstrapped files, because then the metadata file and data file can return different number of rows causing errors + * merging. + * + * @param _sqlContext Spark SQL Context + * @param userSchema User specified schema in the datasource query + * @param globPaths Globbed paths obtained from the user provided path for querying + * @param metaClient Hoodie table meta client + * @param optParams DataSource options passed by the user + */ +class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext, + val userSchema: StructType, + val globPaths: Seq[Path], + val metaClient: HoodieTableMetaClient, + val optParams: Map[String, String]) extends BaseRelation + with PrunedFilteredScan with Logging { + + val skeletonSchema: StructType = HoodieSparkUtils.getMetaSchema + var dataSchema: StructType = _ + var fullSchema: StructType = _ + + val fileIndex: HoodieBootstrapFileIndex = buildFileIndex() + + override def sqlContext: SQLContext = _sqlContext + + override val needConversion: Boolean = false + + override def schema: StructType = inferFullSchema() + + override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + logInfo("Starting scan..") + + // Compute splits + val bootstrapSplits = fileIndex.files.map(hoodieBaseFile => { + var skeletonFile: Option[PartitionedFile] = Option.empty + var dataFile: PartitionedFile = null + + if (hoodieBaseFile.getBootstrapBaseFile.isPresent) { + skeletonFile = Option(PartitionedFile(InternalRow.empty, hoodieBaseFile.getPath, 0, hoodieBaseFile.getFileLen)) + dataFile = PartitionedFile(InternalRow.empty, hoodieBaseFile.getBootstrapBaseFile.get().getPath, 0, + hoodieBaseFile.getBootstrapBaseFile.get().getFileLen) + } else { + dataFile = PartitionedFile(InternalRow.empty, hoodieBaseFile.getPath, 0, hoodieBaseFile.getFileLen) + } + HoodieBootstrapSplit(dataFile, skeletonFile) + }) + val tableState = HoodieBootstrapTableState(bootstrapSplits) + + // Get required schemas for column pruning + var requiredDataSchema = StructType(Seq()) + var requiredSkeletonSchema = StructType(Seq()) + requiredColumns.foreach(col => { + var field = dataSchema.find(_.name == col) + if (field.isDefined) { + requiredDataSchema = requiredDataSchema.add(field.get) + } else { + field = skeletonSchema.find(_.name == col) + requiredSkeletonSchema = requiredSkeletonSchema.add(field.get) + } + }) + + // Prepare readers for reading data file and skeleton files + val dataReadFunction = new ParquetFileFormat() + .buildReaderWithPartitionValues( + sparkSession = _sqlContext.sparkSession, + dataSchema = dataSchema, + partitionSchema = StructType(Seq.empty), + requiredSchema = requiredDataSchema, + filters = if (requiredSkeletonSchema.isEmpty) filters else Seq() , + options = Map.empty, + hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf() + ) + + val skeletonReadFunction = new ParquetFileFormat() + .buildReaderWithPartitionValues( + sparkSession = _sqlContext.sparkSession, + dataSchema = skeletonSchema, + partitionSchema = StructType(Seq.empty), + requiredSchema = requiredSkeletonSchema, + filters = if (requiredDataSchema.isEmpty) filters else Seq(), + options = Map.empty, + hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf() + ) + + val regularReadFunction = new ParquetFileFormat() + .buildReaderWithPartitionValues( + sparkSession = _sqlContext.sparkSession, + dataSchema = fullSchema, + partitionSchema = StructType(Seq.empty), + requiredSchema = StructType(requiredSkeletonSchema.fields ++ requiredDataSchema.fields), + filters = filters, + options = Map.empty, + hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()) + + val rdd = new HoodieBootstrapRDD(_sqlContext.sparkSession, dataReadFunction, skeletonReadFunction, + regularReadFunction, requiredDataSchema, requiredSkeletonSchema, requiredColumns, tableState) + rdd.asInstanceOf[RDD[Row]] + } + + def inferFullSchema(): StructType = { + if (fullSchema == null) { + logInfo("Inferring schema..") + val schemaResolver = new TableSchemaResolver(metaClient) + val tableSchema = schemaResolver.getTableAvroSchemaWithoutMetadataFields + dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema) + fullSchema = StructType(skeletonSchema.fields ++ dataSchema.fields) + } + fullSchema + } + + def buildFileIndex(): HoodieBootstrapFileIndex = { + logInfo("Building file index..") + val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(_sqlContext.sparkSession, globPaths) + val fileStatuses = inMemoryFileIndex.allFiles() + + if (fileStatuses.isEmpty) { + throw new HoodieException("No files found for reading in user provided path.") + } + + val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitsTimeline + .filterCompletedInstants, fileStatuses.toArray) + val latestFiles: List[HoodieBaseFile] = fsView.getLatestBaseFiles.iterator().asScala.toList + + if (log.isDebugEnabled) { + latestFiles.foreach(file => { + logDebug("Printing indexed files:") + if (file.getBootstrapBaseFile.isPresent) { + logDebug("Skeleton File: " + file.getPath + ", Data File: " + file.getBootstrapBaseFile.get().getPath) + } else { + logDebug("Regular Hoodie File: " + file.getPath) + } + }) + } + + HoodieBootstrapFileIndex(latestFiles) + } +} + +case class HoodieBootstrapFileIndex(files: List[HoodieBaseFile]) + +case class HoodieBootstrapTableState(files: List[HoodieBootstrapSplit]) + +case class HoodieBootstrapSplit(dataFile: PartitionedFile, skeletonFile: Option[PartitionedFile]) diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index d82481a80..d66981ebd 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -34,11 +34,13 @@ import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.common.util.ReflectionUtils +import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, BOOTSTRAP_INDEX_CLASS_PROP, DEFAULT_BOOTSTRAP_INDEX_CLASS} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool} import org.apache.hudi.sync.common.AbstractSyncTool import org.apache.log4j.LogManager +import org.apache.spark.SparkContext import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} @@ -49,12 +51,13 @@ import scala.collection.mutable.ListBuffer private[hudi] object HoodieSparkSqlWriter { private val log = LogManager.getLogger(getClass) + private var tableExists: Boolean = false def write(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], df: DataFrame, - hoodieTableConfig: Option[HoodieTableConfig] = Option.empty, + hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty, hoodieWriteClient: Option[HoodieWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty, asyncCompactionTriggerFn: Option[Function1[HoodieWriteClient[HoodieRecordPayload[Nothing]], Unit]] = Option.empty ) @@ -90,32 +93,23 @@ private[hudi] object HoodieSparkSqlWriter { } val jsc = new JavaSparkContext(sparkContext) - val basePath = new Path(parameters("path")) + val basePath = new Path(path.get) val instantTime = HoodieActiveTimeline.createNewInstantTime() val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) - var exists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) - var tableConfig : HoodieTableConfig = if (exists) { - hoodieTableConfig.getOrElse( - new HoodieTableMetaClient(sparkContext.hadoopConfiguration, path.get).getTableConfig) - } else { - null - } + tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) + var tableConfig = getHoodieTableConfig(sparkContext, path.get, hoodieTableConfigOpt) - if (mode == SaveMode.Ignore && exists) { + if (mode == SaveMode.Ignore && tableExists) { log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.") (false, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig) } else { - if (exists && mode == SaveMode.Append) { - val existingTableName = tableConfig.getTableName - if (!existingTableName.equals(tblName)) { - throw new HoodieException(s"hoodie table with name $existingTableName already exist at $basePath") - } - } + // Handle various save modes + handleSaveModes(mode, basePath, tableConfig, tblName, operation, fs) + val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) = if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) { // register classes & schemas - val structName = s"${tblName}_record" - val nameSpace = s"hoodie.${tblName}" + val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName) sparkContext.getConf.registerKryoClasses( Array(classOf[org.apache.avro.generic.GenericData], classOf[org.apache.avro.Schema])) @@ -134,23 +128,11 @@ private[hudi] object HoodieSparkSqlWriter { parameters(PAYLOAD_CLASS_OPT_KEY)) }).toJavaRDD() - // Handle various save modes - if (mode == SaveMode.ErrorIfExists && exists) { - throw new HoodieException(s"hoodie table at $basePath already exists.") - } - - if (mode == SaveMode.Overwrite && exists) { - log.warn(s"hoodie table at $basePath already exists. Deleting existing data & overwriting with new data.") - fs.delete(basePath, true) - exists = false - } - // Create the table if not present - if (!exists) { - //FIXME(bootstrap): bootstrapIndexClass needs to be set when bootstrap index class is integrated. - val tableMetaClient = HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration, - path.get, HoodieTableType.valueOf(tableType), - tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), null, null, null) + if (!tableExists) { + val tableMetaClient = HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, + HoodieTableType.valueOf(tableType), tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), + null.asInstanceOf[String]) tableConfig = tableMetaClient.getTableConfig } @@ -179,12 +161,6 @@ private[hudi] object HoodieSparkSqlWriter { val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation) (writeStatuses, client) } else { - - // Handle save modes - if (mode != SaveMode.Append) { - throw new HoodieException(s"Append is the only save mode applicable for $operation operation") - } - val structName = s"${tblName}_record" val nameSpace = s"hoodie.${tblName}" sparkContext.getConf.registerKryoClasses( @@ -196,7 +172,7 @@ private[hudi] object HoodieSparkSqlWriter { val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace) val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD() - if (!exists) { + if (!tableExists) { throw new HoodieException(s"hoodie table at $basePath does not exist") } @@ -224,6 +200,56 @@ private[hudi] object HoodieSparkSqlWriter { } } + def bootstrap(sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + df: DataFrame, + hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty): Boolean = { + + val sparkContext = sqlContext.sparkContext + val path = parameters.getOrElse("path", throw new HoodieException("'path' must be set.")) + val tableName = parameters.getOrElse(HoodieWriteConfig.TABLE_NAME, + throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}' must be set.")) + val tableType = parameters(TABLE_TYPE_OPT_KEY) + val bootstrapBasePath = parameters.getOrElse(BOOTSTRAP_BASE_PATH_PROP, + throw new HoodieException(s"'${BOOTSTRAP_BASE_PATH_PROP}' is required for '${BOOTSTRAP_OPERATION_OPT_VAL}'" + + " operation'")) + val bootstrapIndexClass = parameters.getOrDefault(BOOTSTRAP_INDEX_CLASS_PROP, DEFAULT_BOOTSTRAP_INDEX_CLASS) + + var schema: String = null + if (df.schema.nonEmpty) { + val (structName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName) + schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, namespace).toString + } else { + schema = HoodieAvroUtils.getNullSchema.toString + } + + val basePath = new Path(path) + val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) + tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) + val tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt) + + // Handle various save modes + if (mode == SaveMode.Ignore && tableExists) { + log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.") + false + } else { + handleSaveModes(mode, basePath, tableConfig, tableName, BOOTSTRAP_OPERATION_OPT_VAL, fs) + } + + if (!tableExists) { + HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration, path, + HoodieTableType.valueOf(tableType), tableName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), + null, bootstrapIndexClass, bootstrapBasePath) + } + + val jsc = new JavaSparkContext(sqlContext.sparkContext) + val writeClient = DataSourceUtils.createHoodieClient(jsc, schema, path, tableName, mapAsJavaMap(parameters)) + writeClient.bootstrap(org.apache.hudi.common.util.Option.empty()) + val metaSyncSuccess = metaSync(parameters, basePath, jsc.hadoopConfiguration) + metaSyncSuccess + } + /** * Add default options for unspecified write options keys. * @@ -267,6 +293,31 @@ private[hudi] object HoodieSparkSqlWriter { props } + private def handleSaveModes(mode: SaveMode, tablePath: Path, tableConfig: HoodieTableConfig, tableName: String, + operation: String, fs: FileSystem): Unit = { + if (mode == SaveMode.Append && tableExists) { + val existingTableName = tableConfig.getTableName + if (!existingTableName.equals(tableName)) { + throw new HoodieException(s"hoodie table with name $existingTableName already exist at $tablePath") + } + } + + if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) { + if (mode == SaveMode.ErrorIfExists && tableExists) { + throw new HoodieException(s"hoodie table at $tablePath already exists.") + } else if (mode == SaveMode.Overwrite && tableExists) { + log.warn(s"hoodie table at $tablePath already exists. Deleting existing data & overwriting with new data.") + fs.delete(tablePath, true) + tableExists = false + } + } else { + // Delete Operation only supports Append mode + if (mode != SaveMode.Append) { + throw new HoodieException(s"Append is the only save mode applicable for $operation operation") + } + } + } + private def syncHive(basePath: Path, fs: FileSystem, parameters: Map[String, String]): Boolean = { val hiveSyncConfig: HiveSyncConfig = buildSyncConfig(basePath, parameters) val hiveConf: HiveConf = new HiveConf() @@ -403,4 +454,15 @@ private[hudi] object HoodieSparkSqlWriter { false } } + + private def getHoodieTableConfig(sparkContext: SparkContext, + tablePath: String, + hoodieTableConfigOpt: Option[HoodieTableConfig]): HoodieTableConfig = { + if (tableExists) { + hoodieTableConfigOpt.getOrElse( + new HoodieTableMetaClient(sparkContext.hadoopConfiguration, tablePath).getTableConfig) + } else { + null + } + } } diff --git a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala index 338a54eda..e894b0653 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -17,9 +17,17 @@ package org.apache.hudi + +import com.google.common.collect.Lists +import org.apache.avro.Schema +import org.apache.hadoop.fs.GlobPattern +import org.apache.hadoop.fs.Path import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.common.bootstrap.index.BootstrapIndex import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieTableType} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.timeline.HoodieTimeline +import org.apache.hudi.common.util.ParquetUtils import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException import org.apache.hudi.table.HoodieTable @@ -28,8 +36,8 @@ import org.apache.hadoop.fs.GlobPattern import org.apache.log4j.LogManager import org.apache.spark.rdd.RDD import org.apache.spark.sql.sources.{BaseRelation, TableScan} -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.{DataFrame, Row, SQLContext} import scala.collection.JavaConversions._ import scala.collection.mutable @@ -47,8 +55,10 @@ class IncrementalRelation(val sqlContext: SQLContext, private val log = LogManager.getLogger(classOf[IncrementalRelation]) - private val metaClient = - new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true) + + val skeletonSchema: StructType = HoodieSparkUtils.getMetaSchema + private val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true) + // MOR tables not supported yet if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) { throw new HoodieException("Incremental view not implemented yet, for merge-on-read tables") @@ -72,13 +82,16 @@ class IncrementalRelation(val sqlContext: SQLContext, optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, lastInstant.getTimestamp)) .getInstants.iterator().toList - // use schema from latest metadata, if not present, read schema from the data file - private val latestSchema = { - val schemaUtil = new TableSchemaResolver(metaClient) - val tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields); - AvroConversionUtils.convertAvroSchemaToStructType(tableSchema) + // use schema from a file produced in the latest instant + val latestSchema: StructType = { + log.info("Inferring schema..") + val schemaResolver = new TableSchemaResolver(metaClient) + val tableSchema = schemaResolver.getTableAvroSchemaWithoutMetadataFields + val dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema) + StructType(skeletonSchema.fields ++ dataSchema.fields) } + private val filters = { if (optParams.contains(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY)) { val filterStr = optParams.getOrElse( @@ -93,36 +106,69 @@ class IncrementalRelation(val sqlContext: SQLContext, override def schema: StructType = latestSchema override def buildScan(): RDD[Row] = { - val fileIdToFullPath = mutable.HashMap[String, String]() + val regularFileIdToFullPath = mutable.HashMap[String, String]() + var metaBootstrapFileIdToFullPath = mutable.HashMap[String, String]() + for (commit <- commitsToReturn) { val metadata: HoodieCommitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit) .get, classOf[HoodieCommitMetadata]) - fileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap + + if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == commit.getTimestamp) { + metaBootstrapFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap + } else { + regularFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap + } } + + if (metaBootstrapFileIdToFullPath.nonEmpty) { + // filer out meta bootstrap files that have had more commits since metadata bootstrap + metaBootstrapFileIdToFullPath = metaBootstrapFileIdToFullPath + .filterNot(fileIdFullPath => regularFileIdToFullPath.contains(fileIdFullPath._1)) + } + val pathGlobPattern = optParams.getOrElse( DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL) - val filteredFullPath = if(!pathGlobPattern.equals(DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)) { - val globMatcher = new GlobPattern("*" + pathGlobPattern) - fileIdToFullPath.filter(p => globMatcher.matches(p._2)) - } else { - fileIdToFullPath + val (filteredRegularFullPaths, filteredMetaBootstrapFullPaths) = { + if(!pathGlobPattern.equals(DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)) { + val globMatcher = new GlobPattern("*" + pathGlobPattern) + (regularFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values, + metaBootstrapFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values) + } else { + (regularFileIdToFullPath.values, metaBootstrapFileIdToFullPath.values) + } } // unset the path filter, otherwise if end_instant_time is not the latest instant, path filter set for RO view // will filter out all the files incorrectly. sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class") val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path")) - if (filteredFullPath.isEmpty) { + if (filteredRegularFullPaths.isEmpty && filteredMetaBootstrapFullPaths.isEmpty) { sqlContext.sparkContext.emptyRDD[Row] } else { log.info("Additional Filters to be applied to incremental source are :" + filters) - filters.foldLeft(sqlContext.read.options(sOpts) - .schema(latestSchema) - .parquet(filteredFullPath.values.toList: _*) - .filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp)) - .filter(String.format("%s <= '%s'", - HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp)))((e, f) => e.filter(f)) - .toDF().rdd + + var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], latestSchema) + + if (metaBootstrapFileIdToFullPath.nonEmpty) { + df = sqlContext.sparkSession.read + .format("hudi") + .schema(latestSchema) + .option(DataSourceReadOptions.READ_PATHS_OPT_KEY, filteredMetaBootstrapFullPaths.mkString(",")) + .load() + } + + if (regularFileIdToFullPath.nonEmpty) + { + df = df.union(sqlContext.read.options(sOpts) + .schema(latestSchema) + .parquet(filteredRegularFullPaths.toList: _*) + .filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + commitsToReturn.head.getTimestamp)) + .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + commitsToReturn.last.getTimestamp))) + } + + filters.foldLeft(df)((e, f) => e.filter(f)).rdd } } } diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala new file mode 100644 index 000000000..f24e5ad5b --- /dev/null +++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala @@ -0,0 +1,630 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import java.time.Instant +import java.util.Collections + +import collection.JavaConverters._ +import org.apache.hadoop.fs.FileSystem +import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider +import org.apache.hudi.client.TestBootstrap +import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.table.timeline.HoodieTimeline +import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieCompactionConfig, HoodieWriteConfig} +import org.apache.hudi.keygen.SimpleKeyGenerator +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.functions.{col, lit} +import org.apache.spark.sql.{SaveMode, SparkSession} +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.junit.jupiter.api.io.TempDir + +class TestDataSourceForBootstrap { + + var spark: SparkSession = _ + val commonOpts = Map( + HoodieWriteConfig.INSERT_PARALLELISM -> "4", + HoodieWriteConfig.UPSERT_PARALLELISM -> "4", + HoodieWriteConfig.DELETE_PARALLELISM -> "4", + HoodieWriteConfig.BULKINSERT_PARALLELISM -> "4", + HoodieWriteConfig.FINALIZE_WRITE_PARALLELISM -> "4", + HoodieBootstrapConfig.BOOTSTRAP_PARALLELISM -> "4", + DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition", + DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp", + HoodieWriteConfig.TABLE_NAME -> "hoodie_test" + ) + var basePath: String = _ + var srcPath: String = _ + var fs: FileSystem = _ + + @BeforeEach def initialize(@TempDir tempDir: java.nio.file.Path) { + spark = SparkSession.builder + .appName("Hoodie Datasource test") + .master("local[2]") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .getOrCreate + basePath = tempDir.toAbsolutePath.toString + "/base" + srcPath = tempDir.toAbsolutePath.toString + "/src" + fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration) + } + + @AfterEach def tearDown(): Unit ={ + // Close spark session + if (spark != null) { + spark.stop() + spark = null + } + + // Close file system + if (fs != null) { + fs.close() + fs = null + } + } + + @Test def testMetadataBootstrapCOWNonPartitioned(): Unit = { + val timestamp = Instant.now.toEpochMilli + val numRecords = 100 + val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext) + + val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, Collections.emptyList(), jsc, + spark.sqlContext) + + // Write source data non-partitioned + sourceDF.write + .format("parquet") + .mode(SaveMode.Overwrite) + .save(srcPath) + + // Perform bootstrap + val bootstrapDF = spark.emptyDataFrame + bootstrapDF.write + .format("hudi") + .options(commonOpts) + .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test") + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key") + .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath) + .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, "org.apache.hudi.keygen.NonpartitionedKeyGenerator") + .mode(SaveMode.Overwrite) + .save(basePath) + + val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath) + assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1) + + // Read bootstrapped table and verify count + var hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*") + assertEquals(numRecords, hoodieROViewDF1.count()) + + // Perform upsert + val updateTimestamp = Instant.now.toEpochMilli + val numRecordsUpdate = 10 + val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, + Collections.emptyList(), jsc, spark.sqlContext) + + updateDF.write + .format("hudi") + .options(commonOpts) + .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test") + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) + .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key") + .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp") + .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator") + .mode(SaveMode.Append) + .save(basePath) + + val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, basePath) + assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size()) + + // Read table after upsert and verify count + hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*") + assertEquals(numRecords, hoodieROViewDF1.count()) + assertEquals(numRecordsUpdate, hoodieROViewDF1.filter(s"timestamp == $updateTimestamp").count()) + + // incrementally pull only changes in the bootstrap commit, which would pull all the initial records written + // during bootstrap + val hoodieIncViewDF1 = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000") + .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commitInstantTime1) + .load(basePath) + + assertEquals(numRecords, hoodieIncViewDF1.count()) + var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect(); + assertEquals(1, countsPerCommit.length) + assertEquals(commitInstantTime1, countsPerCommit(0).get(0)) + + // incrementally pull only changes in the latest commit, which would pull only the updated records in the + // latest commit + val hoodieIncViewDF2 = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) + .load(basePath); + + assertEquals(numRecordsUpdate, hoodieIncViewDF2.count()) + countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect(); + assertEquals(1, countsPerCommit.length) + assertEquals(commitInstantTime2, countsPerCommit(0).get(0)) + } + + @Test def testMetadataBootstrapCOWHiveStylePartitioned(): Unit = { + val timestamp = Instant.now.toEpochMilli + val numRecords = 100 + val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03") + val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext) + + val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc, + spark.sqlContext) + + // Write source data hive style partitioned + sourceDF.write + .partitionBy("datestr") + .format("parquet") + .mode(SaveMode.Overwrite) + .save(srcPath) + + // Perform bootstrap + val bootstrapDF = spark.emptyDataFrame + bootstrapDF.write + .format("hudi") + .options(commonOpts) + .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test") + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key") + .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath) + .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, "org.apache.hudi.keygen.SimpleKeyGenerator") + .mode(SaveMode.Overwrite) + .save(basePath) + + val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath) + assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1) + + // Read bootstrapped table and verify count + val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*") + assertEquals(numRecords, hoodieROViewDF1.count()) + + // Perform upsert + val updateTimestamp = Instant.now.toEpochMilli + val numRecordsUpdate = 10 + val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava, + jsc, spark.sqlContext) + + updateDF.write + .format("hudi") + .options(commonOpts) + .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test") + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) + .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key") + .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp") + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr") + // Required because source data is hive style partitioned + .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true") + .mode(SaveMode.Append) + .save(basePath) + + val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, basePath) + assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size()) + + // Read table after upsert and verify count + val hoodieROViewDF2 = spark.read.format("hudi").load(basePath + "/*") + assertEquals(numRecords, hoodieROViewDF2.count()) + assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count()) + + // incrementally pull only changes in the bootstrap commit, which would pull all the initial records written + // during bootstrap + val hoodieIncViewDF1 = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000") + .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commitInstantTime1) + .load(basePath) + + assertEquals(numRecords, hoodieIncViewDF1.count()) + var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect(); + assertEquals(1, countsPerCommit.length) + assertEquals(commitInstantTime1, countsPerCommit(0).get(0)) + + // incrementally pull only changes in the latest commit, which would pull only the updated records in the + // latest commit + val hoodieIncViewDF2 = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) + .load(basePath); + + assertEquals(numRecordsUpdate, hoodieIncViewDF2.count()) + countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect(); + assertEquals(1, countsPerCommit.length) + assertEquals(commitInstantTime2, countsPerCommit(0).get(0)) + + // pull the latest commit within certain partitions + val hoodieIncViewDF3 = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) + .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/datestr=2020-04-02/*") + .load(basePath) + + assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2020-04-02")).count(), + hoodieIncViewDF3.count()) + } + + @Test def testMetadataBootstrapCOWPartitioned(): Unit = { + val timestamp = Instant.now.toEpochMilli + val numRecords = 100 + val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03") + val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext) + + var sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc, + spark.sqlContext) + + // Writing data for each partition instead of using partitionBy to avoid hive-style partitioning and hence + // have partitioned columns stored in the data file + partitionPaths.foreach(partitionPath => { + sourceDF + .filter(sourceDF("datestr").equalTo(lit(partitionPath))) + .write + .format("parquet") + .mode(SaveMode.Overwrite) + .save(srcPath + "/" + partitionPath) + }) + + // Perform bootstrap + val bootstrapDF = spark.emptyDataFrame + bootstrapDF.write + .format("hudi") + .options(commonOpts) + .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test") + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key") + .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath) + .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, "org.apache.hudi.keygen.SimpleKeyGenerator") + .mode(SaveMode.Overwrite) + .save(basePath) + + val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath) + assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1) + + // Read bootstrapped table and verify count + val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*") + assertEquals(numRecords, hoodieROViewDF1.count()) + + // Perform upsert + val updateTimestamp = Instant.now.toEpochMilli + val numRecordsUpdate = 10 + var updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava, + jsc, spark.sqlContext) + + updateDF.write + .format("hudi") + .options(commonOpts) + .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test") + .option("hoodie.upsert.shuffle.parallelism", "4") + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) + .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key") + .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp") + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr") + .mode(SaveMode.Append) + .save(basePath) + + val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, basePath) + assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size()) + + // Read table after upsert and verify count + val hoodieROViewDF2 = spark.read.format("hudi").load(basePath + "/*") + assertEquals(numRecords, hoodieROViewDF2.count()) + assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count()) + + // incrementally pull only changes in the bootstrap commit, which would pull all the initial records written + // during bootstrap + val hoodieIncViewDF1 = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000") + .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commitInstantTime1) + .load(basePath) + + assertEquals(numRecords, hoodieIncViewDF1.count()) + var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect(); + assertEquals(1, countsPerCommit.length) + assertEquals(commitInstantTime1, countsPerCommit(0).get(0)) + + // incrementally pull only changes in the latest commit, which would pull only the updated records in the + // latest commit + val hoodieIncViewDF2 = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) + .load(basePath); + + assertEquals(numRecordsUpdate, hoodieIncViewDF2.count()) + countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect(); + assertEquals(1, countsPerCommit.length) + assertEquals(commitInstantTime2, countsPerCommit(0).get(0)) + + // pull the latest commit within certain partitions + val hoodieIncViewDF3 = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) + .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/2020-04-02/*") + .load(basePath) + + assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2020-04-02")).count(), + hoodieIncViewDF3.count()) + } + + @Test def testMetadataBootstrapMORPartitionedInlineCompactionOn(): Unit = { + val timestamp = Instant.now.toEpochMilli + val numRecords = 100 + val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03") + val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext) + + val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc, + spark.sqlContext) + + // Writing data for each partition instead of using partitionBy to avoid hive-style partitioning and hence + // have partitioned columns stored in the data file + partitionPaths.foreach(partitionPath => { + sourceDF + .filter(sourceDF("datestr").equalTo(lit(partitionPath))) + .write + .format("parquet") + .mode(SaveMode.Overwrite) + .save(srcPath + "/" + partitionPath) + }) + + // Perform bootstrap + val bootstrapDF = spark.emptyDataFrame + bootstrapDF.write + .format("hudi") + .options(commonOpts) + .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test") + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key") + .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath) + .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, "org.apache.hudi.keygen.SimpleKeyGenerator") + .mode(SaveMode.Overwrite) + .save(basePath) + + val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath) + assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1) + + // Read bootstrapped table and verify count + val hoodieROViewDF1 = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, + DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) + .load(basePath + "/*") + assertEquals(numRecords, hoodieROViewDF1.count()) + + // Perform upsert + val updateTimestamp = Instant.now.toEpochMilli + val numRecordsUpdate = 10 + val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava, + jsc, spark.sqlContext) + + updateDF.write + .format("hudi") + .options(commonOpts) + .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test") + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key") + .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp") + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr") + .option(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true") + .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, "1") + .mode(SaveMode.Append) + .save(basePath) + + // Expect 2 new commits since meta bootstrap - delta commit and compaction commit (due to inline compaction) + assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size()) + + // Read table after upsert and verify count. Since we have inline compaction enabled the RO view will have + // the updated rows. + val hoodieROViewDF2 = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, + DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) + .load(basePath + "/*") + assertEquals(numRecords, hoodieROViewDF2.count()) + assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count()) + } + + @Test def testMetadataBootstrapMORPartitioned(): Unit = { + val timestamp = Instant.now.toEpochMilli + val numRecords = 100 + val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03") + val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext) + + val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc, + spark.sqlContext) + + // Writing data for each partition instead of using partitionBy to avoid hive-style partitioning and hence + // have partitioned columns stored in the data file + partitionPaths.foreach(partitionPath => { + sourceDF + .filter(sourceDF("datestr").equalTo(lit(partitionPath))) + .write + .format("parquet") + .mode(SaveMode.Overwrite) + .save(srcPath + "/" + partitionPath) + }) + + // Perform bootstrap + val bootstrapDF = spark.emptyDataFrame + bootstrapDF.write + .format("hudi") + .options(commonOpts) + .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test") + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key") + .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath) + .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, "org.apache.hudi.keygen.SimpleKeyGenerator") + .mode(SaveMode.Overwrite) + .save(basePath) + + val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath) + assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1) + + // Read bootstrapped table and verify count + val hoodieROViewDF1 = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, + DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) + .load(basePath + "/*") + assertEquals(numRecords, hoodieROViewDF1.count()) + + // Perform upsert + val updateTimestamp = Instant.now.toEpochMilli + val numRecordsUpdate = 10 + val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, + partitionPaths.asJava, jsc, spark.sqlContext) + + updateDF.write + .format("hudi") + .options(commonOpts) + .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test") + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key") + .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp") + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr") + .mode(SaveMode.Append) + .save(basePath) + + // Expect 1 new commit since meta bootstrap - delta commit (because inline compaction is off) + assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size()) + + // Read table after upsert and verify count. Since we have inline compaction off the RO view will have + // no updated rows. + val hoodieROViewDF2 = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, + DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) + .load(basePath + "/*") + assertEquals(numRecords, hoodieROViewDF2.count()) + assertEquals(0, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count()) + } + + @Test def testFullBootstrapCOWPartitioned(): Unit = { + val timestamp = Instant.now.toEpochMilli + val numRecords = 100 + val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03") + val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext) + + val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc, + spark.sqlContext) + + // Writing data for each partition instead of using partitionBy to avoid hive-style partitioning and hence + // have partitioned columns stored in the data file + partitionPaths.foreach(partitionPath => { + sourceDF + .filter(sourceDF("datestr").equalTo(lit(partitionPath))) + .write + .format("parquet") + .mode(SaveMode.Overwrite) + .save(srcPath + "/" + partitionPath) + }) + + // Perform bootstrap + val bootstrapDF = spark.emptyDataFrame + bootstrapDF.write + .format("hudi") + .options(commonOpts) + .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test") + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key") + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr") + .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp") + .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath) + .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, classOf[SimpleKeyGenerator].getName) + .option(HoodieBootstrapConfig.BOOTSTRAP_MODE_SELECTOR, classOf[FullRecordBootstrapModeSelector].getName) + .option(HoodieBootstrapConfig.FULL_BOOTSTRAP_INPUT_PROVIDER, classOf[SparkParquetBootstrapDataProvider].getName) + .mode(SaveMode.Overwrite) + .save(basePath) + + val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath) + assertEquals(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS, commitInstantTime1) + + // Read bootstrapped table and verify count + val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*") + assertEquals(numRecords, hoodieROViewDF1.count()) + + // Perform upsert + val updateTimestamp = Instant.now.toEpochMilli + val numRecordsUpdate = 10 + val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava, + jsc, spark.sqlContext) + + updateDF.write + .format("hudi") + .options(commonOpts) + .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test") + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) + .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key") + .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp") + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr") + .mode(SaveMode.Append) + .save(basePath) + + val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, basePath) + assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size()) + + // Read table after upsert and verify count + val hoodieROViewDF2 = spark.read.format("hudi").load(basePath + "/*") + assertEquals(numRecords, hoodieROViewDF2.count()) + assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count()) + + // incrementally pull only changes in the bootstrap commit, which would pull all the initial records written + // during bootstrap + val hoodieIncViewDF1 = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000") + .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commitInstantTime1) + .load(basePath) + + assertEquals(numRecords, hoodieIncViewDF1.count()) + var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect(); + assertEquals(1, countsPerCommit.length) + assertEquals(commitInstantTime1, countsPerCommit(0).get(0)) + + // incrementally pull only changes in the latest commit, which would pull only the updated records in the + // latest commit + val hoodieIncViewDF2 = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) + .load(basePath); + + assertEquals(numRecordsUpdate, hoodieIncViewDF2.count()) + countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect(); + assertEquals(1, countsPerCommit.length) + assertEquals(commitInstantTime2, countsPerCommit(0).get(0)) + + // pull the latest commit within certain partitions + val hoodieIncViewDF3 = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) + .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/2020-04-02/*") + .load(basePath) + + assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2020-04-02")).count(), + hoodieIncViewDF3.count()) + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 5c7ac9319..6577c09ad 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -581,8 +581,10 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { // Perform bootstrap with tableBasePath as source String bootstrapSourcePath = dfsBasePath + "/src_bootstrapped"; - sqlContext.read().format("org.apache.hudi").load(tableBasePath + "/*/*.parquet").write().format("parquet") - .save(bootstrapSourcePath); + Dataset sourceDf = sqlContext.read() + .format("org.apache.hudi") + .load(tableBasePath + "/*/*.parquet"); + sourceDf.write().format("parquet").save(bootstrapSourcePath); String newDatasetBasePath = dfsBasePath + "/test_dataset_bootstrapped"; cfg.runBootstrap = true; @@ -600,12 +602,11 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { assertEquals(1950, sqlContext.sql("select distinct _hoodie_record_key from bootstrapped").count()); StructField[] fields = res.schema().fields(); - assertEquals(5, fields.length); - assertEquals(HoodieRecord.COMMIT_TIME_METADATA_FIELD, fields[0].name()); - assertEquals(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, fields[1].name()); - assertEquals(HoodieRecord.RECORD_KEY_METADATA_FIELD, fields[2].name()); - assertEquals(HoodieRecord.PARTITION_PATH_METADATA_FIELD, fields[3].name()); - assertEquals(HoodieRecord.FILENAME_METADATA_FIELD, fields[4].name()); + List fieldNames = Arrays.asList(res.schema().fieldNames()); + List expectedFieldNames = Arrays.asList(sourceDf.schema().fieldNames()); + assertEquals(expectedFieldNames.size(), fields.length); + assertTrue(fieldNames.containsAll(HoodieRecord.HOODIE_META_COLUMNS)); + assertTrue(fieldNames.containsAll(expectedFieldNames)); } @Test