1
0

[HUDI-3504] Support bootstrap command based on Call Produce Command (#5977)

This commit is contained in:
ForwardXu
2022-06-27 13:06:50 +08:00
committed by GitHub
parent 8f4e2a189e
commit 26c967bac6
7 changed files with 740 additions and 0 deletions

View File

@@ -0,0 +1,254 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.cli;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
/**
* Performs bootstrap from a non-hudi source.
*/
public class BootstrapExecutorUtils implements Serializable {
private static final Logger LOG = LogManager.getLogger(BootstrapExecutorUtils.class);
/**
* Config.
*/
private final Config cfg;
/**
* Spark context.
*/
private final transient JavaSparkContext jssc;
/**
* Bag of properties with source, hoodie client, key generator etc.
*/
private final TypedProperties props;
/**
* Hadoop Configuration.
*/
private final Configuration configuration;
/**
* Bootstrap Configuration.
*/
private final HoodieWriteConfig bootstrapConfig;
/**
* FileSystem instance.
*/
private final transient FileSystem fs;
private final String bootstrapBasePath;
public static final String CHECKPOINT_KEY = HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY;
/**
* Bootstrap Executor.
*
* @param cfg DeltaStreamer Config
* @param jssc Java Spark Context
* @param fs File System
* @param properties Bootstrap Writer Properties
* @throws IOException
*/
public BootstrapExecutorUtils(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf,
TypedProperties properties) throws IOException {
this.cfg = cfg;
this.jssc = jssc;
this.fs = fs;
this.configuration = conf;
this.props = properties;
ValidationUtils.checkArgument(properties.containsKey(HoodieTableConfig.BOOTSTRAP_BASE_PATH
.key()),
HoodieTableConfig.BOOTSTRAP_BASE_PATH.key() + " must be specified.");
this.bootstrapBasePath = properties.getString(HoodieTableConfig.BOOTSTRAP_BASE_PATH.key());
// Add more defaults if full bootstrap requested
this.props.putIfAbsent(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key(),
DataSourceWriteOptions.PAYLOAD_CLASS_NAME().defaultValue());
/**
* Schema provider that supplies the command for reading the input and writing out the target table.
*/
SchemaProvider schemaProvider = createSchemaProvider(cfg.schemaProviderClass, props, jssc);
HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder().withPath(cfg.basePath)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build())
.forTable(cfg.tableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withAutoCommit(true)
.withProps(props);
if (null != schemaProvider && null != schemaProvider.getTargetSchema()) {
builder = builder.withSchema(schemaProvider.getTargetSchema().toString());
}
this.bootstrapConfig = builder.build();
LOG.info("Created bootstrap executor with configs : " + bootstrapConfig.getProps());
}
public static SchemaProvider createSchemaProvider(String schemaProviderClass, TypedProperties cfg,
JavaSparkContext jssc) throws IOException {
try {
return StringUtils.isNullOrEmpty(schemaProviderClass) ? null
: (SchemaProvider) ReflectionUtils.loadClass(schemaProviderClass, cfg, jssc);
} catch (Throwable e) {
throw new IOException("Could not load schema provider class " + schemaProviderClass, e);
}
}
/**
* Executes Bootstrap.
*/
public void execute() throws IOException {
initializeTable();
try (SparkRDDWriteClient bootstrapClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(jssc), bootstrapConfig)) {
HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
checkpointCommitMetadata.put(CHECKPOINT_KEY, Config.checkpoint);
bootstrapClient.bootstrap(Option.of(checkpointCommitMetadata));
syncHive();
}
}
/**
* Sync to Hive.
*/
private void syncHive() {
if (cfg.enableHiveSync) {
TypedProperties metaProps = new TypedProperties();
metaProps.putAll(props);
metaProps.put(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), cfg.basePath);
metaProps.put(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.key(), cfg.baseFileFormat);
if (props.getBoolean(HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.key(), HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.defaultValue())) {
metaProps.put(HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC_SPEC.key(), HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key()),
props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())));
}
new HiveSyncTool(metaProps, configuration, fs).syncHoodieTable();
}
}
private void initializeTable() throws IOException {
Path basePath = new Path(cfg.basePath);
if (fs.exists(basePath)) {
if (cfg.bootstrapOverwrite) {
LOG.warn("Target base path already exists, overwrite it");
fs.delete(basePath, true);
} else {
throw new HoodieException("target base path already exists at " + cfg.basePath
+ ". Cannot bootstrap data on top of an existing table");
}
}
HoodieTableMetaClient.withPropertyBuilder()
.fromProperties(props)
.setTableType(cfg.tableType)
.setTableName(cfg.tableName)
.setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
.setPayloadClassName(cfg.payloadClass)
.setBaseFileFormat(cfg.baseFileFormat)
.setBootstrapIndexClass(cfg.bootstrapIndexClass)
.setBootstrapBasePath(bootstrapBasePath)
.initTable(new Configuration(jssc.hadoopConfiguration()), cfg.basePath);
}
public static class Config {
private String tableName;
private String tableType;
private String basePath;
private String baseFileFormat;
private String bootstrapIndexClass;
private String schemaProviderClass;
private String payloadClass;
private Boolean enableHiveSync;
private Boolean bootstrapOverwrite;
public static String checkpoint = null;
public void setTableName(String tableName) {
this.tableName = tableName;
}
public void setTableType(String tableType) {
this.tableType = tableType;
}
public void setBasePath(String basePath) {
this.basePath = basePath;
}
public void setBaseFileFormat(String baseFileFormat) {
this.baseFileFormat = baseFileFormat;
}
public void setBootstrapIndexClass(String bootstrapIndexClass) {
this.bootstrapIndexClass = bootstrapIndexClass;
}
public void setSchemaProviderClass(String schemaProviderClass) {
this.schemaProviderClass = schemaProviderClass;
}
public void setPayloadClass(String payloadClass) {
this.payloadClass = payloadClass;
}
public void setEnableHiveSync(Boolean enableHiveSync) {
this.enableHiveSync = enableHiveSync;
}
public void setBootstrapOverwrite(Boolean bootstrapOverwrite) {
this.bootstrapOverwrite = bootstrapOverwrite;
}
}
}

