[HUDI-426] Bootstrap datasource integration (#1702)
This commit is contained in:
@@ -217,6 +217,10 @@
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-sql_${scala.binary.version}</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-avro_${scala.binary.version}</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.shell</groupId>
|
||||
|
||||
@@ -101,6 +101,11 @@
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-sql_${scala.binary.version}</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-avro_${scala.binary.version}</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- Dropwizard Metrics -->
|
||||
<dependency>
|
||||
|
||||
@@ -18,17 +18,25 @@
|
||||
|
||||
package org.apache.hudi.client.bootstrap;
|
||||
|
||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||
import org.apache.hudi.avro.model.HoodieFileStatus;
|
||||
import org.apache.hudi.common.bootstrap.FileStatusUtils;
|
||||
import org.apache.hudi.common.util.ParquetUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.parquet.schema.MessageType;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.spark.sql.avro.SchemaConverters;
|
||||
import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter;
|
||||
import org.apache.spark.sql.internal.SQLConf;
|
||||
import org.apache.spark.sql.types.StructType;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Bootstrap Schema Provider. Schema provided in config is used. If not available, use schema from Parquet
|
||||
@@ -50,7 +58,10 @@ public class BootstrapSchemaProvider {
|
||||
public final Schema getBootstrapSchema(JavaSparkContext jsc, List<Pair<String, List<HoodieFileStatus>>> partitions) {
|
||||
if (writeConfig.getSchema() != null) {
|
||||
// Use schema specified by user if set
|
||||
return Schema.parse(writeConfig.getSchema());
|
||||
Schema userSchema = Schema.parse(writeConfig.getSchema());
|
||||
if (!HoodieAvroUtils.getNullSchema().equals(userSchema)) {
|
||||
return userSchema;
|
||||
}
|
||||
}
|
||||
return getBootstrapSourceSchema(jsc, partitions);
|
||||
}
|
||||
@@ -64,14 +75,26 @@ public class BootstrapSchemaProvider {
|
||||
*/
|
||||
protected Schema getBootstrapSourceSchema(JavaSparkContext jsc,
|
||||
List<Pair<String, List<HoodieFileStatus>>> partitions) {
|
||||
return partitions.stream().flatMap(p -> p.getValue().stream())
|
||||
.map(fs -> {
|
||||
try {
|
||||
Path filePath = FileStatusUtils.toPath(fs.getPath());
|
||||
return ParquetUtils.readAvroSchema(jsc.hadoopConfiguration(), filePath);
|
||||
} catch (Exception ex) {
|
||||
return null;
|
||||
}
|
||||
}).filter(x -> x != null).findAny().get();
|
||||
MessageType parquetSchema = partitions.stream().flatMap(p -> p.getValue().stream()).map(fs -> {
|
||||
try {
|
||||
Path filePath = FileStatusUtils.toPath(fs.getPath());
|
||||
return ParquetUtils.readSchema(jsc.hadoopConfiguration(), filePath);
|
||||
} catch (Exception ex) {
|
||||
return null;
|
||||
}
|
||||
}).filter(Objects::nonNull).findAny()
|
||||
.orElseThrow(() -> new HoodieException("Could not determine schema from the data files."));
|
||||
|
||||
|
||||
ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(
|
||||
Boolean.parseBoolean(SQLConf.PARQUET_BINARY_AS_STRING().defaultValueString()),
|
||||
Boolean.parseBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().defaultValueString()));
|
||||
StructType sparkSchema = converter.convert(parquetSchema);
|
||||
String tableName = HoodieAvroUtils.sanitizeName(writeConfig.getTableName());
|
||||
String structName = tableName + "_record";
|
||||
String recordNamespace = "hoodie." + tableName;
|
||||
|
||||
return SchemaConverters.toAvroType(sparkSchema, false, structName, recordNamespace);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ package org.apache.hudi.config;
|
||||
import org.apache.hudi.client.bootstrap.BootstrapMode;
|
||||
import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector;
|
||||
import org.apache.hudi.client.bootstrap.translator.IdentityBootstrapPartitionPathTranslator;
|
||||
import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
|
||||
import org.apache.hudi.common.config.DefaultHoodieConfig;
|
||||
|
||||
import java.io.File;
|
||||
@@ -52,6 +53,9 @@ public class HoodieBootstrapConfig extends DefaultHoodieConfig {
|
||||
public static final String DEFAULT_BOOTSTRAP_MODE_SELECTOR_REGEX = ".*";
|
||||
public static final String DEFAULT_BOOTSTRAP_MODE_SELECTOR_REGEX_MODE = BootstrapMode.METADATA_ONLY.name();
|
||||
|
||||
public static final String BOOTSTRAP_INDEX_CLASS_PROP = "hoodie.bootstrap.index.class";
|
||||
public static final String DEFAULT_BOOTSTRAP_INDEX_CLASS = HFileBootstrapIndex.class.getName();
|
||||
|
||||
public HoodieBootstrapConfig(Properties props) {
|
||||
super(props);
|
||||
}
|
||||
@@ -129,6 +133,8 @@ public class HoodieBootstrapConfig extends DefaultHoodieConfig {
|
||||
setDefaultOnCondition(props, !props.containsKey(BOOTSTRAP_MODE_SELECTOR_REGEX_MODE),
|
||||
BOOTSTRAP_MODE_SELECTOR_REGEX_MODE, DEFAULT_BOOTSTRAP_MODE_SELECTOR_REGEX_MODE);
|
||||
BootstrapMode.valueOf(props.getProperty(BOOTSTRAP_MODE_SELECTOR_REGEX_MODE));
|
||||
setDefaultOnCondition(props, !props.containsKey(BOOTSTRAP_INDEX_CLASS_PROP), BOOTSTRAP_INDEX_CLASS_PROP,
|
||||
DEFAULT_BOOTSTRAP_INDEX_CLASS);
|
||||
return config;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -136,6 +136,7 @@ public class BootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>>
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getSchemaToStoreInCommit() {
|
||||
return bootstrapSchema;
|
||||
}
|
||||
|
||||
@@ -69,6 +69,11 @@ public class HoodieAvroUtils {
|
||||
|
||||
private static ThreadLocal<BinaryDecoder> reuseDecoder = ThreadLocal.withInitial(() -> null);
|
||||
|
||||
// As per https://avro.apache.org/docs/current/spec.html#names
|
||||
private static String INVALID_AVRO_CHARS_IN_NAMES = "[^A-Za-z0-9_]";
|
||||
private static String INVALID_AVRO_FIRST_CHAR_IN_NAMES = "[^A-Za-z_]";
|
||||
private static String MASK_FOR_INVALID_CHARS_IN_NAMES = "__";
|
||||
|
||||
// All metadata fields are optional strings.
|
||||
public static final Schema METADATA_FIELD_SCHEMA =
|
||||
Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)));
|
||||
@@ -444,4 +449,21 @@ public class HoodieAvroUtils {
|
||||
}
|
||||
return fieldSchema.getLogicalType() == LogicalTypes.date();
|
||||
}
|
||||
|
||||
public static Schema getNullSchema() {
|
||||
return Schema.create(Schema.Type.NULL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sanitizes Name according to Avro rule for names.
|
||||
* Removes characters other than the ones mentioned in https://avro.apache.org/docs/current/spec.html#names .
|
||||
* @param name input name
|
||||
* @return sanitized name
|
||||
*/
|
||||
public static String sanitizeName(String name) {
|
||||
if (name.substring(0,1).matches(INVALID_AVRO_FIRST_CHAR_IN_NAMES)) {
|
||||
name = name.replaceFirst(INVALID_AVRO_FIRST_CHAR_IN_NAMES, MASK_FOR_INVALID_CHARS_IN_NAMES);
|
||||
}
|
||||
return name.replaceAll(INVALID_AVRO_CHARS_IN_NAMES, MASK_FOR_INVALID_CHARS_IN_NAMES);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hudi.common.model;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonAutoDetect;
|
||||
@@ -126,6 +127,18 @@ public class HoodieCommitMetadata implements Serializable {
|
||||
return fullPaths;
|
||||
}
|
||||
|
||||
public Map<HoodieFileGroupId, String> getFileGroupIdAndFullPaths(String basePath) {
|
||||
Map<HoodieFileGroupId, String> fileGroupIdToFullPaths = new HashMap<>();
|
||||
for (Map.Entry<String, List<HoodieWriteStat>> entry : getPartitionToWriteStats().entrySet()) {
|
||||
for (HoodieWriteStat stat : entry.getValue()) {
|
||||
HoodieFileGroupId fileGroupId = new HoodieFileGroupId(stat.getPartitionPath(), stat.getFileId());
|
||||
Path fullPath = new Path(basePath, stat.getPath());
|
||||
fileGroupIdToFullPaths.put(fileGroupId, fullPath.toString());
|
||||
}
|
||||
}
|
||||
return fileGroupIdToFullPaths;
|
||||
}
|
||||
|
||||
public String toJsonString() throws IOException {
|
||||
if (partitionToWriteStats.containsKey(null)) {
|
||||
LOG.info("partition path is null for " + partitionToWriteStats.get(null));
|
||||
|
||||
@@ -57,8 +57,9 @@ public class SparkParquetBootstrapDataProvider extends FullRecordBootstrapDataPr
|
||||
public JavaRDD<HoodieRecord> generateInputRecordRDD(String tableName, String sourceBasePath,
|
||||
List<Pair<String, List<HoodieFileStatus>>> partitionPathsWithFiles) {
|
||||
String[] filePaths = partitionPathsWithFiles.stream().map(Pair::getValue)
|
||||
.flatMap(f -> f.stream().map(fs -> FileStatusUtils.toPath(fs.getPath()).toUri().getPath()))
|
||||
.flatMap(f -> f.stream().map(fs -> FileStatusUtils.toPath(fs.getPath()).toString()))
|
||||
.toArray(String[]::new);
|
||||
|
||||
Dataset inputDataset = sparkSession.read().parquet(filePaths);
|
||||
try {
|
||||
KeyGenerator keyGenerator = DataSourceUtils.createKeyGenerator(props);
|
||||
|
||||
@@ -20,6 +20,7 @@ package org.apache.hudi
|
||||
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
|
||||
import org.apache.hudi.common.model.HoodieKey
|
||||
import org.apache.avro.Schema
|
||||
import org.apache.hudi.avro.HoodieAvroUtils
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.avro.SchemaConverters
|
||||
import org.apache.spark.sql.catalyst.encoders.RowEncoder
|
||||
@@ -90,4 +91,9 @@ object AvroConversionUtils {
|
||||
requiredFields.foreach(f => recordBuilder.set(f, record.get(positionIterator.next())))
|
||||
recordBuilder.build()
|
||||
}
|
||||
|
||||
def getAvroRecordNameAndNamespace(tableName: String): (String, String) = {
|
||||
val name = HoodieAvroUtils.sanitizeName(tableName)
|
||||
(s"${name}_record", s"hoodie.${name}")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,6 +59,8 @@ object DataSourceReadOptions {
|
||||
val REALTIME_PAYLOAD_COMBINE_OPT_VAL = "payload_combine"
|
||||
val DEFAULT_REALTIME_MERGE_OPT_VAL = REALTIME_PAYLOAD_COMBINE_OPT_VAL
|
||||
|
||||
val READ_PATHS_OPT_KEY = "hoodie.datasource.read.paths"
|
||||
|
||||
@Deprecated
|
||||
val VIEW_TYPE_OPT_KEY = "hoodie.datasource.view.type"
|
||||
@Deprecated
|
||||
@@ -138,6 +140,7 @@ object DataSourceWriteOptions {
|
||||
val INSERT_OPERATION_OPT_VAL = "insert"
|
||||
val UPSERT_OPERATION_OPT_VAL = "upsert"
|
||||
val DELETE_OPERATION_OPT_VAL = "delete"
|
||||
val BOOTSTRAP_OPERATION_OPT_VAL = "bootstrap"
|
||||
val DEFAULT_OPERATION_OPT_VAL = UPSERT_OPERATION_OPT_VAL
|
||||
|
||||
/**
|
||||
|
||||
@@ -17,15 +17,16 @@
|
||||
|
||||
package org.apache.hudi
|
||||
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hudi.DataSourceReadOptions._
|
||||
import org.apache.hudi.common.fs.FSUtils
|
||||
import org.apache.hudi.common.model.HoodieTableType
|
||||
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION_OPT_KEY}
|
||||
import org.apache.hudi.common.fs.FSUtils
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient
|
||||
import org.apache.hudi.exception.HoodieException
|
||||
import org.apache.hudi.hadoop.HoodieROTablePathFilter
|
||||
import org.apache.log4j.LogManager
|
||||
import org.apache.spark.sql.execution.datasources.DataSource
|
||||
import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand
|
||||
import org.apache.spark.sql.execution.streaming.Sink
|
||||
import org.apache.spark.sql.sources._
|
||||
import org.apache.spark.sql.streaming.OutputMode
|
||||
@@ -57,22 +58,39 @@ class DefaultSource extends RelationProvider
|
||||
val parameters = Map(QUERY_TYPE_OPT_KEY -> DEFAULT_QUERY_TYPE_OPT_VAL) ++ translateViewTypesToQueryTypes(optParams)
|
||||
|
||||
val path = parameters.get("path")
|
||||
if (path.isEmpty) {
|
||||
throw new HoodieException("'path' must be specified.")
|
||||
val readPathsStr = parameters.get(DataSourceReadOptions.READ_PATHS_OPT_KEY)
|
||||
if (path.isEmpty && readPathsStr.isEmpty) {
|
||||
throw new HoodieException(s"'path' or '$READ_PATHS_OPT_KEY' or both must be specified.")
|
||||
}
|
||||
|
||||
val fs = FSUtils.getFs(path.get, sqlContext.sparkContext.hadoopConfiguration)
|
||||
val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(Seq(path.get), fs)
|
||||
val readPaths = readPathsStr.map(p => p.split(",").toSeq).getOrElse(Seq())
|
||||
val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths
|
||||
|
||||
val fs = FSUtils.getFs(allPaths.head, sqlContext.sparkContext.hadoopConfiguration)
|
||||
val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs)
|
||||
|
||||
val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray)
|
||||
log.info("Obtained hudi table path: " + tablePath)
|
||||
|
||||
val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath)
|
||||
val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent
|
||||
log.info("Is bootstrapped table => " + isBootstrappedTable)
|
||||
|
||||
if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) {
|
||||
val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath)
|
||||
if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
|
||||
new MergeOnReadSnapshotRelation(sqlContext, optParams, schema, globPaths, metaClient)
|
||||
if (isBootstrappedTable) {
|
||||
// Snapshot query is not supported for Bootstrapped MOR tables
|
||||
log.warn("Snapshot query is not supported for Bootstrapped Merge-on-Read tables." +
|
||||
" Falling back to Read Optimized query.")
|
||||
new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, optParams)
|
||||
} else {
|
||||
new MergeOnReadSnapshotRelation(sqlContext, optParams, schema, globPaths, metaClient)
|
||||
}
|
||||
} else {
|
||||
getBaseFileOnlyView(sqlContext, parameters, schema)
|
||||
getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, isBootstrappedTable, globPaths, metaClient)
|
||||
}
|
||||
} else if(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)) {
|
||||
getBaseFileOnlyView(sqlContext, parameters, schema)
|
||||
getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, isBootstrappedTable, globPaths, metaClient)
|
||||
} else if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
|
||||
new IncrementalRelation(sqlContext, tablePath, optParams, schema)
|
||||
} else {
|
||||
@@ -83,8 +101,8 @@ class DefaultSource extends RelationProvider
|
||||
/**
|
||||
* This DataSource API is used for writing the DataFrame at the destination. For now, we are returning a dummy
|
||||
* relation here because Spark does not really make use of the relation returned, and just returns an empty
|
||||
* dataset at [[SaveIntoDataSourceCommand.run()]]. This saves us the cost of creating and returning a parquet
|
||||
* relation here.
|
||||
* dataset at [[org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run()]]. This saves us the cost
|
||||
* of creating and returning a parquet relation here.
|
||||
*
|
||||
* TODO: Revisit to return a concrete relation here when we support CREATE TABLE AS for Hudi with DataSource API.
|
||||
* That is the only case where Spark seems to actually need a relation to be returned here
|
||||
@@ -101,7 +119,13 @@ class DefaultSource extends RelationProvider
|
||||
optParams: Map[String, String],
|
||||
df: DataFrame): BaseRelation = {
|
||||
val parameters = HoodieSparkSqlWriter.parametersWithWriteDefaults(optParams)
|
||||
HoodieSparkSqlWriter.write(sqlContext, mode, parameters, df)
|
||||
|
||||
if (parameters(OPERATION_OPT_KEY).equals(BOOTSTRAP_OPERATION_OPT_VAL)) {
|
||||
HoodieSparkSqlWriter.bootstrap(sqlContext, mode, parameters, df)
|
||||
} else {
|
||||
HoodieSparkSqlWriter.write(sqlContext, mode, parameters, df)
|
||||
}
|
||||
|
||||
new HoodieEmptyRelation(sqlContext, df.schema)
|
||||
}
|
||||
|
||||
@@ -120,23 +144,34 @@ class DefaultSource extends RelationProvider
|
||||
override def shortName(): String = "hudi"
|
||||
|
||||
private def getBaseFileOnlyView(sqlContext: SQLContext,
|
||||
optParams: Map[String, String],
|
||||
schema: StructType): BaseRelation = {
|
||||
optParams: Map[String, String],
|
||||
schema: StructType,
|
||||
extraReadPaths: Seq[String],
|
||||
isBootstrappedTable: Boolean,
|
||||
globPaths: Seq[Path],
|
||||
metaClient: HoodieTableMetaClient): BaseRelation = {
|
||||
log.warn("Loading Base File Only View.")
|
||||
// this is just effectively RO view only, where `path` can contain a mix of
|
||||
// non-hoodie/hoodie path files. set the path filter up
|
||||
sqlContext.sparkContext.hadoopConfiguration.setClass(
|
||||
"mapreduce.input.pathFilter.class",
|
||||
classOf[HoodieROTablePathFilter],
|
||||
classOf[org.apache.hadoop.fs.PathFilter])
|
||||
|
||||
log.info("Constructing hoodie (as parquet) data source with options :" + optParams)
|
||||
// simply return as a regular parquet relation
|
||||
DataSource.apply(
|
||||
sparkSession = sqlContext.sparkSession,
|
||||
userSpecifiedSchema = Option(schema),
|
||||
className = "parquet",
|
||||
options = optParams)
|
||||
.resolveRelation()
|
||||
if (isBootstrappedTable) {
|
||||
// For bootstrapped tables, use our custom Spark relation for querying
|
||||
new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, optParams)
|
||||
} else {
|
||||
// this is just effectively RO view only, where `path` can contain a mix of
|
||||
// non-hoodie/hoodie path files. set the path filter up
|
||||
sqlContext.sparkContext.hadoopConfiguration.setClass(
|
||||
"mapreduce.input.pathFilter.class",
|
||||
classOf[HoodieROTablePathFilter],
|
||||
classOf[org.apache.hadoop.fs.PathFilter])
|
||||
|
||||
log.info("Constructing hoodie (as parquet) data source with options :" + optParams)
|
||||
// simply return as a regular parquet relation
|
||||
DataSource.apply(
|
||||
sparkSession = sqlContext.sparkSession,
|
||||
paths = extraReadPaths,
|
||||
userSpecifiedSchema = Option(schema),
|
||||
className = "parquet",
|
||||
options = optParams)
|
||||
.resolveRelation()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,131 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi
|
||||
|
||||
import org.apache.spark.{Partition, TaskContext}
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.execution.datasources.PartitionedFile
|
||||
import org.apache.spark.sql.types.StructType
|
||||
import org.apache.spark.sql.vectorized.ColumnarBatch
|
||||
|
||||
class HoodieBootstrapRDD(@transient spark: SparkSession,
|
||||
dataReadFunction: PartitionedFile => Iterator[Any],
|
||||
skeletonReadFunction: PartitionedFile => Iterator[Any],
|
||||
regularReadFunction: PartitionedFile => Iterator[Any],
|
||||
dataSchema: StructType,
|
||||
skeletonSchema: StructType,
|
||||
requiredColumns: Array[String],
|
||||
tableState: HoodieBootstrapTableState)
|
||||
extends RDD[InternalRow](spark.sparkContext, Nil) {
|
||||
|
||||
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
|
||||
val bootstrapPartition = split.asInstanceOf[HoodieBootstrapPartition]
|
||||
|
||||
if (log.isDebugEnabled) {
|
||||
if (bootstrapPartition.split.skeletonFile.isDefined) {
|
||||
logDebug("Got Split => Index: " + bootstrapPartition.index + ", Data File: "
|
||||
+ bootstrapPartition.split.dataFile.filePath + ", Skeleton File: "
|
||||
+ bootstrapPartition.split.skeletonFile.get.filePath)
|
||||
} else {
|
||||
logDebug("Got Split => Index: " + bootstrapPartition.index + ", Data File: "
|
||||
+ bootstrapPartition.split.dataFile.filePath)
|
||||
}
|
||||
}
|
||||
|
||||
var partitionedFileIterator: Iterator[InternalRow] = null
|
||||
|
||||
if (bootstrapPartition.split.skeletonFile.isDefined) {
|
||||
// It is a bootstrap split. Check both skeleton and data files.
|
||||
if (dataSchema.isEmpty) {
|
||||
// No data column to fetch, hence fetch only from skeleton file
|
||||
partitionedFileIterator = read(bootstrapPartition.split.skeletonFile.get, skeletonReadFunction)
|
||||
} else if (skeletonSchema.isEmpty) {
|
||||
// No metadata column to fetch, hence fetch only from data file
|
||||
partitionedFileIterator = read(bootstrapPartition.split.dataFile, dataReadFunction)
|
||||
} else {
|
||||
// Fetch from both data and skeleton file, and merge
|
||||
val dataFileIterator = read(bootstrapPartition.split.dataFile, dataReadFunction)
|
||||
val skeletonFileIterator = read(bootstrapPartition.split.skeletonFile.get, skeletonReadFunction)
|
||||
partitionedFileIterator = merge(skeletonFileIterator, dataFileIterator)
|
||||
}
|
||||
} else {
|
||||
partitionedFileIterator = read(bootstrapPartition.split.dataFile, regularReadFunction)
|
||||
}
|
||||
partitionedFileIterator
|
||||
}
|
||||
|
||||
def merge(skeletonFileIterator: Iterator[InternalRow], dataFileIterator: Iterator[InternalRow])
|
||||
: Iterator[InternalRow] = {
|
||||
new Iterator[InternalRow] {
|
||||
override def hasNext: Boolean = dataFileIterator.hasNext && skeletonFileIterator.hasNext
|
||||
override def next(): InternalRow = {
|
||||
mergeInternalRow(skeletonFileIterator.next(), dataFileIterator.next())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def mergeInternalRow(skeletonRow: InternalRow, dataRow: InternalRow): InternalRow = {
|
||||
val skeletonArr = skeletonRow.copy().toSeq(skeletonSchema)
|
||||
val dataArr = dataRow.copy().toSeq(dataSchema)
|
||||
// We need to return it in the order requested
|
||||
val mergedArr = requiredColumns.map(col => {
|
||||
if (skeletonSchema.fieldNames.contains(col)) {
|
||||
val idx = skeletonSchema.fieldIndex(col)
|
||||
skeletonArr(idx)
|
||||
} else {
|
||||
val idx = dataSchema.fieldIndex(col)
|
||||
dataArr(idx)
|
||||
}
|
||||
})
|
||||
|
||||
logDebug("Merged data and skeleton values => " + mergedArr.mkString(","))
|
||||
val mergedRow = InternalRow.fromSeq(mergedArr)
|
||||
mergedRow
|
||||
}
|
||||
|
||||
def read(partitionedFile: PartitionedFile, readFileFunction: PartitionedFile => Iterator[Any])
|
||||
: Iterator[InternalRow] = {
|
||||
val fileIterator = readFileFunction(partitionedFile)
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
val rows = fileIterator.flatMap(_ match {
|
||||
case r: InternalRow => Seq(r)
|
||||
case b: ColumnarBatch => b.rowIterator().asScala
|
||||
})
|
||||
rows
|
||||
}
|
||||
|
||||
override protected def getPartitions: Array[Partition] = {
|
||||
tableState.files.zipWithIndex.map(file => {
|
||||
if (file._1.skeletonFile.isDefined) {
|
||||
logDebug("Forming partition with => Index: " + file._2 + ", Files: " + file._1.dataFile.filePath
|
||||
+ "," + file._1.skeletonFile.get.filePath)
|
||||
HoodieBootstrapPartition(file._2, file._1)
|
||||
} else {
|
||||
logDebug("Forming partition with => Index: " + file._2 + ", File: " + file._1.dataFile.filePath)
|
||||
HoodieBootstrapPartition(file._2, file._1)
|
||||
}
|
||||
}).toArray
|
||||
}
|
||||
}
|
||||
|
||||
case class HoodieBootstrapPartition(index: Int, split: HoodieBootstrapSplit) extends Partition
|
||||
@@ -0,0 +1,185 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi
|
||||
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hudi.common.model.HoodieBaseFile
|
||||
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
|
||||
import org.apache.hudi.exception.HoodieException
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.execution.datasources.PartitionedFile
|
||||
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
|
||||
import org.apache.spark.sql.{Row, SQLContext}
|
||||
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
|
||||
import org.apache.spark.sql.types.StructType
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
/**
|
||||
* This is Spark relation that can be used for querying metadata/fully bootstrapped query hoodie tables, as well as
|
||||
* non-bootstrapped tables. It implements PrunedFilteredScan interface in order to support column pruning and filter
|
||||
* push-down. For metadata bootstrapped files, if we query columns from both metadata and actual data then it will
|
||||
* perform a merge of both to return the result.
|
||||
*
|
||||
* Caveat: Filter push-down does not work when querying both metadata and actual data columns over metadata
|
||||
* bootstrapped files, because then the metadata file and data file can return different number of rows causing errors
|
||||
* merging.
|
||||
*
|
||||
* @param _sqlContext Spark SQL Context
|
||||
* @param userSchema User specified schema in the datasource query
|
||||
* @param globPaths Globbed paths obtained from the user provided path for querying
|
||||
* @param metaClient Hoodie table meta client
|
||||
* @param optParams DataSource options passed by the user
|
||||
*/
|
||||
class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,
|
||||
val userSchema: StructType,
|
||||
val globPaths: Seq[Path],
|
||||
val metaClient: HoodieTableMetaClient,
|
||||
val optParams: Map[String, String]) extends BaseRelation
|
||||
with PrunedFilteredScan with Logging {
|
||||
|
||||
val skeletonSchema: StructType = HoodieSparkUtils.getMetaSchema
|
||||
var dataSchema: StructType = _
|
||||
var fullSchema: StructType = _
|
||||
|
||||
val fileIndex: HoodieBootstrapFileIndex = buildFileIndex()
|
||||
|
||||
override def sqlContext: SQLContext = _sqlContext
|
||||
|
||||
override val needConversion: Boolean = false
|
||||
|
||||
override def schema: StructType = inferFullSchema()
|
||||
|
||||
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
|
||||
logInfo("Starting scan..")
|
||||
|
||||
// Compute splits
|
||||
val bootstrapSplits = fileIndex.files.map(hoodieBaseFile => {
|
||||
var skeletonFile: Option[PartitionedFile] = Option.empty
|
||||
var dataFile: PartitionedFile = null
|
||||
|
||||
if (hoodieBaseFile.getBootstrapBaseFile.isPresent) {
|
||||
skeletonFile = Option(PartitionedFile(InternalRow.empty, hoodieBaseFile.getPath, 0, hoodieBaseFile.getFileLen))
|
||||
dataFile = PartitionedFile(InternalRow.empty, hoodieBaseFile.getBootstrapBaseFile.get().getPath, 0,
|
||||
hoodieBaseFile.getBootstrapBaseFile.get().getFileLen)
|
||||
} else {
|
||||
dataFile = PartitionedFile(InternalRow.empty, hoodieBaseFile.getPath, 0, hoodieBaseFile.getFileLen)
|
||||
}
|
||||
HoodieBootstrapSplit(dataFile, skeletonFile)
|
||||
})
|
||||
val tableState = HoodieBootstrapTableState(bootstrapSplits)
|
||||
|
||||
// Get required schemas for column pruning
|
||||
var requiredDataSchema = StructType(Seq())
|
||||
var requiredSkeletonSchema = StructType(Seq())
|
||||
requiredColumns.foreach(col => {
|
||||
var field = dataSchema.find(_.name == col)
|
||||
if (field.isDefined) {
|
||||
requiredDataSchema = requiredDataSchema.add(field.get)
|
||||
} else {
|
||||
field = skeletonSchema.find(_.name == col)
|
||||
requiredSkeletonSchema = requiredSkeletonSchema.add(field.get)
|
||||
}
|
||||
})
|
||||
|
||||
// Prepare readers for reading data file and skeleton files
|
||||
val dataReadFunction = new ParquetFileFormat()
|
||||
.buildReaderWithPartitionValues(
|
||||
sparkSession = _sqlContext.sparkSession,
|
||||
dataSchema = dataSchema,
|
||||
partitionSchema = StructType(Seq.empty),
|
||||
requiredSchema = requiredDataSchema,
|
||||
filters = if (requiredSkeletonSchema.isEmpty) filters else Seq() ,
|
||||
options = Map.empty,
|
||||
hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
|
||||
)
|
||||
|
||||
val skeletonReadFunction = new ParquetFileFormat()
|
||||
.buildReaderWithPartitionValues(
|
||||
sparkSession = _sqlContext.sparkSession,
|
||||
dataSchema = skeletonSchema,
|
||||
partitionSchema = StructType(Seq.empty),
|
||||
requiredSchema = requiredSkeletonSchema,
|
||||
filters = if (requiredDataSchema.isEmpty) filters else Seq(),
|
||||
options = Map.empty,
|
||||
hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
|
||||
)
|
||||
|
||||
val regularReadFunction = new ParquetFileFormat()
|
||||
.buildReaderWithPartitionValues(
|
||||
sparkSession = _sqlContext.sparkSession,
|
||||
dataSchema = fullSchema,
|
||||
partitionSchema = StructType(Seq.empty),
|
||||
requiredSchema = StructType(requiredSkeletonSchema.fields ++ requiredDataSchema.fields),
|
||||
filters = filters,
|
||||
options = Map.empty,
|
||||
hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf())
|
||||
|
||||
val rdd = new HoodieBootstrapRDD(_sqlContext.sparkSession, dataReadFunction, skeletonReadFunction,
|
||||
regularReadFunction, requiredDataSchema, requiredSkeletonSchema, requiredColumns, tableState)
|
||||
rdd.asInstanceOf[RDD[Row]]
|
||||
}
|
||||
|
||||
def inferFullSchema(): StructType = {
|
||||
if (fullSchema == null) {
|
||||
logInfo("Inferring schema..")
|
||||
val schemaResolver = new TableSchemaResolver(metaClient)
|
||||
val tableSchema = schemaResolver.getTableAvroSchemaWithoutMetadataFields
|
||||
dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
|
||||
fullSchema = StructType(skeletonSchema.fields ++ dataSchema.fields)
|
||||
}
|
||||
fullSchema
|
||||
}
|
||||
|
||||
def buildFileIndex(): HoodieBootstrapFileIndex = {
|
||||
logInfo("Building file index..")
|
||||
val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(_sqlContext.sparkSession, globPaths)
|
||||
val fileStatuses = inMemoryFileIndex.allFiles()
|
||||
|
||||
if (fileStatuses.isEmpty) {
|
||||
throw new HoodieException("No files found for reading in user provided path.")
|
||||
}
|
||||
|
||||
val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitsTimeline
|
||||
.filterCompletedInstants, fileStatuses.toArray)
|
||||
val latestFiles: List[HoodieBaseFile] = fsView.getLatestBaseFiles.iterator().asScala.toList
|
||||
|
||||
if (log.isDebugEnabled) {
|
||||
latestFiles.foreach(file => {
|
||||
logDebug("Printing indexed files:")
|
||||
if (file.getBootstrapBaseFile.isPresent) {
|
||||
logDebug("Skeleton File: " + file.getPath + ", Data File: " + file.getBootstrapBaseFile.get().getPath)
|
||||
} else {
|
||||
logDebug("Regular Hoodie File: " + file.getPath)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
HoodieBootstrapFileIndex(latestFiles)
|
||||
}
|
||||
}
|
||||
|
||||
case class HoodieBootstrapFileIndex(files: List[HoodieBaseFile])
|
||||
|
||||
case class HoodieBootstrapTableState(files: List[HoodieBootstrapSplit])
|
||||
|
||||
case class HoodieBootstrapSplit(dataFile: PartitionedFile, skeletonFile: Option[PartitionedFile])
|
||||
@@ -34,11 +34,13 @@ import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType}
|
||||
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
|
||||
import org.apache.hudi.common.util.ReflectionUtils
|
||||
import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, BOOTSTRAP_INDEX_CLASS_PROP, DEFAULT_BOOTSTRAP_INDEX_CLASS}
|
||||
import org.apache.hudi.config.HoodieWriteConfig
|
||||
import org.apache.hudi.exception.HoodieException
|
||||
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
|
||||
import org.apache.hudi.sync.common.AbstractSyncTool
|
||||
import org.apache.log4j.LogManager
|
||||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
|
||||
@@ -49,12 +51,13 @@ import scala.collection.mutable.ListBuffer
|
||||
private[hudi] object HoodieSparkSqlWriter {
|
||||
|
||||
private val log = LogManager.getLogger(getClass)
|
||||
private var tableExists: Boolean = false
|
||||
|
||||
def write(sqlContext: SQLContext,
|
||||
mode: SaveMode,
|
||||
parameters: Map[String, String],
|
||||
df: DataFrame,
|
||||
hoodieTableConfig: Option[HoodieTableConfig] = Option.empty,
|
||||
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
|
||||
hoodieWriteClient: Option[HoodieWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty,
|
||||
asyncCompactionTriggerFn: Option[Function1[HoodieWriteClient[HoodieRecordPayload[Nothing]], Unit]] = Option.empty
|
||||
)
|
||||
@@ -90,32 +93,23 @@ private[hudi] object HoodieSparkSqlWriter {
|
||||
}
|
||||
|
||||
val jsc = new JavaSparkContext(sparkContext)
|
||||
val basePath = new Path(parameters("path"))
|
||||
val basePath = new Path(path.get)
|
||||
val instantTime = HoodieActiveTimeline.createNewInstantTime()
|
||||
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
|
||||
var exists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
|
||||
var tableConfig : HoodieTableConfig = if (exists) {
|
||||
hoodieTableConfig.getOrElse(
|
||||
new HoodieTableMetaClient(sparkContext.hadoopConfiguration, path.get).getTableConfig)
|
||||
} else {
|
||||
null
|
||||
}
|
||||
tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
|
||||
var tableConfig = getHoodieTableConfig(sparkContext, path.get, hoodieTableConfigOpt)
|
||||
|
||||
if (mode == SaveMode.Ignore && exists) {
|
||||
if (mode == SaveMode.Ignore && tableExists) {
|
||||
log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
|
||||
(false, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
|
||||
} else {
|
||||
if (exists && mode == SaveMode.Append) {
|
||||
val existingTableName = tableConfig.getTableName
|
||||
if (!existingTableName.equals(tblName)) {
|
||||
throw new HoodieException(s"hoodie table with name $existingTableName already exist at $basePath")
|
||||
}
|
||||
}
|
||||
// Handle various save modes
|
||||
handleSaveModes(mode, basePath, tableConfig, tblName, operation, fs)
|
||||
|
||||
val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
|
||||
if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
|
||||
// register classes & schemas
|
||||
val structName = s"${tblName}_record"
|
||||
val nameSpace = s"hoodie.${tblName}"
|
||||
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
|
||||
sparkContext.getConf.registerKryoClasses(
|
||||
Array(classOf[org.apache.avro.generic.GenericData],
|
||||
classOf[org.apache.avro.Schema]))
|
||||
@@ -134,23 +128,11 @@ private[hudi] object HoodieSparkSqlWriter {
|
||||
parameters(PAYLOAD_CLASS_OPT_KEY))
|
||||
}).toJavaRDD()
|
||||
|
||||
// Handle various save modes
|
||||
if (mode == SaveMode.ErrorIfExists && exists) {
|
||||
throw new HoodieException(s"hoodie table at $basePath already exists.")
|
||||
}
|
||||
|
||||
if (mode == SaveMode.Overwrite && exists) {
|
||||
log.warn(s"hoodie table at $basePath already exists. Deleting existing data & overwriting with new data.")
|
||||
fs.delete(basePath, true)
|
||||
exists = false
|
||||
}
|
||||
|
||||
// Create the table if not present
|
||||
if (!exists) {
|
||||
//FIXME(bootstrap): bootstrapIndexClass needs to be set when bootstrap index class is integrated.
|
||||
val tableMetaClient = HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration,
|
||||
path.get, HoodieTableType.valueOf(tableType),
|
||||
tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), null, null, null)
|
||||
if (!tableExists) {
|
||||
val tableMetaClient = HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get,
|
||||
HoodieTableType.valueOf(tableType), tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY),
|
||||
null.asInstanceOf[String])
|
||||
tableConfig = tableMetaClient.getTableConfig
|
||||
}
|
||||
|
||||
@@ -179,12 +161,6 @@ private[hudi] object HoodieSparkSqlWriter {
|
||||
val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)
|
||||
(writeStatuses, client)
|
||||
} else {
|
||||
|
||||
// Handle save modes
|
||||
if (mode != SaveMode.Append) {
|
||||
throw new HoodieException(s"Append is the only save mode applicable for $operation operation")
|
||||
}
|
||||
|
||||
val structName = s"${tblName}_record"
|
||||
val nameSpace = s"hoodie.${tblName}"
|
||||
sparkContext.getConf.registerKryoClasses(
|
||||
@@ -196,7 +172,7 @@ private[hudi] object HoodieSparkSqlWriter {
|
||||
val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
|
||||
val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD()
|
||||
|
||||
if (!exists) {
|
||||
if (!tableExists) {
|
||||
throw new HoodieException(s"hoodie table at $basePath does not exist")
|
||||
}
|
||||
|
||||
@@ -224,6 +200,56 @@ private[hudi] object HoodieSparkSqlWriter {
|
||||
}
|
||||
}
|
||||
|
||||
def bootstrap(sqlContext: SQLContext,
|
||||
mode: SaveMode,
|
||||
parameters: Map[String, String],
|
||||
df: DataFrame,
|
||||
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty): Boolean = {
|
||||
|
||||
val sparkContext = sqlContext.sparkContext
|
||||
val path = parameters.getOrElse("path", throw new HoodieException("'path' must be set."))
|
||||
val tableName = parameters.getOrElse(HoodieWriteConfig.TABLE_NAME,
|
||||
throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}' must be set."))
|
||||
val tableType = parameters(TABLE_TYPE_OPT_KEY)
|
||||
val bootstrapBasePath = parameters.getOrElse(BOOTSTRAP_BASE_PATH_PROP,
|
||||
throw new HoodieException(s"'${BOOTSTRAP_BASE_PATH_PROP}' is required for '${BOOTSTRAP_OPERATION_OPT_VAL}'" +
|
||||
" operation'"))
|
||||
val bootstrapIndexClass = parameters.getOrDefault(BOOTSTRAP_INDEX_CLASS_PROP, DEFAULT_BOOTSTRAP_INDEX_CLASS)
|
||||
|
||||
var schema: String = null
|
||||
if (df.schema.nonEmpty) {
|
||||
val (structName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
|
||||
schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, namespace).toString
|
||||
} else {
|
||||
schema = HoodieAvroUtils.getNullSchema.toString
|
||||
}
|
||||
|
||||
val basePath = new Path(path)
|
||||
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
|
||||
tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
|
||||
val tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
|
||||
|
||||
// Handle various save modes
|
||||
if (mode == SaveMode.Ignore && tableExists) {
|
||||
log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
|
||||
false
|
||||
} else {
|
||||
handleSaveModes(mode, basePath, tableConfig, tableName, BOOTSTRAP_OPERATION_OPT_VAL, fs)
|
||||
}
|
||||
|
||||
if (!tableExists) {
|
||||
HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration, path,
|
||||
HoodieTableType.valueOf(tableType), tableName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY),
|
||||
null, bootstrapIndexClass, bootstrapBasePath)
|
||||
}
|
||||
|
||||
val jsc = new JavaSparkContext(sqlContext.sparkContext)
|
||||
val writeClient = DataSourceUtils.createHoodieClient(jsc, schema, path, tableName, mapAsJavaMap(parameters))
|
||||
writeClient.bootstrap(org.apache.hudi.common.util.Option.empty())
|
||||
val metaSyncSuccess = metaSync(parameters, basePath, jsc.hadoopConfiguration)
|
||||
metaSyncSuccess
|
||||
}
|
||||
|
||||
/**
|
||||
* Add default options for unspecified write options keys.
|
||||
*
|
||||
@@ -267,6 +293,31 @@ private[hudi] object HoodieSparkSqlWriter {
|
||||
props
|
||||
}
|
||||
|
||||
private def handleSaveModes(mode: SaveMode, tablePath: Path, tableConfig: HoodieTableConfig, tableName: String,
|
||||
operation: String, fs: FileSystem): Unit = {
|
||||
if (mode == SaveMode.Append && tableExists) {
|
||||
val existingTableName = tableConfig.getTableName
|
||||
if (!existingTableName.equals(tableName)) {
|
||||
throw new HoodieException(s"hoodie table with name $existingTableName already exist at $tablePath")
|
||||
}
|
||||
}
|
||||
|
||||
if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
|
||||
if (mode == SaveMode.ErrorIfExists && tableExists) {
|
||||
throw new HoodieException(s"hoodie table at $tablePath already exists.")
|
||||
} else if (mode == SaveMode.Overwrite && tableExists) {
|
||||
log.warn(s"hoodie table at $tablePath already exists. Deleting existing data & overwriting with new data.")
|
||||
fs.delete(tablePath, true)
|
||||
tableExists = false
|
||||
}
|
||||
} else {
|
||||
// Delete Operation only supports Append mode
|
||||
if (mode != SaveMode.Append) {
|
||||
throw new HoodieException(s"Append is the only save mode applicable for $operation operation")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def syncHive(basePath: Path, fs: FileSystem, parameters: Map[String, String]): Boolean = {
|
||||
val hiveSyncConfig: HiveSyncConfig = buildSyncConfig(basePath, parameters)
|
||||
val hiveConf: HiveConf = new HiveConf()
|
||||
@@ -403,4 +454,15 @@ private[hudi] object HoodieSparkSqlWriter {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
private def getHoodieTableConfig(sparkContext: SparkContext,
|
||||
tablePath: String,
|
||||
hoodieTableConfigOpt: Option[HoodieTableConfig]): HoodieTableConfig = {
|
||||
if (tableExists) {
|
||||
hoodieTableConfigOpt.getOrElse(
|
||||
new HoodieTableMetaClient(sparkContext.hadoopConfiguration, tablePath).getTableConfig)
|
||||
} else {
|
||||
null
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,9 +17,17 @@
|
||||
|
||||
package org.apache.hudi
|
||||
|
||||
|
||||
import com.google.common.collect.Lists
|
||||
import org.apache.avro.Schema
|
||||
import org.apache.hadoop.fs.GlobPattern
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hudi.avro.HoodieAvroUtils
|
||||
import org.apache.hudi.common.bootstrap.index.BootstrapIndex
|
||||
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieTableType}
|
||||
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline
|
||||
import org.apache.hudi.common.util.ParquetUtils
|
||||
import org.apache.hudi.config.HoodieWriteConfig
|
||||
import org.apache.hudi.exception.HoodieException
|
||||
import org.apache.hudi.table.HoodieTable
|
||||
@@ -28,8 +36,8 @@ import org.apache.hadoop.fs.GlobPattern
|
||||
import org.apache.log4j.LogManager
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
|
||||
import org.apache.spark.sql.types.StructType
|
||||
import org.apache.spark.sql.{Row, SQLContext}
|
||||
import org.apache.spark.sql.types.{StringType, StructField, StructType}
|
||||
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
import scala.collection.mutable
|
||||
@@ -47,8 +55,10 @@ class IncrementalRelation(val sqlContext: SQLContext,
|
||||
|
||||
private val log = LogManager.getLogger(classOf[IncrementalRelation])
|
||||
|
||||
private val metaClient =
|
||||
new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true)
|
||||
|
||||
val skeletonSchema: StructType = HoodieSparkUtils.getMetaSchema
|
||||
private val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true)
|
||||
|
||||
// MOR tables not supported yet
|
||||
if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
|
||||
throw new HoodieException("Incremental view not implemented yet, for merge-on-read tables")
|
||||
@@ -72,13 +82,16 @@ class IncrementalRelation(val sqlContext: SQLContext,
|
||||
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, lastInstant.getTimestamp))
|
||||
.getInstants.iterator().toList
|
||||
|
||||
// use schema from latest metadata, if not present, read schema from the data file
|
||||
private val latestSchema = {
|
||||
val schemaUtil = new TableSchemaResolver(metaClient)
|
||||
val tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields);
|
||||
AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
|
||||
// use schema from a file produced in the latest instant
|
||||
val latestSchema: StructType = {
|
||||
log.info("Inferring schema..")
|
||||
val schemaResolver = new TableSchemaResolver(metaClient)
|
||||
val tableSchema = schemaResolver.getTableAvroSchemaWithoutMetadataFields
|
||||
val dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
|
||||
StructType(skeletonSchema.fields ++ dataSchema.fields)
|
||||
}
|
||||
|
||||
|
||||
private val filters = {
|
||||
if (optParams.contains(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY)) {
|
||||
val filterStr = optParams.getOrElse(
|
||||
@@ -93,36 +106,69 @@ class IncrementalRelation(val sqlContext: SQLContext,
|
||||
override def schema: StructType = latestSchema
|
||||
|
||||
override def buildScan(): RDD[Row] = {
|
||||
val fileIdToFullPath = mutable.HashMap[String, String]()
|
||||
val regularFileIdToFullPath = mutable.HashMap[String, String]()
|
||||
var metaBootstrapFileIdToFullPath = mutable.HashMap[String, String]()
|
||||
|
||||
for (commit <- commitsToReturn) {
|
||||
val metadata: HoodieCommitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit)
|
||||
.get, classOf[HoodieCommitMetadata])
|
||||
fileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap
|
||||
|
||||
if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == commit.getTimestamp) {
|
||||
metaBootstrapFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap
|
||||
} else {
|
||||
regularFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap
|
||||
}
|
||||
}
|
||||
|
||||
if (metaBootstrapFileIdToFullPath.nonEmpty) {
|
||||
// filer out meta bootstrap files that have had more commits since metadata bootstrap
|
||||
metaBootstrapFileIdToFullPath = metaBootstrapFileIdToFullPath
|
||||
.filterNot(fileIdFullPath => regularFileIdToFullPath.contains(fileIdFullPath._1))
|
||||
}
|
||||
|
||||
val pathGlobPattern = optParams.getOrElse(
|
||||
DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY,
|
||||
DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)
|
||||
val filteredFullPath = if(!pathGlobPattern.equals(DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)) {
|
||||
val globMatcher = new GlobPattern("*" + pathGlobPattern)
|
||||
fileIdToFullPath.filter(p => globMatcher.matches(p._2))
|
||||
} else {
|
||||
fileIdToFullPath
|
||||
val (filteredRegularFullPaths, filteredMetaBootstrapFullPaths) = {
|
||||
if(!pathGlobPattern.equals(DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)) {
|
||||
val globMatcher = new GlobPattern("*" + pathGlobPattern)
|
||||
(regularFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values,
|
||||
metaBootstrapFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values)
|
||||
} else {
|
||||
(regularFileIdToFullPath.values, metaBootstrapFileIdToFullPath.values)
|
||||
}
|
||||
}
|
||||
// unset the path filter, otherwise if end_instant_time is not the latest instant, path filter set for RO view
|
||||
// will filter out all the files incorrectly.
|
||||
sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")
|
||||
val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
|
||||
if (filteredFullPath.isEmpty) {
|
||||
if (filteredRegularFullPaths.isEmpty && filteredMetaBootstrapFullPaths.isEmpty) {
|
||||
sqlContext.sparkContext.emptyRDD[Row]
|
||||
} else {
|
||||
log.info("Additional Filters to be applied to incremental source are :" + filters)
|
||||
filters.foldLeft(sqlContext.read.options(sOpts)
|
||||
.schema(latestSchema)
|
||||
.parquet(filteredFullPath.values.toList: _*)
|
||||
.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp))
|
||||
.filter(String.format("%s <= '%s'",
|
||||
HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp)))((e, f) => e.filter(f))
|
||||
.toDF().rdd
|
||||
|
||||
var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], latestSchema)
|
||||
|
||||
if (metaBootstrapFileIdToFullPath.nonEmpty) {
|
||||
df = sqlContext.sparkSession.read
|
||||
.format("hudi")
|
||||
.schema(latestSchema)
|
||||
.option(DataSourceReadOptions.READ_PATHS_OPT_KEY, filteredMetaBootstrapFullPaths.mkString(","))
|
||||
.load()
|
||||
}
|
||||
|
||||
if (regularFileIdToFullPath.nonEmpty)
|
||||
{
|
||||
df = df.union(sqlContext.read.options(sOpts)
|
||||
.schema(latestSchema)
|
||||
.parquet(filteredRegularFullPaths.toList: _*)
|
||||
.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
|
||||
commitsToReturn.head.getTimestamp))
|
||||
.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
|
||||
commitsToReturn.last.getTimestamp)))
|
||||
}
|
||||
|
||||
filters.foldLeft(df)((e, f) => e.filter(f)).rdd
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,630 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.functional
|
||||
|
||||
import java.time.Instant
|
||||
import java.util.Collections
|
||||
|
||||
import collection.JavaConverters._
|
||||
import org.apache.hadoop.fs.FileSystem
|
||||
import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider
|
||||
import org.apache.hudi.client.TestBootstrap
|
||||
import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector
|
||||
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
|
||||
import org.apache.hudi.common.fs.FSUtils
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline
|
||||
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieCompactionConfig, HoodieWriteConfig}
|
||||
import org.apache.hudi.keygen.SimpleKeyGenerator
|
||||
import org.apache.spark.api.java.JavaSparkContext
|
||||
import org.apache.spark.sql.functions.{col, lit}
|
||||
import org.apache.spark.sql.{SaveMode, SparkSession}
|
||||
import org.junit.jupiter.api.Assertions.assertEquals
|
||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
|
||||
import org.junit.jupiter.api.io.TempDir
|
||||
|
||||
class TestDataSourceForBootstrap {
|
||||
|
||||
var spark: SparkSession = _
|
||||
val commonOpts = Map(
|
||||
HoodieWriteConfig.INSERT_PARALLELISM -> "4",
|
||||
HoodieWriteConfig.UPSERT_PARALLELISM -> "4",
|
||||
HoodieWriteConfig.DELETE_PARALLELISM -> "4",
|
||||
HoodieWriteConfig.BULKINSERT_PARALLELISM -> "4",
|
||||
HoodieWriteConfig.FINALIZE_WRITE_PARALLELISM -> "4",
|
||||
HoodieBootstrapConfig.BOOTSTRAP_PARALLELISM -> "4",
|
||||
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
|
||||
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
|
||||
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp",
|
||||
HoodieWriteConfig.TABLE_NAME -> "hoodie_test"
|
||||
)
|
||||
var basePath: String = _
|
||||
var srcPath: String = _
|
||||
var fs: FileSystem = _
|
||||
|
||||
@BeforeEach def initialize(@TempDir tempDir: java.nio.file.Path) {
|
||||
spark = SparkSession.builder
|
||||
.appName("Hoodie Datasource test")
|
||||
.master("local[2]")
|
||||
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
||||
.getOrCreate
|
||||
basePath = tempDir.toAbsolutePath.toString + "/base"
|
||||
srcPath = tempDir.toAbsolutePath.toString + "/src"
|
||||
fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
|
||||
}
|
||||
|
||||
@AfterEach def tearDown(): Unit ={
|
||||
// Close spark session
|
||||
if (spark != null) {
|
||||
spark.stop()
|
||||
spark = null
|
||||
}
|
||||
|
||||
// Close file system
|
||||
if (fs != null) {
|
||||
fs.close()
|
||||
fs = null
|
||||
}
|
||||
}
|
||||
|
||||
@Test def testMetadataBootstrapCOWNonPartitioned(): Unit = {
|
||||
val timestamp = Instant.now.toEpochMilli
|
||||
val numRecords = 100
|
||||
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
|
||||
|
||||
val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, Collections.emptyList(), jsc,
|
||||
spark.sqlContext)
|
||||
|
||||
// Write source data non-partitioned
|
||||
sourceDF.write
|
||||
.format("parquet")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(srcPath)
|
||||
|
||||
// Perform bootstrap
|
||||
val bootstrapDF = spark.emptyDataFrame
|
||||
bootstrapDF.write
|
||||
.format("hudi")
|
||||
.options(commonOpts)
|
||||
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
|
||||
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
|
||||
.option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
|
||||
.option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(basePath)
|
||||
|
||||
val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||
assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1)
|
||||
|
||||
// Read bootstrapped table and verify count
|
||||
var hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
|
||||
assertEquals(numRecords, hoodieROViewDF1.count())
|
||||
|
||||
// Perform upsert
|
||||
val updateTimestamp = Instant.now.toEpochMilli
|
||||
val numRecordsUpdate = 10
|
||||
val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate,
|
||||
Collections.emptyList(), jsc, spark.sqlContext)
|
||||
|
||||
updateDF.write
|
||||
.format("hudi")
|
||||
.options(commonOpts)
|
||||
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
|
||||
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
|
||||
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
|
||||
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
|
||||
.mode(SaveMode.Append)
|
||||
.save(basePath)
|
||||
|
||||
val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||
assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size())
|
||||
|
||||
// Read table after upsert and verify count
|
||||
hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
|
||||
assertEquals(numRecords, hoodieROViewDF1.count())
|
||||
assertEquals(numRecordsUpdate, hoodieROViewDF1.filter(s"timestamp == $updateTimestamp").count())
|
||||
|
||||
// incrementally pull only changes in the bootstrap commit, which would pull all the initial records written
|
||||
// during bootstrap
|
||||
val hoodieIncViewDF1 = spark.read.format("hudi")
|
||||
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
|
||||
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
|
||||
.option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commitInstantTime1)
|
||||
.load(basePath)
|
||||
|
||||
assertEquals(numRecords, hoodieIncViewDF1.count())
|
||||
var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
|
||||
assertEquals(1, countsPerCommit.length)
|
||||
assertEquals(commitInstantTime1, countsPerCommit(0).get(0))
|
||||
|
||||
// incrementally pull only changes in the latest commit, which would pull only the updated records in the
|
||||
// latest commit
|
||||
val hoodieIncViewDF2 = spark.read.format("hudi")
|
||||
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
|
||||
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
|
||||
.load(basePath);
|
||||
|
||||
assertEquals(numRecordsUpdate, hoodieIncViewDF2.count())
|
||||
countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
|
||||
assertEquals(1, countsPerCommit.length)
|
||||
assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
|
||||
}
|
||||
|
||||
@Test def testMetadataBootstrapCOWHiveStylePartitioned(): Unit = {
|
||||
val timestamp = Instant.now.toEpochMilli
|
||||
val numRecords = 100
|
||||
val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03")
|
||||
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
|
||||
|
||||
val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc,
|
||||
spark.sqlContext)
|
||||
|
||||
// Write source data hive style partitioned
|
||||
sourceDF.write
|
||||
.partitionBy("datestr")
|
||||
.format("parquet")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(srcPath)
|
||||
|
||||
// Perform bootstrap
|
||||
val bootstrapDF = spark.emptyDataFrame
|
||||
bootstrapDF.write
|
||||
.format("hudi")
|
||||
.options(commonOpts)
|
||||
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
|
||||
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
|
||||
.option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
|
||||
.option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, "org.apache.hudi.keygen.SimpleKeyGenerator")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(basePath)
|
||||
|
||||
val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||
assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1)
|
||||
|
||||
// Read bootstrapped table and verify count
|
||||
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
|
||||
assertEquals(numRecords, hoodieROViewDF1.count())
|
||||
|
||||
// Perform upsert
|
||||
val updateTimestamp = Instant.now.toEpochMilli
|
||||
val numRecordsUpdate = 10
|
||||
val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava,
|
||||
jsc, spark.sqlContext)
|
||||
|
||||
updateDF.write
|
||||
.format("hudi")
|
||||
.options(commonOpts)
|
||||
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
|
||||
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
|
||||
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
|
||||
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
|
||||
// Required because source data is hive style partitioned
|
||||
.option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true")
|
||||
.mode(SaveMode.Append)
|
||||
.save(basePath)
|
||||
|
||||
val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||
assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size())
|
||||
|
||||
// Read table after upsert and verify count
|
||||
val hoodieROViewDF2 = spark.read.format("hudi").load(basePath + "/*")
|
||||
assertEquals(numRecords, hoodieROViewDF2.count())
|
||||
assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count())
|
||||
|
||||
// incrementally pull only changes in the bootstrap commit, which would pull all the initial records written
|
||||
// during bootstrap
|
||||
val hoodieIncViewDF1 = spark.read.format("hudi")
|
||||
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
|
||||
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
|
||||
.option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commitInstantTime1)
|
||||
.load(basePath)
|
||||
|
||||
assertEquals(numRecords, hoodieIncViewDF1.count())
|
||||
var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
|
||||
assertEquals(1, countsPerCommit.length)
|
||||
assertEquals(commitInstantTime1, countsPerCommit(0).get(0))
|
||||
|
||||
// incrementally pull only changes in the latest commit, which would pull only the updated records in the
|
||||
// latest commit
|
||||
val hoodieIncViewDF2 = spark.read.format("hudi")
|
||||
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
|
||||
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
|
||||
.load(basePath);
|
||||
|
||||
assertEquals(numRecordsUpdate, hoodieIncViewDF2.count())
|
||||
countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
|
||||
assertEquals(1, countsPerCommit.length)
|
||||
assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
|
||||
|
||||
// pull the latest commit within certain partitions
|
||||
val hoodieIncViewDF3 = spark.read.format("hudi")
|
||||
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
|
||||
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
|
||||
.option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/datestr=2020-04-02/*")
|
||||
.load(basePath)
|
||||
|
||||
assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2020-04-02")).count(),
|
||||
hoodieIncViewDF3.count())
|
||||
}
|
||||
|
||||
@Test def testMetadataBootstrapCOWPartitioned(): Unit = {
|
||||
val timestamp = Instant.now.toEpochMilli
|
||||
val numRecords = 100
|
||||
val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03")
|
||||
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
|
||||
|
||||
var sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc,
|
||||
spark.sqlContext)
|
||||
|
||||
// Writing data for each partition instead of using partitionBy to avoid hive-style partitioning and hence
|
||||
// have partitioned columns stored in the data file
|
||||
partitionPaths.foreach(partitionPath => {
|
||||
sourceDF
|
||||
.filter(sourceDF("datestr").equalTo(lit(partitionPath)))
|
||||
.write
|
||||
.format("parquet")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(srcPath + "/" + partitionPath)
|
||||
})
|
||||
|
||||
// Perform bootstrap
|
||||
val bootstrapDF = spark.emptyDataFrame
|
||||
bootstrapDF.write
|
||||
.format("hudi")
|
||||
.options(commonOpts)
|
||||
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
|
||||
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
|
||||
.option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
|
||||
.option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, "org.apache.hudi.keygen.SimpleKeyGenerator")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(basePath)
|
||||
|
||||
val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||
assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1)
|
||||
|
||||
// Read bootstrapped table and verify count
|
||||
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
|
||||
assertEquals(numRecords, hoodieROViewDF1.count())
|
||||
|
||||
// Perform upsert
|
||||
val updateTimestamp = Instant.now.toEpochMilli
|
||||
val numRecordsUpdate = 10
|
||||
var updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava,
|
||||
jsc, spark.sqlContext)
|
||||
|
||||
updateDF.write
|
||||
.format("hudi")
|
||||
.options(commonOpts)
|
||||
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
|
||||
.option("hoodie.upsert.shuffle.parallelism", "4")
|
||||
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
|
||||
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
|
||||
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
|
||||
.mode(SaveMode.Append)
|
||||
.save(basePath)
|
||||
|
||||
val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||
assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size())
|
||||
|
||||
// Read table after upsert and verify count
|
||||
val hoodieROViewDF2 = spark.read.format("hudi").load(basePath + "/*")
|
||||
assertEquals(numRecords, hoodieROViewDF2.count())
|
||||
assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count())
|
||||
|
||||
// incrementally pull only changes in the bootstrap commit, which would pull all the initial records written
|
||||
// during bootstrap
|
||||
val hoodieIncViewDF1 = spark.read.format("hudi")
|
||||
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
|
||||
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
|
||||
.option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commitInstantTime1)
|
||||
.load(basePath)
|
||||
|
||||
assertEquals(numRecords, hoodieIncViewDF1.count())
|
||||
var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
|
||||
assertEquals(1, countsPerCommit.length)
|
||||
assertEquals(commitInstantTime1, countsPerCommit(0).get(0))
|
||||
|
||||
// incrementally pull only changes in the latest commit, which would pull only the updated records in the
|
||||
// latest commit
|
||||
val hoodieIncViewDF2 = spark.read.format("hudi")
|
||||
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
|
||||
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
|
||||
.load(basePath);
|
||||
|
||||
assertEquals(numRecordsUpdate, hoodieIncViewDF2.count())
|
||||
countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
|
||||
assertEquals(1, countsPerCommit.length)
|
||||
assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
|
||||
|
||||
// pull the latest commit within certain partitions
|
||||
val hoodieIncViewDF3 = spark.read.format("hudi")
|
||||
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
|
||||
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
|
||||
.option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/2020-04-02/*")
|
||||
.load(basePath)
|
||||
|
||||
assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2020-04-02")).count(),
|
||||
hoodieIncViewDF3.count())
|
||||
}
|
||||
|
||||
@Test def testMetadataBootstrapMORPartitionedInlineCompactionOn(): Unit = {
|
||||
val timestamp = Instant.now.toEpochMilli
|
||||
val numRecords = 100
|
||||
val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03")
|
||||
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
|
||||
|
||||
val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc,
|
||||
spark.sqlContext)
|
||||
|
||||
// Writing data for each partition instead of using partitionBy to avoid hive-style partitioning and hence
|
||||
// have partitioned columns stored in the data file
|
||||
partitionPaths.foreach(partitionPath => {
|
||||
sourceDF
|
||||
.filter(sourceDF("datestr").equalTo(lit(partitionPath)))
|
||||
.write
|
||||
.format("parquet")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(srcPath + "/" + partitionPath)
|
||||
})
|
||||
|
||||
// Perform bootstrap
|
||||
val bootstrapDF = spark.emptyDataFrame
|
||||
bootstrapDF.write
|
||||
.format("hudi")
|
||||
.options(commonOpts)
|
||||
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
|
||||
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
|
||||
.option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
|
||||
.option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, "org.apache.hudi.keygen.SimpleKeyGenerator")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(basePath)
|
||||
|
||||
val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||
assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1)
|
||||
|
||||
// Read bootstrapped table and verify count
|
||||
val hoodieROViewDF1 = spark.read.format("hudi")
|
||||
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
|
||||
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
|
||||
.load(basePath + "/*")
|
||||
assertEquals(numRecords, hoodieROViewDF1.count())
|
||||
|
||||
// Perform upsert
|
||||
val updateTimestamp = Instant.now.toEpochMilli
|
||||
val numRecordsUpdate = 10
|
||||
val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava,
|
||||
jsc, spark.sqlContext)
|
||||
|
||||
updateDF.write
|
||||
.format("hudi")
|
||||
.options(commonOpts)
|
||||
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
|
||||
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
|
||||
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
|
||||
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
|
||||
.option(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true")
|
||||
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, "1")
|
||||
.mode(SaveMode.Append)
|
||||
.save(basePath)
|
||||
|
||||
// Expect 2 new commits since meta bootstrap - delta commit and compaction commit (due to inline compaction)
|
||||
assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size())
|
||||
|
||||
// Read table after upsert and verify count. Since we have inline compaction enabled the RO view will have
|
||||
// the updated rows.
|
||||
val hoodieROViewDF2 = spark.read.format("hudi")
|
||||
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
|
||||
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
|
||||
.load(basePath + "/*")
|
||||
assertEquals(numRecords, hoodieROViewDF2.count())
|
||||
assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count())
|
||||
}
|
||||
|
||||
@Test def testMetadataBootstrapMORPartitioned(): Unit = {
|
||||
val timestamp = Instant.now.toEpochMilli
|
||||
val numRecords = 100
|
||||
val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03")
|
||||
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
|
||||
|
||||
val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc,
|
||||
spark.sqlContext)
|
||||
|
||||
// Writing data for each partition instead of using partitionBy to avoid hive-style partitioning and hence
|
||||
// have partitioned columns stored in the data file
|
||||
partitionPaths.foreach(partitionPath => {
|
||||
sourceDF
|
||||
.filter(sourceDF("datestr").equalTo(lit(partitionPath)))
|
||||
.write
|
||||
.format("parquet")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(srcPath + "/" + partitionPath)
|
||||
})
|
||||
|
||||
// Perform bootstrap
|
||||
val bootstrapDF = spark.emptyDataFrame
|
||||
bootstrapDF.write
|
||||
.format("hudi")
|
||||
.options(commonOpts)
|
||||
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
|
||||
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
|
||||
.option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
|
||||
.option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, "org.apache.hudi.keygen.SimpleKeyGenerator")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(basePath)
|
||||
|
||||
val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||
assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1)
|
||||
|
||||
// Read bootstrapped table and verify count
|
||||
val hoodieROViewDF1 = spark.read.format("hudi")
|
||||
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
|
||||
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
|
||||
.load(basePath + "/*")
|
||||
assertEquals(numRecords, hoodieROViewDF1.count())
|
||||
|
||||
// Perform upsert
|
||||
val updateTimestamp = Instant.now.toEpochMilli
|
||||
val numRecordsUpdate = 10
|
||||
val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate,
|
||||
partitionPaths.asJava, jsc, spark.sqlContext)
|
||||
|
||||
updateDF.write
|
||||
.format("hudi")
|
||||
.options(commonOpts)
|
||||
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
|
||||
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
|
||||
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
|
||||
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
|
||||
.mode(SaveMode.Append)
|
||||
.save(basePath)
|
||||
|
||||
// Expect 1 new commit since meta bootstrap - delta commit (because inline compaction is off)
|
||||
assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size())
|
||||
|
||||
// Read table after upsert and verify count. Since we have inline compaction off the RO view will have
|
||||
// no updated rows.
|
||||
val hoodieROViewDF2 = spark.read.format("hudi")
|
||||
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
|
||||
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
|
||||
.load(basePath + "/*")
|
||||
assertEquals(numRecords, hoodieROViewDF2.count())
|
||||
assertEquals(0, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count())
|
||||
}
|
||||
|
||||
@Test def testFullBootstrapCOWPartitioned(): Unit = {
|
||||
val timestamp = Instant.now.toEpochMilli
|
||||
val numRecords = 100
|
||||
val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03")
|
||||
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
|
||||
|
||||
val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc,
|
||||
spark.sqlContext)
|
||||
|
||||
// Writing data for each partition instead of using partitionBy to avoid hive-style partitioning and hence
|
||||
// have partitioned columns stored in the data file
|
||||
partitionPaths.foreach(partitionPath => {
|
||||
sourceDF
|
||||
.filter(sourceDF("datestr").equalTo(lit(partitionPath)))
|
||||
.write
|
||||
.format("parquet")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(srcPath + "/" + partitionPath)
|
||||
})
|
||||
|
||||
// Perform bootstrap
|
||||
val bootstrapDF = spark.emptyDataFrame
|
||||
bootstrapDF.write
|
||||
.format("hudi")
|
||||
.options(commonOpts)
|
||||
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
|
||||
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
|
||||
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
|
||||
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
|
||||
.option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
|
||||
.option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, classOf[SimpleKeyGenerator].getName)
|
||||
.option(HoodieBootstrapConfig.BOOTSTRAP_MODE_SELECTOR, classOf[FullRecordBootstrapModeSelector].getName)
|
||||
.option(HoodieBootstrapConfig.FULL_BOOTSTRAP_INPUT_PROVIDER, classOf[SparkParquetBootstrapDataProvider].getName)
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(basePath)
|
||||
|
||||
val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||
assertEquals(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS, commitInstantTime1)
|
||||
|
||||
// Read bootstrapped table and verify count
|
||||
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
|
||||
assertEquals(numRecords, hoodieROViewDF1.count())
|
||||
|
||||
// Perform upsert
|
||||
val updateTimestamp = Instant.now.toEpochMilli
|
||||
val numRecordsUpdate = 10
|
||||
val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava,
|
||||
jsc, spark.sqlContext)
|
||||
|
||||
updateDF.write
|
||||
.format("hudi")
|
||||
.options(commonOpts)
|
||||
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
|
||||
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
|
||||
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
|
||||
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
|
||||
.mode(SaveMode.Append)
|
||||
.save(basePath)
|
||||
|
||||
val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||
assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size())
|
||||
|
||||
// Read table after upsert and verify count
|
||||
val hoodieROViewDF2 = spark.read.format("hudi").load(basePath + "/*")
|
||||
assertEquals(numRecords, hoodieROViewDF2.count())
|
||||
assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count())
|
||||
|
||||
// incrementally pull only changes in the bootstrap commit, which would pull all the initial records written
|
||||
// during bootstrap
|
||||
val hoodieIncViewDF1 = spark.read.format("hudi")
|
||||
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
|
||||
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
|
||||
.option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commitInstantTime1)
|
||||
.load(basePath)
|
||||
|
||||
assertEquals(numRecords, hoodieIncViewDF1.count())
|
||||
var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
|
||||
assertEquals(1, countsPerCommit.length)
|
||||
assertEquals(commitInstantTime1, countsPerCommit(0).get(0))
|
||||
|
||||
// incrementally pull only changes in the latest commit, which would pull only the updated records in the
|
||||
// latest commit
|
||||
val hoodieIncViewDF2 = spark.read.format("hudi")
|
||||
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
|
||||
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
|
||||
.load(basePath);
|
||||
|
||||
assertEquals(numRecordsUpdate, hoodieIncViewDF2.count())
|
||||
countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
|
||||
assertEquals(1, countsPerCommit.length)
|
||||
assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
|
||||
|
||||
// pull the latest commit within certain partitions
|
||||
val hoodieIncViewDF3 = spark.read.format("hudi")
|
||||
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
|
||||
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
|
||||
.option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/2020-04-02/*")
|
||||
.load(basePath)
|
||||
|
||||
assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2020-04-02")).count(),
|
||||
hoodieIncViewDF3.count())
|
||||
}
|
||||
}
|
||||
@@ -581,8 +581,10 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
|
||||
// Perform bootstrap with tableBasePath as source
|
||||
String bootstrapSourcePath = dfsBasePath + "/src_bootstrapped";
|
||||
sqlContext.read().format("org.apache.hudi").load(tableBasePath + "/*/*.parquet").write().format("parquet")
|
||||
.save(bootstrapSourcePath);
|
||||
Dataset<Row> sourceDf = sqlContext.read()
|
||||
.format("org.apache.hudi")
|
||||
.load(tableBasePath + "/*/*.parquet");
|
||||
sourceDf.write().format("parquet").save(bootstrapSourcePath);
|
||||
|
||||
String newDatasetBasePath = dfsBasePath + "/test_dataset_bootstrapped";
|
||||
cfg.runBootstrap = true;
|
||||
@@ -600,12 +602,11 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
assertEquals(1950, sqlContext.sql("select distinct _hoodie_record_key from bootstrapped").count());
|
||||
|
||||
StructField[] fields = res.schema().fields();
|
||||
assertEquals(5, fields.length);
|
||||
assertEquals(HoodieRecord.COMMIT_TIME_METADATA_FIELD, fields[0].name());
|
||||
assertEquals(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, fields[1].name());
|
||||
assertEquals(HoodieRecord.RECORD_KEY_METADATA_FIELD, fields[2].name());
|
||||
assertEquals(HoodieRecord.PARTITION_PATH_METADATA_FIELD, fields[3].name());
|
||||
assertEquals(HoodieRecord.FILENAME_METADATA_FIELD, fields[4].name());
|
||||
List<String> fieldNames = Arrays.asList(res.schema().fieldNames());
|
||||
List<String> expectedFieldNames = Arrays.asList(sourceDf.schema().fieldNames());
|
||||
assertEquals(expectedFieldNames.size(), fields.length);
|
||||
assertTrue(fieldNames.containsAll(HoodieRecord.HOODIE_META_COLUMNS));
|
||||
assertTrue(fieldNames.containsAll(expectedFieldNames));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
Reference in New Issue
Block a user