diff --git a/hudi-cli/pom.xml b/hudi-cli/pom.xml index ec15cce76..8e80c8499 100644 --- a/hudi-cli/pom.xml +++ b/hudi-cli/pom.xml @@ -153,6 +153,13 @@ test test-jar + + org.apache.hudi + hudi-spark_${scala.binary.version} + ${project.version} + test + test-jar + org.apache.hudi hudi-utilities_${scala.binary.version} diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/BootstrapCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/BootstrapCommand.java new file mode 100644 index 000000000..2db49a25a --- /dev/null +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/BootstrapCommand.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.commands; + +import org.apache.hudi.cli.HoodieCLI; +import org.apache.hudi.cli.HoodiePrintHelper; +import org.apache.hudi.cli.TableHeader; +import org.apache.hudi.cli.commands.SparkMain.SparkCommand; +import org.apache.hudi.cli.utils.InputStreamConsumer; +import org.apache.hudi.cli.utils.SparkUtil; +import org.apache.hudi.common.bootstrap.index.BootstrapIndex; +import org.apache.hudi.common.model.BootstrapFileMapping; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.utilities.UtilHelpers; + +import org.apache.spark.launcher.SparkLauncher; +import org.apache.spark.util.Utils; +import org.springframework.shell.core.CommandMarker; +import org.springframework.shell.core.annotation.CliCommand; +import org.springframework.shell.core.annotation.CliOption; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.stream.Collectors; + +import scala.collection.JavaConverters; + +/** + * CLI command to perform bootstrap action & display bootstrap index. + */ +@Component +public class BootstrapCommand implements CommandMarker { + + @CliCommand(value = "bootstrap run", help = "Run a bootstrap action for current Hudi table") + public String bootstrap( + @CliOption(key = {"srcPath"}, mandatory = true, help = "Bootstrap source data path of the table") final String srcPath, + @CliOption(key = {"targetPath"}, mandatory = true, + help = "Base path for the target hoodie table") final String targetPath, + @CliOption(key = {"tableName"}, mandatory = true, help = "Hoodie table name") final String tableName, + @CliOption(key = {"tableType"}, mandatory = true, help = "Hoodie table type") final String tableType, + @CliOption(key = {"rowKeyField"}, mandatory = true, help = "Record key columns for bootstrap data") final String rowKeyField, + @CliOption(key = {"partitionPathField"}, unspecifiedDefaultValue = "", + help = "Partition fields for bootstrap source data") final String partitionPathField, + @CliOption(key = {"bootstrapIndexClass"}, unspecifiedDefaultValue = "org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex", + help = "Bootstrap Index Class") final String bootstrapIndexClass, + @CliOption(key = {"selectorClass"}, unspecifiedDefaultValue = "org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector", + help = "Selector class for bootstrap") final String selectorClass, + @CliOption(key = {"keyGeneratorClass"}, unspecifiedDefaultValue = "org.apache.hudi.keygen.SimpleKeyGenerator", + help = "Key generator class for bootstrap") final String keyGeneratorClass, + @CliOption(key = {"fullBootstrapInputProvider"}, unspecifiedDefaultValue = "org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider", + help = "Class for Full bootstrap input provider") final String fullBootstrapInputProvider, + @CliOption(key = {"schemaProviderClass"}, unspecifiedDefaultValue = "", + help = "SchemaProvider to attach schemas to bootstrap source data") final String schemaProviderClass, + @CliOption(key = {"payloadClass"}, unspecifiedDefaultValue = "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload", + help = "Payload Class") final String payloadClass, + @CliOption(key = {"parallelism"}, unspecifiedDefaultValue = "1500", help = "Bootstrap writer parallelism") final int parallelism, + @CliOption(key = {"sparkMaster"}, unspecifiedDefaultValue = "", help = "Spark Master") String master, + @CliOption(key = {"sparkMemory"}, unspecifiedDefaultValue = "4G", help = "Spark executor memory") final String sparkMemory, + @CliOption(key = {"enableHiveSync"}, unspecifiedDefaultValue = "false", help = "Enable Hive sync") final Boolean enableHiveSync, + @CliOption(key = {"propsFilePath"}, help = "path to properties file on localfs or dfs with configurations for hoodie client for importing", + unspecifiedDefaultValue = "") final String propsFilePath, + @CliOption(key = {"hoodieConfigs"}, help = "Any configuration that can be set in the properties file can be passed here in the form of an array", + unspecifiedDefaultValue = "") final String[] configs) + throws IOException, InterruptedException, URISyntaxException { + + String sparkPropertiesPath = + Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala()); + + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + + String cmd = SparkCommand.BOOTSTRAP.toString(); + + sparkLauncher.addAppArgs(cmd, master, sparkMemory, tableName, tableType, targetPath, srcPath, rowKeyField, + partitionPathField, String.valueOf(parallelism), schemaProviderClass, bootstrapIndexClass, selectorClass, + keyGeneratorClass, fullBootstrapInputProvider, payloadClass, String.valueOf(enableHiveSync), propsFilePath); + UtilHelpers.validateAndAddProperties(configs, sparkLauncher); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + if (exitCode != 0) { + return "Failed to bootstrap source data to Hudi dataset"; + } + return "Bootstrapped source data as Hudi dataset"; + } + + @CliCommand(value = "bootstrap index showmapping", help = "Show bootstrap index mapping") + public String showBootstrapIndexMapping( + @CliOption(key = {"partitionPath"}, unspecifiedDefaultValue = "", help = "A valid partition path") String partitionPath, + @CliOption(key = {"fileIds"}, unspecifiedDefaultValue = "", help = "Valid fileIds split by comma") String fileIds, + @CliOption(key = {"limit"}, unspecifiedDefaultValue = "-1", help = "Limit rows to be displayed") Integer limit, + @CliOption(key = {"sortBy"}, unspecifiedDefaultValue = "", help = "Sorting Field") final String sortByField, + @CliOption(key = {"desc"}, unspecifiedDefaultValue = "false", help = "Ordering") final boolean descending, + @CliOption(key = {"headeronly"}, unspecifiedDefaultValue = "false", help = "Print Header Only") + final boolean headerOnly) { + + if (partitionPath.isEmpty() && !fileIds.isEmpty()) { + throw new IllegalStateException("PartitionPath is mandatory when passing fileIds."); + } + + BootstrapIndex.IndexReader indexReader = createBootstrapIndexReader(); + List indexedPartitions = indexReader.getIndexedPartitionPaths(); + + if (!partitionPath.isEmpty() && !indexedPartitions.contains(partitionPath)) { + return partitionPath + " is not an valid indexed partition"; + } + + List mappingList = new ArrayList<>(); + if (!fileIds.isEmpty()) { + List fileGroupIds = Arrays.stream(fileIds.split(",")) + .map(fileId -> new HoodieFileGroupId(partitionPath, fileId)).collect(Collectors.toList()); + mappingList.addAll(indexReader.getSourceFileMappingForFileIds(fileGroupIds).values()); + } else if (!partitionPath.isEmpty()) { + mappingList.addAll(indexReader.getSourceFileMappingForPartition(partitionPath)); + } else { + for (String part : indexedPartitions) { + mappingList.addAll(indexReader.getSourceFileMappingForPartition(part)); + } + } + + final List rows = convertBootstrapSourceFileMapping(mappingList); + final TableHeader header = new TableHeader() + .addTableHeaderField("Hudi Partition") + .addTableHeaderField("FileId") + .addTableHeaderField("Source File Base Path") + .addTableHeaderField("Source File Partition") + .addTableHeaderField("Source File Path"); + + return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, + limit, headerOnly, rows); + } + + @CliCommand(value = "bootstrap index showpartitions", help = "Show bootstrap indexed partitions") + public String showBootstrapIndexPartitions() { + + BootstrapIndex.IndexReader indexReader = createBootstrapIndexReader(); + List indexedPartitions = indexReader.getIndexedPartitionPaths(); + + String[] header = new String[] {"Indexed partitions"}; + String[][] rows = new String[indexedPartitions.size()][1]; + for (int i = 0; i < indexedPartitions.size(); i++) { + rows[i][0] = indexedPartitions.get(i); + } + return HoodiePrintHelper.print(header, rows); + } + + private BootstrapIndex.IndexReader createBootstrapIndexReader() { + HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); + BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient); + if (!index.useIndex()) { + throw new HoodieException("This is not a bootstrapped Hudi table. Don't have any index info"); + } + return index.createReader(); + } + + private List convertBootstrapSourceFileMapping(List mappingList) { + final List rows = new ArrayList<>(); + for (BootstrapFileMapping mapping : mappingList) { + rows.add(new Comparable[] {mapping.getPartitionPath(), mapping.getFileId(), + mapping.getBootstrapBasePath(), mapping.getBootstrapPartitionPath(), mapping.getBoostrapFileStatus().getPath().getUri()}); + } + return rows; + } +} diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index 8f7aa8017..5f9509840 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -18,12 +18,15 @@ package org.apache.hudi.cli.commands; +import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.cli.DedupeSparkJob; import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.client.HoodieWriteClient; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.config.HoodieBootstrapConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieSavepointException; @@ -35,11 +38,16 @@ import org.apache.hudi.utilities.HoodieCleaner; import org.apache.hudi.utilities.HoodieCompactionAdminTool; import org.apache.hudi.utilities.HoodieCompactionAdminTool.Operation; import org.apache.hudi.utilities.HoodieCompactor; +import org.apache.hudi.utilities.UtilHelpers; +import org.apache.hudi.utilities.deltastreamer.BootstrapExecutor; +import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; +import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -55,7 +63,7 @@ public class SparkMain { * Commands. */ enum SparkCommand { - ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, + BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLEAN, DELETE_SAVEPOINT } @@ -164,6 +172,19 @@ public class SparkMain { assert (args.length == 5); returnCode = deleteSavepoint(jsc, args[3], args[4]); break; + case BOOTSTRAP: + assert (args.length >= 18); + propsFilePath = null; + if (!StringUtils.isNullOrEmpty(args[17])) { + propsFilePath = args[17]; + } + configs = new ArrayList<>(); + if (args.length > 18) { + configs.addAll(Arrays.asList(args).subList(18, args.length)); + } + returnCode = doBootstrap(jsc, args[3], args[4], args[5], args[6], args[7], args[8], args[9], args[10], + args[11], args[12], args[13], args[14], args[15], args[16], propsFilePath, configs); + break; default: break; } @@ -174,7 +195,7 @@ public class SparkMain { List masterContained = Arrays.asList(SparkCommand.COMPACT_VALIDATE, SparkCommand.COMPACT_REPAIR, SparkCommand.COMPACT_UNSCHEDULE_PLAN, SparkCommand.COMPACT_UNSCHEDULE_FILE, SparkCommand.CLEAN, SparkCommand.IMPORT, SparkCommand.UPSERT, SparkCommand.DEDUPLICATE, SparkCommand.SAVEPOINT, - SparkCommand.DELETE_SAVEPOINT, SparkCommand.ROLLBACK_TO_SAVEPOINT, SparkCommand.ROLLBACK); + SparkCommand.DELETE_SAVEPOINT, SparkCommand.ROLLBACK_TO_SAVEPOINT, SparkCommand.ROLLBACK, SparkCommand.BOOTSTRAP); return masterContained.contains(command); } @@ -281,6 +302,36 @@ public class SparkMain { return 0; } + private static int doBootstrap(JavaSparkContext jsc, String tableName, String tableType, String basePath, + String sourcePath, String recordKeyCols, String partitionFields, String parallelism, String schemaProviderClass, + String bootstrapIndexClass, String selectorClass, String keyGeneratorClass, String fullBootstrapInputProvider, + String payloadClassName, String enableHiveSync, String propsFilePath, List configs) throws IOException { + + TypedProperties properties = propsFilePath == null ? UtilHelpers.buildProperties(configs) + : UtilHelpers.readConfig(FSUtils.getFs(propsFilePath, jsc.hadoopConfiguration()), new Path(propsFilePath), configs).getConfig(); + + properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, sourcePath); + properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, keyGeneratorClass); + properties.setProperty(HoodieBootstrapConfig.FULL_BOOTSTRAP_INPUT_PROVIDER, fullBootstrapInputProvider); + properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_PARALLELISM, parallelism); + properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_MODE_SELECTOR, selectorClass); + properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), recordKeyCols); + properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), partitionFields); + + HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); + cfg.targetTableName = tableName; + cfg.targetBasePath = basePath; + cfg.tableType = tableType; + cfg.schemaProviderClassName = schemaProviderClass; + cfg.bootstrapIndexClass = bootstrapIndexClass; + cfg.payloadClassName = payloadClassName; + cfg.enableHiveSync = Boolean.valueOf(enableHiveSync); + + new BootstrapExecutor(cfg, jsc, FSUtils.getFs(basePath, jsc.hadoopConfiguration()), + jsc.hadoopConfiguration(), properties).execute(); + return 0; + } + private static int rollback(JavaSparkContext jsc, String instantTime, String basePath) throws Exception { HoodieWriteClient client = createHoodieClient(jsc, basePath); if (client.rollback(instantTime)) { diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestBootstrapCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestBootstrapCommand.java new file mode 100644 index 000000000..7ac1d61b0 --- /dev/null +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestBootstrapCommand.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.integ; + +import org.apache.hudi.cli.HoodieCLI; +import org.apache.hudi.cli.HoodiePrintHelper; +import org.apache.hudi.cli.commands.TableCommand; +import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest; +import org.apache.hudi.client.TestBootstrap; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.shell.core.CommandResult; + +import java.io.File; +import java.io.IOException; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test class of {@link org.apache.hudi.cli.commands.BootstrapCommand}. + */ +public class ITTestBootstrapCommand extends AbstractShellIntegrationTest { + + private static final int NUM_OF_RECORDS = 100; + private static final String PARTITION_FIELD = "datestr"; + private static final String RECORD_KEY_FIELD = "_row_key"; + + private String tableName; + private String sourcePath; + private String tablePath; + private List partitions; + + @BeforeEach + public void init() { + String srcName = "source"; + tableName = "test-table"; + sourcePath = basePath + File.separator + srcName; + tablePath = basePath + File.separator + tableName; + + // generate test data + partitions = Arrays.asList("2018", "2019", "2020"); + double timestamp = new Double(Instant.now().toEpochMilli()).longValue(); + for (int i = 0; i < partitions.size(); i++) { + Dataset df = TestBootstrap.generateTestRawTripDataset(timestamp, + i * NUM_OF_RECORDS, i * NUM_OF_RECORDS + NUM_OF_RECORDS, null, jsc, sqlContext); + df.write().parquet(sourcePath + File.separator + PARTITION_FIELD + "=" + partitions.get(i)); + } + } + + /** + * Test case for command 'bootstrap'. + */ + @Test + public void testBootstrapRunCommand() throws IOException { + // test bootstrap run command + String cmdStr = String.format( + "bootstrap run --targetPath %s --tableName %s --tableType %s --srcPath %s --rowKeyField %s --partitionPathField %s --sparkMaster %s", + tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(), sourcePath, RECORD_KEY_FIELD, PARTITION_FIELD, "local"); + CommandResult cr = getShell().executeCommand(cmdStr); + assertTrue(cr.isSuccess()); + + // Connect & check Hudi table exist + new TableCommand().connect(tablePath, TimelineLayoutVersion.VERSION_1, false, 2000, 300000, 7); + metaClient = HoodieCLI.getTableMetaClient(); + assertEquals(1, metaClient.getActiveTimeline().getCommitsTimeline().countInstants(), "Should have 1 commit."); + + // test "bootstrap index showpartitions" + CommandResult crForIndexedPartitions = getShell().executeCommand("bootstrap index showpartitions"); + assertTrue(crForIndexedPartitions.isSuccess()); + + String[] header = new String[] {"Indexed partitions"}; + String[][] rows = new String[partitions.size()][1]; + for (int i = 0; i < partitions.size(); i++) { + rows[i][0] = PARTITION_FIELD + "=" + partitions.get(i); + } + String expect = HoodiePrintHelper.print(header, rows); + expect = removeNonWordAndStripSpace(expect); + String got = removeNonWordAndStripSpace(crForIndexedPartitions.getResult().toString()); + assertEquals(expect, got); + + // test "bootstrap index showMapping" + CommandResult crForIndexedMapping = getShell().executeCommand("bootstrap index showmapping"); + assertTrue(crForIndexedMapping.isSuccess()); + + CommandResult crForIndexedMappingWithPartition = getShell().executeCommand(String.format( + "bootstrap index showmapping --partitionPath %s=%s", PARTITION_FIELD, partitions.get(0))); + assertTrue(crForIndexedMappingWithPartition.isSuccess()); + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapCommitActionExecutor.java index 57b41ef40..d4be2b3ea 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapCommitActionExecutor.java @@ -193,7 +193,7 @@ public class BootstrapCommitActionExecutor> } /** - * Perform Metadata Bootstrap. + * Perform Full Bootstrap. * @param partitionFilesList List of partitions and files within that partitions */ protected Option fullBootstrap(List>> partitionFilesList) { diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index b80eaba2a..09ff604f6 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -522,8 +522,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { secondClient.rollback(commitTime1); allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath()); // After rollback, there should be no base file with the failed commit time - assertEquals(0, Arrays.stream(allFiles) - .filter(file -> file.getPath().getName().contains(commitTime1)).count()); + List remainingFiles = Arrays.stream(allFiles).filter(file -> file.getPath().getName() + .contains(commitTime1)).map(fileStatus -> fileStatus.getPath().toString()).collect(Collectors.toList()); + assertEquals(0, remainingFiles.size(), "There files should have been rolled-back " + + "when rolling back commit " + commitTime1 + " but are still remaining. Files: " + remainingFiles); dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles, basePath); assertEquals(200, recordsRead.size()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/BootstrapIndex.java b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/BootstrapIndex.java index daeac06d0..08d7f86ae 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/BootstrapIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/BootstrapIndex.java @@ -71,14 +71,15 @@ public abstract class BootstrapIndex implements Serializable { } /** - * Check if bootstrap Index is present and ensures readable. + * Check if bootstrap Index is physically present. It does not guarantee the validity of the index. + * To ensure an index is valid, use useIndex() API. */ protected abstract boolean isPresent(); /** * Bootstrap Index Reader Interface. */ - public abstract static class IndexReader implements Serializable, AutoCloseable { + public abstract static class IndexReader implements Serializable, AutoCloseable { protected final HoodieTableMetaClient metaClient; @@ -102,7 +103,7 @@ public abstract class BootstrapIndex implements Serializable { * Return list file-ids indexed. * @return */ - public abstract List getIndexedFileIds(); + public abstract List getIndexedFileGroupIds(); /** * Lookup bootstrap index by partition. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java index b9f745442..7dc0f69d2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -57,6 +58,7 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -79,6 +81,13 @@ public class HFileBootstrapIndex extends BootstrapIndex { public static final String BOOTSTRAP_INDEX_FILE_ID = "00000000-0000-0000-0000-000000000000-0"; + private static final String PARTITION_KEY_PREFIX = "part"; + private static final String FILE_ID_KEY_PREFIX = "fileid"; + private static final String KEY_VALUE_SEPARATOR = "="; + private static final String KEY_PARTS_SEPARATOR = ";"; + // This is part of the suffix that HFIle appends to every key + private static final String HFILE_CELL_KEY_SUFFIX_PART = "//LATEST_TIMESTAMP/Put/vlen"; + // Additional Metadata written to HFiles. public static final byte[] INDEX_INFO_KEY = Bytes.toBytes("INDEX_INFO"); @@ -96,12 +105,44 @@ public class HFileBootstrapIndex extends BootstrapIndex { } } + /** + * Returns partition-key to be used in HFile. + * @param partition Partition-Path + * @return + */ private static String getPartitionKey(String partition) { - return "part=" + partition; + return getKeyValueString(PARTITION_KEY_PREFIX, partition); } + /** + * Returns file group key to be used in HFile. + * @param fileGroupId File Group Id. + * @return + */ private static String getFileGroupKey(HoodieFileGroupId fileGroupId) { - return "part=" + fileGroupId.getPartitionPath() + ";fileid=" + fileGroupId.getFileId(); + return getPartitionKey(fileGroupId.getPartitionPath()) + KEY_PARTS_SEPARATOR + + getKeyValueString(FILE_ID_KEY_PREFIX, fileGroupId.getFileId()); + } + + private static String getPartitionFromKey(String key) { + String[] parts = key.split("=", 2); + ValidationUtils.checkArgument(parts[0].equals(PARTITION_KEY_PREFIX)); + return parts[1]; + } + + private static String getFileIdFromKey(String key) { + String[] parts = key.split("=", 2); + ValidationUtils.checkArgument(parts[0].equals(FILE_ID_KEY_PREFIX)); + return parts[1]; + } + + private static HoodieFileGroupId getFileGroupFromKey(String key) { + String[] parts = key.split(KEY_PARTS_SEPARATOR, 2); + return new HoodieFileGroupId(getPartitionFromKey(parts[0]), getFileIdFromKey(parts[1])); + } + + private static String getKeyValueString(String key, String value) { + return key + KEY_VALUE_SEPARATOR + value; } private static Path partitionIndexPath(HoodieTableMetaClient metaClient) { @@ -116,6 +157,17 @@ public class HFileBootstrapIndex extends BootstrapIndex { HoodieFileFormat.HFILE.getFileExtension())); } + /** + * HFile stores cell key in the format example : "2020/03/18//LATEST_TIMESTAMP/Put/vlen=3692/seqid=0". + * This API returns only the user key part from it. + * @param cellKey HFIle Cell Key + * @return + */ + private static String getUserKeyFromCellKey(String cellKey) { + int hfileSuffixBeginIndex = cellKey.lastIndexOf(HFILE_CELL_KEY_SUFFIX_PART); + return cellKey.substring(0, hfileSuffixBeginIndex); + } + /** * Helper method to create HFile Reader. * @@ -160,7 +212,7 @@ public class HFileBootstrapIndex extends BootstrapIndex { } @Override - protected boolean isPresent() { + public boolean isPresent() { return isPresent; } @@ -240,21 +292,21 @@ public class HFileBootstrapIndex extends BootstrapIndex { @Override public List getIndexedPartitionPaths() { HFileScanner scanner = partitionIndexReader().getScanner(true, true); - return getAllKeys(scanner); + return getAllKeys(scanner, HFileBootstrapIndex::getPartitionFromKey); } @Override - public List getIndexedFileIds() { + public List getIndexedFileGroupIds() { HFileScanner scanner = fileIdIndexReader().getScanner(true, true); - return getAllKeys(scanner); + return getAllKeys(scanner, HFileBootstrapIndex::getFileGroupFromKey); } - private List getAllKeys(HFileScanner scanner) { - List keys = new ArrayList<>(); + private List getAllKeys(HFileScanner scanner, Function converter) { + List keys = new ArrayList<>(); try { boolean available = scanner.seekTo(); while (available) { - keys.add(CellUtil.getCellKeyAsString(scanner.getKeyValue())); + keys.add(converter.apply(getUserKeyFromCellKey(CellUtil.getCellKeyAsString(scanner.getKeyValue())))); available = scanner.next(); } } catch (IOException ioe) { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/bootstrap/TestBootstrapIndex.java b/hudi-common/src/test/java/org/apache/hudi/common/bootstrap/TestBootstrapIndex.java index d0dfd939f..428bd8cb9 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/bootstrap/TestBootstrapIndex.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/bootstrap/TestBootstrapIndex.java @@ -34,10 +34,12 @@ import org.apache.hadoop.fs.permission.FsAction; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -50,17 +52,18 @@ import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Unit Tests for Bootstrap Index. */ public class TestBootstrapIndex extends HoodieCommonTestHarness { - private static String[] PARTITIONS = {"2020/03/18", "2020/03/19", "2020/03/20", "2020/03/21"}; - private static String BOOTSTRAP_BASE_PATH = "/tmp/source/parquet_tables/table1"; + private static final String[] PARTITIONS = {"2020/03/18", "2020/03/19", "2020/03/20", "2020/03/21"}; + private static final Set PARTITION_SET = Arrays.stream(PARTITIONS).collect(Collectors.toSet()); + private static final String BOOTSTRAP_BASE_PATH = "/tmp/source/parquet_tables/table1"; @BeforeEach - public void init() throws IOException { initMetaClient(); } @@ -127,11 +130,14 @@ public class TestBootstrapIndex extends HoodieCommonTestHarness { private void validateBootstrapIndex(Map> bootstrapMapping) { BootstrapIndex index = new HFileBootstrapIndex(metaClient); try (BootstrapIndex.IndexReader reader = index.createReader()) { - List partitions = reader.getIndexedPartitionPaths(); - assertEquals(bootstrapMapping.size(), partitions.size()); - long expNumFileGroupKeys = bootstrapMapping.values().stream().flatMap(x -> x.stream()).count(); - long gotNumFileGroupKeys = reader.getIndexedFileIds().size(); + List indexedPartitions = reader.getIndexedPartitionPaths(); + assertEquals(bootstrapMapping.size(), indexedPartitions.size()); + indexedPartitions.forEach(partition -> assertTrue(PARTITION_SET.contains(partition))); + long expNumFileGroupKeys = bootstrapMapping.values().stream().flatMap(Collection::stream).count(); + List fileGroupIds = reader.getIndexedFileGroupIds(); + long gotNumFileGroupKeys = fileGroupIds.size(); assertEquals(expNumFileGroupKeys, gotNumFileGroupKeys); + fileGroupIds.forEach(fgId -> assertTrue(PARTITION_SET.contains(fgId.getPartitionPath()))); bootstrapMapping.entrySet().stream().forEach(e -> { List gotMapping = reader.getSourceFileMappingForPartition(e.getKey()); diff --git a/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java b/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java index 4e1984c98..b53c7d887 100644 --- a/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java +++ b/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java @@ -158,7 +158,7 @@ public class TestBootstrap extends HoodieClientTestBase { public Schema generateNewDataSetAndReturnSchema(double timestamp, int numRecords, List partitionPaths, String srcPath) throws Exception { boolean isPartitioned = partitionPaths != null && !partitionPaths.isEmpty(); - Dataset df = generateTestRawTripDataset(timestamp, numRecords, partitionPaths, jsc, sqlContext); + Dataset df = generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths, jsc, sqlContext); df.printSchema(); if (isPartitioned) { df.write().partitionBy("datestr").format("parquet").mode(SaveMode.Overwrite).save(srcPath); @@ -550,11 +550,11 @@ public class TestBootstrap extends HoodieClientTestBase { return builder; } - private static Dataset generateTestRawTripDataset(double timestamp, int numRecords, List partitionPaths, + public static Dataset generateTestRawTripDataset(double timestamp, int from, int to, List partitionPaths, JavaSparkContext jsc, SQLContext sqlContext) { boolean isPartitioned = partitionPaths != null && !partitionPaths.isEmpty(); final List records = new ArrayList<>(); - IntStream.range(0, numRecords).forEach(i -> { + IntStream.range(from, to).forEach(i -> { String id = "" + i; records.add(generateGenericRecord("trip_" + id, "rider_" + id, "driver_" + id, timestamp, false, false).toString()); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 1dffd07ba..053119646 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.config.DFSPropertiesConfiguration; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; @@ -100,7 +101,7 @@ public class UtilHelpers { public static SchemaProvider createSchemaProvider(String schemaProviderClass, TypedProperties cfg, JavaSparkContext jssc) throws IOException { try { - return schemaProviderClass == null ? null + return StringUtils.isNullOrEmpty(schemaProviderClass) ? null : (SchemaProvider) ReflectionUtils.loadClass(schemaProviderClass, cfg, jssc); } catch (Throwable e) { throw new IOException("Could not load schema provider class " + schemaProviderClass, e);