View File

@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.cli;
import org.apache.avro.Schema;
import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIClass;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.Serializable;
/**
* Class to provide schema for reading data and also writing into a Hoodie table,
* used by deltastreamer (runs over Spark).
*/
@PublicAPIClass(maturity = ApiMaturityLevel.STABLE)
public abstract class SchemaProvider implements Serializable {
protected TypedProperties config;
protected JavaSparkContext jssc;
public SchemaProvider(TypedProperties props) {
this(props, null);
}
protected SchemaProvider(TypedProperties props, JavaSparkContext jssc) {
this.config = props;
this.jssc = jssc;
}
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
public abstract Schema getSourceSchema();
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
public Schema getTargetSchema() {
// by default, use source schema as target for hoodie table as well
return getSourceSchema();
}
}

View File

@@ -55,6 +55,9 @@ object HoodieProcedures {
mapBuilder.put(StatsWriteAmplificationProcedure.NAME, StatsWriteAmplificationProcedure.builder)
mapBuilder.put(StatsFileSizeProcedure.NAME, StatsFileSizeProcedure.builder)
mapBuilder.put(HdfsParquetImportProcedure.NAME, HdfsParquetImportProcedure.builder)
mapBuilder.put(RunBootstrapProcedure.NAME, RunBootstrapProcedure.builder)
mapBuilder.put(ShowBootstrapMappingProcedure.NAME, ShowBootstrapMappingProcedure.builder)
mapBuilder.put(ShowBootstrapPartitionsProcedure.NAME, ShowBootstrapPartitionsProcedure.builder)
mapBuilder.build
}
}

View File

