diff --git a/hudi-cli/pom.xml b/hudi-cli/pom.xml
index 8e80c8499..52b90f445 100644
--- a/hudi-cli/pom.xml
+++ b/hudi-cli/pom.xml
@@ -217,6 +217,10 @@
org.apache.spark
spark-sql_${scala.binary.version}
+
+ org.apache.spark
+ spark-avro_${scala.binary.version}
+
org.springframework.shell
diff --git a/hudi-client/pom.xml b/hudi-client/pom.xml
index 5524cad80..0f0cd213e 100644
--- a/hudi-client/pom.xml
+++ b/hudi-client/pom.xml
@@ -101,6 +101,11 @@
org.apache.spark
spark-sql_${scala.binary.version}
+
+ org.apache.spark
+ spark-avro_${scala.binary.version}
+ provided
+
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/BootstrapSchemaProvider.java b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/BootstrapSchemaProvider.java
index 61e29c2cf..2018ca9ba 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/BootstrapSchemaProvider.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/BootstrapSchemaProvider.java
@@ -18,17 +18,25 @@
package org.apache.hudi.client.bootstrap;
+import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.common.bootstrap.FileStatusUtils;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.parquet.schema.MessageType;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
+import org.apache.spark.sql.avro.SchemaConverters;
+import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.StructType;
import java.util.List;
+import java.util.Objects;
/**
* Bootstrap Schema Provider. Schema provided in config is used. If not available, use schema from Parquet
@@ -50,7 +58,10 @@ public class BootstrapSchemaProvider {
public final Schema getBootstrapSchema(JavaSparkContext jsc, List>> partitions) {
if (writeConfig.getSchema() != null) {
// Use schema specified by user if set
- return Schema.parse(writeConfig.getSchema());
+ Schema userSchema = Schema.parse(writeConfig.getSchema());
+ if (!HoodieAvroUtils.getNullSchema().equals(userSchema)) {
+ return userSchema;
+ }
}
return getBootstrapSourceSchema(jsc, partitions);
}
@@ -64,14 +75,26 @@ public class BootstrapSchemaProvider {
*/
protected Schema getBootstrapSourceSchema(JavaSparkContext jsc,
List>> partitions) {
- return partitions.stream().flatMap(p -> p.getValue().stream())
- .map(fs -> {
- try {
- Path filePath = FileStatusUtils.toPath(fs.getPath());
- return ParquetUtils.readAvroSchema(jsc.hadoopConfiguration(), filePath);
- } catch (Exception ex) {
- return null;
- }
- }).filter(x -> x != null).findAny().get();
+ MessageType parquetSchema = partitions.stream().flatMap(p -> p.getValue().stream()).map(fs -> {
+ try {
+ Path filePath = FileStatusUtils.toPath(fs.getPath());
+ return ParquetUtils.readSchema(jsc.hadoopConfiguration(), filePath);
+ } catch (Exception ex) {
+ return null;
+ }
+ }).filter(Objects::nonNull).findAny()
+ .orElseThrow(() -> new HoodieException("Could not determine schema from the data files."));
+
+
+ ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(
+ Boolean.parseBoolean(SQLConf.PARQUET_BINARY_AS_STRING().defaultValueString()),
+ Boolean.parseBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().defaultValueString()));
+ StructType sparkSchema = converter.convert(parquetSchema);
+ String tableName = HoodieAvroUtils.sanitizeName(writeConfig.getTableName());
+ String structName = tableName + "_record";
+ String recordNamespace = "hoodie." + tableName;
+
+ return SchemaConverters.toAvroType(sparkSchema, false, structName, recordNamespace);
}
+
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
index ebfaed0d3..783b34a97 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
@@ -21,6 +21,7 @@ package org.apache.hudi.config;
import org.apache.hudi.client.bootstrap.BootstrapMode;
import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector;
import org.apache.hudi.client.bootstrap.translator.IdentityBootstrapPartitionPathTranslator;
+import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
import org.apache.hudi.common.config.DefaultHoodieConfig;
import java.io.File;
@@ -52,6 +53,9 @@ public class HoodieBootstrapConfig extends DefaultHoodieConfig {
public static final String DEFAULT_BOOTSTRAP_MODE_SELECTOR_REGEX = ".*";
public static final String DEFAULT_BOOTSTRAP_MODE_SELECTOR_REGEX_MODE = BootstrapMode.METADATA_ONLY.name();
+ public static final String BOOTSTRAP_INDEX_CLASS_PROP = "hoodie.bootstrap.index.class";
+ public static final String DEFAULT_BOOTSTRAP_INDEX_CLASS = HFileBootstrapIndex.class.getName();
+
public HoodieBootstrapConfig(Properties props) {
super(props);
}
@@ -129,6 +133,8 @@ public class HoodieBootstrapConfig extends DefaultHoodieConfig {
setDefaultOnCondition(props, !props.containsKey(BOOTSTRAP_MODE_SELECTOR_REGEX_MODE),
BOOTSTRAP_MODE_SELECTOR_REGEX_MODE, DEFAULT_BOOTSTRAP_MODE_SELECTOR_REGEX_MODE);
BootstrapMode.valueOf(props.getProperty(BOOTSTRAP_MODE_SELECTOR_REGEX_MODE));
+ setDefaultOnCondition(props, !props.containsKey(BOOTSTRAP_INDEX_CLASS_PROP), BOOTSTRAP_INDEX_CLASS_PROP,
+ DEFAULT_BOOTSTRAP_INDEX_CLASS);
return config;
}
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapCommitActionExecutor.java
index d4be2b3ea..3b7c50112 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapCommitActionExecutor.java
@@ -136,6 +136,7 @@ public class BootstrapCommitActionExecutor>
}
}
+ @Override
protected String getSchemaToStoreInCommit() {
return bootstrapSchema;
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index c6374d7bb..6da6739f1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -69,6 +69,11 @@ public class HoodieAvroUtils {
private static ThreadLocal reuseDecoder = ThreadLocal.withInitial(() -> null);
+ // As per https://avro.apache.org/docs/current/spec.html#names
+ private static String INVALID_AVRO_CHARS_IN_NAMES = "[^A-Za-z0-9_]";
+ private static String INVALID_AVRO_FIRST_CHAR_IN_NAMES = "[^A-Za-z_]";
+ private static String MASK_FOR_INVALID_CHARS_IN_NAMES = "__";
+
// All metadata fields are optional strings.
public static final Schema METADATA_FIELD_SCHEMA =
Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)));
@@ -444,4 +449,21 @@ public class HoodieAvroUtils {
}
return fieldSchema.getLogicalType() == LogicalTypes.date();
}
+
+ public static Schema getNullSchema() {
+ return Schema.create(Schema.Type.NULL);
+ }
+
+ /**
+ * Sanitizes Name according to Avro rule for names.
+ * Removes characters other than the ones mentioned in https://avro.apache.org/docs/current/spec.html#names .
+ * @param name input name
+ * @return sanitized name
+ */
+ public static String sanitizeName(String name) {
+ if (name.substring(0,1).matches(INVALID_AVRO_FIRST_CHAR_IN_NAMES)) {
+ name = name.replaceFirst(INVALID_AVRO_FIRST_CHAR_IN_NAMES, MASK_FOR_INVALID_CHARS_IN_NAMES);
+ }
+ return name.replaceAll(INVALID_AVRO_CHARS_IN_NAMES, MASK_FOR_INVALID_CHARS_IN_NAMES);
+ }
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
index 1d5f238a1..d1e2cc6ad 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
@@ -18,6 +18,7 @@
package org.apache.hudi.common.model;
+import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
@@ -126,6 +127,18 @@ public class HoodieCommitMetadata implements Serializable {
return fullPaths;
}
+ public Map getFileGroupIdAndFullPaths(String basePath) {
+ Map fileGroupIdToFullPaths = new HashMap<>();
+ for (Map.Entry> entry : getPartitionToWriteStats().entrySet()) {
+ for (HoodieWriteStat stat : entry.getValue()) {
+ HoodieFileGroupId fileGroupId = new HoodieFileGroupId(stat.getPartitionPath(), stat.getFileId());
+ Path fullPath = new Path(basePath, stat.getPath());
+ fileGroupIdToFullPaths.put(fileGroupId, fullPath.toString());
+ }
+ }
+ return fileGroupIdToFullPaths;
+ }
+
public String toJsonString() throws IOException {
if (partitionToWriteStats.containsKey(null)) {
LOG.info("partition path is null for " + partitionToWriteStats.get(null));
diff --git a/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java b/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java
index 32e523004..3a5a79647 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java
@@ -57,8 +57,9 @@ public class SparkParquetBootstrapDataProvider extends FullRecordBootstrapDataPr
public JavaRDD generateInputRecordRDD(String tableName, String sourceBasePath,
List>> partitionPathsWithFiles) {
String[] filePaths = partitionPathsWithFiles.stream().map(Pair::getValue)
- .flatMap(f -> f.stream().map(fs -> FileStatusUtils.toPath(fs.getPath()).toUri().getPath()))
+ .flatMap(f -> f.stream().map(fs -> FileStatusUtils.toPath(fs.getPath()).toString()))
.toArray(String[]::new);
+
Dataset inputDataset = sparkSession.read().parquet(filePaths);
try {
KeyGenerator keyGenerator = DataSourceUtils.createKeyGenerator(props);
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index e6d6c55fe..70a135624 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -20,6 +20,7 @@ package org.apache.hudi
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
import org.apache.hudi.common.model.HoodieKey
import org.apache.avro.Schema
+import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.encoders.RowEncoder
@@ -90,4 +91,9 @@ object AvroConversionUtils {
requiredFields.foreach(f => recordBuilder.set(f, record.get(positionIterator.next())))
recordBuilder.build()
}
+
+ def getAvroRecordNameAndNamespace(tableName: String): (String, String) = {
+ val name = HoodieAvroUtils.sanitizeName(tableName)
+ (s"${name}_record", s"hoodie.${name}")
+ }
}
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 917bfed1d..ec51cb6db 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -59,6 +59,8 @@ object DataSourceReadOptions {
val REALTIME_PAYLOAD_COMBINE_OPT_VAL = "payload_combine"
val DEFAULT_REALTIME_MERGE_OPT_VAL = REALTIME_PAYLOAD_COMBINE_OPT_VAL
+ val READ_PATHS_OPT_KEY = "hoodie.datasource.read.paths"
+
@Deprecated
val VIEW_TYPE_OPT_KEY = "hoodie.datasource.view.type"
@Deprecated
@@ -138,6 +140,7 @@ object DataSourceWriteOptions {
val INSERT_OPERATION_OPT_VAL = "insert"
val UPSERT_OPERATION_OPT_VAL = "upsert"
val DELETE_OPERATION_OPT_VAL = "delete"
+ val BOOTSTRAP_OPERATION_OPT_VAL = "bootstrap"
val DEFAULT_OPERATION_OPT_VAL = UPSERT_OPERATION_OPT_VAL
/**
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
index 10be30591..6f4216928 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -17,15 +17,16 @@
package org.apache.hudi
+import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceReadOptions._
-import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION_OPT_KEY}
+import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.log4j.LogManager
import org.apache.spark.sql.execution.datasources.DataSource
-import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
@@ -57,22 +58,39 @@ class DefaultSource extends RelationProvider
val parameters = Map(QUERY_TYPE_OPT_KEY -> DEFAULT_QUERY_TYPE_OPT_VAL) ++ translateViewTypesToQueryTypes(optParams)
val path = parameters.get("path")
- if (path.isEmpty) {
- throw new HoodieException("'path' must be specified.")
+ val readPathsStr = parameters.get(DataSourceReadOptions.READ_PATHS_OPT_KEY)
+ if (path.isEmpty && readPathsStr.isEmpty) {
+ throw new HoodieException(s"'path' or '$READ_PATHS_OPT_KEY' or both must be specified.")
}
- val fs = FSUtils.getFs(path.get, sqlContext.sparkContext.hadoopConfiguration)
- val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(Seq(path.get), fs)
+ val readPaths = readPathsStr.map(p => p.split(",").toSeq).getOrElse(Seq())
+ val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths
+
+ val fs = FSUtils.getFs(allPaths.head, sqlContext.sparkContext.hadoopConfiguration)
+ val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs)
+
val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray)
+ log.info("Obtained hudi table path: " + tablePath)
+
+ val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath)
+ val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent
+ log.info("Is bootstrapped table => " + isBootstrappedTable)
+
if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) {
- val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath)
if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
- new MergeOnReadSnapshotRelation(sqlContext, optParams, schema, globPaths, metaClient)
+ if (isBootstrappedTable) {
+ // Snapshot query is not supported for Bootstrapped MOR tables
+ log.warn("Snapshot query is not supported for Bootstrapped Merge-on-Read tables." +
+ " Falling back to Read Optimized query.")
+ new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, optParams)
+ } else {
+ new MergeOnReadSnapshotRelation(sqlContext, optParams, schema, globPaths, metaClient)
+ }
} else {
- getBaseFileOnlyView(sqlContext, parameters, schema)
+ getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, isBootstrappedTable, globPaths, metaClient)
}
} else if(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)) {
- getBaseFileOnlyView(sqlContext, parameters, schema)
+ getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, isBootstrappedTable, globPaths, metaClient)
} else if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
new IncrementalRelation(sqlContext, tablePath, optParams, schema)
} else {
@@ -83,8 +101,8 @@ class DefaultSource extends RelationProvider
/**
* This DataSource API is used for writing the DataFrame at the destination. For now, we are returning a dummy
* relation here because Spark does not really make use of the relation returned, and just returns an empty
- * dataset at [[SaveIntoDataSourceCommand.run()]]. This saves us the cost of creating and returning a parquet
- * relation here.
+ * dataset at [[org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run()]]. This saves us the cost
+ * of creating and returning a parquet relation here.
*
* TODO: Revisit to return a concrete relation here when we support CREATE TABLE AS for Hudi with DataSource API.
* That is the only case where Spark seems to actually need a relation to be returned here
@@ -101,7 +119,13 @@ class DefaultSource extends RelationProvider
optParams: Map[String, String],
df: DataFrame): BaseRelation = {
val parameters = HoodieSparkSqlWriter.parametersWithWriteDefaults(optParams)
- HoodieSparkSqlWriter.write(sqlContext, mode, parameters, df)
+
+ if (parameters(OPERATION_OPT_KEY).equals(BOOTSTRAP_OPERATION_OPT_VAL)) {
+ HoodieSparkSqlWriter.bootstrap(sqlContext, mode, parameters, df)
+ } else {
+ HoodieSparkSqlWriter.write(sqlContext, mode, parameters, df)
+ }
+
new HoodieEmptyRelation(sqlContext, df.schema)
}
@@ -120,23 +144,34 @@ class DefaultSource extends RelationProvider
override def shortName(): String = "hudi"
private def getBaseFileOnlyView(sqlContext: SQLContext,
- optParams: Map[String, String],
- schema: StructType): BaseRelation = {
+ optParams: Map[String, String],
+ schema: StructType,
+ extraReadPaths: Seq[String],
+ isBootstrappedTable: Boolean,
+ globPaths: Seq[Path],
+ metaClient: HoodieTableMetaClient): BaseRelation = {
log.warn("Loading Base File Only View.")
- // this is just effectively RO view only, where `path` can contain a mix of
- // non-hoodie/hoodie path files. set the path filter up
- sqlContext.sparkContext.hadoopConfiguration.setClass(
- "mapreduce.input.pathFilter.class",
- classOf[HoodieROTablePathFilter],
- classOf[org.apache.hadoop.fs.PathFilter])
- log.info("Constructing hoodie (as parquet) data source with options :" + optParams)
- // simply return as a regular parquet relation
- DataSource.apply(
- sparkSession = sqlContext.sparkSession,
- userSpecifiedSchema = Option(schema),
- className = "parquet",
- options = optParams)
- .resolveRelation()
+ if (isBootstrappedTable) {
+ // For bootstrapped tables, use our custom Spark relation for querying
+ new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, optParams)
+ } else {
+ // this is just effectively RO view only, where `path` can contain a mix of
+ // non-hoodie/hoodie path files. set the path filter up
+ sqlContext.sparkContext.hadoopConfiguration.setClass(
+ "mapreduce.input.pathFilter.class",
+ classOf[HoodieROTablePathFilter],
+ classOf[org.apache.hadoop.fs.PathFilter])
+
+ log.info("Constructing hoodie (as parquet) data source with options :" + optParams)
+ // simply return as a regular parquet relation
+ DataSource.apply(
+ sparkSession = sqlContext.sparkSession,
+ paths = extraReadPaths,
+ userSpecifiedSchema = Option(schema),
+ className = "parquet",
+ options = optParams)
+ .resolveRelation()
+ }
}
}
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala
new file mode 100644
index 000000000..a522db6af
--- /dev/null
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+class HoodieBootstrapRDD(@transient spark: SparkSession,
+ dataReadFunction: PartitionedFile => Iterator[Any],
+ skeletonReadFunction: PartitionedFile => Iterator[Any],
+ regularReadFunction: PartitionedFile => Iterator[Any],
+ dataSchema: StructType,
+ skeletonSchema: StructType,
+ requiredColumns: Array[String],
+ tableState: HoodieBootstrapTableState)
+ extends RDD[InternalRow](spark.sparkContext, Nil) {
+
+ override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+ val bootstrapPartition = split.asInstanceOf[HoodieBootstrapPartition]
+
+ if (log.isDebugEnabled) {
+ if (bootstrapPartition.split.skeletonFile.isDefined) {
+ logDebug("Got Split => Index: " + bootstrapPartition.index + ", Data File: "
+ + bootstrapPartition.split.dataFile.filePath + ", Skeleton File: "
+ + bootstrapPartition.split.skeletonFile.get.filePath)
+ } else {
+ logDebug("Got Split => Index: " + bootstrapPartition.index + ", Data File: "
+ + bootstrapPartition.split.dataFile.filePath)
+ }
+ }
+
+ var partitionedFileIterator: Iterator[InternalRow] = null
+
+ if (bootstrapPartition.split.skeletonFile.isDefined) {
+ // It is a bootstrap split. Check both skeleton and data files.
+ if (dataSchema.isEmpty) {
+ // No data column to fetch, hence fetch only from skeleton file
+ partitionedFileIterator = read(bootstrapPartition.split.skeletonFile.get, skeletonReadFunction)
+ } else if (skeletonSchema.isEmpty) {
+ // No metadata column to fetch, hence fetch only from data file
+ partitionedFileIterator = read(bootstrapPartition.split.dataFile, dataReadFunction)
+ } else {
+ // Fetch from both data and skeleton file, and merge
+ val dataFileIterator = read(bootstrapPartition.split.dataFile, dataReadFunction)
+ val skeletonFileIterator = read(bootstrapPartition.split.skeletonFile.get, skeletonReadFunction)
+ partitionedFileIterator = merge(skeletonFileIterator, dataFileIterator)
+ }
+ } else {
+ partitionedFileIterator = read(bootstrapPartition.split.dataFile, regularReadFunction)
+ }
+ partitionedFileIterator
+ }
+
+ def merge(skeletonFileIterator: Iterator[InternalRow], dataFileIterator: Iterator[InternalRow])
+ : Iterator[InternalRow] = {
+ new Iterator[InternalRow] {
+ override def hasNext: Boolean = dataFileIterator.hasNext && skeletonFileIterator.hasNext
+ override def next(): InternalRow = {
+ mergeInternalRow(skeletonFileIterator.next(), dataFileIterator.next())
+ }
+ }
+ }
+
+ def mergeInternalRow(skeletonRow: InternalRow, dataRow: InternalRow): InternalRow = {
+ val skeletonArr = skeletonRow.copy().toSeq(skeletonSchema)
+ val dataArr = dataRow.copy().toSeq(dataSchema)
+ // We need to return it in the order requested
+ val mergedArr = requiredColumns.map(col => {
+ if (skeletonSchema.fieldNames.contains(col)) {
+ val idx = skeletonSchema.fieldIndex(col)
+ skeletonArr(idx)
+ } else {
+ val idx = dataSchema.fieldIndex(col)
+ dataArr(idx)
+ }
+ })
+
+ logDebug("Merged data and skeleton values => " + mergedArr.mkString(","))
+ val mergedRow = InternalRow.fromSeq(mergedArr)
+ mergedRow
+ }
+
+ def read(partitionedFile: PartitionedFile, readFileFunction: PartitionedFile => Iterator[Any])
+ : Iterator[InternalRow] = {
+ val fileIterator = readFileFunction(partitionedFile)
+
+ import scala.collection.JavaConverters._
+
+ val rows = fileIterator.flatMap(_ match {
+ case r: InternalRow => Seq(r)
+ case b: ColumnarBatch => b.rowIterator().asScala
+ })
+ rows
+ }
+
+ override protected def getPartitions: Array[Partition] = {
+ tableState.files.zipWithIndex.map(file => {
+ if (file._1.skeletonFile.isDefined) {
+ logDebug("Forming partition with => Index: " + file._2 + ", Files: " + file._1.dataFile.filePath
+ + "," + file._1.skeletonFile.get.filePath)
+ HoodieBootstrapPartition(file._2, file._1)
+ } else {
+ logDebug("Forming partition with => Index: " + file._2 + ", File: " + file._1.dataFile.filePath)
+ HoodieBootstrapPartition(file._2, file._1)
+ }
+ }).toArray
+ }
+}
+
+case class HoodieBootstrapPartition(index: Int, split: HoodieBootstrapSplit) extends Partition
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
new file mode 100644
index 000000000..a1e9947ca
--- /dev/null
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+/**
+ * This is Spark relation that can be used for querying metadata/fully bootstrapped query hoodie tables, as well as
+ * non-bootstrapped tables. It implements PrunedFilteredScan interface in order to support column pruning and filter
+ * push-down. For metadata bootstrapped files, if we query columns from both metadata and actual data then it will
+ * perform a merge of both to return the result.
+ *
+ * Caveat: Filter push-down does not work when querying both metadata and actual data columns over metadata
+ * bootstrapped files, because then the metadata file and data file can return different number of rows causing errors
+ * merging.
+ *
+ * @param _sqlContext Spark SQL Context
+ * @param userSchema User specified schema in the datasource query
+ * @param globPaths Globbed paths obtained from the user provided path for querying
+ * @param metaClient Hoodie table meta client
+ * @param optParams DataSource options passed by the user
+ */
+class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,
+ val userSchema: StructType,
+ val globPaths: Seq[Path],
+ val metaClient: HoodieTableMetaClient,
+ val optParams: Map[String, String]) extends BaseRelation
+ with PrunedFilteredScan with Logging {
+
+ val skeletonSchema: StructType = HoodieSparkUtils.getMetaSchema
+ var dataSchema: StructType = _
+ var fullSchema: StructType = _
+
+ val fileIndex: HoodieBootstrapFileIndex = buildFileIndex()
+
+ override def sqlContext: SQLContext = _sqlContext
+
+ override val needConversion: Boolean = false
+
+ override def schema: StructType = inferFullSchema()
+
+ override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
+ logInfo("Starting scan..")
+
+ // Compute splits
+ val bootstrapSplits = fileIndex.files.map(hoodieBaseFile => {
+ var skeletonFile: Option[PartitionedFile] = Option.empty
+ var dataFile: PartitionedFile = null
+
+ if (hoodieBaseFile.getBootstrapBaseFile.isPresent) {
+ skeletonFile = Option(PartitionedFile(InternalRow.empty, hoodieBaseFile.getPath, 0, hoodieBaseFile.getFileLen))
+ dataFile = PartitionedFile(InternalRow.empty, hoodieBaseFile.getBootstrapBaseFile.get().getPath, 0,
+ hoodieBaseFile.getBootstrapBaseFile.get().getFileLen)
+ } else {
+ dataFile = PartitionedFile(InternalRow.empty, hoodieBaseFile.getPath, 0, hoodieBaseFile.getFileLen)
+ }
+ HoodieBootstrapSplit(dataFile, skeletonFile)
+ })
+ val tableState = HoodieBootstrapTableState(bootstrapSplits)
+
+ // Get required schemas for column pruning
+ var requiredDataSchema = StructType(Seq())
+ var requiredSkeletonSchema = StructType(Seq())
+ requiredColumns.foreach(col => {
+ var field = dataSchema.find(_.name == col)
+ if (field.isDefined) {
+ requiredDataSchema = requiredDataSchema.add(field.get)
+ } else {
+ field = skeletonSchema.find(_.name == col)
+ requiredSkeletonSchema = requiredSkeletonSchema.add(field.get)
+ }
+ })
+
+ // Prepare readers for reading data file and skeleton files
+ val dataReadFunction = new ParquetFileFormat()
+ .buildReaderWithPartitionValues(
+ sparkSession = _sqlContext.sparkSession,
+ dataSchema = dataSchema,
+ partitionSchema = StructType(Seq.empty),
+ requiredSchema = requiredDataSchema,
+ filters = if (requiredSkeletonSchema.isEmpty) filters else Seq() ,
+ options = Map.empty,
+ hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
+ )
+
+ val skeletonReadFunction = new ParquetFileFormat()
+ .buildReaderWithPartitionValues(
+ sparkSession = _sqlContext.sparkSession,
+ dataSchema = skeletonSchema,
+ partitionSchema = StructType(Seq.empty),
+ requiredSchema = requiredSkeletonSchema,
+ filters = if (requiredDataSchema.isEmpty) filters else Seq(),
+ options = Map.empty,
+ hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
+ )
+
+ val regularReadFunction = new ParquetFileFormat()
+ .buildReaderWithPartitionValues(
+ sparkSession = _sqlContext.sparkSession,
+ dataSchema = fullSchema,
+ partitionSchema = StructType(Seq.empty),
+ requiredSchema = StructType(requiredSkeletonSchema.fields ++ requiredDataSchema.fields),
+ filters = filters,
+ options = Map.empty,
+ hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf())
+
+ val rdd = new HoodieBootstrapRDD(_sqlContext.sparkSession, dataReadFunction, skeletonReadFunction,
+ regularReadFunction, requiredDataSchema, requiredSkeletonSchema, requiredColumns, tableState)
+ rdd.asInstanceOf[RDD[Row]]
+ }
+
+ def inferFullSchema(): StructType = {
+ if (fullSchema == null) {
+ logInfo("Inferring schema..")
+ val schemaResolver = new TableSchemaResolver(metaClient)
+ val tableSchema = schemaResolver.getTableAvroSchemaWithoutMetadataFields
+ dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+ fullSchema = StructType(skeletonSchema.fields ++ dataSchema.fields)
+ }
+ fullSchema
+ }
+
+ def buildFileIndex(): HoodieBootstrapFileIndex = {
+ logInfo("Building file index..")
+ val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(_sqlContext.sparkSession, globPaths)
+ val fileStatuses = inMemoryFileIndex.allFiles()
+
+ if (fileStatuses.isEmpty) {
+ throw new HoodieException("No files found for reading in user provided path.")
+ }
+
+ val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitsTimeline
+ .filterCompletedInstants, fileStatuses.toArray)
+ val latestFiles: List[HoodieBaseFile] = fsView.getLatestBaseFiles.iterator().asScala.toList
+
+ if (log.isDebugEnabled) {
+ latestFiles.foreach(file => {
+ logDebug("Printing indexed files:")
+ if (file.getBootstrapBaseFile.isPresent) {
+ logDebug("Skeleton File: " + file.getPath + ", Data File: " + file.getBootstrapBaseFile.get().getPath)
+ } else {
+ logDebug("Regular Hoodie File: " + file.getPath)
+ }
+ })
+ }
+
+ HoodieBootstrapFileIndex(latestFiles)
+ }
+}
+
+case class HoodieBootstrapFileIndex(files: List[HoodieBaseFile])
+
+case class HoodieBootstrapTableState(files: List[HoodieBootstrapSplit])
+
+case class HoodieBootstrapSplit(dataFile: PartitionedFile, skeletonFile: Option[PartitionedFile])
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index d82481a80..d66981ebd 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -34,11 +34,13 @@ import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.util.ReflectionUtils
+import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, BOOTSTRAP_INDEX_CLASS_PROP, DEFAULT_BOOTSTRAP_INDEX_CLASS}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
import org.apache.hudi.sync.common.AbstractSyncTool
import org.apache.log4j.LogManager
+import org.apache.spark.SparkContext
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
@@ -49,12 +51,13 @@ import scala.collection.mutable.ListBuffer
private[hudi] object HoodieSparkSqlWriter {
private val log = LogManager.getLogger(getClass)
+ private var tableExists: Boolean = false
def write(sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
df: DataFrame,
- hoodieTableConfig: Option[HoodieTableConfig] = Option.empty,
+ hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
hoodieWriteClient: Option[HoodieWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty,
asyncCompactionTriggerFn: Option[Function1[HoodieWriteClient[HoodieRecordPayload[Nothing]], Unit]] = Option.empty
)
@@ -90,32 +93,23 @@ private[hudi] object HoodieSparkSqlWriter {
}
val jsc = new JavaSparkContext(sparkContext)
- val basePath = new Path(parameters("path"))
+ val basePath = new Path(path.get)
val instantTime = HoodieActiveTimeline.createNewInstantTime()
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
- var exists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
- var tableConfig : HoodieTableConfig = if (exists) {
- hoodieTableConfig.getOrElse(
- new HoodieTableMetaClient(sparkContext.hadoopConfiguration, path.get).getTableConfig)
- } else {
- null
- }
+ tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
+ var tableConfig = getHoodieTableConfig(sparkContext, path.get, hoodieTableConfigOpt)
- if (mode == SaveMode.Ignore && exists) {
+ if (mode == SaveMode.Ignore && tableExists) {
log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
(false, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
} else {
- if (exists && mode == SaveMode.Append) {
- val existingTableName = tableConfig.getTableName
- if (!existingTableName.equals(tblName)) {
- throw new HoodieException(s"hoodie table with name $existingTableName already exist at $basePath")
- }
- }
+ // Handle various save modes
+ handleSaveModes(mode, basePath, tableConfig, tblName, operation, fs)
+
val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
// register classes & schemas
- val structName = s"${tblName}_record"
- val nameSpace = s"hoodie.${tblName}"
+ val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
@@ -134,23 +128,11 @@ private[hudi] object HoodieSparkSqlWriter {
parameters(PAYLOAD_CLASS_OPT_KEY))
}).toJavaRDD()
- // Handle various save modes
- if (mode == SaveMode.ErrorIfExists && exists) {
- throw new HoodieException(s"hoodie table at $basePath already exists.")
- }
-
- if (mode == SaveMode.Overwrite && exists) {
- log.warn(s"hoodie table at $basePath already exists. Deleting existing data & overwriting with new data.")
- fs.delete(basePath, true)
- exists = false
- }
-
// Create the table if not present
- if (!exists) {
- //FIXME(bootstrap): bootstrapIndexClass needs to be set when bootstrap index class is integrated.
- val tableMetaClient = HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration,
- path.get, HoodieTableType.valueOf(tableType),
- tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), null, null, null)
+ if (!tableExists) {
+ val tableMetaClient = HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get,
+ HoodieTableType.valueOf(tableType), tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY),
+ null.asInstanceOf[String])
tableConfig = tableMetaClient.getTableConfig
}
@@ -179,12 +161,6 @@ private[hudi] object HoodieSparkSqlWriter {
val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)
(writeStatuses, client)
} else {
-
- // Handle save modes
- if (mode != SaveMode.Append) {
- throw new HoodieException(s"Append is the only save mode applicable for $operation operation")
- }
-
val structName = s"${tblName}_record"
val nameSpace = s"hoodie.${tblName}"
sparkContext.getConf.registerKryoClasses(
@@ -196,7 +172,7 @@ private[hudi] object HoodieSparkSqlWriter {
val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD()
- if (!exists) {
+ if (!tableExists) {
throw new HoodieException(s"hoodie table at $basePath does not exist")
}
@@ -224,6 +200,56 @@ private[hudi] object HoodieSparkSqlWriter {
}
}
+ def bootstrap(sqlContext: SQLContext,
+ mode: SaveMode,
+ parameters: Map[String, String],
+ df: DataFrame,
+ hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty): Boolean = {
+
+ val sparkContext = sqlContext.sparkContext
+ val path = parameters.getOrElse("path", throw new HoodieException("'path' must be set."))
+ val tableName = parameters.getOrElse(HoodieWriteConfig.TABLE_NAME,
+ throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}' must be set."))
+ val tableType = parameters(TABLE_TYPE_OPT_KEY)
+ val bootstrapBasePath = parameters.getOrElse(BOOTSTRAP_BASE_PATH_PROP,
+ throw new HoodieException(s"'${BOOTSTRAP_BASE_PATH_PROP}' is required for '${BOOTSTRAP_OPERATION_OPT_VAL}'" +
+ " operation'"))
+ val bootstrapIndexClass = parameters.getOrDefault(BOOTSTRAP_INDEX_CLASS_PROP, DEFAULT_BOOTSTRAP_INDEX_CLASS)
+
+ var schema: String = null
+ if (df.schema.nonEmpty) {
+ val (structName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
+ schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, namespace).toString
+ } else {
+ schema = HoodieAvroUtils.getNullSchema.toString
+ }
+
+ val basePath = new Path(path)
+ val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
+ tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
+ val tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
+
+ // Handle various save modes
+ if (mode == SaveMode.Ignore && tableExists) {
+ log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
+ false
+ } else {
+ handleSaveModes(mode, basePath, tableConfig, tableName, BOOTSTRAP_OPERATION_OPT_VAL, fs)
+ }
+
+ if (!tableExists) {
+ HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration, path,
+ HoodieTableType.valueOf(tableType), tableName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY),
+ null, bootstrapIndexClass, bootstrapBasePath)
+ }
+
+ val jsc = new JavaSparkContext(sqlContext.sparkContext)
+ val writeClient = DataSourceUtils.createHoodieClient(jsc, schema, path, tableName, mapAsJavaMap(parameters))
+ writeClient.bootstrap(org.apache.hudi.common.util.Option.empty())
+ val metaSyncSuccess = metaSync(parameters, basePath, jsc.hadoopConfiguration)
+ metaSyncSuccess
+ }
+
/**
* Add default options for unspecified write options keys.
*
@@ -267,6 +293,31 @@ private[hudi] object HoodieSparkSqlWriter {
props
}
+ private def handleSaveModes(mode: SaveMode, tablePath: Path, tableConfig: HoodieTableConfig, tableName: String,
+ operation: String, fs: FileSystem): Unit = {
+ if (mode == SaveMode.Append && tableExists) {
+ val existingTableName = tableConfig.getTableName
+ if (!existingTableName.equals(tableName)) {
+ throw new HoodieException(s"hoodie table with name $existingTableName already exist at $tablePath")
+ }
+ }
+
+ if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
+ if (mode == SaveMode.ErrorIfExists && tableExists) {
+ throw new HoodieException(s"hoodie table at $tablePath already exists.")
+ } else if (mode == SaveMode.Overwrite && tableExists) {
+ log.warn(s"hoodie table at $tablePath already exists. Deleting existing data & overwriting with new data.")
+ fs.delete(tablePath, true)
+ tableExists = false
+ }
+ } else {
+ // Delete Operation only supports Append mode
+ if (mode != SaveMode.Append) {
+ throw new HoodieException(s"Append is the only save mode applicable for $operation operation")
+ }
+ }
+ }
+
private def syncHive(basePath: Path, fs: FileSystem, parameters: Map[String, String]): Boolean = {
val hiveSyncConfig: HiveSyncConfig = buildSyncConfig(basePath, parameters)
val hiveConf: HiveConf = new HiveConf()
@@ -403,4 +454,15 @@ private[hudi] object HoodieSparkSqlWriter {
false
}
}
+
+ private def getHoodieTableConfig(sparkContext: SparkContext,
+ tablePath: String,
+ hoodieTableConfigOpt: Option[HoodieTableConfig]): HoodieTableConfig = {
+ if (tableExists) {
+ hoodieTableConfigOpt.getOrElse(
+ new HoodieTableMetaClient(sparkContext.hadoopConfiguration, tablePath).getTableConfig)
+ } else {
+ null
+ }
+ }
}
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 338a54eda..e894b0653 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -17,9 +17,17 @@
package org.apache.hudi
+
+import com.google.common.collect.Lists
+import org.apache.avro.Schema
+import org.apache.hadoop.fs.GlobPattern
+import org.apache.hadoop.fs.Path
import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.bootstrap.index.BootstrapIndex
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieTableType}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.table.timeline.HoodieTimeline
+import org.apache.hudi.common.util.ParquetUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.table.HoodieTable
@@ -28,8 +36,8 @@ import org.apache.hadoop.fs.GlobPattern
import org.apache.log4j.LogManager
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import scala.collection.JavaConversions._
import scala.collection.mutable
@@ -47,8 +55,10 @@ class IncrementalRelation(val sqlContext: SQLContext,
private val log = LogManager.getLogger(classOf[IncrementalRelation])
- private val metaClient =
- new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true)
+
+ val skeletonSchema: StructType = HoodieSparkUtils.getMetaSchema
+ private val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true)
+
// MOR tables not supported yet
if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
throw new HoodieException("Incremental view not implemented yet, for merge-on-read tables")
@@ -72,13 +82,16 @@ class IncrementalRelation(val sqlContext: SQLContext,
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, lastInstant.getTimestamp))
.getInstants.iterator().toList
- // use schema from latest metadata, if not present, read schema from the data file
- private val latestSchema = {
- val schemaUtil = new TableSchemaResolver(metaClient)
- val tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields);
- AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+ // use schema from a file produced in the latest instant
+ val latestSchema: StructType = {
+ log.info("Inferring schema..")
+ val schemaResolver = new TableSchemaResolver(metaClient)
+ val tableSchema = schemaResolver.getTableAvroSchemaWithoutMetadataFields
+ val dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+ StructType(skeletonSchema.fields ++ dataSchema.fields)
}
+
private val filters = {
if (optParams.contains(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY)) {
val filterStr = optParams.getOrElse(
@@ -93,36 +106,69 @@ class IncrementalRelation(val sqlContext: SQLContext,
override def schema: StructType = latestSchema
override def buildScan(): RDD[Row] = {
- val fileIdToFullPath = mutable.HashMap[String, String]()
+ val regularFileIdToFullPath = mutable.HashMap[String, String]()
+ var metaBootstrapFileIdToFullPath = mutable.HashMap[String, String]()
+
for (commit <- commitsToReturn) {
val metadata: HoodieCommitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit)
.get, classOf[HoodieCommitMetadata])
- fileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap
+
+ if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == commit.getTimestamp) {
+ metaBootstrapFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap
+ } else {
+ regularFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap
+ }
}
+
+ if (metaBootstrapFileIdToFullPath.nonEmpty) {
+ // filer out meta bootstrap files that have had more commits since metadata bootstrap
+ metaBootstrapFileIdToFullPath = metaBootstrapFileIdToFullPath
+ .filterNot(fileIdFullPath => regularFileIdToFullPath.contains(fileIdFullPath._1))
+ }
+
val pathGlobPattern = optParams.getOrElse(
DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY,
DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)
- val filteredFullPath = if(!pathGlobPattern.equals(DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)) {
- val globMatcher = new GlobPattern("*" + pathGlobPattern)
- fileIdToFullPath.filter(p => globMatcher.matches(p._2))
- } else {
- fileIdToFullPath
+ val (filteredRegularFullPaths, filteredMetaBootstrapFullPaths) = {
+ if(!pathGlobPattern.equals(DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)) {
+ val globMatcher = new GlobPattern("*" + pathGlobPattern)
+ (regularFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values,
+ metaBootstrapFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values)
+ } else {
+ (regularFileIdToFullPath.values, metaBootstrapFileIdToFullPath.values)
+ }
}
// unset the path filter, otherwise if end_instant_time is not the latest instant, path filter set for RO view
// will filter out all the files incorrectly.
sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")
val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
- if (filteredFullPath.isEmpty) {
+ if (filteredRegularFullPaths.isEmpty && filteredMetaBootstrapFullPaths.isEmpty) {
sqlContext.sparkContext.emptyRDD[Row]
} else {
log.info("Additional Filters to be applied to incremental source are :" + filters)
- filters.foldLeft(sqlContext.read.options(sOpts)
- .schema(latestSchema)
- .parquet(filteredFullPath.values.toList: _*)
- .filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp))
- .filter(String.format("%s <= '%s'",
- HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp)))((e, f) => e.filter(f))
- .toDF().rdd
+
+ var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], latestSchema)
+
+ if (metaBootstrapFileIdToFullPath.nonEmpty) {
+ df = sqlContext.sparkSession.read
+ .format("hudi")
+ .schema(latestSchema)
+ .option(DataSourceReadOptions.READ_PATHS_OPT_KEY, filteredMetaBootstrapFullPaths.mkString(","))
+ .load()
+ }
+
+ if (regularFileIdToFullPath.nonEmpty)
+ {
+ df = df.union(sqlContext.read.options(sOpts)
+ .schema(latestSchema)
+ .parquet(filteredRegularFullPaths.toList: _*)
+ .filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+ commitsToReturn.head.getTimestamp))
+ .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+ commitsToReturn.last.getTimestamp)))
+ }
+
+ filters.foldLeft(df)((e, f) => e.filter(f)).rdd
}
}
}
diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
new file mode 100644
index 000000000..f24e5ad5b
--- /dev/null
+++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
@@ -0,0 +1,630 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import java.time.Instant
+import java.util.Collections
+
+import collection.JavaConverters._
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider
+import org.apache.hudi.client.TestBootstrap
+import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.timeline.HoodieTimeline
+import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieCompactionConfig, HoodieWriteConfig}
+import org.apache.hudi.keygen.SimpleKeyGenerator
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.functions.{col, lit}
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.io.TempDir
+
+class TestDataSourceForBootstrap {
+
+ var spark: SparkSession = _
+ val commonOpts = Map(
+ HoodieWriteConfig.INSERT_PARALLELISM -> "4",
+ HoodieWriteConfig.UPSERT_PARALLELISM -> "4",
+ HoodieWriteConfig.DELETE_PARALLELISM -> "4",
+ HoodieWriteConfig.BULKINSERT_PARALLELISM -> "4",
+ HoodieWriteConfig.FINALIZE_WRITE_PARALLELISM -> "4",
+ HoodieBootstrapConfig.BOOTSTRAP_PARALLELISM -> "4",
+ DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
+ DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp",
+ HoodieWriteConfig.TABLE_NAME -> "hoodie_test"
+ )
+ var basePath: String = _
+ var srcPath: String = _
+ var fs: FileSystem = _
+
+ @BeforeEach def initialize(@TempDir tempDir: java.nio.file.Path) {
+ spark = SparkSession.builder
+ .appName("Hoodie Datasource test")
+ .master("local[2]")
+ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .getOrCreate
+ basePath = tempDir.toAbsolutePath.toString + "/base"
+ srcPath = tempDir.toAbsolutePath.toString + "/src"
+ fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
+ }
+
+ @AfterEach def tearDown(): Unit ={
+ // Close spark session
+ if (spark != null) {
+ spark.stop()
+ spark = null
+ }
+
+ // Close file system
+ if (fs != null) {
+ fs.close()
+ fs = null
+ }
+ }
+
+ @Test def testMetadataBootstrapCOWNonPartitioned(): Unit = {
+ val timestamp = Instant.now.toEpochMilli
+ val numRecords = 100
+ val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
+
+ val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, Collections.emptyList(), jsc,
+ spark.sqlContext)
+
+ // Write source data non-partitioned
+ sourceDF.write
+ .format("parquet")
+ .mode(SaveMode.Overwrite)
+ .save(srcPath)
+
+ // Perform bootstrap
+ val bootstrapDF = spark.emptyDataFrame
+ bootstrapDF.write
+ .format("hudi")
+ .options(commonOpts)
+ .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
+ .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
+ .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+ assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1)
+
+ // Read bootstrapped table and verify count
+ var hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
+ assertEquals(numRecords, hoodieROViewDF1.count())
+
+ // Perform upsert
+ val updateTimestamp = Instant.now.toEpochMilli
+ val numRecordsUpdate = 10
+ val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate,
+ Collections.emptyList(), jsc, spark.sqlContext)
+
+ updateDF.write
+ .format("hudi")
+ .options(commonOpts)
+ .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
+ .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
+ .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+ assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size())
+
+ // Read table after upsert and verify count
+ hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
+ assertEquals(numRecords, hoodieROViewDF1.count())
+ assertEquals(numRecordsUpdate, hoodieROViewDF1.filter(s"timestamp == $updateTimestamp").count())
+
+ // incrementally pull only changes in the bootstrap commit, which would pull all the initial records written
+ // during bootstrap
+ val hoodieIncViewDF1 = spark.read.format("hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
+ .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commitInstantTime1)
+ .load(basePath)
+
+ assertEquals(numRecords, hoodieIncViewDF1.count())
+ var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
+ assertEquals(1, countsPerCommit.length)
+ assertEquals(commitInstantTime1, countsPerCommit(0).get(0))
+
+ // incrementally pull only changes in the latest commit, which would pull only the updated records in the
+ // latest commit
+ val hoodieIncViewDF2 = spark.read.format("hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
+ .load(basePath);
+
+ assertEquals(numRecordsUpdate, hoodieIncViewDF2.count())
+ countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
+ assertEquals(1, countsPerCommit.length)
+ assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
+ }
+
+ @Test def testMetadataBootstrapCOWHiveStylePartitioned(): Unit = {
+ val timestamp = Instant.now.toEpochMilli
+ val numRecords = 100
+ val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03")
+ val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
+
+ val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc,
+ spark.sqlContext)
+
+ // Write source data hive style partitioned
+ sourceDF.write
+ .partitionBy("datestr")
+ .format("parquet")
+ .mode(SaveMode.Overwrite)
+ .save(srcPath)
+
+ // Perform bootstrap
+ val bootstrapDF = spark.emptyDataFrame
+ bootstrapDF.write
+ .format("hudi")
+ .options(commonOpts)
+ .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
+ .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
+ .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, "org.apache.hudi.keygen.SimpleKeyGenerator")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+ assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1)
+
+ // Read bootstrapped table and verify count
+ val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
+ assertEquals(numRecords, hoodieROViewDF1.count())
+
+ // Perform upsert
+ val updateTimestamp = Instant.now.toEpochMilli
+ val numRecordsUpdate = 10
+ val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava,
+ jsc, spark.sqlContext)
+
+ updateDF.write
+ .format("hudi")
+ .options(commonOpts)
+ .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
+ .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
+ .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
+ // Required because source data is hive style partitioned
+ .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true")
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+ assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size())
+
+ // Read table after upsert and verify count
+ val hoodieROViewDF2 = spark.read.format("hudi").load(basePath + "/*")
+ assertEquals(numRecords, hoodieROViewDF2.count())
+ assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count())
+
+ // incrementally pull only changes in the bootstrap commit, which would pull all the initial records written
+ // during bootstrap
+ val hoodieIncViewDF1 = spark.read.format("hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
+ .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commitInstantTime1)
+ .load(basePath)
+
+ assertEquals(numRecords, hoodieIncViewDF1.count())
+ var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
+ assertEquals(1, countsPerCommit.length)
+ assertEquals(commitInstantTime1, countsPerCommit(0).get(0))
+
+ // incrementally pull only changes in the latest commit, which would pull only the updated records in the
+ // latest commit
+ val hoodieIncViewDF2 = spark.read.format("hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
+ .load(basePath);
+
+ assertEquals(numRecordsUpdate, hoodieIncViewDF2.count())
+ countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
+ assertEquals(1, countsPerCommit.length)
+ assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
+
+ // pull the latest commit within certain partitions
+ val hoodieIncViewDF3 = spark.read.format("hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
+ .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/datestr=2020-04-02/*")
+ .load(basePath)
+
+ assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2020-04-02")).count(),
+ hoodieIncViewDF3.count())
+ }
+
+ @Test def testMetadataBootstrapCOWPartitioned(): Unit = {
+ val timestamp = Instant.now.toEpochMilli
+ val numRecords = 100
+ val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03")
+ val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
+
+ var sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc,
+ spark.sqlContext)
+
+ // Writing data for each partition instead of using partitionBy to avoid hive-style partitioning and hence
+ // have partitioned columns stored in the data file
+ partitionPaths.foreach(partitionPath => {
+ sourceDF
+ .filter(sourceDF("datestr").equalTo(lit(partitionPath)))
+ .write
+ .format("parquet")
+ .mode(SaveMode.Overwrite)
+ .save(srcPath + "/" + partitionPath)
+ })
+
+ // Perform bootstrap
+ val bootstrapDF = spark.emptyDataFrame
+ bootstrapDF.write
+ .format("hudi")
+ .options(commonOpts)
+ .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
+ .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
+ .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, "org.apache.hudi.keygen.SimpleKeyGenerator")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+ assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1)
+
+ // Read bootstrapped table and verify count
+ val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
+ assertEquals(numRecords, hoodieROViewDF1.count())
+
+ // Perform upsert
+ val updateTimestamp = Instant.now.toEpochMilli
+ val numRecordsUpdate = 10
+ var updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava,
+ jsc, spark.sqlContext)
+
+ updateDF.write
+ .format("hudi")
+ .options(commonOpts)
+ .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
+ .option("hoodie.upsert.shuffle.parallelism", "4")
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
+ .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
+ .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+ assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size())
+
+ // Read table after upsert and verify count
+ val hoodieROViewDF2 = spark.read.format("hudi").load(basePath + "/*")
+ assertEquals(numRecords, hoodieROViewDF2.count())
+ assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count())
+
+ // incrementally pull only changes in the bootstrap commit, which would pull all the initial records written
+ // during bootstrap
+ val hoodieIncViewDF1 = spark.read.format("hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
+ .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commitInstantTime1)
+ .load(basePath)
+
+ assertEquals(numRecords, hoodieIncViewDF1.count())
+ var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
+ assertEquals(1, countsPerCommit.length)
+ assertEquals(commitInstantTime1, countsPerCommit(0).get(0))
+
+ // incrementally pull only changes in the latest commit, which would pull only the updated records in the
+ // latest commit
+ val hoodieIncViewDF2 = spark.read.format("hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
+ .load(basePath);
+
+ assertEquals(numRecordsUpdate, hoodieIncViewDF2.count())
+ countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
+ assertEquals(1, countsPerCommit.length)
+ assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
+
+ // pull the latest commit within certain partitions
+ val hoodieIncViewDF3 = spark.read.format("hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
+ .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/2020-04-02/*")
+ .load(basePath)
+
+ assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2020-04-02")).count(),
+ hoodieIncViewDF3.count())
+ }
+
+ @Test def testMetadataBootstrapMORPartitionedInlineCompactionOn(): Unit = {
+ val timestamp = Instant.now.toEpochMilli
+ val numRecords = 100
+ val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03")
+ val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
+
+ val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc,
+ spark.sqlContext)
+
+ // Writing data for each partition instead of using partitionBy to avoid hive-style partitioning and hence
+ // have partitioned columns stored in the data file
+ partitionPaths.foreach(partitionPath => {
+ sourceDF
+ .filter(sourceDF("datestr").equalTo(lit(partitionPath)))
+ .write
+ .format("parquet")
+ .mode(SaveMode.Overwrite)
+ .save(srcPath + "/" + partitionPath)
+ })
+
+ // Perform bootstrap
+ val bootstrapDF = spark.emptyDataFrame
+ bootstrapDF.write
+ .format("hudi")
+ .options(commonOpts)
+ .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
+ .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
+ .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, "org.apache.hudi.keygen.SimpleKeyGenerator")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+ assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1)
+
+ // Read bootstrapped table and verify count
+ val hoodieROViewDF1 = spark.read.format("hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
+ DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+ .load(basePath + "/*")
+ assertEquals(numRecords, hoodieROViewDF1.count())
+
+ // Perform upsert
+ val updateTimestamp = Instant.now.toEpochMilli
+ val numRecordsUpdate = 10
+ val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava,
+ jsc, spark.sqlContext)
+
+ updateDF.write
+ .format("hudi")
+ .options(commonOpts)
+ .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
+ .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
+ .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
+ .option(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true")
+ .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, "1")
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ // Expect 2 new commits since meta bootstrap - delta commit and compaction commit (due to inline compaction)
+ assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size())
+
+ // Read table after upsert and verify count. Since we have inline compaction enabled the RO view will have
+ // the updated rows.
+ val hoodieROViewDF2 = spark.read.format("hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
+ DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+ .load(basePath + "/*")
+ assertEquals(numRecords, hoodieROViewDF2.count())
+ assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count())
+ }
+
+ @Test def testMetadataBootstrapMORPartitioned(): Unit = {
+ val timestamp = Instant.now.toEpochMilli
+ val numRecords = 100
+ val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03")
+ val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
+
+ val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc,
+ spark.sqlContext)
+
+ // Writing data for each partition instead of using partitionBy to avoid hive-style partitioning and hence
+ // have partitioned columns stored in the data file
+ partitionPaths.foreach(partitionPath => {
+ sourceDF
+ .filter(sourceDF("datestr").equalTo(lit(partitionPath)))
+ .write
+ .format("parquet")
+ .mode(SaveMode.Overwrite)
+ .save(srcPath + "/" + partitionPath)
+ })
+
+ // Perform bootstrap
+ val bootstrapDF = spark.emptyDataFrame
+ bootstrapDF.write
+ .format("hudi")
+ .options(commonOpts)
+ .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
+ .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
+ .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, "org.apache.hudi.keygen.SimpleKeyGenerator")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+ assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1)
+
+ // Read bootstrapped table and verify count
+ val hoodieROViewDF1 = spark.read.format("hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
+ DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+ .load(basePath + "/*")
+ assertEquals(numRecords, hoodieROViewDF1.count())
+
+ // Perform upsert
+ val updateTimestamp = Instant.now.toEpochMilli
+ val numRecordsUpdate = 10
+ val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate,
+ partitionPaths.asJava, jsc, spark.sqlContext)
+
+ updateDF.write
+ .format("hudi")
+ .options(commonOpts)
+ .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
+ .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
+ .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ // Expect 1 new commit since meta bootstrap - delta commit (because inline compaction is off)
+ assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size())
+
+ // Read table after upsert and verify count. Since we have inline compaction off the RO view will have
+ // no updated rows.
+ val hoodieROViewDF2 = spark.read.format("hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
+ DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+ .load(basePath + "/*")
+ assertEquals(numRecords, hoodieROViewDF2.count())
+ assertEquals(0, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count())
+ }
+
+ @Test def testFullBootstrapCOWPartitioned(): Unit = {
+ val timestamp = Instant.now.toEpochMilli
+ val numRecords = 100
+ val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03")
+ val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
+
+ val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc,
+ spark.sqlContext)
+
+ // Writing data for each partition instead of using partitionBy to avoid hive-style partitioning and hence
+ // have partitioned columns stored in the data file
+ partitionPaths.foreach(partitionPath => {
+ sourceDF
+ .filter(sourceDF("datestr").equalTo(lit(partitionPath)))
+ .write
+ .format("parquet")
+ .mode(SaveMode.Overwrite)
+ .save(srcPath + "/" + partitionPath)
+ })
+
+ // Perform bootstrap
+ val bootstrapDF = spark.emptyDataFrame
+ bootstrapDF.write
+ .format("hudi")
+ .options(commonOpts)
+ .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
+ .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
+ .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
+ .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
+ .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, classOf[SimpleKeyGenerator].getName)
+ .option(HoodieBootstrapConfig.BOOTSTRAP_MODE_SELECTOR, classOf[FullRecordBootstrapModeSelector].getName)
+ .option(HoodieBootstrapConfig.FULL_BOOTSTRAP_INPUT_PROVIDER, classOf[SparkParquetBootstrapDataProvider].getName)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+ assertEquals(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS, commitInstantTime1)
+
+ // Read bootstrapped table and verify count
+ val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
+ assertEquals(numRecords, hoodieROViewDF1.count())
+
+ // Perform upsert
+ val updateTimestamp = Instant.now.toEpochMilli
+ val numRecordsUpdate = 10
+ val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava,
+ jsc, spark.sqlContext)
+
+ updateDF.write
+ .format("hudi")
+ .options(commonOpts)
+ .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
+ .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
+ .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+ assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size())
+
+ // Read table after upsert and verify count
+ val hoodieROViewDF2 = spark.read.format("hudi").load(basePath + "/*")
+ assertEquals(numRecords, hoodieROViewDF2.count())
+ assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count())
+
+ // incrementally pull only changes in the bootstrap commit, which would pull all the initial records written
+ // during bootstrap
+ val hoodieIncViewDF1 = spark.read.format("hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
+ .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commitInstantTime1)
+ .load(basePath)
+
+ assertEquals(numRecords, hoodieIncViewDF1.count())
+ var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
+ assertEquals(1, countsPerCommit.length)
+ assertEquals(commitInstantTime1, countsPerCommit(0).get(0))
+
+ // incrementally pull only changes in the latest commit, which would pull only the updated records in the
+ // latest commit
+ val hoodieIncViewDF2 = spark.read.format("hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
+ .load(basePath);
+
+ assertEquals(numRecordsUpdate, hoodieIncViewDF2.count())
+ countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
+ assertEquals(1, countsPerCommit.length)
+ assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
+
+ // pull the latest commit within certain partitions
+ val hoodieIncViewDF3 = spark.read.format("hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
+ .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/2020-04-02/*")
+ .load(basePath)
+
+ assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2020-04-02")).count(),
+ hoodieIncViewDF3.count())
+ }
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 5c7ac9319..6577c09ad 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -581,8 +581,10 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
// Perform bootstrap with tableBasePath as source
String bootstrapSourcePath = dfsBasePath + "/src_bootstrapped";
- sqlContext.read().format("org.apache.hudi").load(tableBasePath + "/*/*.parquet").write().format("parquet")
- .save(bootstrapSourcePath);
+ Dataset sourceDf = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(tableBasePath + "/*/*.parquet");
+ sourceDf.write().format("parquet").save(bootstrapSourcePath);
String newDatasetBasePath = dfsBasePath + "/test_dataset_bootstrapped";
cfg.runBootstrap = true;
@@ -600,12 +602,11 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
assertEquals(1950, sqlContext.sql("select distinct _hoodie_record_key from bootstrapped").count());
StructField[] fields = res.schema().fields();
- assertEquals(5, fields.length);
- assertEquals(HoodieRecord.COMMIT_TIME_METADATA_FIELD, fields[0].name());
- assertEquals(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, fields[1].name());
- assertEquals(HoodieRecord.RECORD_KEY_METADATA_FIELD, fields[2].name());
- assertEquals(HoodieRecord.PARTITION_PATH_METADATA_FIELD, fields[3].name());
- assertEquals(HoodieRecord.FILENAME_METADATA_FIELD, fields[4].name());
+ List fieldNames = Arrays.asList(res.schema().fieldNames());
+ List expectedFieldNames = Arrays.asList(sourceDf.schema().fieldNames());
+ assertEquals(expectedFieldNames.size(), fields.length);
+ assertTrue(fieldNames.containsAll(HoodieRecord.HOODIE_META_COLUMNS));
+ assertTrue(fieldNames.containsAll(expectedFieldNames));
}
@Test