1
0

[HUDI-427] [HUDI-971] Implement CLI support for performing bootstrap (#1869)

* [HUDI-971] Clean partitions & fileIds returned by HFileBootstrapIndex
* [HUDI-427] Implement CLI support for performing bootstrap

Co-authored-by: Wenning Ding <wenningd@amazon.com>
Co-authored-by: Balaji Varadarajan <vbalaji@apache.org>
This commit is contained in:
wenningd
2020-08-08 12:37:29 -07:00
committed by GitHub
parent 5ee676e34f
commit 9fe2d2b14a
11 changed files with 448 additions and 28 deletions

View File

@@ -153,6 +153,13 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-utilities_${scala.binary.version}</artifactId>

View File

@@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.cli.commands;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.commands.SparkMain.SparkCommand;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
import org.apache.hudi.common.model.BootstrapFileMapping;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.util.Utils;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import scala.collection.JavaConverters;
/**
* CLI command to perform bootstrap action & display bootstrap index.
*/
@Component
public class BootstrapCommand implements CommandMarker {
@CliCommand(value = "bootstrap run", help = "Run a bootstrap action for current Hudi table")
public String bootstrap(
@CliOption(key = {"srcPath"}, mandatory = true, help = "Bootstrap source data path of the table") final String srcPath,
@CliOption(key = {"targetPath"}, mandatory = true,
help = "Base path for the target hoodie table") final String targetPath,
@CliOption(key = {"tableName"}, mandatory = true, help = "Hoodie table name") final String tableName,
@CliOption(key = {"tableType"}, mandatory = true, help = "Hoodie table type") final String tableType,
@CliOption(key = {"rowKeyField"}, mandatory = true, help = "Record key columns for bootstrap data") final String rowKeyField,
@CliOption(key = {"partitionPathField"}, unspecifiedDefaultValue = "",
help = "Partition fields for bootstrap source data") final String partitionPathField,
@CliOption(key = {"bootstrapIndexClass"}, unspecifiedDefaultValue = "org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex",
help = "Bootstrap Index Class") final String bootstrapIndexClass,
@CliOption(key = {"selectorClass"}, unspecifiedDefaultValue = "org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector",
help = "Selector class for bootstrap") final String selectorClass,
@CliOption(key = {"keyGeneratorClass"}, unspecifiedDefaultValue = "org.apache.hudi.keygen.SimpleKeyGenerator",
help = "Key generator class for bootstrap") final String keyGeneratorClass,
@CliOption(key = {"fullBootstrapInputProvider"}, unspecifiedDefaultValue = "org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider",
help = "Class for Full bootstrap input provider") final String fullBootstrapInputProvider,
@CliOption(key = {"schemaProviderClass"}, unspecifiedDefaultValue = "",
help = "SchemaProvider to attach schemas to bootstrap source data") final String schemaProviderClass,
@CliOption(key = {"payloadClass"}, unspecifiedDefaultValue = "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload",
help = "Payload Class") final String payloadClass,
@CliOption(key = {"parallelism"}, unspecifiedDefaultValue = "1500", help = "Bootstrap writer parallelism") final int parallelism,
@CliOption(key = {"sparkMaster"}, unspecifiedDefaultValue = "", help = "Spark Master") String master,
@CliOption(key = {"sparkMemory"}, unspecifiedDefaultValue = "4G", help = "Spark executor memory") final String sparkMemory,
@CliOption(key = {"enableHiveSync"}, unspecifiedDefaultValue = "false", help = "Enable Hive sync") final Boolean enableHiveSync,
@CliOption(key = {"propsFilePath"}, help = "path to properties file on localfs or dfs with configurations for hoodie client for importing",
unspecifiedDefaultValue = "") final String propsFilePath,
@CliOption(key = {"hoodieConfigs"}, help = "Any configuration that can be set in the properties file can be passed here in the form of an array",
unspecifiedDefaultValue = "") final String[] configs)
throws IOException, InterruptedException, URISyntaxException {
String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
String cmd = SparkCommand.BOOTSTRAP.toString();
sparkLauncher.addAppArgs(cmd, master, sparkMemory, tableName, tableType, targetPath, srcPath, rowKeyField,
partitionPathField, String.valueOf(parallelism), schemaProviderClass, bootstrapIndexClass, selectorClass,
keyGeneratorClass, fullBootstrapInputProvider, payloadClass, String.valueOf(enableHiveSync), propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to bootstrap source data to Hudi dataset";
}
return "Bootstrapped source data as Hudi dataset";
}
@CliCommand(value = "bootstrap index showmapping", help = "Show bootstrap index mapping")
public String showBootstrapIndexMapping(
@CliOption(key = {"partitionPath"}, unspecifiedDefaultValue = "", help = "A valid partition path") String partitionPath,
@CliOption(key = {"fileIds"}, unspecifiedDefaultValue = "", help = "Valid fileIds split by comma") String fileIds,
@CliOption(key = {"limit"}, unspecifiedDefaultValue = "-1", help = "Limit rows to be displayed") Integer limit,
@CliOption(key = {"sortBy"}, unspecifiedDefaultValue = "", help = "Sorting Field") final String sortByField,
@CliOption(key = {"desc"}, unspecifiedDefaultValue = "false", help = "Ordering") final boolean descending,
@CliOption(key = {"headeronly"}, unspecifiedDefaultValue = "false", help = "Print Header Only")
final boolean headerOnly) {
if (partitionPath.isEmpty() && !fileIds.isEmpty()) {
throw new IllegalStateException("PartitionPath is mandatory when passing fileIds.");
}
BootstrapIndex.IndexReader indexReader = createBootstrapIndexReader();
List<String> indexedPartitions = indexReader.getIndexedPartitionPaths();
if (!partitionPath.isEmpty() && !indexedPartitions.contains(partitionPath)) {
return partitionPath + " is not an valid indexed partition";
}
List<BootstrapFileMapping> mappingList = new ArrayList<>();
if (!fileIds.isEmpty()) {
List<HoodieFileGroupId> fileGroupIds = Arrays.stream(fileIds.split(","))
.map(fileId -> new HoodieFileGroupId(partitionPath, fileId)).collect(Collectors.toList());
mappingList.addAll(indexReader.getSourceFileMappingForFileIds(fileGroupIds).values());
} else if (!partitionPath.isEmpty()) {
mappingList.addAll(indexReader.getSourceFileMappingForPartition(partitionPath));
} else {
for (String part : indexedPartitions) {
mappingList.addAll(indexReader.getSourceFileMappingForPartition(part));
}
}
final List<Comparable[]> rows = convertBootstrapSourceFileMapping(mappingList);
final TableHeader header = new TableHeader()
.addTableHeaderField("Hudi Partition")
.addTableHeaderField("FileId")
.addTableHeaderField("Source File Base Path")
.addTableHeaderField("Source File Partition")
.addTableHeaderField("Source File Path");
return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending,
limit, headerOnly, rows);
}
@CliCommand(value = "bootstrap index showpartitions", help = "Show bootstrap indexed partitions")
public String showBootstrapIndexPartitions() {
BootstrapIndex.IndexReader indexReader = createBootstrapIndexReader();
List<String> indexedPartitions = indexReader.getIndexedPartitionPaths();
String[] header = new String[] {"Indexed partitions"};
String[][] rows = new String[indexedPartitions.size()][1];
for (int i = 0; i < indexedPartitions.size(); i++) {
rows[i][0] = indexedPartitions.get(i);
}
return HoodiePrintHelper.print(header, rows);
}
private BootstrapIndex.IndexReader createBootstrapIndexReader() {
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient);
if (!index.useIndex()) {
throw new HoodieException("This is not a bootstrapped Hudi table. Don't have any index info");
}
return index.createReader();
}
private List<Comparable[]> convertBootstrapSourceFileMapping(List<BootstrapFileMapping> mappingList) {
final List<Comparable[]> rows = new ArrayList<>();
for (BootstrapFileMapping mapping : mappingList) {
rows.add(new Comparable[] {mapping.getPartitionPath(), mapping.getFileId(),
mapping.getBootstrapBasePath(), mapping.getBootstrapPartitionPath(), mapping.getBoostrapFileStatus().getPath().getUri()});
}
return rows;
}
}

