[HUDI-704] Add test for RepairsCommand (#1554)
This commit is contained in:
@@ -72,4 +72,13 @@ public class HoodieTableHeaderFields {
|
||||
public static final String HEADER_DELTA_BASE_UNSCHEDULED = "Delta To Base Ratio" + COMPACTION_UNSCHEDULED_SUFFIX;
|
||||
public static final String HEADER_DELTA_FILES_SCHEDULED = "Delta Files" + COMPACTION_SCHEDULED_SUFFIX;
|
||||
public static final String HEADER_DELTA_FILES_UNSCHEDULED = "Delta Files" + COMPACTION_UNSCHEDULED_SUFFIX;
|
||||
|
||||
/**
|
||||
* Fields of Repair.
|
||||
*/
|
||||
public static final String HEADER_METADATA_PRESENT = "Metadata Present?";
|
||||
public static final String HEADER_REPAIR_ACTION = "Action";
|
||||
public static final String HEADER_HOODIE_PROPERTY = "Property";
|
||||
public static final String HEADER_OLD_VALUE = "Old Value";
|
||||
public static final String HEADER_NEW_VALUE = "New Value";
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ package org.apache.hudi.cli.commands;
|
||||
|
||||
import org.apache.hudi.cli.HoodieCLI;
|
||||
import org.apache.hudi.cli.HoodiePrintHelper;
|
||||
import org.apache.hudi.cli.HoodieTableHeaderFields;
|
||||
import org.apache.hudi.cli.utils.InputStreamConsumer;
|
||||
import org.apache.hudi.cli.utils.SparkUtil;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
@@ -30,12 +31,15 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.util.CleanerUtils;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.spark.launcher.SparkLauncher;
|
||||
import org.apache.spark.util.Utils;
|
||||
import org.springframework.shell.core.CommandMarker;
|
||||
import org.springframework.shell.core.annotation.CliCommand;
|
||||
import org.springframework.shell.core.annotation.CliOption;
|
||||
import org.springframework.stereotype.Component;
|
||||
import scala.collection.JavaConverters;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
@@ -55,6 +59,7 @@ import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME
|
||||
public class RepairsCommand implements CommandMarker {
|
||||
|
||||
private static final Logger LOG = Logger.getLogger(RepairsCommand.class);
|
||||
public static final String DEDUPLICATE_RETURN_PREFIX = "Deduplicated files placed in: ";
|
||||
|
||||
@CliCommand(value = "repair deduplicate",
|
||||
help = "De-duplicate a partition path contains duplicates & produce repaired files to replace with")
|
||||
@@ -64,19 +69,35 @@ public class RepairsCommand implements CommandMarker {
|
||||
@CliOption(key = {"repairedOutputPath"}, help = "Location to place the repaired files",
|
||||
mandatory = true) final String repairedOutputPath,
|
||||
@CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path",
|
||||
mandatory = true) final String sparkPropertiesPath)
|
||||
unspecifiedDefaultValue = "") String sparkPropertiesPath,
|
||||
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master,
|
||||
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
|
||||
help = "Spark executor memory") final String sparkMemory,
|
||||
@CliOption(key = {"dryrun"},
|
||||
help = "Should we actually remove duplicates or just run and store result to repairedOutputPath",
|
||||
unspecifiedDefaultValue = "true") final boolean dryRun)
|
||||
throws Exception {
|
||||
if (StringUtils.isNullOrEmpty(sparkPropertiesPath)) {
|
||||
sparkPropertiesPath =
|
||||
Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
|
||||
}
|
||||
|
||||
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
|
||||
sparkLauncher.addAppArgs(SparkMain.SparkCommand.DEDUPLICATE.toString(), duplicatedPartitionPath, repairedOutputPath,
|
||||
HoodieCLI.getTableMetaClient().getBasePath());
|
||||
sparkLauncher.addAppArgs(SparkMain.SparkCommand.DEDUPLICATE.toString(), master, sparkMemory,
|
||||
duplicatedPartitionPath, repairedOutputPath, HoodieCLI.getTableMetaClient().getBasePath(),
|
||||
String.valueOf(dryRun));
|
||||
Process process = sparkLauncher.launch();
|
||||
InputStreamConsumer.captureOutput(process);
|
||||
int exitCode = process.waitFor();
|
||||
|
||||
if (exitCode != 0) {
|
||||
return "Deduplicated files placed in: " + repairedOutputPath;
|
||||
return "Deduplication failed!";
|
||||
}
|
||||
if (dryRun) {
|
||||
return DEDUPLICATE_RETURN_PREFIX + repairedOutputPath;
|
||||
} else {
|
||||
return DEDUPLICATE_RETURN_PREFIX + duplicatedPartitionPath;
|
||||
}
|
||||
return "Deduplication failed ";
|
||||
}
|
||||
|
||||
@CliCommand(value = "repair addpartitionmeta", help = "Add partition metadata to a table, if not present")
|
||||
@@ -106,12 +127,14 @@ public class RepairsCommand implements CommandMarker {
|
||||
HoodiePartitionMetadata partitionMetadata =
|
||||
new HoodiePartitionMetadata(HoodieCLI.fs, latestCommit, basePath, partitionPath);
|
||||
partitionMetadata.trySave(0);
|
||||
row[2] = "Repaired";
|
||||
}
|
||||
}
|
||||
rows[ind++] = row;
|
||||
}
|
||||
|
||||
return HoodiePrintHelper.print(new String[] {"Partition Path", "Metadata Present?", "Action"}, rows);
|
||||
return HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
|
||||
HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
|
||||
}
|
||||
|
||||
@CliCommand(value = "repair overwrite-hoodie-props", help = "Overwrite hoodie.properties with provided file. Risky operation. Proceed with caution!")
|
||||
@@ -140,7 +163,8 @@ public class RepairsCommand implements CommandMarker {
|
||||
};
|
||||
rows[ind++] = row;
|
||||
}
|
||||
return HoodiePrintHelper.print(new String[] {"Property", "Old Value", "New Value"}, rows);
|
||||
return HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_HOODIE_PROPERTY,
|
||||
HoodieTableHeaderFields.HEADER_OLD_VALUE, HoodieTableHeaderFields.HEADER_NEW_VALUE}, rows);
|
||||
}
|
||||
|
||||
@CliCommand(value = "repair corrupted clean files", help = "repair corrupted clean files")
|
||||
|
||||
@@ -73,8 +73,8 @@ public class SparkMain {
|
||||
returnCode = rollback(jsc, args[1], args[2]);
|
||||
break;
|
||||
case DEDUPLICATE:
|
||||
assert (args.length == 4);
|
||||
returnCode = deduplicatePartitionPath(jsc, args[1], args[2], args[3]);
|
||||
assert (args.length == 7);
|
||||
returnCode = deduplicatePartitionPath(jsc, args[3], args[4], args[5], args[6]);
|
||||
break;
|
||||
case ROLLBACK_TO_SAVEPOINT:
|
||||
assert (args.length == 3);
|
||||
@@ -162,7 +162,8 @@ public class SparkMain {
|
||||
|
||||
private static boolean sparkMasterContained(SparkCommand command) {
|
||||
List<SparkCommand> masterContained = Arrays.asList(SparkCommand.COMPACT_VALIDATE, SparkCommand.COMPACT_REPAIR,
|
||||
SparkCommand.COMPACT_UNSCHEDULE_PLAN, SparkCommand.COMPACT_UNSCHEDULE_FILE, SparkCommand.CLEAN);
|
||||
SparkCommand.COMPACT_UNSCHEDULE_PLAN, SparkCommand.COMPACT_UNSCHEDULE_FILE, SparkCommand.CLEAN,
|
||||
SparkCommand.DEDUPLICATE);
|
||||
return masterContained.contains(command);
|
||||
}
|
||||
|
||||
@@ -263,10 +264,10 @@ public class SparkMain {
|
||||
}
|
||||
|
||||
private static int deduplicatePartitionPath(JavaSparkContext jsc, String duplicatedPartitionPath,
|
||||
String repairedOutputPath, String basePath) {
|
||||
String repairedOutputPath, String basePath, String dryRun) {
|
||||
DedupeSparkJob job = new DedupeSparkJob(basePath, duplicatedPartitionPath, repairedOutputPath, new SQLContext(jsc),
|
||||
FSUtils.getFs(basePath, jsc.hadoopConfiguration()));
|
||||
job.fixDuplicates(true);
|
||||
job.fixDuplicates(Boolean.parseBoolean(dryRun));
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
@@ -46,6 +46,10 @@ object SparkHelpers {
|
||||
HoodieIndexConfig.DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_INDEX_FILTER_TYPE);
|
||||
val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter)
|
||||
val parquetConfig: HoodieParquetConfig = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_PAGE_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES.toInt, fs.getConf, HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO.toDouble)
|
||||
|
||||
// Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'.
|
||||
parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader)
|
||||
|
||||
val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](instantTime, destinationFile, parquetConfig, schema, new SparkTaskContextSupplier())
|
||||
for (rec <- sourceRecords) {
|
||||
val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
|
||||
|
||||
Reference in New Issue
Block a user