@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.cli.BootstrapExecutorUtils
import org.apache.hudi.cli.HDFSParquetImporterUtils.{buildProperties, readConfig}
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.config.HoodieBootstrapConfig
import org.apache.hudi.keygen.constant.KeyGeneratorType
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
import java.util
import java.util.Locale
import java.util.function.Supplier
class RunBootstrapProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "tableType", DataTypes.StringType, None),
ProcedureParameter.required(2, "bootstrapPath", DataTypes.StringType, None),
ProcedureParameter.required(3, "basePath", DataTypes.StringType, None),
ProcedureParameter.required(4, "rowKeyField", DataTypes.StringType, None),
ProcedureParameter.optional(5, "baseFileFormat", DataTypes.StringType, "PARQUET"),
ProcedureParameter.optional(6, "partitionPathField", DataTypes.StringType, ""),
ProcedureParameter.optional(7, "bootstrapIndexClass", DataTypes.StringType, "org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex"),
ProcedureParameter.optional(8, "selectorClass", DataTypes.StringType, "org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector"),
ProcedureParameter.optional(9, "keyGeneratorClass", DataTypes.StringType, "org.apache.hudi.keygen.SimpleKeyGenerator"),
ProcedureParameter.optional(10, "fullBootstrapInputProvider", DataTypes.StringType, "org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider"),
ProcedureParameter.optional(11, "schemaProviderClass", DataTypes.StringType, ""),
ProcedureParameter.optional(12, "payloadClass", DataTypes.StringType, "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload"),
ProcedureParameter.optional(13, "parallelism", DataTypes.IntegerType, 1500),
ProcedureParameter.optional(14, "enableHiveSync", DataTypes.BooleanType, false),
ProcedureParameter.optional(15, "propsFilePath", DataTypes.StringType, ""),
ProcedureParameter.optional(16, "bootstrapOverwrite", DataTypes.BooleanType, false)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("status", DataTypes.IntegerType, nullable = true, Metadata.empty))
)
def parameters: Array[ProcedureParameter] = PARAMETERS
def outputType: StructType = OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val tableType = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
val bootstrapPath = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String]
val basePath = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[String]
val rowKeyField = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[String]
val baseFileFormat = getArgValueOrDefault(args, PARAMETERS(5)).get.asInstanceOf[String]
val partitionPathField = getArgValueOrDefault(args, PARAMETERS(6)).get.asInstanceOf[String]
val bootstrapIndexClass = getArgValueOrDefault(args, PARAMETERS(7)).get.asInstanceOf[String]
val selectorClass = getArgValueOrDefault(args, PARAMETERS(8)).get.asInstanceOf[String]
val keyGeneratorClass = getArgValueOrDefault(args, PARAMETERS(9)).get.asInstanceOf[String]
val fullBootstrapInputProvider = getArgValueOrDefault(args, PARAMETERS(10)).get.asInstanceOf[String]
val schemaProviderClass = getArgValueOrDefault(args, PARAMETERS(11)).get.asInstanceOf[String]
val payloadClass = getArgValueOrDefault(args, PARAMETERS(12)).get.asInstanceOf[String]
val parallelism = getArgValueOrDefault(args, PARAMETERS(13)).get.asInstanceOf[Int]
val enableHiveSync = getArgValueOrDefault(args, PARAMETERS(14)).get.asInstanceOf[Boolean]
val propsFilePath = getArgValueOrDefault(args, PARAMETERS(15)).get.asInstanceOf[String]
val bootstrapOverwrite = getArgValueOrDefault(args, PARAMETERS(16)).get.asInstanceOf[Boolean]
val configs: util.List[String] = new util.ArrayList[String]
val properties: TypedProperties = if (propsFilePath == null || propsFilePath.isEmpty) buildProperties(configs)
else readConfig(jsc.hadoopConfiguration, new Path(propsFilePath), configs).getProps(true)
properties.setProperty(HoodieBootstrapConfig.BASE_PATH.key, bootstrapPath)
if (!StringUtils.isNullOrEmpty(keyGeneratorClass) && KeyGeneratorType.getNames.contains(keyGeneratorClass.toUpperCase(Locale.ROOT))) {
properties.setProperty(HoodieBootstrapConfig.KEYGEN_TYPE.key, keyGeneratorClass.toUpperCase(Locale.ROOT))
}
else {
properties.setProperty(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key, keyGeneratorClass)
}
properties.setProperty(HoodieBootstrapConfig.FULL_BOOTSTRAP_INPUT_PROVIDER_CLASS_NAME.key, fullBootstrapInputProvider)
properties.setProperty(HoodieBootstrapConfig.PARALLELISM_VALUE.key, parallelism.toString)
properties.setProperty(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key, selectorClass)
properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD.key, rowKeyField)
properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, partitionPathField)
val fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration)
val cfg = new BootstrapExecutorUtils.Config()
cfg.setTableName(tableName.get.asInstanceOf[String])
cfg.setTableType(tableType)
cfg.setBasePath(basePath)
cfg.setBaseFileFormat(baseFileFormat)
cfg.setBootstrapIndexClass(bootstrapIndexClass)
cfg.setSchemaProviderClass(schemaProviderClass)
cfg.setPayloadClass(payloadClass)
cfg.setEnableHiveSync(enableHiveSync)
cfg.setBootstrapOverwrite(bootstrapOverwrite)
try {
new BootstrapExecutorUtils(cfg, jsc, fs, jsc.hadoopConfiguration, properties).execute()
} catch {
case e: Exception =>
logWarning(s"Run bootstrap failed due to", e)
Seq(Row(-1))
}
Seq(Row(0))
}
override def build = new RunBootstrapProcedure()
}
object RunBootstrapProcedure {
val NAME = "run_bootstrap"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new RunBootstrapProcedure
}
}