View File

@@ -18,12 +18,15 @@
package org.apache.hudi.cli.commands;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.cli.DedupeSparkJob;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieBootstrapConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieSavepointException;
@@ -35,11 +38,16 @@ import org.apache.hudi.utilities.HoodieCleaner;
import org.apache.hudi.utilities.HoodieCompactionAdminTool;
import org.apache.hudi.utilities.HoodieCompactionAdminTool.Operation;
import org.apache.hudi.utilities.HoodieCompactor;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.deltastreamer.BootstrapExecutor;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -55,7 +63,7 @@ public class SparkMain {
* Commands.
*/
enum SparkCommand {
ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN,
BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN,
COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLEAN, DELETE_SAVEPOINT
}
@@ -164,6 +172,19 @@ public class SparkMain {
assert (args.length == 5);
returnCode = deleteSavepoint(jsc, args[3], args[4]);
break;
case BOOTSTRAP:
assert (args.length >= 18);
propsFilePath = null;
if (!StringUtils.isNullOrEmpty(args[17])) {
propsFilePath = args[17];
}
configs = new ArrayList<>();
if (args.length > 18) {
configs.addAll(Arrays.asList(args).subList(18, args.length));
}
returnCode = doBootstrap(jsc, args[3], args[4], args[5], args[6], args[7], args[8], args[9], args[10],
args[11], args[12], args[13], args[14], args[15], args[16], propsFilePath, configs);
break;
default:
break;
}
@@ -174,7 +195,7 @@ public class SparkMain {
List<SparkCommand> masterContained = Arrays.asList(SparkCommand.COMPACT_VALIDATE, SparkCommand.COMPACT_REPAIR,
SparkCommand.COMPACT_UNSCHEDULE_PLAN, SparkCommand.COMPACT_UNSCHEDULE_FILE, SparkCommand.CLEAN,
SparkCommand.IMPORT, SparkCommand.UPSERT, SparkCommand.DEDUPLICATE, SparkCommand.SAVEPOINT,
SparkCommand.DELETE_SAVEPOINT, SparkCommand.ROLLBACK_TO_SAVEPOINT, SparkCommand.ROLLBACK);
SparkCommand.DELETE_SAVEPOINT, SparkCommand.ROLLBACK_TO_SAVEPOINT, SparkCommand.ROLLBACK, SparkCommand.BOOTSTRAP);
return masterContained.contains(command);
}
@@ -281,6 +302,36 @@ public class SparkMain {
return 0;
}
private static int doBootstrap(JavaSparkContext jsc, String tableName, String tableType, String basePath,
String sourcePath, String recordKeyCols, String partitionFields, String parallelism, String schemaProviderClass,
String bootstrapIndexClass, String selectorClass, String keyGeneratorClass, String fullBootstrapInputProvider,
String payloadClassName, String enableHiveSync, String propsFilePath, List<String> configs) throws IOException {
TypedProperties properties = propsFilePath == null ? UtilHelpers.buildProperties(configs)
: UtilHelpers.readConfig(FSUtils.getFs(propsFilePath, jsc.hadoopConfiguration()), new Path(propsFilePath), configs).getConfig();
properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, sourcePath);
properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, keyGeneratorClass);
properties.setProperty(HoodieBootstrapConfig.FULL_BOOTSTRAP_INPUT_PROVIDER, fullBootstrapInputProvider);
properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_PARALLELISM, parallelism);
properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_MODE_SELECTOR, selectorClass);
properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), recordKeyCols);
properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), partitionFields);
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetTableName = tableName;
cfg.targetBasePath = basePath;
cfg.tableType = tableType;
cfg.schemaProviderClassName = schemaProviderClass;
cfg.bootstrapIndexClass = bootstrapIndexClass;
cfg.payloadClassName = payloadClassName;
cfg.enableHiveSync = Boolean.valueOf(enableHiveSync);
new BootstrapExecutor(cfg, jsc, FSUtils.getFs(basePath, jsc.hadoopConfiguration()),
jsc.hadoopConfiguration(), properties).execute();
return 0;
}
private static int rollback(JavaSparkContext jsc, String instantTime, String basePath) throws Exception {
HoodieWriteClient client = createHoodieClient(jsc, basePath);
if (client.rollback(instantTime)) {

View File

@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.cli.integ;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.commands.TableCommand;
import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
import org.apache.hudi.client.TestBootstrap;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.shell.core.CommandResult;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test class of {@link org.apache.hudi.cli.commands.BootstrapCommand}.
*/
public class ITTestBootstrapCommand extends AbstractShellIntegrationTest {
private static final int NUM_OF_RECORDS = 100;
private static final String PARTITION_FIELD = "datestr";
private static final String RECORD_KEY_FIELD = "_row_key";
private String tableName;
private String sourcePath;
private String tablePath;
private List<String> partitions;
@BeforeEach
public void init() {
String srcName = "source";
tableName = "test-table";
sourcePath = basePath + File.separator + srcName;
tablePath = basePath + File.separator + tableName;
// generate test data
partitions = Arrays.asList("2018", "2019", "2020");
double timestamp = new Double(Instant.now().toEpochMilli()).longValue();
for (int i = 0; i < partitions.size(); i++) {
Dataset<Row> df = TestBootstrap.generateTestRawTripDataset(timestamp,
i * NUM_OF_RECORDS, i * NUM_OF_RECORDS + NUM_OF_RECORDS, null, jsc, sqlContext);
df.write().parquet(sourcePath + File.separator + PARTITION_FIELD + "=" + partitions.get(i));
}
}
/**
* Test case for command 'bootstrap'.
*/
@Test
public void testBootstrapRunCommand() throws IOException {
// test bootstrap run command
String cmdStr = String.format(
"bootstrap run --targetPath %s --tableName %s --tableType %s --srcPath %s --rowKeyField %s --partitionPathField %s --sparkMaster %s",
tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(), sourcePath, RECORD_KEY_FIELD, PARTITION_FIELD, "local");
CommandResult cr = getShell().executeCommand(cmdStr);
assertTrue(cr.isSuccess());
// Connect & check Hudi table exist
new TableCommand().connect(tablePath, TimelineLayoutVersion.VERSION_1, false, 2000, 300000, 7);
metaClient = HoodieCLI.getTableMetaClient();
assertEquals(1, metaClient.getActiveTimeline().getCommitsTimeline().countInstants(), "Should have 1 commit.");
// test "bootstrap index showpartitions"
CommandResult crForIndexedPartitions = getShell().executeCommand("bootstrap index showpartitions");
assertTrue(crForIndexedPartitions.isSuccess());
String[] header = new String[] {"Indexed partitions"};
String[][] rows = new String[partitions.size()][1];
for (int i = 0; i < partitions.size(); i++) {
rows[i][0] = PARTITION_FIELD + "=" + partitions.get(i);
}
String expect = HoodiePrintHelper.print(header, rows);
expect = removeNonWordAndStripSpace(expect);
String got = removeNonWordAndStripSpace(crForIndexedPartitions.getResult().toString());
assertEquals(expect, got);
// test "bootstrap index showMapping"
CommandResult crForIndexedMapping = getShell().executeCommand("bootstrap index showmapping");
assertTrue(crForIndexedMapping.isSuccess());
CommandResult crForIndexedMappingWithPartition = getShell().executeCommand(String.format(
"bootstrap index showmapping --partitionPath %s=%s", PARTITION_FIELD, partitions.get(0)));
assertTrue(crForIndexedMappingWithPartition.isSuccess());
}
}