From 445208a0d20b457daeeb5f70995302c92dd19f31 Mon Sep 17 00:00:00 2001 From: Manoj Govindassamy Date: Fri, 26 Nov 2021 13:44:16 -0800 Subject: [PATCH] [HUDI-2845] Metadata CLI - files/partition file listing fix and new validate option (#4092) - Co-authored-by: Sivabalan Narayanan --- .../hudi/cli/commands/MetadataCommand.java | 208 ++++++++++++++---- .../hudi/cli/commands/SparkEnvCommand.java | 1 + .../org/apache/hudi/cli/utils/SparkUtil.java | 55 +++-- 3 files changed, 205 insertions(+), 59 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java index ac898a8e6..143a5123c 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java @@ -18,20 +18,25 @@ package org.apache.hudi.cli.commands; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.hudi.cli.HoodieCLI; +import org.apache.hudi.cli.HoodiePrintHelper; +import org.apache.hudi.cli.TableHeader; import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.metadata.HoodieBackedTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; import org.springframework.shell.core.CommandMarker; import org.springframework.shell.core.annotation.CliCommand; @@ -40,25 +45,44 @@ import org.springframework.stereotype.Component; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; /** * CLI commands to operate on the Metadata Table. + *

+ *

+ * Example: + * The default spark.master conf is set to yarn. If you are running on a local deployment, + * we can set the spark master to local using set conf command. + * > set --conf SPARK_MASTER=local + *

+ * Connect to the table + * > connect --path {path to hudi table} + *

+ * Run metadata commands + * > metadata list-partitions */ @Component public class MetadataCommand implements CommandMarker { - private JavaSparkContext jsc; + private static final Logger LOG = LogManager.getLogger(MetadataCommand.class); private static String metadataBaseDirectory; + private JavaSparkContext jsc; /** * Sets the directory to store/read Metadata Table. - * + *

* This can be used to store the metadata table away from the dataset directory. - * - Useful for testing as well as for using via the HUDI CLI so that the actual dataset is not written to. - * - Useful for testing Metadata Table performance and operations on existing datasets before enabling. + * - Useful for testing as well as for using via the HUDI CLI so that the actual dataset is not written to. + * - Useful for testing Metadata Table performance and operations on existing datasets before enabling. */ public static void setMetadataBaseDirectory(String metadataDir) { ValidationUtils.checkState(metadataBaseDirectory == null, @@ -75,8 +99,7 @@ public class MetadataCommand implements CommandMarker { @CliCommand(value = "metadata set", help = "Set options for Metadata Table") public String set(@CliOption(key = {"metadataDir"}, - help = "Directory to read/write metadata table (can be different from dataset)", unspecifiedDefaultValue = "") - final String metadataDir) { + help = "Directory to read/write metadata table (can be different from dataset)", unspecifiedDefaultValue = "") final String metadataDir) { if (!metadataDir.isEmpty()) { setMetadataBaseDirectory(metadataDir); } @@ -152,16 +175,22 @@ public class MetadataCommand implements CommandMarker { config, HoodieCLI.basePath, "/tmp"); Map stats = metadata.stats(); - StringBuffer out = new StringBuffer("\n"); - out.append(String.format("Base path: %s\n", getMetadataTableBasePath(HoodieCLI.basePath))); + final List rows = new ArrayList<>(); for (Map.Entry entry : stats.entrySet()) { - out.append(String.format("%s: %s\n", entry.getKey(), entry.getValue())); + Comparable[] row = new Comparable[2]; + row[0] = entry.getKey(); + row[1] = entry.getValue(); + rows.add(row); } - return out.toString(); + TableHeader header = new TableHeader() + .addTableHeaderField("stat key") + .addTableHeaderField("stat value"); + return HoodiePrintHelper.print(header, new HashMap<>(), "", + false, Integer.MAX_VALUE, false, rows); } - @CliCommand(value = "metadata list-partitions", help = "Print a list of all partitions from the metadata") + @CliCommand(value = "metadata list-partitions", help = "List all partitions from metadata") public String listPartitions() throws IOException { HoodieCLI.getTableMetaClient(); initJavaSparkContext(); @@ -169,55 +198,150 @@ public class MetadataCommand implements CommandMarker { HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(new HoodieSparkEngineContext(jsc), config, HoodieCLI.basePath, "/tmp"); - StringBuffer out = new StringBuffer("\n"); if (!metadata.enabled()) { - out.append("=== Metadata Table not initilized. Using file listing to get list of partitions. ===\n\n"); + return "[ERROR] Metadata Table not enabled/initialized\n\n"; } - long t1 = System.currentTimeMillis(); + HoodieTimer timer = new HoodieTimer().startTimer(); List partitions = metadata.getAllPartitionPaths(); - long t2 = System.currentTimeMillis(); + LOG.debug("Took " + timer.endTimer() + " ms"); - int[] count = {0}; - partitions.stream().sorted((p1, p2) -> p2.compareTo(p1)).forEach(p -> { - out.append(p); - if (++count[0] % 15 == 0) { - out.append("\n"); - } else { - out.append(", "); - } + final List rows = new ArrayList<>(); + partitions.stream().sorted(Comparator.reverseOrder()).forEach(p -> { + Comparable[] row = new Comparable[1]; + row[0] = p; + rows.add(row); }); - out.append(String.format("\n\n=== List of partitions retrieved in %.2fsec ===", (t2 - t1) / 1000.0)); - - return out.toString(); + TableHeader header = new TableHeader().addTableHeaderField("partition"); + return HoodiePrintHelper.print(header, new HashMap<>(), "", + false, Integer.MAX_VALUE, false, rows); } @CliCommand(value = "metadata list-files", help = "Print a list of all files in a partition from the metadata") public String listFiles( - @CliOption(key = {"partition"}, help = "Name of the partition to list files", mandatory = true) - final String partition) throws IOException { + @CliOption(key = {"partition"}, help = "Name of the partition to list files", mandatory = true) final String partition) throws IOException { HoodieCLI.getTableMetaClient(); HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build(); - HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(new HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath, "/tmp"); + HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata( + new HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath, "/tmp"); - StringBuffer out = new StringBuffer("\n"); if (!metaReader.enabled()) { - out.append("=== Metadata Table not initialized. Using file listing to get list of files in partition. ===\n\n"); + return "[ERROR] Metadata Table not enabled/initialized\n\n"; } - long t1 = System.currentTimeMillis(); + HoodieTimer timer = new HoodieTimer().startTimer(); FileStatus[] statuses = metaReader.getAllFilesInPartition(new Path(HoodieCLI.basePath, partition)); - long t2 = System.currentTimeMillis(); + LOG.debug("Took " + timer.endTimer() + " ms"); - Arrays.stream(statuses).sorted((p1, p2) -> p2.getPath().getName().compareTo(p1.getPath().getName())).forEach(p -> { - out.append("\t" + p.getPath().getName()); - out.append("\n"); + final List rows = new ArrayList<>(); + Arrays.stream(statuses).sorted((p1, p2) -> p2.getPath().getName().compareTo(p1.getPath().getName())).forEach(f -> { + Comparable[] row = new Comparable[1]; + row[0] = f; + rows.add(row); }); - out.append(String.format("\n=== Files in partition retrieved in %.2fsec ===", (t2 - t1) / 1000.0)); + TableHeader header = new TableHeader().addTableHeaderField("file path"); + return HoodiePrintHelper.print(header, new HashMap<>(), "", + false, Integer.MAX_VALUE, false, rows); + } - return out.toString(); + @CliCommand(value = "metadata validate-files", help = "Validate all files in all partitions from the metadata") + public String validateFiles( + @CliOption(key = {"verbose"}, help = "Print all file details", unspecifiedDefaultValue = "false") final boolean verbose) throws IOException { + HoodieCLI.getTableMetaClient(); + HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build(); + HoodieBackedTableMetadata metadataReader = new HoodieBackedTableMetadata( + new HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath, "/tmp"); + + if (!metadataReader.enabled()) { + return "[ERROR] Metadata Table not enabled/initialized\n\n"; + } + + HoodieMetadataConfig fsConfig = HoodieMetadataConfig.newBuilder().enable(false).build(); + HoodieBackedTableMetadata fsMetaReader = new HoodieBackedTableMetadata( + new HoodieLocalEngineContext(HoodieCLI.conf), fsConfig, HoodieCLI.basePath, "/tmp"); + + HoodieTimer timer = new HoodieTimer().startTimer(); + List metadataPartitions = metadataReader.getAllPartitionPaths(); + LOG.debug("Listing partitions Took " + timer.endTimer() + " ms"); + List fsPartitions = fsMetaReader.getAllPartitionPaths(); + Collections.sort(fsPartitions); + Collections.sort(metadataPartitions); + + Set allPartitions = new HashSet<>(); + allPartitions.addAll(fsPartitions); + allPartitions.addAll(metadataPartitions); + + if (!fsPartitions.equals(metadataPartitions)) { + LOG.error("FS partition listing is not matching with metadata partition listing!"); + LOG.error("All FS partitions: " + Arrays.toString(fsPartitions.toArray())); + LOG.error("All Metadata partitions: " + Arrays.toString(metadataPartitions.toArray())); + } + + final List rows = new ArrayList<>(); + for (String partition : allPartitions) { + Map fileStatusMap = new HashMap<>(); + Map metadataFileStatusMap = new HashMap<>(); + FileStatus[] metadataStatuses = metadataReader.getAllFilesInPartition(new Path(HoodieCLI.basePath, partition)); + Arrays.stream(metadataStatuses).forEach(entry -> metadataFileStatusMap.put(entry.getPath().getName(), entry)); + FileStatus[] fsStatuses = fsMetaReader.getAllFilesInPartition(new Path(HoodieCLI.basePath, partition)); + Arrays.stream(fsStatuses).forEach(entry -> fileStatusMap.put(entry.getPath().getName(), entry)); + + Set allFiles = new HashSet<>(); + allFiles.addAll(fileStatusMap.keySet()); + allFiles.addAll(metadataFileStatusMap.keySet()); + + for (String file : allFiles) { + Comparable[] row = new Comparable[6]; + row[0] = partition; + FileStatus fsFileStatus = fileStatusMap.get(file); + FileStatus metaFileStatus = metadataFileStatusMap.get(file); + row[1] = file; + row[2] = fsFileStatus != null; + row[3] = metaFileStatus != null; + row[4] = (fsFileStatus != null) ? fsFileStatus.getLen() : 0; + row[5] = (metaFileStatus != null) ? metaFileStatus.getLen() : 0; + rows.add(row); + } + + if (metadataStatuses.length != fsStatuses.length) { + LOG.error(" FS and metadata files count not matching for " + partition + ". FS files count " + fsStatuses.length + ", metadata base files count " + + metadataStatuses.length); + } + + for (Map.Entry entry : fileStatusMap.entrySet()) { + if (!metadataFileStatusMap.containsKey(entry.getKey())) { + LOG.error("FS file not found in metadata " + entry.getKey()); + } else { + if (entry.getValue().getLen() != metadataFileStatusMap.get(entry.getKey()).getLen()) { + LOG.error(" FS file size mismatch " + entry.getKey() + ", size equality " + + (entry.getValue().getLen() == metadataFileStatusMap.get(entry.getKey()).getLen()) + + ". FS size " + entry.getValue().getLen() + ", metadata size " + + metadataFileStatusMap.get(entry.getKey()).getLen()); + } + } + } + for (Map.Entry entry : metadataFileStatusMap.entrySet()) { + if (!fileStatusMap.containsKey(entry.getKey())) { + LOG.error("Metadata file not found in FS " + entry.getKey()); + } else { + if (entry.getValue().getLen() != fileStatusMap.get(entry.getKey()).getLen()) { + LOG.error(" Metadata file size mismatch " + entry.getKey() + ", size equality " + + (entry.getValue().getLen() == fileStatusMap.get(entry.getKey()).getLen()) + + ". Metadata size " + entry.getValue().getLen() + ", FS size " + + metadataFileStatusMap.get(entry.getKey()).getLen()); + } + } + } + } + TableHeader header = new TableHeader().addTableHeaderField("Partition") + .addTableHeaderField("File Name") + .addTableHeaderField(" IsPresent in FS ") + .addTableHeaderField(" IsPresent in Metadata") + .addTableHeaderField(" FS size") + .addTableHeaderField(" Metadata size"); + return HoodiePrintHelper.print(header, new HashMap<>(), "", false, Integer.MAX_VALUE, false, rows); } private HoodieWriteConfig getWriteConfig() { @@ -227,7 +351,7 @@ public class MetadataCommand implements CommandMarker { private void initJavaSparkContext() { if (jsc == null) { - jsc = SparkUtil.initJavaSparkConf("HoodieClI"); + jsc = SparkUtil.initJavaSparkConf(SparkUtil.getDefaultConf("HoodieCLI", Option.empty())); } } -} +} \ No newline at end of file diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkEnvCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkEnvCommand.java index 7969808e2..a3d78d126 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkEnvCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkEnvCommand.java @@ -43,6 +43,7 @@ public class SparkEnvCommand implements CommandMarker { throw new IllegalArgumentException("Illegal set parameter, please use like [set --conf SPARK_HOME=/usr/etc/spark]"); } env.put(map[0].trim(), map[1].trim()); + System.setProperty(map[0].trim(), map[1].trim()); } @CliCommand(value = "show envs all", help = "Show spark launcher envs") diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java index cf853b4c5..45e048755 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java @@ -34,6 +34,7 @@ import java.io.File; import java.net.URISyntaxException; import java.util.Map; import java.util.Objects; +import java.util.Properties; /** * Utility functions dealing with Spark. @@ -62,28 +63,30 @@ public class SparkUtil { return sparkLauncher; } - public static JavaSparkContext initJavaSparkConf(String name) { - return initJavaSparkConf(name, Option.empty(), Option.empty()); - } + /** + * Get the default spark configuration. + * + * @param appName - Spark application name + * @param sparkMaster - Spark master node name + * @return Spark configuration + */ + public static SparkConf getDefaultConf(final String appName, final Option sparkMaster) { + final Properties properties = System.getProperties(); + SparkConf sparkConf = new SparkConf().setAppName(appName); - public static JavaSparkContext initJavaSparkConf(String name, Option master, - Option executorMemory) { - SparkConf sparkConf = new SparkConf().setAppName(name); - - String defMaster = master.orElse(sparkConf.getenv(HoodieCliSparkConfig.CLI_SPARK_MASTER)); - if ((null == defMaster) || (defMaster.isEmpty())) { - sparkConf.setMaster(DEFAULT_SPARK_MASTER); - } else { - sparkConf.setMaster(defMaster); + // Configure the sparkMaster + String sparkMasterNode = DEFAULT_SPARK_MASTER; + if (properties.getProperty(HoodieCliSparkConfig.CLI_SPARK_MASTER) != null) { + sparkMasterNode = properties.getProperty(HoodieCliSparkConfig.CLI_SPARK_MASTER); } + sparkMasterNode = sparkMaster.orElse(sparkMasterNode); + sparkConf.setMaster(sparkMasterNode); - sparkConf.set(HoodieCliSparkConfig.CLI_SERIALIZER, "org.apache.spark.serializer.KryoSerializer"); + // Configure driver sparkConf.set(HoodieCliSparkConfig.CLI_DRIVER_MAX_RESULT_SIZE, "2g"); sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_OVERWRITE, "true"); - sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_ENABLED, "true"); - if (executorMemory.isPresent()) { - sparkConf.set(HoodieCliSparkConfig.CLI_EXECUTOR_MEMORY, executorMemory.get()); - } + sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_ENABLED, "false"); + sparkConf.set(HoodieCliSparkConfig.CLI_SERIALIZER, "org.apache.spark.serializer.KryoSerializer"); // Configure hadoop conf sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESS, "true"); @@ -91,10 +94,28 @@ public class SparkUtil { sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC, "org.apache.hadoop.io.compress.GzipCodec"); sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_TYPE, "BLOCK"); + return sparkConf; + } + + public static JavaSparkContext initJavaSparkConf(String name) { + return initJavaSparkConf(name, Option.empty(), Option.empty()); + } + + public static JavaSparkContext initJavaSparkConf(String name, Option master, Option executorMemory) { + SparkConf sparkConf = getDefaultConf(name, master); + if (executorMemory.isPresent()) { + sparkConf.set(HoodieCliSparkConfig.CLI_EXECUTOR_MEMORY, executorMemory.get()); + } + + return initJavaSparkConf(sparkConf); + } + + public static JavaSparkContext initJavaSparkConf(SparkConf sparkConf) { SparkRDDWriteClient.registerClasses(sparkConf); JavaSparkContext jsc = new JavaSparkContext(sparkConf); jsc.hadoopConfiguration().setBoolean(HoodieCliSparkConfig.CLI_PARQUET_ENABLE_SUMMARY_METADATA, false); FSUtils.prepareHadoopConf(jsc.hadoopConfiguration()); return jsc; } + }