View File

@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.procedures
import com.google.common.collect.Lists
import org.apache.hudi.common.bootstrap.index.BootstrapIndex
import org.apache.hudi.common.model.{BootstrapFileMapping, HoodieFileGroupId}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
import java.util
import java.util.function.Supplier
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
class ShowBootstrapMappingProcedure extends BaseProcedure with ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "partitionPath", DataTypes.StringType, ""),
ProcedureParameter.optional(2, "fileIds", DataTypes.StringType, ""),
ProcedureParameter.optional(3, "limit", DataTypes.IntegerType, 10),
ProcedureParameter.optional(4, "sortBy", DataTypes.StringType, "partition"),
ProcedureParameter.optional(5, "desc", DataTypes.BooleanType, false)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("fileid", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("source_basepath", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("source_partition", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("source_file", DataTypes.StringType, nullable = true, Metadata.empty))
)
def parameters: Array[ProcedureParameter] = PARAMETERS
def outputType: StructType = OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val partitionPath = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
val fileIds = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String]
val limit = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[Int]
val sortBy = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[String]
val desc = getArgValueOrDefault(args, PARAMETERS(5)).get.asInstanceOf[Boolean]
val basePath: String = getBasePath(tableName)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
if (partitionPath.isEmpty && fileIds.nonEmpty) throw new IllegalStateException("PartitionPath is mandatory when passing fileIds.")
val indexReader = createBootstrapIndexReader(metaClient)
val indexedPartitions = indexReader.getIndexedPartitionPaths
if (partitionPath.nonEmpty && !indexedPartitions.contains(partitionPath)) new HoodieException(partitionPath + " is not an valid indexed partition")
val mappingList: util.ArrayList[BootstrapFileMapping] = new util.ArrayList[BootstrapFileMapping]
if (fileIds.nonEmpty) {
val fileGroupIds = fileIds.split(",").toList.map((fileId: String) => new HoodieFileGroupId(partitionPath, fileId)).asJava
mappingList.addAll(indexReader.getSourceFileMappingForFileIds(fileGroupIds).values)
} else if (partitionPath.nonEmpty) mappingList.addAll(indexReader.getSourceFileMappingForPartition(partitionPath))
else {
for (part <- indexedPartitions) {
mappingList.addAll(indexReader.getSourceFileMappingForPartition(part))
}
}
val rows: java.util.List[Row] = mappingList
.map(mapping => Row(mapping.getPartitionPath, mapping.getFileId, mapping.getBootstrapBasePath,
mapping.getBootstrapPartitionPath, mapping.getBootstrapFileStatus.getPath.getUri)).toList
val df = spark.createDataFrame(rows, OUTPUT_TYPE)
if (desc) {
df.orderBy(df(sortBy).desc).limit(limit).collect()
} else {
df.orderBy(df(sortBy).asc).limit(limit).collect()
}
}
private def createBootstrapIndexReader(metaClient: HoodieTableMetaClient) = {
val index = BootstrapIndex.getBootstrapIndex(metaClient)
if (!index.useIndex) throw new HoodieException("This is not a bootstrapped Hudi table. Don't have any index info")
index.createReader
}
override def build: Procedure = new ShowBootstrapMappingProcedure()
}
object ShowBootstrapMappingProcedure {
val NAME = "show_bootstrap_mapping"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new ShowBootstrapMappingProcedure
}
}

