[HUDI-2845] Metadata CLI - files/partition file listing fix and new validate option (#4092)
- Co-authored-by: Sivabalan Narayanan <n.siva.b@gmail.com>
This commit is contained in:
committed by
GitHub
parent
d1e83e4ba0
commit
445208a0d2
@@ -18,20 +18,25 @@
|
||||
|
||||
package org.apache.hudi.cli.commands;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.cli.HoodieCLI;
|
||||
import org.apache.hudi.cli.HoodiePrintHelper;
|
||||
import org.apache.hudi.cli.TableHeader;
|
||||
import org.apache.hudi.cli.utils.SparkUtil;
|
||||
import org.apache.hudi.client.common.HoodieSparkEngineContext;
|
||||
import org.apache.hudi.common.config.HoodieMetadataConfig;
|
||||
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
|
||||
import org.apache.hudi.common.util.HoodieTimer;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
|
||||
import org.apache.hudi.metadata.HoodieTableMetadata;
|
||||
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.springframework.shell.core.CommandMarker;
|
||||
import org.springframework.shell.core.annotation.CliCommand;
|
||||
@@ -40,25 +45,44 @@ import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* CLI commands to operate on the Metadata Table.
|
||||
* <p>
|
||||
* <p>
|
||||
* Example:
|
||||
* The default spark.master conf is set to yarn. If you are running on a local deployment,
|
||||
* we can set the spark master to local using set conf command.
|
||||
* > set --conf SPARK_MASTER=local
|
||||
* <p>
|
||||
* Connect to the table
|
||||
* > connect --path {path to hudi table}
|
||||
* <p>
|
||||
* Run metadata commands
|
||||
* > metadata list-partitions
|
||||
*/
|
||||
@Component
|
||||
public class MetadataCommand implements CommandMarker {
|
||||
|
||||
private JavaSparkContext jsc;
|
||||
private static final Logger LOG = LogManager.getLogger(MetadataCommand.class);
|
||||
private static String metadataBaseDirectory;
|
||||
private JavaSparkContext jsc;
|
||||
|
||||
/**
|
||||
* Sets the directory to store/read Metadata Table.
|
||||
*
|
||||
* <p>
|
||||
* This can be used to store the metadata table away from the dataset directory.
|
||||
* - Useful for testing as well as for using via the HUDI CLI so that the actual dataset is not written to.
|
||||
* - Useful for testing Metadata Table performance and operations on existing datasets before enabling.
|
||||
* - Useful for testing as well as for using via the HUDI CLI so that the actual dataset is not written to.
|
||||
* - Useful for testing Metadata Table performance and operations on existing datasets before enabling.
|
||||
*/
|
||||
public static void setMetadataBaseDirectory(String metadataDir) {
|
||||
ValidationUtils.checkState(metadataBaseDirectory == null,
|
||||
@@ -75,8 +99,7 @@ public class MetadataCommand implements CommandMarker {
|
||||
|
||||
@CliCommand(value = "metadata set", help = "Set options for Metadata Table")
|
||||
public String set(@CliOption(key = {"metadataDir"},
|
||||
help = "Directory to read/write metadata table (can be different from dataset)", unspecifiedDefaultValue = "")
|
||||
final String metadataDir) {
|
||||
help = "Directory to read/write metadata table (can be different from dataset)", unspecifiedDefaultValue = "") final String metadataDir) {
|
||||
if (!metadataDir.isEmpty()) {
|
||||
setMetadataBaseDirectory(metadataDir);
|
||||
}
|
||||
@@ -152,16 +175,22 @@ public class MetadataCommand implements CommandMarker {
|
||||
config, HoodieCLI.basePath, "/tmp");
|
||||
Map<String, String> stats = metadata.stats();
|
||||
|
||||
StringBuffer out = new StringBuffer("\n");
|
||||
out.append(String.format("Base path: %s\n", getMetadataTableBasePath(HoodieCLI.basePath)));
|
||||
final List<Comparable[]> rows = new ArrayList<>();
|
||||
for (Map.Entry<String, String> entry : stats.entrySet()) {
|
||||
out.append(String.format("%s: %s\n", entry.getKey(), entry.getValue()));
|
||||
Comparable[] row = new Comparable[2];
|
||||
row[0] = entry.getKey();
|
||||
row[1] = entry.getValue();
|
||||
rows.add(row);
|
||||
}
|
||||
|
||||
return out.toString();
|
||||
TableHeader header = new TableHeader()
|
||||
.addTableHeaderField("stat key")
|
||||
.addTableHeaderField("stat value");
|
||||
return HoodiePrintHelper.print(header, new HashMap<>(), "",
|
||||
false, Integer.MAX_VALUE, false, rows);
|
||||
}
|
||||
|
||||
@CliCommand(value = "metadata list-partitions", help = "Print a list of all partitions from the metadata")
|
||||
@CliCommand(value = "metadata list-partitions", help = "List all partitions from metadata")
|
||||
public String listPartitions() throws IOException {
|
||||
HoodieCLI.getTableMetaClient();
|
||||
initJavaSparkContext();
|
||||
@@ -169,55 +198,150 @@ public class MetadataCommand implements CommandMarker {
|
||||
HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(new HoodieSparkEngineContext(jsc), config,
|
||||
HoodieCLI.basePath, "/tmp");
|
||||
|
||||
StringBuffer out = new StringBuffer("\n");
|
||||
if (!metadata.enabled()) {
|
||||
out.append("=== Metadata Table not initilized. Using file listing to get list of partitions. ===\n\n");
|
||||
return "[ERROR] Metadata Table not enabled/initialized\n\n";
|
||||
}
|
||||
|
||||
long t1 = System.currentTimeMillis();
|
||||
HoodieTimer timer = new HoodieTimer().startTimer();
|
||||
List<String> partitions = metadata.getAllPartitionPaths();
|
||||
long t2 = System.currentTimeMillis();
|
||||
LOG.debug("Took " + timer.endTimer() + " ms");
|
||||
|
||||
int[] count = {0};
|
||||
partitions.stream().sorted((p1, p2) -> p2.compareTo(p1)).forEach(p -> {
|
||||
out.append(p);
|
||||
if (++count[0] % 15 == 0) {
|
||||
out.append("\n");
|
||||
} else {
|
||||
out.append(", ");
|
||||
}
|
||||
final List<Comparable[]> rows = new ArrayList<>();
|
||||
partitions.stream().sorted(Comparator.reverseOrder()).forEach(p -> {
|
||||
Comparable[] row = new Comparable[1];
|
||||
row[0] = p;
|
||||
rows.add(row);
|
||||
});
|
||||
|
||||
out.append(String.format("\n\n=== List of partitions retrieved in %.2fsec ===", (t2 - t1) / 1000.0));
|
||||
|
||||
return out.toString();
|
||||
TableHeader header = new TableHeader().addTableHeaderField("partition");
|
||||
return HoodiePrintHelper.print(header, new HashMap<>(), "",
|
||||
false, Integer.MAX_VALUE, false, rows);
|
||||
}
|
||||
|
||||
@CliCommand(value = "metadata list-files", help = "Print a list of all files in a partition from the metadata")
|
||||
public String listFiles(
|
||||
@CliOption(key = {"partition"}, help = "Name of the partition to list files", mandatory = true)
|
||||
final String partition) throws IOException {
|
||||
@CliOption(key = {"partition"}, help = "Name of the partition to list files", mandatory = true) final String partition) throws IOException {
|
||||
HoodieCLI.getTableMetaClient();
|
||||
HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build();
|
||||
HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(new HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath, "/tmp");
|
||||
HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(
|
||||
new HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath, "/tmp");
|
||||
|
||||
StringBuffer out = new StringBuffer("\n");
|
||||
if (!metaReader.enabled()) {
|
||||
out.append("=== Metadata Table not initialized. Using file listing to get list of files in partition. ===\n\n");
|
||||
return "[ERROR] Metadata Table not enabled/initialized\n\n";
|
||||
}
|
||||
|
||||
long t1 = System.currentTimeMillis();
|
||||
HoodieTimer timer = new HoodieTimer().startTimer();
|
||||
FileStatus[] statuses = metaReader.getAllFilesInPartition(new Path(HoodieCLI.basePath, partition));
|
||||
long t2 = System.currentTimeMillis();
|
||||
LOG.debug("Took " + timer.endTimer() + " ms");
|
||||
|
||||
Arrays.stream(statuses).sorted((p1, p2) -> p2.getPath().getName().compareTo(p1.getPath().getName())).forEach(p -> {
|
||||
out.append("\t" + p.getPath().getName());
|
||||
out.append("\n");
|
||||
final List<Comparable[]> rows = new ArrayList<>();
|
||||
Arrays.stream(statuses).sorted((p1, p2) -> p2.getPath().getName().compareTo(p1.getPath().getName())).forEach(f -> {
|
||||
Comparable[] row = new Comparable[1];
|
||||
row[0] = f;
|
||||
rows.add(row);
|
||||
});
|
||||
|
||||
out.append(String.format("\n=== Files in partition retrieved in %.2fsec ===", (t2 - t1) / 1000.0));
|
||||
TableHeader header = new TableHeader().addTableHeaderField("file path");
|
||||
return HoodiePrintHelper.print(header, new HashMap<>(), "",
|
||||
false, Integer.MAX_VALUE, false, rows);
|
||||
}
|
||||
|
||||
return out.toString();
|
||||
@CliCommand(value = "metadata validate-files", help = "Validate all files in all partitions from the metadata")
|
||||
public String validateFiles(
|
||||
@CliOption(key = {"verbose"}, help = "Print all file details", unspecifiedDefaultValue = "false") final boolean verbose) throws IOException {
|
||||
HoodieCLI.getTableMetaClient();
|
||||
HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build();
|
||||
HoodieBackedTableMetadata metadataReader = new HoodieBackedTableMetadata(
|
||||
new HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath, "/tmp");
|
||||
|
||||
if (!metadataReader.enabled()) {
|
||||
return "[ERROR] Metadata Table not enabled/initialized\n\n";
|
||||
}
|
||||
|
||||
HoodieMetadataConfig fsConfig = HoodieMetadataConfig.newBuilder().enable(false).build();
|
||||
HoodieBackedTableMetadata fsMetaReader = new HoodieBackedTableMetadata(
|
||||
new HoodieLocalEngineContext(HoodieCLI.conf), fsConfig, HoodieCLI.basePath, "/tmp");
|
||||
|
||||
HoodieTimer timer = new HoodieTimer().startTimer();
|
||||
List<String> metadataPartitions = metadataReader.getAllPartitionPaths();
|
||||
LOG.debug("Listing partitions Took " + timer.endTimer() + " ms");
|
||||
List<String> fsPartitions = fsMetaReader.getAllPartitionPaths();
|
||||
Collections.sort(fsPartitions);
|
||||
Collections.sort(metadataPartitions);
|
||||
|
||||
Set<String> allPartitions = new HashSet<>();
|
||||
allPartitions.addAll(fsPartitions);
|
||||
allPartitions.addAll(metadataPartitions);
|
||||
|
||||
if (!fsPartitions.equals(metadataPartitions)) {
|
||||
LOG.error("FS partition listing is not matching with metadata partition listing!");
|
||||
LOG.error("All FS partitions: " + Arrays.toString(fsPartitions.toArray()));
|
||||
LOG.error("All Metadata partitions: " + Arrays.toString(metadataPartitions.toArray()));
|
||||
}
|
||||
|
||||
final List<Comparable[]> rows = new ArrayList<>();
|
||||
for (String partition : allPartitions) {
|
||||
Map<String, FileStatus> fileStatusMap = new HashMap<>();
|
||||
Map<String, FileStatus> metadataFileStatusMap = new HashMap<>();
|
||||
FileStatus[] metadataStatuses = metadataReader.getAllFilesInPartition(new Path(HoodieCLI.basePath, partition));
|
||||
Arrays.stream(metadataStatuses).forEach(entry -> metadataFileStatusMap.put(entry.getPath().getName(), entry));
|
||||
FileStatus[] fsStatuses = fsMetaReader.getAllFilesInPartition(new Path(HoodieCLI.basePath, partition));
|
||||
Arrays.stream(fsStatuses).forEach(entry -> fileStatusMap.put(entry.getPath().getName(), entry));
|
||||
|
||||
Set<String> allFiles = new HashSet<>();
|
||||
allFiles.addAll(fileStatusMap.keySet());
|
||||
allFiles.addAll(metadataFileStatusMap.keySet());
|
||||
|
||||
for (String file : allFiles) {
|
||||
Comparable[] row = new Comparable[6];
|
||||
row[0] = partition;
|
||||
FileStatus fsFileStatus = fileStatusMap.get(file);
|
||||
FileStatus metaFileStatus = metadataFileStatusMap.get(file);
|
||||
row[1] = file;
|
||||
row[2] = fsFileStatus != null;
|
||||
row[3] = metaFileStatus != null;
|
||||
row[4] = (fsFileStatus != null) ? fsFileStatus.getLen() : 0;
|
||||
row[5] = (metaFileStatus != null) ? metaFileStatus.getLen() : 0;
|
||||
rows.add(row);
|
||||
}
|
||||
|
||||
if (metadataStatuses.length != fsStatuses.length) {
|
||||
LOG.error(" FS and metadata files count not matching for " + partition + ". FS files count " + fsStatuses.length + ", metadata base files count "
|
||||
+ metadataStatuses.length);
|
||||
}
|
||||
|
||||
for (Map.Entry<String, FileStatus> entry : fileStatusMap.entrySet()) {
|
||||
if (!metadataFileStatusMap.containsKey(entry.getKey())) {
|
||||
LOG.error("FS file not found in metadata " + entry.getKey());
|
||||
} else {
|
||||
if (entry.getValue().getLen() != metadataFileStatusMap.get(entry.getKey()).getLen()) {
|
||||
LOG.error(" FS file size mismatch " + entry.getKey() + ", size equality "
|
||||
+ (entry.getValue().getLen() == metadataFileStatusMap.get(entry.getKey()).getLen())
|
||||
+ ". FS size " + entry.getValue().getLen() + ", metadata size "
|
||||
+ metadataFileStatusMap.get(entry.getKey()).getLen());
|
||||
}
|
||||
}
|
||||
}
|
||||
for (Map.Entry<String, FileStatus> entry : metadataFileStatusMap.entrySet()) {
|
||||
if (!fileStatusMap.containsKey(entry.getKey())) {
|
||||
LOG.error("Metadata file not found in FS " + entry.getKey());
|
||||
} else {
|
||||
if (entry.getValue().getLen() != fileStatusMap.get(entry.getKey()).getLen()) {
|
||||
LOG.error(" Metadata file size mismatch " + entry.getKey() + ", size equality "
|
||||
+ (entry.getValue().getLen() == fileStatusMap.get(entry.getKey()).getLen())
|
||||
+ ". Metadata size " + entry.getValue().getLen() + ", FS size "
|
||||
+ metadataFileStatusMap.get(entry.getKey()).getLen());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
TableHeader header = new TableHeader().addTableHeaderField("Partition")
|
||||
.addTableHeaderField("File Name")
|
||||
.addTableHeaderField(" IsPresent in FS ")
|
||||
.addTableHeaderField(" IsPresent in Metadata")
|
||||
.addTableHeaderField(" FS size")
|
||||
.addTableHeaderField(" Metadata size");
|
||||
return HoodiePrintHelper.print(header, new HashMap<>(), "", false, Integer.MAX_VALUE, false, rows);
|
||||
}
|
||||
|
||||
private HoodieWriteConfig getWriteConfig() {
|
||||
@@ -227,7 +351,7 @@ public class MetadataCommand implements CommandMarker {
|
||||
|
||||
private void initJavaSparkContext() {
|
||||
if (jsc == null) {
|
||||
jsc = SparkUtil.initJavaSparkConf("HoodieClI");
|
||||
jsc = SparkUtil.initJavaSparkConf(SparkUtil.getDefaultConf("HoodieCLI", Option.empty()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -43,6 +43,7 @@ public class SparkEnvCommand implements CommandMarker {
|
||||
throw new IllegalArgumentException("Illegal set parameter, please use like [set --conf SPARK_HOME=/usr/etc/spark]");
|
||||
}
|
||||
env.put(map[0].trim(), map[1].trim());
|
||||
System.setProperty(map[0].trim(), map[1].trim());
|
||||
}
|
||||
|
||||
@CliCommand(value = "show envs all", help = "Show spark launcher envs")
|
||||
|
||||
@@ -34,6 +34,7 @@ import java.io.File;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Utility functions dealing with Spark.
|
||||
@@ -62,28 +63,30 @@ public class SparkUtil {
|
||||
return sparkLauncher;
|
||||
}
|
||||
|
||||
public static JavaSparkContext initJavaSparkConf(String name) {
|
||||
return initJavaSparkConf(name, Option.empty(), Option.empty());
|
||||
}
|
||||
/**
|
||||
* Get the default spark configuration.
|
||||
*
|
||||
* @param appName - Spark application name
|
||||
* @param sparkMaster - Spark master node name
|
||||
* @return Spark configuration
|
||||
*/
|
||||
public static SparkConf getDefaultConf(final String appName, final Option<String> sparkMaster) {
|
||||
final Properties properties = System.getProperties();
|
||||
SparkConf sparkConf = new SparkConf().setAppName(appName);
|
||||
|
||||
public static JavaSparkContext initJavaSparkConf(String name, Option<String> master,
|
||||
Option<String> executorMemory) {
|
||||
SparkConf sparkConf = new SparkConf().setAppName(name);
|
||||
|
||||
String defMaster = master.orElse(sparkConf.getenv(HoodieCliSparkConfig.CLI_SPARK_MASTER));
|
||||
if ((null == defMaster) || (defMaster.isEmpty())) {
|
||||
sparkConf.setMaster(DEFAULT_SPARK_MASTER);
|
||||
} else {
|
||||
sparkConf.setMaster(defMaster);
|
||||
// Configure the sparkMaster
|
||||
String sparkMasterNode = DEFAULT_SPARK_MASTER;
|
||||
if (properties.getProperty(HoodieCliSparkConfig.CLI_SPARK_MASTER) != null) {
|
||||
sparkMasterNode = properties.getProperty(HoodieCliSparkConfig.CLI_SPARK_MASTER);
|
||||
}
|
||||
sparkMasterNode = sparkMaster.orElse(sparkMasterNode);
|
||||
sparkConf.setMaster(sparkMasterNode);
|
||||
|
||||
sparkConf.set(HoodieCliSparkConfig.CLI_SERIALIZER, "org.apache.spark.serializer.KryoSerializer");
|
||||
// Configure driver
|
||||
sparkConf.set(HoodieCliSparkConfig.CLI_DRIVER_MAX_RESULT_SIZE, "2g");
|
||||
sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_OVERWRITE, "true");
|
||||
sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_ENABLED, "true");
|
||||
if (executorMemory.isPresent()) {
|
||||
sparkConf.set(HoodieCliSparkConfig.CLI_EXECUTOR_MEMORY, executorMemory.get());
|
||||
}
|
||||
sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_ENABLED, "false");
|
||||
sparkConf.set(HoodieCliSparkConfig.CLI_SERIALIZER, "org.apache.spark.serializer.KryoSerializer");
|
||||
|
||||
// Configure hadoop conf
|
||||
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESS, "true");
|
||||
@@ -91,10 +94,28 @@ public class SparkUtil {
|
||||
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC, "org.apache.hadoop.io.compress.GzipCodec");
|
||||
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_TYPE, "BLOCK");
|
||||
|
||||
return sparkConf;
|
||||
}
|
||||
|
||||
public static JavaSparkContext initJavaSparkConf(String name) {
|
||||
return initJavaSparkConf(name, Option.empty(), Option.empty());
|
||||
}
|
||||
|
||||
public static JavaSparkContext initJavaSparkConf(String name, Option<String> master, Option<String> executorMemory) {
|
||||
SparkConf sparkConf = getDefaultConf(name, master);
|
||||
if (executorMemory.isPresent()) {
|
||||
sparkConf.set(HoodieCliSparkConfig.CLI_EXECUTOR_MEMORY, executorMemory.get());
|
||||
}
|
||||
|
||||
return initJavaSparkConf(sparkConf);
|
||||
}
|
||||
|
||||
public static JavaSparkContext initJavaSparkConf(SparkConf sparkConf) {
|
||||
SparkRDDWriteClient.registerClasses(sparkConf);
|
||||
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
|
||||
jsc.hadoopConfiguration().setBoolean(HoodieCliSparkConfig.CLI_PARQUET_ENABLE_SUMMARY_METADATA, false);
|
||||
FSUtils.prepareHadoopConf(jsc.hadoopConfiguration());
|
||||
return jsc;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user