diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
index ac898a8e6..143a5123c 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
@@ -18,20 +18,25 @@
package org.apache.hudi.cli.commands;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.HoodiePrintHelper;
+import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliCommand;
@@ -40,25 +45,44 @@ import org.springframework.stereotype.Component;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* CLI commands to operate on the Metadata Table.
+ *
+ *
+ * Example:
+ * The default spark.master conf is set to yarn. If you are running on a local deployment,
+ * we can set the spark master to local using set conf command.
+ * > set --conf SPARK_MASTER=local
+ *
+ * Connect to the table
+ * > connect --path {path to hudi table}
+ *
+ * Run metadata commands
+ * > metadata list-partitions
*/
@Component
public class MetadataCommand implements CommandMarker {
- private JavaSparkContext jsc;
+ private static final Logger LOG = LogManager.getLogger(MetadataCommand.class);
private static String metadataBaseDirectory;
+ private JavaSparkContext jsc;
/**
* Sets the directory to store/read Metadata Table.
- *
+ *
* This can be used to store the metadata table away from the dataset directory.
- * - Useful for testing as well as for using via the HUDI CLI so that the actual dataset is not written to.
- * - Useful for testing Metadata Table performance and operations on existing datasets before enabling.
+ * - Useful for testing as well as for using via the HUDI CLI so that the actual dataset is not written to.
+ * - Useful for testing Metadata Table performance and operations on existing datasets before enabling.
*/
public static void setMetadataBaseDirectory(String metadataDir) {
ValidationUtils.checkState(metadataBaseDirectory == null,
@@ -75,8 +99,7 @@ public class MetadataCommand implements CommandMarker {
@CliCommand(value = "metadata set", help = "Set options for Metadata Table")
public String set(@CliOption(key = {"metadataDir"},
- help = "Directory to read/write metadata table (can be different from dataset)", unspecifiedDefaultValue = "")
- final String metadataDir) {
+ help = "Directory to read/write metadata table (can be different from dataset)", unspecifiedDefaultValue = "") final String metadataDir) {
if (!metadataDir.isEmpty()) {
setMetadataBaseDirectory(metadataDir);
}
@@ -152,16 +175,22 @@ public class MetadataCommand implements CommandMarker {
config, HoodieCLI.basePath, "/tmp");
Map stats = metadata.stats();
- StringBuffer out = new StringBuffer("\n");
- out.append(String.format("Base path: %s\n", getMetadataTableBasePath(HoodieCLI.basePath)));
+ final List rows = new ArrayList<>();
for (Map.Entry entry : stats.entrySet()) {
- out.append(String.format("%s: %s\n", entry.getKey(), entry.getValue()));
+ Comparable[] row = new Comparable[2];
+ row[0] = entry.getKey();
+ row[1] = entry.getValue();
+ rows.add(row);
}
- return out.toString();
+ TableHeader header = new TableHeader()
+ .addTableHeaderField("stat key")
+ .addTableHeaderField("stat value");
+ return HoodiePrintHelper.print(header, new HashMap<>(), "",
+ false, Integer.MAX_VALUE, false, rows);
}
- @CliCommand(value = "metadata list-partitions", help = "Print a list of all partitions from the metadata")
+ @CliCommand(value = "metadata list-partitions", help = "List all partitions from metadata")
public String listPartitions() throws IOException {
HoodieCLI.getTableMetaClient();
initJavaSparkContext();
@@ -169,55 +198,150 @@ public class MetadataCommand implements CommandMarker {
HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(new HoodieSparkEngineContext(jsc), config,
HoodieCLI.basePath, "/tmp");
- StringBuffer out = new StringBuffer("\n");
if (!metadata.enabled()) {
- out.append("=== Metadata Table not initilized. Using file listing to get list of partitions. ===\n\n");
+ return "[ERROR] Metadata Table not enabled/initialized\n\n";
}
- long t1 = System.currentTimeMillis();
+ HoodieTimer timer = new HoodieTimer().startTimer();
List partitions = metadata.getAllPartitionPaths();
- long t2 = System.currentTimeMillis();
+ LOG.debug("Took " + timer.endTimer() + " ms");
- int[] count = {0};
- partitions.stream().sorted((p1, p2) -> p2.compareTo(p1)).forEach(p -> {
- out.append(p);
- if (++count[0] % 15 == 0) {
- out.append("\n");
- } else {
- out.append(", ");
- }
+ final List rows = new ArrayList<>();
+ partitions.stream().sorted(Comparator.reverseOrder()).forEach(p -> {
+ Comparable[] row = new Comparable[1];
+ row[0] = p;
+ rows.add(row);
});
- out.append(String.format("\n\n=== List of partitions retrieved in %.2fsec ===", (t2 - t1) / 1000.0));
-
- return out.toString();
+ TableHeader header = new TableHeader().addTableHeaderField("partition");
+ return HoodiePrintHelper.print(header, new HashMap<>(), "",
+ false, Integer.MAX_VALUE, false, rows);
}
@CliCommand(value = "metadata list-files", help = "Print a list of all files in a partition from the metadata")
public String listFiles(
- @CliOption(key = {"partition"}, help = "Name of the partition to list files", mandatory = true)
- final String partition) throws IOException {
+ @CliOption(key = {"partition"}, help = "Name of the partition to list files", mandatory = true) final String partition) throws IOException {
HoodieCLI.getTableMetaClient();
HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build();
- HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(new HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath, "/tmp");
+ HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(
+ new HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath, "/tmp");
- StringBuffer out = new StringBuffer("\n");
if (!metaReader.enabled()) {
- out.append("=== Metadata Table not initialized. Using file listing to get list of files in partition. ===\n\n");
+ return "[ERROR] Metadata Table not enabled/initialized\n\n";
}
- long t1 = System.currentTimeMillis();
+ HoodieTimer timer = new HoodieTimer().startTimer();
FileStatus[] statuses = metaReader.getAllFilesInPartition(new Path(HoodieCLI.basePath, partition));
- long t2 = System.currentTimeMillis();
+ LOG.debug("Took " + timer.endTimer() + " ms");
- Arrays.stream(statuses).sorted((p1, p2) -> p2.getPath().getName().compareTo(p1.getPath().getName())).forEach(p -> {
- out.append("\t" + p.getPath().getName());
- out.append("\n");
+ final List rows = new ArrayList<>();
+ Arrays.stream(statuses).sorted((p1, p2) -> p2.getPath().getName().compareTo(p1.getPath().getName())).forEach(f -> {
+ Comparable[] row = new Comparable[1];
+ row[0] = f;
+ rows.add(row);
});
- out.append(String.format("\n=== Files in partition retrieved in %.2fsec ===", (t2 - t1) / 1000.0));
+ TableHeader header = new TableHeader().addTableHeaderField("file path");
+ return HoodiePrintHelper.print(header, new HashMap<>(), "",
+ false, Integer.MAX_VALUE, false, rows);
+ }
- return out.toString();
+ @CliCommand(value = "metadata validate-files", help = "Validate all files in all partitions from the metadata")
+ public String validateFiles(
+ @CliOption(key = {"verbose"}, help = "Print all file details", unspecifiedDefaultValue = "false") final boolean verbose) throws IOException {
+ HoodieCLI.getTableMetaClient();
+ HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build();
+ HoodieBackedTableMetadata metadataReader = new HoodieBackedTableMetadata(
+ new HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath, "/tmp");
+
+ if (!metadataReader.enabled()) {
+ return "[ERROR] Metadata Table not enabled/initialized\n\n";
+ }
+
+ HoodieMetadataConfig fsConfig = HoodieMetadataConfig.newBuilder().enable(false).build();
+ HoodieBackedTableMetadata fsMetaReader = new HoodieBackedTableMetadata(
+ new HoodieLocalEngineContext(HoodieCLI.conf), fsConfig, HoodieCLI.basePath, "/tmp");
+
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ List metadataPartitions = metadataReader.getAllPartitionPaths();
+ LOG.debug("Listing partitions Took " + timer.endTimer() + " ms");
+ List fsPartitions = fsMetaReader.getAllPartitionPaths();
+ Collections.sort(fsPartitions);
+ Collections.sort(metadataPartitions);
+
+ Set allPartitions = new HashSet<>();
+ allPartitions.addAll(fsPartitions);
+ allPartitions.addAll(metadataPartitions);
+
+ if (!fsPartitions.equals(metadataPartitions)) {
+ LOG.error("FS partition listing is not matching with metadata partition listing!");
+ LOG.error("All FS partitions: " + Arrays.toString(fsPartitions.toArray()));
+ LOG.error("All Metadata partitions: " + Arrays.toString(metadataPartitions.toArray()));
+ }
+
+ final List rows = new ArrayList<>();
+ for (String partition : allPartitions) {
+ Map fileStatusMap = new HashMap<>();
+ Map metadataFileStatusMap = new HashMap<>();
+ FileStatus[] metadataStatuses = metadataReader.getAllFilesInPartition(new Path(HoodieCLI.basePath, partition));
+ Arrays.stream(metadataStatuses).forEach(entry -> metadataFileStatusMap.put(entry.getPath().getName(), entry));
+ FileStatus[] fsStatuses = fsMetaReader.getAllFilesInPartition(new Path(HoodieCLI.basePath, partition));
+ Arrays.stream(fsStatuses).forEach(entry -> fileStatusMap.put(entry.getPath().getName(), entry));
+
+ Set allFiles = new HashSet<>();
+ allFiles.addAll(fileStatusMap.keySet());
+ allFiles.addAll(metadataFileStatusMap.keySet());
+
+ for (String file : allFiles) {
+ Comparable[] row = new Comparable[6];
+ row[0] = partition;
+ FileStatus fsFileStatus = fileStatusMap.get(file);
+ FileStatus metaFileStatus = metadataFileStatusMap.get(file);
+ row[1] = file;
+ row[2] = fsFileStatus != null;
+ row[3] = metaFileStatus != null;
+ row[4] = (fsFileStatus != null) ? fsFileStatus.getLen() : 0;
+ row[5] = (metaFileStatus != null) ? metaFileStatus.getLen() : 0;
+ rows.add(row);
+ }
+
+ if (metadataStatuses.length != fsStatuses.length) {
+ LOG.error(" FS and metadata files count not matching for " + partition + ". FS files count " + fsStatuses.length + ", metadata base files count "
+ + metadataStatuses.length);
+ }
+
+ for (Map.Entry entry : fileStatusMap.entrySet()) {
+ if (!metadataFileStatusMap.containsKey(entry.getKey())) {
+ LOG.error("FS file not found in metadata " + entry.getKey());
+ } else {
+ if (entry.getValue().getLen() != metadataFileStatusMap.get(entry.getKey()).getLen()) {
+ LOG.error(" FS file size mismatch " + entry.getKey() + ", size equality "
+ + (entry.getValue().getLen() == metadataFileStatusMap.get(entry.getKey()).getLen())
+ + ". FS size " + entry.getValue().getLen() + ", metadata size "
+ + metadataFileStatusMap.get(entry.getKey()).getLen());
+ }
+ }
+ }
+ for (Map.Entry entry : metadataFileStatusMap.entrySet()) {
+ if (!fileStatusMap.containsKey(entry.getKey())) {
+ LOG.error("Metadata file not found in FS " + entry.getKey());
+ } else {
+ if (entry.getValue().getLen() != fileStatusMap.get(entry.getKey()).getLen()) {
+ LOG.error(" Metadata file size mismatch " + entry.getKey() + ", size equality "
+ + (entry.getValue().getLen() == fileStatusMap.get(entry.getKey()).getLen())
+ + ". Metadata size " + entry.getValue().getLen() + ", FS size "
+ + metadataFileStatusMap.get(entry.getKey()).getLen());
+ }
+ }
+ }
+ }
+ TableHeader header = new TableHeader().addTableHeaderField("Partition")
+ .addTableHeaderField("File Name")
+ .addTableHeaderField(" IsPresent in FS ")
+ .addTableHeaderField(" IsPresent in Metadata")
+ .addTableHeaderField(" FS size")
+ .addTableHeaderField(" Metadata size");
+ return HoodiePrintHelper.print(header, new HashMap<>(), "", false, Integer.MAX_VALUE, false, rows);
}
private HoodieWriteConfig getWriteConfig() {
@@ -227,7 +351,7 @@ public class MetadataCommand implements CommandMarker {
private void initJavaSparkContext() {
if (jsc == null) {
- jsc = SparkUtil.initJavaSparkConf("HoodieClI");
+ jsc = SparkUtil.initJavaSparkConf(SparkUtil.getDefaultConf("HoodieCLI", Option.empty()));
}
}
-}
+}
\ No newline at end of file
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkEnvCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkEnvCommand.java
index 7969808e2..a3d78d126 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkEnvCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkEnvCommand.java
@@ -43,6 +43,7 @@ public class SparkEnvCommand implements CommandMarker {
throw new IllegalArgumentException("Illegal set parameter, please use like [set --conf SPARK_HOME=/usr/etc/spark]");
}
env.put(map[0].trim(), map[1].trim());
+ System.setProperty(map[0].trim(), map[1].trim());
}
@CliCommand(value = "show envs all", help = "Show spark launcher envs")
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
index cf853b4c5..45e048755 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
@@ -34,6 +34,7 @@ import java.io.File;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.Objects;
+import java.util.Properties;
/**
* Utility functions dealing with Spark.
@@ -62,28 +63,30 @@ public class SparkUtil {
return sparkLauncher;
}
- public static JavaSparkContext initJavaSparkConf(String name) {
- return initJavaSparkConf(name, Option.empty(), Option.empty());
- }
+ /**
+ * Get the default spark configuration.
+ *
+ * @param appName - Spark application name
+ * @param sparkMaster - Spark master node name
+ * @return Spark configuration
+ */
+ public static SparkConf getDefaultConf(final String appName, final Option sparkMaster) {
+ final Properties properties = System.getProperties();
+ SparkConf sparkConf = new SparkConf().setAppName(appName);
- public static JavaSparkContext initJavaSparkConf(String name, Option master,
- Option executorMemory) {
- SparkConf sparkConf = new SparkConf().setAppName(name);
-
- String defMaster = master.orElse(sparkConf.getenv(HoodieCliSparkConfig.CLI_SPARK_MASTER));
- if ((null == defMaster) || (defMaster.isEmpty())) {
- sparkConf.setMaster(DEFAULT_SPARK_MASTER);
- } else {
- sparkConf.setMaster(defMaster);
+ // Configure the sparkMaster
+ String sparkMasterNode = DEFAULT_SPARK_MASTER;
+ if (properties.getProperty(HoodieCliSparkConfig.CLI_SPARK_MASTER) != null) {
+ sparkMasterNode = properties.getProperty(HoodieCliSparkConfig.CLI_SPARK_MASTER);
}
+ sparkMasterNode = sparkMaster.orElse(sparkMasterNode);
+ sparkConf.setMaster(sparkMasterNode);
- sparkConf.set(HoodieCliSparkConfig.CLI_SERIALIZER, "org.apache.spark.serializer.KryoSerializer");
+ // Configure driver
sparkConf.set(HoodieCliSparkConfig.CLI_DRIVER_MAX_RESULT_SIZE, "2g");
sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_OVERWRITE, "true");
- sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_ENABLED, "true");
- if (executorMemory.isPresent()) {
- sparkConf.set(HoodieCliSparkConfig.CLI_EXECUTOR_MEMORY, executorMemory.get());
- }
+ sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_ENABLED, "false");
+ sparkConf.set(HoodieCliSparkConfig.CLI_SERIALIZER, "org.apache.spark.serializer.KryoSerializer");
// Configure hadoop conf
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESS, "true");
@@ -91,10 +94,28 @@ public class SparkUtil {
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC, "org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_TYPE, "BLOCK");
+ return sparkConf;
+ }
+
+ public static JavaSparkContext initJavaSparkConf(String name) {
+ return initJavaSparkConf(name, Option.empty(), Option.empty());
+ }
+
+ public static JavaSparkContext initJavaSparkConf(String name, Option master, Option executorMemory) {
+ SparkConf sparkConf = getDefaultConf(name, master);
+ if (executorMemory.isPresent()) {
+ sparkConf.set(HoodieCliSparkConfig.CLI_EXECUTOR_MEMORY, executorMemory.get());
+ }
+
+ return initJavaSparkConf(sparkConf);
+ }
+
+ public static JavaSparkContext initJavaSparkConf(SparkConf sparkConf) {
SparkRDDWriteClient.registerClasses(sparkConf);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
jsc.hadoopConfiguration().setBoolean(HoodieCliSparkConfig.CLI_PARQUET_ENABLE_SUMMARY_METADATA, false);
FSUtils.prepareHadoopConf(jsc.hadoopConfiguration());
return jsc;
}
+
}