View File

@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.common.bootstrap.index.BootstrapIndex
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
import java.util.function.Supplier
class ShowBootstrapPartitionsProcedure extends BaseProcedure with ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("indexed_partitions", DataTypes.StringType, nullable = true, Metadata.empty))
)
def parameters: Array[ProcedureParameter] = PARAMETERS
def outputType: StructType = OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val basePath: String = getBasePath(tableName)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val indexReader = createBootstrapIndexReader(metaClient)
val indexedPartitions = indexReader.getIndexedPartitionPaths
indexedPartitions.stream().toArray.map(r => Row(r)).toList
}
private def createBootstrapIndexReader(metaClient: HoodieTableMetaClient) = {
val index = BootstrapIndex.getBootstrapIndex(metaClient)
if (!index.useIndex) throw new HoodieException("This is not a bootstrapped Hudi table. Don't have any index info")
index.createReader
}
override def build = new ShowBootstrapPartitionsProcedure()
}
object ShowBootstrapPartitionsProcedure {
val NAME = "show_bootstrap_partitions"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new ShowBootstrapPartitionsProcedure
}
}

View File

@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.procedure
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.functional.TestBootstrap
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
import org.apache.spark.sql.{Dataset, Row}
import java.time.Instant
import java.util
class TestBootstrapProcedure extends HoodieSparkSqlTestBase {
test("Test Call run_bootstrap Procedure") {
withTempDir { tmp =>
val NUM_OF_RECORDS = 100
val PARTITION_FIELD = "datestr"
val RECORD_KEY_FIELD = "_row_key"
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}"
val srcName: String = "source"
val sourcePath = basePath + Path.SEPARATOR + srcName
val tablePath = basePath + Path.SEPARATOR + tableName
val jsc = new JavaSparkContext(spark.sparkContext)
// generate test data
val partitions = util.Arrays.asList("2018", "2019", "2020")
val timestamp: Long = Instant.now.toEpochMilli
for (i <- 0 until partitions.size) {
val df: Dataset[Row] = TestBootstrap.generateTestRawTripDataset(timestamp, i * NUM_OF_RECORDS, i * NUM_OF_RECORDS + NUM_OF_RECORDS, null, jsc, spark.sqlContext)
df.write.parquet(sourcePath + Path.SEPARATOR + PARTITION_FIELD + "=" + partitions.get(i))
}
// run bootstrap
checkAnswer(
s"""call run_bootstrap(
|table => '$tableName',
|basePath => '$tablePath',
|tableType => '${HoodieTableType.COPY_ON_WRITE.name}',
|bootstrapPath => '$sourcePath',
|rowKeyField => '$RECORD_KEY_FIELD',
|partitionPathField => '$PARTITION_FIELD',
|bootstrapOverwrite => true)""".stripMargin) {
Seq(0)
}
// create table
spark.sql(
s"""
|create table $tableName using hudi
|location '$tablePath'
|tblproperties(primaryKey = '$RECORD_KEY_FIELD')
|""".stripMargin)
// show bootstrap's index partitions
var result = spark.sql(s"""call show_bootstrap_partitions(table => '$tableName')""".stripMargin).collect()
assertResult(3) {
result.length
}
// show bootstrap's index mapping
result = spark.sql(
s"""call show_bootstrap_mapping(table => '$tableName')""".stripMargin).collect()
assertResult(3) {
result.length
}
}
}
}