diff --git a/docs/admin_guide.md b/docs/admin_guide.md index 9b8c9981e..f7fe91f8d 100644 --- a/docs/admin_guide.md +++ b/docs/admin_guide.md @@ -161,6 +161,33 @@ hoodie:trips->commit showfiles --commit 20161005165855 --sortBy "Partition Path" .... ``` + +#### FileSystem View + +Hudi views each partition as a collection of file-groups with each file-group containing a list of file-slices in commit +order (See Concepts). The below commands allow users to view the file-slices for a data-set. + +``` + hoodie:stock_ticks_mor->show fsview all + .... + _______________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________ + | Partition | FileId | Base-Instant | Data-File | Data-File Size| Num Delta Files| Total Delta File Size| Delta Files | + |==============================================================================================================================================================================================================================================================================================================================================================================================================| + | 2018/08/31| 111415c3-f26d-4639-86c8-f9956f245ac3| 20181002180759| hdfs://namenode:8020/user/hive/warehouse/stock_ticks_mor/2018/08/31/111415c3-f26d-4639-86c8-f9956f245ac3_0_20181002180759.parquet| 432.5 KB | 1 | 20.8 KB | [HoodieLogFile {hdfs://namenode:8020/user/hive/warehouse/stock_ticks_mor/2018/08/31/.111415c3-f26d-4639-86c8-f9956f245ac3_20181002180759.log.1}]| + + + + hoodie:stock_ticks_mor->show fsview latest --partitionPath "2018/08/31" + ...... + __________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________ + | Partition | FileId | Base-Instant | Data-File | Data-File Size| Num Delta Files| Total Delta Size| Delta Size - compaction scheduled| Delta Size - compaction unscheduled| Delta To Base Ratio - compaction scheduled| Delta To Base Ratio - compaction unscheduled| Delta Files - compaction scheduled | Delta Files - compaction unscheduled| + |=================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================| + | 2018/08/31| 111415c3-f26d-4639-86c8-f9956f245ac3| 20181002180759| hdfs://namenode:8020/user/hive/warehouse/stock_ticks_mor/2018/08/31/111415c3-f26d-4639-86c8-f9956f245ac3_0_20181002180759.parquet| 432.5 KB | 1 | 20.8 KB | 20.8 KB | 0.0 B | 0.0 B | 0.0 B | [HoodieLogFile {hdfs://namenode:8020/user/hive/warehouse/stock_ticks_mor/2018/08/31/.111415c3-f26d-4639-86c8-f9956f245ac3_20181002180759.log.1}]| [] | + + hoodie:stock_ticks_mor-> +``` + + #### Statistics Since Hoodie directly manages file sizes for HDFS dataset, it might be good to get an overall picture @@ -280,31 +307,76 @@ Description: Run Compaction for given instant time * compaction run - Run Compaction for given instant time ``` -##### Up-Coming CLI for Compaction - -In the next release, more useful CLI to revert/repair compaction schedules will be added. Here is a preview of them: +##### Validate Compaction Validating a compaction plan : Check if all the files necessary for compactions are present and are valid ``` -hoodie:trips->compaction validate --compactionInstant +hoodie:stock_ticks_mor->compaction validate --instant 20181005222611 +... + + COMPACTION PLAN VALID + + ___________________________________________________________________________________________________________________________________________________________________________________________________________________________ + | File Id | Base Instant Time| Base Data File | Num Delta Files| Valid| Error| + |==========================================================================================================================================================================================================================| + | 05320e98-9a57-4c38-b809-a6beaaeb36bd| 20181005222445 | hdfs://namenode:8020/user/hive/warehouse/stock_ticks_mor/2018/08/31/05320e98-9a57-4c38-b809-a6beaaeb36bd_0_20181005222445.parquet| 1 | true | | + + + +hoodie:stock_ticks_mor->compaction validate --instant 20181005222601 + + COMPACTION PLAN INVALID + + _______________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________ + | File Id | Base Instant Time| Base Data File | Num Delta Files| Valid| Error | + |=====================================================================================================================================================================================================================================================================================================| + | 05320e98-9a57-4c38-b809-a6beaaeb36bd| 20181005222445 | hdfs://namenode:8020/user/hive/warehouse/stock_ticks_mor/2018/08/31/05320e98-9a57-4c38-b809-a6beaaeb36bd_0_20181005222445.parquet| 1 | false| All log files specified in compaction operation is not present. Missing .... | + + ``` +##### NOTE + The following commands must be executed without any other writer/ingestion application running. Sometimes, it becomes necessary to remove a fileId from a compaction-plan inorder to speed-up or unblock compaction operation. Any new log-files that happened on this file after the compaction got scheduled will be safely renamed so that are preserved. Hudi provides the following CLI to support it + +##### UnScheduling Compaction + ``` hoodie:trips->compaction unscheduleFileId --fileId +.... +No File renames needed to unschedule file from pending compaction. Operation successful. + ``` In other cases, an entire compaction plan needs to be reverted. This is supported by the following CLI ``` hoodie:trips->compaction unschedule --compactionInstant +..... +No File renames needed to unschedule pending compaction. Operation successful. ``` +##### Repair Compaction + +The above compaction unscheduling operations could sometimes fail partially (e:g -> HDFS temporarily unavailable). With +partial failures, the compaction operation could become inconsistent with the state of file-slices. When you run +`compaction validate`, you can notice invalid compaction operations if there is one. In these cases, the repair +command comes to the rescue, it will rearrange the file-slices so that there is no loss and the file-slices are +consistent with the compaction plan + +``` +hoodie:stock_ticks_mor->compaction repair --instant 20181005222611 +...... +Compaction successfully repaired +..... +``` + + ## Metrics Once the Hoodie Client is configured with the right datasetname and environment for metrics, it produces the following graphite metrics, that aid in debugging hoodie datasets diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CompactionCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CompactionCommand.java index 866e9315b..f5e3c894d 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CompactionCommand.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CompactionCommand.java @@ -16,6 +16,10 @@ package com.uber.hoodie.cli.commands; +import static com.uber.hoodie.common.table.HoodieTimeline.COMPACTION_ACTION; + +import com.uber.hoodie.CompactionAdminClient.RenameOpResult; +import com.uber.hoodie.CompactionAdminClient.ValidationOpResult; import com.uber.hoodie.avro.model.HoodieCompactionOperation; import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.cli.HoodieCLI; @@ -32,14 +36,20 @@ import com.uber.hoodie.common.table.timeline.HoodieInstant.State; import com.uber.hoodie.common.util.AvroUtils; import com.uber.hoodie.exception.HoodieIOException; import java.io.IOException; +import java.io.ObjectInputStream; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.UUID; import java.util.function.Function; import java.util.stream.Collectors; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.launcher.SparkLauncher; @@ -53,7 +63,9 @@ import org.springframework.stereotype.Component; @Component public class CompactionCommand implements CommandMarker { - private static Logger log = LogManager.getLogger(HDFSParquetImportCommand.class); + private static Logger log = LogManager.getLogger(CompactionCommand.class); + + private static final String TMP_DIR = "/tmp/"; @CliAvailabilityIndicator({"compactions show all", "compaction show", "compaction run", "compaction schedule"}) public boolean isAvailable() { @@ -84,7 +96,7 @@ public class CompactionCommand implements CommandMarker { for (int i = 0; i < instants.size(); i++) { HoodieInstant instant = instants.get(i); HoodieCompactionPlan workload = null; - if (!instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) { + if (!instant.getAction().equals(COMPACTION_ACTION)) { try { // This could be a completed compaction. Assume a compaction request file is present but skip if fails workload = AvroUtils.deserializeCompactionPlan( @@ -203,7 +215,8 @@ public class CompactionCommand implements CommandMarker { final String schemaFilePath, @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", help = "Spark executor memory") final String sparkMemory, - @CliOption(key = "retry", unspecifiedDefaultValue = "1", help = "Number of retries") final String retry, + @CliOption(key = "retry", unspecifiedDefaultValue = "1", help = "Number of retries") + final String retry, @CliOption(key = "compactionInstant", mandatory = true, help = "Base path for the target hoodie dataset") final String compactionInstantTime) throws Exception { boolean initialized = HoodieCLI.initConf(); @@ -227,4 +240,272 @@ public class CompactionCommand implements CommandMarker { throw new Exception("Compactions can only be run for table type : MERGE_ON_READ"); } } + + private static String getTmpSerializerFile() { + return TMP_DIR + UUID.randomUUID().toString() + ".ser"; + } + + private T deSerializeOperationResult(String inputP, FileSystem fs) throws Exception { + Path inputPath = new Path(inputP); + FSDataInputStream fsDataInputStream = fs.open(inputPath); + ObjectInputStream in = new ObjectInputStream(fsDataInputStream); + try { + T result = (T) in.readObject(); + log.info("Result : " + result); + return result; + } finally { + in.close(); + fsDataInputStream.close(); + } + } + + @CliCommand(value = "compaction validate", help = "Validate Compaction") + public String validateCompaction( + @CliOption(key = "instant", mandatory = true, help = "Compaction Instant") String compactionInstant, + @CliOption(key = {"parallelism"}, unspecifiedDefaultValue = "3", help = "Parallelism") String parallelism, + @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master ") String master, + @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = "executor memory") String sparkMemory, + @CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") Integer limit, + @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") String sortByField, + @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") boolean descending, + @CliOption(key = { + "headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") boolean headerOnly) + throws Exception { + boolean initialized = HoodieCLI.initConf(); + HoodieCLI.initFS(initialized); + + String outputPathStr = getTmpSerializerFile(); + Path outputPath = new Path(outputPathStr); + String output = null; + if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) { + try { + String sparkPropertiesPath = Utils.getDefaultPropertiesFile( + scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + sparkLauncher.addAppArgs(SparkCommand.COMPACT_VALIDATE.toString(), + HoodieCLI.tableMetadata.getBasePath(), compactionInstant, outputPathStr, parallelism, master, + sparkMemory); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + if (exitCode != 0) { + return "Failed to validate compaction for " + compactionInstant; + } + List res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs); + boolean valid = res.stream().map(r -> r.isSuccess()).reduce(Boolean::logicalAnd).orElse(true); + String message = "\n\n\t COMPACTION PLAN " + (valid ? "VALID" : "INVALID") + "\n\n"; + List rows = new ArrayList<>(); + res.stream().forEach(r -> { + Comparable[] row = new Comparable[]{r.getOperation().getFileId(), + r.getOperation().getBaseInstantTime(), + r.getOperation().getDataFilePath().isPresent() ? r.getOperation().getDataFilePath().get() : "", + r.getOperation().getDeltaFilePaths().size(), r.isSuccess(), + r.getException().isPresent() ? r.getException().get().getMessage() : ""}; + rows.add(row); + }); + + Map> fieldNameToConverterMap = new HashMap<>(); + TableHeader header = new TableHeader() + .addTableHeaderField("File Id") + .addTableHeaderField("Base Instant Time") + .addTableHeaderField("Base Data File") + .addTableHeaderField("Num Delta Files") + .addTableHeaderField("Valid") + .addTableHeaderField("Error"); + + output = message + HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, + headerOnly, rows); + } finally { + // Delete tmp file used to serialize result + if (HoodieCLI.fs.exists(outputPath)) { + HoodieCLI.fs.delete(outputPath, false); + } + } + return output; + } else { + throw new Exception("Compactions can only be run for table type : MERGE_ON_READ"); + } + } + + @CliCommand(value = "compaction unschedule", help = "Unschedule Compaction") + public String unscheduleCompaction( + @CliOption(key = "instant", mandatory = true, help = "Compaction Instant") String compactionInstant, + @CliOption(key = {"parallelism"}, unspecifiedDefaultValue = "3", help = "Parallelism") String parallelism, + @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master ") String master, + @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = "executor memory") String sparkMemory, + @CliOption(key = {"skipValidation"}, help = "skip validation", unspecifiedDefaultValue = "false") boolean skipV, + @CliOption(key = {"dryRun"}, help = "Dry Run Mode", unspecifiedDefaultValue = "false") boolean dryRun, + @CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") Integer limit, + @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") String sortByField, + @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") boolean descending, + @CliOption(key = { + "headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") boolean headerOnly) + throws Exception { + boolean initialized = HoodieCLI.initConf(); + HoodieCLI.initFS(initialized); + + String outputPathStr = getTmpSerializerFile(); + Path outputPath = new Path(outputPathStr); + String output = ""; + if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) { + try { + String sparkPropertiesPath = Utils.getDefaultPropertiesFile( + scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_PLAN.toString(), + HoodieCLI.tableMetadata.getBasePath(), compactionInstant, outputPathStr, parallelism, master, + sparkMemory, Boolean.valueOf(skipV).toString(), Boolean.valueOf(dryRun).toString()); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + if (exitCode != 0) { + return "Failed to unschedule compaction for " + compactionInstant; + } + List res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs); + output = getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, + "unschedule pending compaction"); + } finally { + // Delete tmp file used to serialize result + if (HoodieCLI.fs.exists(outputPath)) { + HoodieCLI.fs.delete(outputPath, false); + } + } + return output; + } else { + throw new Exception("Compactions can only be run for table type : MERGE_ON_READ"); + } + } + + @CliCommand(value = "compaction unscheduleFileId", help = "UnSchedule Compaction for a fileId") + public String unscheduleCompactFile( + @CliOption(key = "fileId", mandatory = true, help = "File Id") final String fileId, + @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master ") String master, + @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = "executor memory") String sparkMemory, + @CliOption(key = {"skipValidation"}, help = "skip validation", unspecifiedDefaultValue = "false") boolean skipV, + @CliOption(key = {"dryRun"}, help = "Dry Run Mode", unspecifiedDefaultValue = "false") boolean dryRun, + @CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") Integer limit, + @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") String sortByField, + @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") boolean descending, + @CliOption(key = {"headeronly"}, help = "Header Only", unspecifiedDefaultValue = "false") boolean headerOnly) + throws Exception { + boolean initialized = HoodieCLI.initConf(); + HoodieCLI.initFS(initialized); + + String outputPathStr = getTmpSerializerFile(); + Path outputPath = new Path(outputPathStr); + String output = ""; + if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) { + try { + String sparkPropertiesPath = Utils.getDefaultPropertiesFile( + scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_FILE.toString(), + HoodieCLI.tableMetadata.getBasePath(), fileId, outputPathStr, "1", master, + sparkMemory, Boolean.valueOf(skipV).toString(), Boolean.valueOf(dryRun).toString()); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + if (exitCode != 0) { + return "Failed to unschedule compaction for file " + fileId; + } + List res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs); + output = getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, + "unschedule file from pending compaction"); + } finally { + // Delete tmp file used to serialize result + if (HoodieCLI.fs.exists(outputPath)) { + HoodieCLI.fs.delete(outputPath, false); + } + } + return output; + } else { + throw new Exception("Compactions can only be run for table type : MERGE_ON_READ"); + } + } + + @CliCommand(value = "compaction repair", help = "Renames the files to make them consistent with the timeline as " + + "dictated by Hoodie metadata. Use when compaction unschedule fails partially.") + public String repairCompaction( + @CliOption(key = "instant", mandatory = true, help = "Compaction Instant") String compactionInstant, + @CliOption(key = {"parallelism"}, unspecifiedDefaultValue = "3", help = "Parallelism") String parallelism, + @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master ") String master, + @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = "executor memory") String sparkMemory, + @CliOption(key = {"dryRun"}, help = "Dry Run Mode", unspecifiedDefaultValue = "false") boolean dryRun, + @CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") Integer limit, + @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") String sortByField, + @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") boolean descending, + @CliOption(key = { + "headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") boolean headerOnly) + throws Exception { + boolean initialized = HoodieCLI.initConf(); + HoodieCLI.initFS(initialized); + String outputPathStr = getTmpSerializerFile(); + Path outputPath = new Path(outputPathStr); + String output = ""; + if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) { + try { + String sparkPropertiesPath = Utils.getDefaultPropertiesFile( + scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + sparkLauncher.addAppArgs(SparkCommand.COMPACT_REPAIR.toString(), + HoodieCLI.tableMetadata.getBasePath(), compactionInstant, outputPathStr, parallelism, master, + sparkMemory, Boolean.valueOf(dryRun).toString()); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + if (exitCode != 0) { + return "Failed to unschedule compaction for " + compactionInstant; + } + List res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs); + output = getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, "repair compaction"); + } finally { + // Delete tmp file used to serialize result + if (HoodieCLI.fs.exists(outputPath)) { + HoodieCLI.fs.delete(outputPath, false); + } + } + return output; + } else { + throw new Exception("Compactions can only be run for table type : MERGE_ON_READ"); + } + } + + private String getRenamesToBePrinted(List res, Integer limit, + String sortByField, boolean descending, boolean headerOnly, String operation) { + + Optional result = res.stream().map(r -> r.isExecuted() && r.isSuccess()).reduce(Boolean::logicalAnd); + if (result.isPresent()) { + System.out.println("There were some file renames that needed to be done to " + operation); + + if (result.get()) { + System.out.println("All renames successfully completed to " + operation + " done !!"); + } else { + System.out.println("Some renames failed. DataSet could be in inconsistent-state. " + + "Try running compaction repair"); + } + + List rows = new ArrayList<>(); + res.stream().forEach(r -> { + Comparable[] row = new Comparable[] { + r.getOperation().fileId, r.getOperation().srcPath, r.getOperation().destPath, + r.isExecuted(), r.isSuccess(), r.getException().isPresent() ? r.getException().get().getMessage() : "" + }; + rows.add(row); + }); + + Map> fieldNameToConverterMap = new HashMap<>(); + TableHeader header = new TableHeader() + .addTableHeaderField("File Id") + .addTableHeaderField("Source File Path") + .addTableHeaderField("Destination File Path") + .addTableHeaderField("Rename Executed?") + .addTableHeaderField("Rename Succeeded?") + .addTableHeaderField("Error"); + + return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, + limit, headerOnly, rows); + } else { + return "No File renames needed to " + operation + ". Operation successful."; + } + } } \ No newline at end of file diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SparkMain.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SparkMain.java index bb9189ccb..165ae67de 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SparkMain.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SparkMain.java @@ -25,6 +25,8 @@ import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.io.compact.strategy.UnBoundedCompactionStrategy; import com.uber.hoodie.utilities.HDFSParquetImporter; +import com.uber.hoodie.utilities.HoodieCompactionAdminTool; +import com.uber.hoodie.utilities.HoodieCompactionAdminTool.Operation; import com.uber.hoodie.utilities.HoodieCompactor; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; @@ -39,6 +41,7 @@ public class SparkMain { */ enum SparkCommand { ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, + COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR } public static void main(String[] args) throws Exception { @@ -78,10 +81,32 @@ public class SparkMain { returnCode = compact(jsc, args[1], args[2], args[3], 1, "", args[4], 0, true); break; + case COMPACT_VALIDATE: + assert (args.length == 7); + doCompactValidate(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6]); + returnCode = 0; + break; + case COMPACT_REPAIR: + assert (args.length == 8); + doCompactRepair(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6], + Boolean.valueOf(args[7])); + returnCode = 0; + break; + case COMPACT_UNSCHEDULE_FILE: + assert (args.length == 9); + doCompactUnscheduleFile(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6], + Boolean.valueOf(args[7]), Boolean.valueOf(args[8])); + returnCode = 0; + break; + case COMPACT_UNSCHEDULE_PLAN: + assert (args.length == 9); + doCompactUnschedule(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6], + Boolean.valueOf(args[7]), Boolean.valueOf(args[8])); + returnCode = 0; + break; default: break; } - System.exit(returnCode); } @@ -103,6 +128,73 @@ public class SparkMain { return new HDFSParquetImporter(cfg).dataImport(jsc, retry); } + private static void doCompactValidate(JavaSparkContext jsc, String basePath, String compactionInstant, + String outputPath, int parallelism, String sparkMaster, String sparkMemory) throws Exception { + HoodieCompactionAdminTool.Config cfg = new HoodieCompactionAdminTool.Config(); + cfg.basePath = basePath; + cfg.operation = Operation.VALIDATE; + cfg.outputPath = outputPath; + cfg.compactionInstantTime = compactionInstant; + cfg.parallelism = parallelism; + if ((null != sparkMaster) && (!sparkMaster.isEmpty())) { + jsc.getConf().setMaster(sparkMaster); + } + jsc.getConf().set("spark.executor.memory", sparkMemory); + new HoodieCompactionAdminTool(cfg).run(jsc); + } + + private static void doCompactRepair(JavaSparkContext jsc, String basePath, String compactionInstant, + String outputPath, int parallelism, String sparkMaster, String sparkMemory, boolean dryRun) throws Exception { + HoodieCompactionAdminTool.Config cfg = new HoodieCompactionAdminTool.Config(); + cfg.basePath = basePath; + cfg.operation = Operation.REPAIR; + cfg.outputPath = outputPath; + cfg.compactionInstantTime = compactionInstant; + cfg.parallelism = parallelism; + cfg.dryRun = dryRun; + if ((null != sparkMaster) && (!sparkMaster.isEmpty())) { + jsc.getConf().setMaster(sparkMaster); + } + jsc.getConf().set("spark.executor.memory", sparkMemory); + new HoodieCompactionAdminTool(cfg).run(jsc); + } + + private static void doCompactUnschedule(JavaSparkContext jsc, String basePath, String compactionInstant, + String outputPath, int parallelism, String sparkMaster, String sparkMemory, boolean skipValidation, + boolean dryRun) throws Exception { + HoodieCompactionAdminTool.Config cfg = new HoodieCompactionAdminTool.Config(); + cfg.basePath = basePath; + cfg.operation = Operation.UNSCHEDULE_PLAN; + cfg.outputPath = outputPath; + cfg.compactionInstantTime = compactionInstant; + cfg.parallelism = parallelism; + cfg.dryRun = dryRun; + cfg.skipValidation = skipValidation; + if ((null != sparkMaster) && (!sparkMaster.isEmpty())) { + jsc.getConf().setMaster(sparkMaster); + } + jsc.getConf().set("spark.executor.memory", sparkMemory); + new HoodieCompactionAdminTool(cfg).run(jsc); + } + + private static void doCompactUnscheduleFile(JavaSparkContext jsc, String basePath, String fileId, + String outputPath, int parallelism, String sparkMaster, String sparkMemory, boolean skipValidation, + boolean dryRun) throws Exception { + HoodieCompactionAdminTool.Config cfg = new HoodieCompactionAdminTool.Config(); + cfg.basePath = basePath; + cfg.operation = Operation.UNSCHEDULE_FILE; + cfg.outputPath = outputPath; + cfg.fileId = fileId; + cfg.parallelism = parallelism; + cfg.dryRun = dryRun; + cfg.skipValidation = skipValidation; + if ((null != sparkMaster) && (!sparkMaster.isEmpty())) { + jsc.getConf().setMaster(sparkMaster); + } + jsc.getConf().set("spark.executor.memory", sparkMemory); + new HoodieCompactionAdminTool(cfg).run(jsc); + } + private static int compact(JavaSparkContext jsc, String basePath, String tableName, String compactionInstant, int parallelism, String schemaFile, String sparkMemory, int retry, boolean schedule) throws Exception { HoodieCompactor.Config cfg = new HoodieCompactor.Config(); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java b/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java new file mode 100644 index 000000000..2f6495f05 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java @@ -0,0 +1,546 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie; + +import static com.uber.hoodie.common.table.HoodieTimeline.COMPACTION_ACTION; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.uber.hoodie.avro.model.HoodieCompactionOperation; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; +import com.uber.hoodie.common.model.CompactionOperation; +import com.uber.hoodie.common.model.FileSlice; +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodieLogFile; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.table.timeline.HoodieInstant.State; +import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; +import com.uber.hoodie.common.util.AvroUtils; +import com.uber.hoodie.common.util.CompactionUtils; +import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.collection.Pair; +import com.uber.hoodie.exception.HoodieException; +import com.uber.hoodie.exception.HoodieIOException; +import com.uber.hoodie.func.OperationResult; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; + +/** + * Client to perform admin operations related to compaction + */ +public class CompactionAdminClient implements Serializable { + + private static Logger log = LogManager.getLogger(CompactionAdminClient.class); + + private final transient JavaSparkContext jsc; + private final String basePath; + + public CompactionAdminClient(JavaSparkContext jsc, String basePath) { + this.jsc = jsc; + this.basePath = basePath; + } + + /** + * Validate all compaction operations in a compaction plan. Verifies the file-slices are consistent with corresponding + * compaction operations. + * + * @param metaClient Hoodie Table Meta Client + * @param compactionInstant Compaction Instant + */ + public List validateCompactionPlan(HoodieTableMetaClient metaClient, + String compactionInstant, int parallelism) throws IOException { + HoodieCompactionPlan plan = getCompactionPlan(metaClient, compactionInstant); + HoodieTableFileSystemView fsView = + new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline()); + + if (plan.getOperations() != null) { + List ops = plan.getOperations().stream() + .map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList()); + return jsc.parallelize(ops, parallelism).map(op -> { + try { + return validateCompactionOperation(metaClient, compactionInstant, op, Optional.of(fsView)); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + }).collect(); + } + return new ArrayList<>(); + } + + /** + * Un-schedules compaction plan. Remove All compaction operation scheduled and re-arrange delta-files that were + * created after the compaction was scheduled. + * + * This operation MUST be executed with compactions and writer turned OFF. + * + * @param compactionInstant Compaction Instant + * @param skipValidation Skip validation step + * @param parallelism Parallelism + * @param dryRun Dry Run + */ + public List unscheduleCompactionPlan( + String compactionInstant, boolean skipValidation, int parallelism, boolean dryRun) throws Exception { + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + List> renameActions = + getRenamingActionsForUnschedulingCompactionPlan(metaClient, compactionInstant, parallelism, + Optional.absent(), skipValidation); + + List res = + runRenamingOps(metaClient, renameActions, parallelism, dryRun); + + java.util.Optional success = + res.stream().map(r -> (r.isExecuted() && r.isSuccess())).reduce(Boolean::logicalAnd); + Optional allSuccess = success.isPresent() ? Optional.of(success.get()) : Optional.absent(); + + // Only if all operations are successfully executed + if (!dryRun && allSuccess.isPresent() && allSuccess.get()) { + // Overwrite compaction request with empty compaction operations + HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(metaClient, compactionInstant); + HoodieCompactionPlan newPlan = + HoodieCompactionPlan.newBuilder().setOperations(new ArrayList<>()).setExtraMetadata(plan.getExtraMetadata()) + .build(); + HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, compactionInstant); + Path inflightPath = new Path(metaClient.getMetaPath(), inflight.getFileName()); + if (metaClient.getFs().exists(inflightPath)) { + // We need to rollback data-files because of this inflight compaction before unscheduling + throw new IllegalStateException("Please rollback the inflight compaction before unscheduling"); + } + metaClient.getActiveTimeline().saveToCompactionRequested( + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, compactionInstant), + AvroUtils.serializeCompactionPlan(newPlan)); + } + return res; + } + + /** + * Remove a fileId from pending compaction. Removes the associated compaction operation and rename delta-files + * that were generated for that file-id after the compaction operation was scheduled. + * + * This operation MUST be executed with compactions and writer turned OFF. + * + * @param fileId FileId to be unscheduled + * @param skipValidation Skip validation + * @param dryRun Dry Run Mode + */ + public List unscheduleCompactionFileId(String fileId, + boolean skipValidation, boolean dryRun) throws Exception { + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + List> renameActions = + getRenamingActionsForUnschedulingCompactionForFileId(metaClient, fileId, Optional.absent(), skipValidation); + + List res = runRenamingOps(metaClient, renameActions, 1, dryRun); + + if (!dryRun && !res.isEmpty() && res.get(0).isExecuted() && res.get(0).isSuccess()) { + // Ready to remove this file-Id from compaction request + Pair compactionOperationWithInstant = + CompactionUtils.getAllPendingCompactionOperations(metaClient).get(fileId); + HoodieCompactionPlan plan = CompactionUtils + .getCompactionPlan(metaClient, compactionOperationWithInstant.getKey()); + List newOps = plan.getOperations().stream() + .filter(op -> !op.getFileId().equals(fileId)).collect(Collectors.toList()); + HoodieCompactionPlan newPlan = + HoodieCompactionPlan.newBuilder().setOperations(newOps).setExtraMetadata(plan.getExtraMetadata()).build(); + HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, + compactionOperationWithInstant.getLeft()); + Path inflightPath = new Path(metaClient.getMetaPath(), inflight.getFileName()); + if (metaClient.getFs().exists(inflightPath)) { + // revert if in inflight state + metaClient.getActiveTimeline().revertCompactionInflightToRequested(inflight); + } + metaClient.getActiveTimeline().saveToCompactionRequested( + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, compactionOperationWithInstant.getLeft()), + AvroUtils.serializeCompactionPlan(newPlan)); + } + return res; + } + + /** + * Renames delta files to make file-slices consistent with the timeline as dictated by Hoodie metadata. + * Use when compaction unschedule fails partially. + * + * This operation MUST be executed with compactions and writer turned OFF. + * @param compactionInstant Compaction Instant to be repaired + * @param dryRun Dry Run Mode + */ + public List repairCompaction(String compactionInstant, + int parallelism, boolean dryRun) throws Exception { + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + List validationResults = + validateCompactionPlan(metaClient, compactionInstant, parallelism); + List failed = validationResults.stream() + .filter(v -> !v.isSuccess()).collect(Collectors.toList()); + if (failed.isEmpty()) { + return new ArrayList<>(); + } + + final HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, + metaClient.getCommitsAndCompactionTimeline()); + List> renameActions = failed.stream().flatMap(v -> + getRenamingActionsToAlignWithCompactionOperation(metaClient, compactionInstant, + v.getOperation(), Optional.of(fsView)).stream()).collect(Collectors.toList()); + return runRenamingOps(metaClient, renameActions, parallelism, dryRun); + } + + /** + * Construction Compaction Plan from compaction instant + */ + private static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, + String compactionInstant) throws IOException { + HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan( + metaClient.getActiveTimeline().getInstantAuxiliaryDetails( + HoodieTimeline.getCompactionRequestedInstant(compactionInstant)).get()); + return compactionPlan; + } + + /** + * Get Renaming actions to ensure the log-files of merged file-slices is aligned with compaction operation. This + * method is used to recover from failures during unschedule compaction operations. + * + * @param metaClient Hoodie Table Meta Client + * @param compactionInstant Compaction Instant + * @param op Compaction Operation + * @param fsViewOpt File System View + */ + protected static List> getRenamingActionsToAlignWithCompactionOperation( + HoodieTableMetaClient metaClient, String compactionInstant, CompactionOperation op, + Optional fsViewOpt) { + HoodieTableFileSystemView fileSystemView = fsViewOpt.isPresent() ? fsViewOpt.get() : + new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline()); + HoodieInstant lastInstant = metaClient.getCommitsAndCompactionTimeline().lastInstant().get(); + FileSlice merged = + fileSystemView.getLatestMergedFileSlicesBeforeOrOn(op.getPartitionPath(), lastInstant.getTimestamp()) + .filter(fs -> fs.getFileId().equals(op.getFileId())).findFirst().get(); + final int maxVersion = + op.getDeltaFilePaths().stream().map(lf -> FSUtils.getFileVersionFromLog(new Path(lf))) + .reduce((x, y) -> x > y ? x : y).map(x -> x).orElse(0); + List logFilesToBeMoved = + merged.getLogFiles().filter(lf -> lf.getLogVersion() > maxVersion).collect(Collectors.toList()); + return logFilesToBeMoved.stream().map(lf -> { + Preconditions.checkArgument(lf.getLogVersion() - maxVersion > 0, + "Expect new log version to be sane"); + HoodieLogFile newLogFile = new HoodieLogFile(new Path(lf.getPath().getParent(), + FSUtils.makeLogFileName(lf.getFileId(), "." + FSUtils.getFileExtensionFromLog(lf.getPath()), + compactionInstant, lf.getLogVersion() - maxVersion))); + return Pair.of(lf, newLogFile); + }).collect(Collectors.toList()); + } + + /** + * Rename log files. This is done for un-scheduling a pending compaction operation NOTE: Can only be used safely when + * no writer (ingestion/compaction) is running. + * + * @param metaClient Hoodie Table Meta-Client + * @param oldLogFile Old Log File + * @param newLogFile New Log File + */ + protected static void renameLogFile(HoodieTableMetaClient metaClient, HoodieLogFile oldLogFile, + HoodieLogFile newLogFile) throws IOException { + FileStatus[] statuses = metaClient.getFs().listStatus(oldLogFile.getPath()); + Preconditions.checkArgument(statuses.length == 1, "Only one status must be present"); + Preconditions.checkArgument(statuses[0].isFile(), "Source File must exist"); + Preconditions.checkArgument(oldLogFile.getPath().getParent().equals(newLogFile.getPath().getParent()), + "Log file must only be moved within the parent directory"); + metaClient.getFs().rename(oldLogFile.getPath(), newLogFile.getPath()); + } + + /** + * Check if a compaction operation is valid + * + * @param metaClient Hoodie Table Meta client + * @param compactionInstant Compaction Instant + * @param operation Compaction Operation + * @param fsViewOpt File System View + */ + private ValidationOpResult validateCompactionOperation(HoodieTableMetaClient metaClient, + String compactionInstant, CompactionOperation operation, Optional fsViewOpt) + throws IOException { + HoodieTableFileSystemView fileSystemView = fsViewOpt.isPresent() ? fsViewOpt.get() : + new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline()); + java.util.Optional lastInstant = metaClient.getCommitsAndCompactionTimeline().lastInstant(); + try { + if (lastInstant.isPresent()) { + java.util.Optional fileSliceOptional = + fileSystemView.getLatestUnCompactedFileSlices(operation.getPartitionPath()) + .filter(fs -> fs.getFileId().equals(operation.getFileId())).findFirst(); + if (fileSliceOptional.isPresent()) { + FileSlice fs = fileSliceOptional.get(); + java.util.Optional df = fs.getDataFile(); + if (operation.getDataFilePath().isPresent()) { + String expPath = metaClient.getFs().getFileStatus(new Path(operation.getDataFilePath().get())).getPath() + .toString(); + Preconditions.checkArgument(df.isPresent(), "Data File must be present. File Slice was : " + + fs + ", operation :" + operation); + Preconditions.checkArgument(df.get().getPath().equals(expPath), + "Base Path in operation is specified as " + expPath + " but got path " + df.get().getPath()); + } + Set logFilesInFileSlice = fs.getLogFiles().collect(Collectors.toSet()); + Set logFilesInCompactionOp = operation.getDeltaFilePaths().stream() + .map(dp -> { + try { + FileStatus[] fileStatuses = metaClient.getFs().listStatus(new Path(dp)); + Preconditions.checkArgument(fileStatuses.length == 1, "Expect only 1 file-status"); + return new HoodieLogFile(fileStatuses[0]); + } catch (FileNotFoundException fe) { + throw new CompactionValidationException(fe.getMessage()); + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } + }).collect(Collectors.toSet()); + Set missing = + logFilesInCompactionOp.stream().filter(lf -> !logFilesInFileSlice.contains(lf)) + .collect(Collectors.toSet()); + Preconditions.checkArgument(missing.isEmpty(), + "All log files specified in compaction operation is not present. Missing :" + missing + + ", Exp :" + logFilesInCompactionOp + ", Got :" + logFilesInFileSlice); + Set diff = + logFilesInFileSlice.stream().filter(lf -> !logFilesInCompactionOp.contains(lf)) + .collect(Collectors.toSet()); + Preconditions.checkArgument(diff.stream() + .filter(lf -> !lf.getBaseCommitTime().equals(compactionInstant)).count() == 0, + "There are some log-files which are neither specified in compaction plan " + + "nor present after compaction request instant. Some of these :" + diff); + } else { + throw new CompactionValidationException("Unable to find file-slice for file-id (" + operation.getFileId() + + " Compaction operation is invalid."); + } + } else { + throw new CompactionValidationException("Unable to find any committed instant. Compaction Operation may " + + "be pointing to stale file-slices"); + } + } catch (CompactionValidationException | IllegalArgumentException e) { + return new ValidationOpResult(operation, false, Optional.of(e)); + } + return new ValidationOpResult(operation, true, Optional.absent()); + } + + /** + * Execute Renaming operation + * + * @param metaClient HoodieTable MetaClient + * @param renameActions List of rename operations + */ + private List runRenamingOps(HoodieTableMetaClient metaClient, + List> renameActions, int parallelism, boolean dryRun) { + if (renameActions.isEmpty()) { + log.info("No renaming of log-files needed. Proceeding to removing file-id from compaction-plan"); + return new ArrayList<>(); + } else { + log.info("The following compaction renaming operations needs to be performed to un-schedule"); + if (!dryRun) { + return jsc.parallelize(renameActions, parallelism).map(lfPair -> { + try { + log.info("RENAME " + lfPair.getLeft().getPath() + " => " + lfPair.getRight().getPath()); + renameLogFile(metaClient, lfPair.getLeft(), lfPair.getRight()); + return new RenameOpResult(lfPair, true, Optional.absent()); + } catch (IOException e) { + log.error("Error renaming log file", e); + log.error("\n\n\n***NOTE Compaction is in inconsistent state. Try running \"compaction repair " + + lfPair.getLeft().getBaseCommitTime() + "\" to recover from failure ***\n\n\n"); + return new RenameOpResult(lfPair, false, Optional.of(e)); + } + }).collect(); + } else { + log.info("Dry-Run Mode activated for rename operations"); + return renameActions.parallelStream() + .map(lfPair -> new RenameOpResult(lfPair, false, false, Optional.absent())) + .collect(Collectors.toList()); + } + } + } + + /** + * Generate renaming actions for unscheduling a pending compaction plan. NOTE: Can only be used safely when no writer + * (ingestion/compaction) is running. + * + * @param metaClient Hoodie Table MetaClient + * @param compactionInstant Compaction Instant to be unscheduled + * @param fsViewOpt Cached File System View + * @param skipValidation Skip Validation + * @return list of pairs of log-files (old, new) and for each pair, rename must be done to successfully unschedule + * compaction. + */ + protected List> getRenamingActionsForUnschedulingCompactionPlan( + HoodieTableMetaClient metaClient, String compactionInstant, int parallelism, + Optional fsViewOpt, boolean skipValidation) throws IOException { + HoodieTableFileSystemView fsView = fsViewOpt.isPresent() ? fsViewOpt.get() : + new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline()); + HoodieCompactionPlan plan = getCompactionPlan(metaClient, compactionInstant); + if (plan.getOperations() != null) { + log.info("Number of Compaction Operations :" + plan.getOperations().size() + + " for instant :" + compactionInstant); + List ops = plan.getOperations().stream() + .map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList()); + return jsc.parallelize(ops, parallelism).flatMap(op -> { + try { + return getRenamingActionsForUnschedulingCompactionOperation(metaClient, compactionInstant, + op, Optional.of(fsView), skipValidation).iterator(); + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } catch (CompactionValidationException ve) { + throw new HoodieException(ve); + } + }).collect(); + } + log.warn("No operations for compaction instant : " + compactionInstant); + return new ArrayList<>(); + } + + /** + * Generate renaming actions for unscheduling a compaction operation NOTE: Can only be used safely when no writer + * (ingestion/compaction) is running. + * + * @param metaClient Hoodie Table MetaClient + * @param compactionInstant Compaction Instant + * @param operation Compaction Operation + * @param fsViewOpt Cached File System View + * @param skipValidation Skip Validation + * @return list of pairs of log-files (old, new) and for each pair, rename must be done to successfully unschedule + * compaction. + */ + public List> getRenamingActionsForUnschedulingCompactionOperation( + HoodieTableMetaClient metaClient, String compactionInstant, CompactionOperation operation, + Optional fsViewOpt, boolean skipValidation) throws IOException { + List> result = new ArrayList<>(); + HoodieTableFileSystemView fileSystemView = fsViewOpt.isPresent() ? fsViewOpt.get() : + new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline()); + if (!skipValidation) { + validateCompactionOperation(metaClient, compactionInstant, operation, Optional.of(fileSystemView)); + } + HoodieInstant lastInstant = metaClient.getCommitsAndCompactionTimeline().lastInstant().get(); + FileSlice merged = + fileSystemView.getLatestMergedFileSlicesBeforeOrOn(operation.getPartitionPath(), lastInstant.getTimestamp()) + .filter(fs -> fs.getFileId().equals(operation.getFileId())).findFirst().get(); + List logFilesToRepair = + merged.getLogFiles().filter(lf -> lf.getBaseCommitTime().equals(compactionInstant)) + .collect(Collectors.toList()); + logFilesToRepair.sort(HoodieLogFile.getBaseInstantAndLogVersionComparator().reversed()); + FileSlice fileSliceForCompaction = + fileSystemView.getLatestFileSlicesBeforeOrOn(operation.getPartitionPath(), operation.getBaseInstantTime()) + .filter(fs -> fs.getFileId().equals(operation.getFileId())).findFirst().get(); + int maxUsedVersion = + fileSliceForCompaction.getLogFiles().findFirst().map(lf -> lf.getLogVersion()) + .orElse(HoodieLogFile.LOGFILE_BASE_VERSION - 1); + String logExtn = fileSliceForCompaction.getLogFiles().findFirst().map(lf -> "." + lf.getFileExtension()) + .orElse(HoodieLogFile.DELTA_EXTENSION); + String parentPath = fileSliceForCompaction.getDataFile().map(df -> new Path(df.getPath()).getParent().toString()) + .orElse(fileSliceForCompaction.getLogFiles().findFirst().map(lf -> lf.getPath().getParent().toString()).get()); + for (HoodieLogFile toRepair : logFilesToRepair) { + int version = maxUsedVersion + 1; + HoodieLogFile newLf = new HoodieLogFile(new Path(parentPath, FSUtils.makeLogFileName(operation.getFileId(), + logExtn, operation.getBaseInstantTime(), version))); + result.add(Pair.of(toRepair, newLf)); + maxUsedVersion = version; + } + return result; + } + + /** + * Generate renaming actions for unscheduling a fileId from pending compaction. NOTE: Can only be used safely when no + * writer (ingestion/compaction) is running. + * + * @param metaClient Hoodie Table MetaClient + * @param fileId FileId to remove compaction + * @param fsViewOpt Cached File System View + * @param skipValidation Skip Validation + * @return list of pairs of log-files (old, new) and for each pair, rename must be done to successfully unschedule + * compaction. + */ + public List> getRenamingActionsForUnschedulingCompactionForFileId( + HoodieTableMetaClient metaClient, String fileId, Optional fsViewOpt, + boolean skipValidation) throws IOException { + Map> allPendingCompactions = + CompactionUtils.getAllPendingCompactionOperations(metaClient); + if (allPendingCompactions.containsKey(fileId)) { + Pair opWithInstant = allPendingCompactions.get(fileId); + return getRenamingActionsForUnschedulingCompactionOperation(metaClient, opWithInstant.getKey(), + CompactionOperation.convertFromAvroRecordInstance(opWithInstant.getValue()), fsViewOpt, skipValidation); + } + throw new HoodieException("FileId " + fileId + " not in pending compaction"); + } + + /** + * Holds Operation result for Renaming + */ + public static class RenameOpResult extends OperationResult { + + public RenameOpResult() { + } + + public RenameOpResult(Pair op, boolean success, + Optional exception) { + super(new RenameInfo(op.getKey().getFileId(), op.getKey().getPath().toString(), + op.getRight().getPath().toString()), success, exception); + } + + public RenameOpResult( + Pair op, boolean executed, boolean success, + Optional exception) { + super(new RenameInfo(op.getKey().getFileId(), op.getKey().getPath().toString(), + op.getRight().getPath().toString()), executed, success, exception); + } + } + + /** + * Holds Operation result for Renaming + */ + public static class ValidationOpResult extends OperationResult { + + public ValidationOpResult() { + } + + public ValidationOpResult( + CompactionOperation operation, boolean success, Optional exception) { + super(operation, success, exception); + } + } + + public static class RenameInfo implements Serializable { + + public String fileId; + public String srcPath; + public String destPath; + + public RenameInfo() { + } + + public RenameInfo(String fileId, String srcPath, String destPath) { + this.fileId = fileId; + this.srcPath = srcPath; + this.destPath = destPath; + } + } + + public static class CompactionValidationException extends RuntimeException { + + public CompactionValidationException(String msg) { + super(msg); + } + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/func/OperationResult.java b/hoodie-client/src/main/java/com/uber/hoodie/func/OperationResult.java new file mode 100644 index 000000000..38f368bb4 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/func/OperationResult.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.func; + +import com.google.common.base.Optional; +import java.io.Serializable; + +/** + * Holds Operation result. Used as a result container for Compaction Admin Client (running as part of Spark-launcher + * process) to communicate results back to Hoodie CLI process. + */ +public class OperationResult implements Serializable { + + private T operation; + private boolean executed; + private boolean success; + private Optional exception; + + public OperationResult() { + } + + public OperationResult(T operation, boolean success, Optional exception) { + this.operation = operation; + this.success = success; + this.exception = exception; + this.executed = true; + } + + public OperationResult(T operation, boolean executed, boolean success, Optional exception) { + this.operation = operation; + this.success = success; + this.exception = exception; + this.executed = executed; + } + + public T getOperation() { + return operation; + } + + public boolean isSuccess() { + return success; + } + + public boolean isExecuted() { + return executed; + } + + public Optional getException() { + return exception; + } + + @Override + public String toString() { + return "OperationResult{" + + "operation=" + operation + + ", executed=" + executed + + ", success=" + success + + ", exception=" + exception + + '}'; + } +} \ No newline at end of file diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestCompactionAdminClient.java b/hoodie-client/src/test/java/com/uber/hoodie/TestCompactionAdminClient.java new file mode 100644 index 000000000..a385f0947 --- /dev/null +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestCompactionAdminClient.java @@ -0,0 +1,363 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie; + +import static com.uber.hoodie.common.model.HoodieTestUtils.getDefaultHadoopConf; + +import com.google.common.base.Optional; +import com.uber.hoodie.CompactionAdminClient.ValidationOpResult; +import com.uber.hoodie.common.model.CompactionOperation; +import com.uber.hoodie.common.model.HoodieLogFile; +import com.uber.hoodie.common.model.HoodieTableType; +import com.uber.hoodie.common.model.HoodieTestUtils; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; +import com.uber.hoodie.common.util.CompactionTestUtils; +import com.uber.hoodie.common.util.CompactionUtils; +import com.uber.hoodie.common.util.collection.Pair; +import com.uber.hoodie.exception.HoodieException; +import com.uber.hoodie.exception.HoodieIOException; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestCompactionAdminClient extends TestHoodieClientBase { + + private HoodieTableMetaClient metaClient; + private CompactionAdminClient client; + + @Before + public void init() throws IOException { + super.init(); + metaClient = HoodieTestUtils.initTableType(getDefaultHadoopConf(), basePath, HoodieTableType.MERGE_ON_READ); + client = new CompactionAdminClient(jsc, basePath); + } + + @Override + public void tearDown() throws IOException { + super.tearDown(); + } + + @Test + public void testUnscheduleCompactionPlan() throws Exception { + int numEntriesPerInstant = 10; + CompactionTestUtils + .setupAndValidateCompactionOperations(metaClient, false, numEntriesPerInstant, numEntriesPerInstant, + numEntriesPerInstant, numEntriesPerInstant); + // THere are delta-commits after compaction instant + validateUnSchedulePlan(client, + "000", "001", numEntriesPerInstant, 2 * numEntriesPerInstant); + // THere are delta-commits after compaction instant + validateUnSchedulePlan(client, + "002", "003", numEntriesPerInstant, 2 * numEntriesPerInstant); + // THere are no delta-commits after compaction instant + validateUnSchedulePlan(client, + "004", "005", numEntriesPerInstant, 0); + // THere are no delta-commits after compaction instant + validateUnSchedulePlan(client, + "006", "007", numEntriesPerInstant, 0); + } + + @Test + public void testUnscheduleCompactionFileId() throws Exception { + int numEntriesPerInstant = 10; + CompactionTestUtils + .setupAndValidateCompactionOperations(metaClient, false, numEntriesPerInstant, numEntriesPerInstant, + numEntriesPerInstant, numEntriesPerInstant); + Map instantsWithOp = + Arrays.asList("001", "003", "005", "007").stream().map(instant -> { + try { + return Pair.of(instant, CompactionUtils.getCompactionPlan(metaClient, instant)); + } catch (IOException ioe) { + throw new HoodieException(ioe); + } + }).map(instantWithPlan -> instantWithPlan.getRight().getOperations().stream().map(op -> Pair.of( + instantWithPlan.getLeft(), CompactionOperation.convertFromAvroRecordInstance(op))).findFirst().get()) + .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); + // THere are delta-commits after compaction instant + validateUnScheduleFileId(client, + "000", "001", instantsWithOp.get("001"), 2); + // THere are delta-commits after compaction instant + validateUnScheduleFileId(client, + "002", "003", instantsWithOp.get("003"), 2); + // THere are no delta-commits after compaction instant + validateUnScheduleFileId(client, + "004", "005", instantsWithOp.get("005"), 0); + // THere are no delta-commits after compaction instant + validateUnScheduleFileId(client, + "006", "007", instantsWithOp.get("007"), 0); + } + + @Test + public void testRepairCompactionPlan() throws Exception { + int numEntriesPerInstant = 10; + CompactionTestUtils + .setupAndValidateCompactionOperations(metaClient,false, numEntriesPerInstant, numEntriesPerInstant, + numEntriesPerInstant, numEntriesPerInstant); + // THere are delta-commits after compaction instant + validateRepair("000", "001", numEntriesPerInstant, 2 * numEntriesPerInstant); + // THere are delta-commits after compaction instant + validateRepair("002", "003", numEntriesPerInstant, 2 * numEntriesPerInstant); + // THere are no delta-commits after compaction instant + validateRepair("004", "005", numEntriesPerInstant, 0); + // THere are no delta-commits after compaction instant + validateRepair("006", "007", numEntriesPerInstant, 0); + } + + private void validateRepair(String ingestionInstant, String compactionInstant, int numEntriesPerInstant, + int expNumRepairs) throws Exception { + List> renameFiles = + validateUnSchedulePlan(client, ingestionInstant, compactionInstant, numEntriesPerInstant, expNumRepairs, true); + metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true); + List result = client.validateCompactionPlan(metaClient, compactionInstant, 1); + if (expNumRepairs > 0) { + Assert.assertTrue("Expect some failures in validation", result.stream().filter(r -> !r.isSuccess()).count() > 0); + } + // Now repair + List> undoFiles = result.stream().flatMap(r -> + client.getRenamingActionsToAlignWithCompactionOperation(metaClient, + compactionInstant, r.getOperation(), Optional.absent()).stream()) + .map(rn -> { + try { + client.renameLogFile(metaClient, rn.getKey(), rn.getValue()); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + return rn; + }).collect(Collectors.toList()); + Map renameFilesFromUndo = + undoFiles.stream().collect(Collectors.toMap(p -> p.getRight().getPath().toString(), + x -> x.getLeft().getPath().toString())); + Map expRenameFiles = + renameFiles.stream().collect(Collectors.toMap(p -> p.getLeft().getPath().toString(), + x -> x.getRight().getPath().toString())); + if (expNumRepairs > 0) { + Assert.assertFalse("Rename Files must be non-empty", renameFiles.isEmpty()); + } else { + Assert.assertTrue("Rename Files must be empty", renameFiles.isEmpty()); + } + expRenameFiles.entrySet().stream().forEach(r -> { + System.out.println("Key :" + r.getKey() + " renamed to " + r.getValue() + " rolled back to " + + renameFilesFromUndo.get(r.getKey())); + }); + + Assert.assertEquals("Undo must completely rollback renames", expRenameFiles, renameFilesFromUndo); + // Now expect validation to succeed + result = client.validateCompactionPlan(metaClient, compactionInstant, 1); + Assert.assertTrue("Expect no failures in validation", result.stream().filter(r -> !r.isSuccess()).count() == 0); + Assert.assertEquals("Expected Num Repairs", expNumRepairs, undoFiles.size()); + } + + /** + * Enssure compaction plan is valid + * @param compactionInstant Compaction Instant + * @throws Exception + */ + private void ensureValidCompactionPlan(String compactionInstant) throws Exception { + metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true); + // Ensure compaction-plan is good to begin with + List validationResults = client.validateCompactionPlan(metaClient, + compactionInstant, 1); + Assert.assertFalse("Some validations failed", + validationResults.stream().filter(v -> !v.isSuccess()).findAny().isPresent()); + } + + private void validateRenameFiles(List> renameFiles, + String ingestionInstant, String compactionInstant, HoodieTableFileSystemView fsView) { + // Ensure new names of log-files are on expected lines + Set uniqNewLogFiles = new HashSet<>(); + Set uniqOldLogFiles = new HashSet<>(); + + renameFiles.stream().forEach(lfPair -> { + Assert.assertFalse("Old Log File Names do not collide", uniqOldLogFiles.contains(lfPair.getKey())); + Assert.assertFalse("New Log File Names do not collide", uniqNewLogFiles.contains(lfPair.getValue())); + uniqOldLogFiles.add(lfPair.getKey()); + uniqNewLogFiles.add(lfPair.getValue()); + }); + + renameFiles.stream().forEach(lfPair -> { + HoodieLogFile oldLogFile = lfPair.getLeft(); + HoodieLogFile newLogFile = lfPair.getValue(); + Assert.assertEquals("Base Commit time is expected", ingestionInstant, newLogFile.getBaseCommitTime()); + Assert.assertEquals("Base Commit time is expected", compactionInstant, oldLogFile.getBaseCommitTime()); + Assert.assertEquals("File Id is expected", oldLogFile.getFileId(), newLogFile.getFileId()); + HoodieLogFile lastLogFileBeforeCompaction = + fsView.getLatestMergedFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], ingestionInstant) + .filter(fs -> fs.getFileId().equals(oldLogFile.getFileId())) + .map(fs -> fs.getLogFiles().findFirst().get()).findFirst().get(); + Assert.assertEquals("Log Version expected", + lastLogFileBeforeCompaction.getLogVersion() + oldLogFile.getLogVersion(), + newLogFile.getLogVersion()); + Assert.assertTrue("Log version does not collide", + newLogFile.getLogVersion() > lastLogFileBeforeCompaction.getLogVersion()); + }); + } + + /** + * Validate Unschedule operations + */ + private List> validateUnSchedulePlan(CompactionAdminClient client, + String ingestionInstant, String compactionInstant, int numEntriesPerInstant, int expNumRenames) + throws Exception { + return validateUnSchedulePlan(client, ingestionInstant, compactionInstant, numEntriesPerInstant, + expNumRenames, false); + } + + /** + * Validate Unschedule operations + */ + private List> validateUnSchedulePlan(CompactionAdminClient client, + String ingestionInstant, String compactionInstant, int numEntriesPerInstant, int expNumRenames, + boolean skipUnSchedule) throws Exception { + + ensureValidCompactionPlan(compactionInstant); + + // Check suggested rename operations + List> renameFiles = + client.getRenamingActionsForUnschedulingCompactionPlan(metaClient, compactionInstant, 1, + Optional.absent(), false); + metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true); + + // Log files belonging to file-slices created because of compaction request must be renamed + + Set gotLogFilesToBeRenamed = renameFiles.stream().map(p -> p.getLeft()).collect(Collectors.toSet()); + final HoodieTableFileSystemView fsView = + new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline()); + Set expLogFilesToBeRenamed = fsView.getLatestFileSlices(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0]) + .filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)) + .flatMap(fs -> fs.getLogFiles()) + .collect(Collectors.toSet()); + Assert.assertEquals("Log files belonging to file-slices created because of compaction request must be renamed", + expLogFilesToBeRenamed, gotLogFilesToBeRenamed); + + if (skipUnSchedule) { + // Do the renaming only but do not touch the compaction plan - Needed for repair tests + renameFiles.stream().forEach(lfPair -> { + try { + client.renameLogFile(metaClient, lfPair.getLeft(), lfPair.getRight()); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + }); + } else { + validateRenameFiles(renameFiles, ingestionInstant, compactionInstant, fsView); + } + + Map fileIdToCountsBeforeRenaming = + fsView.getLatestMergedFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant) + .filter(fs -> fs.getBaseInstantTime().equals(ingestionInstant)) + .map(fs -> Pair.of(fs.getFileId(), fs.getLogFiles().count())) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + + // Call the main unschedule API + + client.unscheduleCompactionPlan(compactionInstant, false, 1, false); + + metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true); + final HoodieTableFileSystemView newFsView = + new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline()); + // Expect all file-slice whose base-commit is same as compaction commit to contain no new Log files + newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant) + .filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)).forEach(fs -> { + Assert.assertFalse("No Data file must be present", fs.getDataFile().isPresent()); + Assert.assertTrue("No Log Files", fs.getLogFiles().count() == 0); + }); + + // Ensure same number of log-files before and after renaming per fileId + Map fileIdToCountsAfterRenaming = + newFsView.getAllFileGroups(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0]).flatMap(fg -> fg.getAllFileSlices()) + .filter(fs -> fs.getBaseInstantTime().equals(ingestionInstant)) + .map(fs -> Pair.of(fs.getFileId(), fs.getLogFiles().count())) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + + Assert.assertEquals("Each File Id has same number of log-files", + fileIdToCountsBeforeRenaming, fileIdToCountsAfterRenaming); + Assert.assertEquals("Not Empty", numEntriesPerInstant, fileIdToCountsAfterRenaming.size()); + Assert.assertEquals("Expected number of renames", expNumRenames, renameFiles.size()); + return renameFiles; + } + + /** + * Validate Unschedule operations + */ + private void validateUnScheduleFileId(CompactionAdminClient client, String ingestionInstant, + String compactionInstant, CompactionOperation op, int expNumRenames) throws Exception { + + ensureValidCompactionPlan(compactionInstant); + + // Check suggested rename operations + List> renameFiles = + client.getRenamingActionsForUnschedulingCompactionOperation(metaClient, compactionInstant, op, + Optional.absent(), false); + metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true); + + // Log files belonging to file-slices created because of compaction request must be renamed + + Set gotLogFilesToBeRenamed = renameFiles.stream().map(p -> p.getLeft()).collect(Collectors.toSet()); + final HoodieTableFileSystemView fsView = + new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline()); + Set expLogFilesToBeRenamed = fsView.getLatestFileSlices(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0]) + .filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)) + .filter(fs -> fs.getFileId().equals(op.getFileId())) + .flatMap(fs -> fs.getLogFiles()) + .collect(Collectors.toSet()); + Assert.assertEquals("Log files belonging to file-slices created because of compaction request must be renamed", + expLogFilesToBeRenamed, gotLogFilesToBeRenamed); + validateRenameFiles(renameFiles, ingestionInstant, compactionInstant, fsView); + + Map fileIdToCountsBeforeRenaming = + fsView.getLatestMergedFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant) + .filter(fs -> fs.getBaseInstantTime().equals(ingestionInstant)) + .filter(fs -> fs.getFileId().equals(op.getFileId())) + .map(fs -> Pair.of(fs.getFileId(), fs.getLogFiles().count())) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + + // Call the main unschedule API + client.unscheduleCompactionFileId(op.getFileId(), false, false); + + metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true); + final HoodieTableFileSystemView newFsView = + new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline()); + // Expect all file-slice whose base-commit is same as compaction commit to contain no new Log files + newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant) + .filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)) + .filter(fs -> fs.getFileId().equals(op.getFileId())).forEach(fs -> { + Assert.assertFalse("No Data file must be present", fs.getDataFile().isPresent()); + Assert.assertTrue("No Log Files", fs.getLogFiles().count() == 0); + }); + + // Ensure same number of log-files before and after renaming per fileId + Map fileIdToCountsAfterRenaming = + newFsView.getAllFileGroups(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0]).flatMap(fg -> fg.getAllFileSlices()) + .filter(fs -> fs.getBaseInstantTime().equals(ingestionInstant)) + .filter(fs -> fs.getFileId().equals(op.getFileId())) + .map(fs -> Pair.of(fs.getFileId(), fs.getLogFiles().count())) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + + Assert.assertEquals("Each File Id has same number of log-files", + fileIdToCountsBeforeRenaming, fileIdToCountsAfterRenaming); + Assert.assertEquals("Not Empty", 1, fileIdToCountsAfterRenaming.size()); + Assert.assertEquals("Expected number of renames", expNumRenames, renameFiles.size()); + } +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/CompactionUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/CompactionUtils.java index 6ce2aa8d3..dec7244db 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/CompactionUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/CompactionUtils.java @@ -33,12 +33,16 @@ import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; /** * Helper class to generate compaction plan from FileGroup/FileSlice abstraction */ public class CompactionUtils { + private static final Logger LOG = LogManager.getLogger(CompactionUtils.class); + /** * Generate compaction operation from file-slice * @@ -47,7 +51,7 @@ public class CompactionUtils { * @param metricsCaptureFunction Metrics Capture function * @return Compaction Operation */ - public static HoodieCompactionOperation buildFromFileSlice(String partitionPath, FileSlice fileSlice, + public static HoodieCompactionOperation buildFromFileSlice(String partitionPath, FileSlice fileSlice, Optional, Map>> metricsCaptureFunction) { HoodieCompactionOperation.Builder builder = HoodieCompactionOperation.newBuilder(); builder.setPartitionPath(partitionPath); @@ -114,16 +118,21 @@ public class CompactionUtils { metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants().collect(Collectors.toList()); return pendingCompactionInstants.stream().map(instant -> { try { - HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan( - metaClient.getActiveTimeline().getInstantAuxiliaryDetails( - HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get()); - return Pair.of(instant, compactionPlan); + return Pair.of(instant, getCompactionPlan(metaClient, instant.getTimestamp())); } catch (IOException e) { throw new HoodieException(e); } }).collect(Collectors.toList()); } + public static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, + String compactionInstant) throws IOException { + HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan( + metaClient.getActiveTimeline().getInstantAuxiliaryDetails( + HoodieTimeline.getCompactionRequestedInstant(compactionInstant)).get()); + return compactionPlan; + } + /** * Get all file-ids with pending Compaction operations and their target compaction instant time * diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/CompactionTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/CompactionTestUtils.java new file mode 100644 index 000000000..5fae7a3b3 --- /dev/null +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/CompactionTestUtils.java @@ -0,0 +1,207 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.util; + +import static com.uber.hoodie.common.model.HoodieTestUtils.DEFAULT_PARTITION_PATHS; +import static com.uber.hoodie.common.table.HoodieTimeline.COMPACTION_ACTION; +import static com.uber.hoodie.common.table.HoodieTimeline.DELTA_COMMIT_ACTION; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.uber.hoodie.avro.model.HoodieCompactionOperation; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; +import com.uber.hoodie.common.model.FileSlice; +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodieLogFile; +import com.uber.hoodie.common.model.HoodieTestUtils; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.table.timeline.HoodieInstant.State; +import com.uber.hoodie.common.util.collection.Pair; +import com.uber.hoodie.exception.HoodieIOException; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.apache.hadoop.fs.Path; +import org.junit.Assert; + +public class CompactionTestUtils { + + public static Map> setupAndValidateCompactionOperations( + HoodieTableMetaClient metaClient, boolean inflight, + int numEntriesInPlan1, int numEntriesInPlan2, + int numEntriesInPlan3, int numEntriesInPlan4) throws IOException { + HoodieCompactionPlan plan1 = createCompactionPlan(metaClient, "000", "001", numEntriesInPlan1, true, true); + HoodieCompactionPlan plan2 = createCompactionPlan(metaClient, "002", "003", numEntriesInPlan2, false, true); + HoodieCompactionPlan plan3 = createCompactionPlan(metaClient, "004", "005", numEntriesInPlan3, true, false); + HoodieCompactionPlan plan4 = createCompactionPlan(metaClient, "006", "007", numEntriesInPlan4, false, false); + + if (inflight) { + scheduleInflightCompaction(metaClient, "001", plan1); + scheduleInflightCompaction(metaClient, "003", plan2); + scheduleInflightCompaction(metaClient, "005", plan3); + scheduleInflightCompaction(metaClient, "007", plan4); + } else { + scheduleCompaction(metaClient, "001", plan1); + scheduleCompaction(metaClient, "003", plan2); + scheduleCompaction(metaClient, "005", plan3); + scheduleCompaction(metaClient, "007", plan4); + } + + createDeltaCommit(metaClient, "000"); + createDeltaCommit(metaClient, "002"); + createDeltaCommit(metaClient, "004"); + createDeltaCommit(metaClient, "006"); + + Map baseInstantsToCompaction = + new ImmutableMap.Builder().put("000", "001").put("002", "003") + .put("004", "005").put("006", "007").build(); + List expectedNumEntries = + Arrays.asList(numEntriesInPlan1, numEntriesInPlan2, numEntriesInPlan3, numEntriesInPlan4); + List plans = new ImmutableList.Builder() + .add(plan1, plan2, plan3, plan4).build(); + IntStream.range(0, 4).boxed().forEach(idx -> { + if (expectedNumEntries.get(idx) > 0) { + Assert.assertEquals("check if plan " + idx + " has exp entries", + expectedNumEntries.get(idx).longValue(), plans.get(idx).getOperations().size()); + } else { + Assert.assertNull("Plan " + idx + " has null ops", plans.get(idx).getOperations()); + } + }); + + metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath(), true); + Map> pendingCompactionMap = + CompactionUtils.getAllPendingCompactionOperations(metaClient); + + Map> expPendingCompactionMap = + generateExpectedCompactionOperations(Arrays.asList(plan1, plan2, plan3, plan4), baseInstantsToCompaction); + + // Ensure Compaction operations are fine. + Assert.assertEquals(expPendingCompactionMap, pendingCompactionMap); + return expPendingCompactionMap; + } + + public static Map> generateExpectedCompactionOperations( + List plans, Map baseInstantsToCompaction) { + return plans.stream() + .flatMap(plan -> { + if (plan.getOperations() != null) { + return plan.getOperations().stream().map(op -> Pair.of(op.getFileId(), + Pair.of(baseInstantsToCompaction.get(op.getBaseInstantTime()), op))); + } + return Stream.empty(); + }).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + } + + public static void scheduleCompaction(HoodieTableMetaClient metaClient, + String instantTime, HoodieCompactionPlan compactionPlan) throws IOException { + metaClient.getActiveTimeline().saveToCompactionRequested( + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, instantTime), + AvroUtils.serializeCompactionPlan(compactionPlan)); + } + + public static void createDeltaCommit(HoodieTableMetaClient metaClient, String instantTime) throws IOException { + metaClient.getActiveTimeline().saveAsComplete( + new HoodieInstant(State.INFLIGHT, DELTA_COMMIT_ACTION, instantTime), Optional.empty()); + } + + public static void scheduleInflightCompaction(HoodieTableMetaClient metaClient, String instantTime, + HoodieCompactionPlan compactionPlan) throws IOException { + scheduleCompaction(metaClient, instantTime, compactionPlan); + metaClient.getActiveTimeline().transitionCompactionRequestedToInflight( + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, instantTime)); + } + + public static HoodieCompactionPlan createCompactionPlan(HoodieTableMetaClient metaClient, String instantId, + String compactionInstantId, int numFileIds, boolean createDataFile, + boolean deltaCommitsAfterCompactionRequests) { + List ops = IntStream.range(0, numFileIds).boxed().map(idx -> { + try { + String fileId = UUID.randomUUID().toString(); + if (createDataFile) { + HoodieTestUtils.createDataFile(metaClient.getBasePath(), DEFAULT_PARTITION_PATHS[0], instantId, fileId); + } + HoodieTestUtils.createNewLogFile(metaClient.getFs(), metaClient.getBasePath(), DEFAULT_PARTITION_PATHS[0], + instantId, fileId, Optional.of(1)); + HoodieTestUtils.createNewLogFile(metaClient.getFs(), metaClient.getBasePath(), DEFAULT_PARTITION_PATHS[0], + instantId, fileId, Optional.of(2)); + FileSlice slice = new FileSlice(instantId, fileId); + if (createDataFile) { + slice.setDataFile(new TestHoodieDataFile(metaClient.getBasePath() + "/" + DEFAULT_PARTITION_PATHS[0] + + "/" + FSUtils.makeDataFileName(instantId, 1, fileId))); + } + String logFilePath1 = HoodieTestUtils + .getLogFilePath(metaClient.getBasePath(), DEFAULT_PARTITION_PATHS[0], instantId, fileId, + Optional.of(1)); + String logFilePath2 = HoodieTestUtils + .getLogFilePath(metaClient.getBasePath(), DEFAULT_PARTITION_PATHS[0], instantId, fileId, + Optional.of(2)); + slice.addLogFile(new HoodieLogFile(new Path(logFilePath1))); + slice.addLogFile(new HoodieLogFile(new Path(logFilePath2))); + HoodieCompactionOperation op = + CompactionUtils.buildFromFileSlice(DEFAULT_PARTITION_PATHS[0], slice, Optional.empty()); + if (deltaCommitsAfterCompactionRequests) { + HoodieTestUtils.createNewLogFile(metaClient.getFs(), metaClient.getBasePath(), DEFAULT_PARTITION_PATHS[0], + compactionInstantId, fileId, Optional.of(1)); + HoodieTestUtils.createNewLogFile(metaClient.getFs(), metaClient.getBasePath(), DEFAULT_PARTITION_PATHS[0], + compactionInstantId, fileId, Optional.of(2)); + } + return op; + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + }).collect(Collectors.toList()); + return new HoodieCompactionPlan(ops.isEmpty() ? null : ops, new HashMap<>()); + } + + public static class TestHoodieDataFile extends HoodieDataFile { + + private final String path; + + public TestHoodieDataFile(String path) { + super(null); + this.path = path; + } + + @Override + public String getPath() { + return path; + } + + @Override + public String getFileId() { + return UUID.randomUUID().toString(); + } + + @Override + public String getCommitTime() { + return "100"; + } + + @Override + public long getFileSize() { + return 0; + } + } +} diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestCompactionUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestCompactionUtils.java index 7557b5087..c65216260 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestCompactionUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestCompactionUtils.java @@ -17,32 +17,29 @@ package com.uber.hoodie.common.util; import static com.uber.hoodie.common.model.HoodieTestUtils.DEFAULT_PARTITION_PATHS; -import static com.uber.hoodie.common.table.HoodieTimeline.COMPACTION_ACTION; +import static com.uber.hoodie.common.model.HoodieTestUtils.getDefaultHadoopConf; +import static com.uber.hoodie.common.util.CompactionTestUtils.createCompactionPlan; +import static com.uber.hoodie.common.util.CompactionTestUtils.scheduleCompaction; +import static com.uber.hoodie.common.util.CompactionTestUtils.setupAndValidateCompactionOperations; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.uber.hoodie.avro.model.HoodieCompactionOperation; import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.common.model.FileSlice; -import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieLogFile; +import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.table.HoodieTableMetaClient; -import com.uber.hoodie.common.table.timeline.HoodieInstant; -import com.uber.hoodie.common.table.timeline.HoodieInstant.State; +import com.uber.hoodie.common.util.CompactionTestUtils.TestHoodieDataFile; import com.uber.hoodie.common.util.collection.Pair; -import com.uber.hoodie.exception.HoodieIOException; import java.io.IOException; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.UUID; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; -import java.util.stream.Stream; import org.apache.hadoop.fs.Path; import org.junit.Assert; import org.junit.Before; @@ -64,7 +61,8 @@ public class TestCompactionUtils { @Before public void init() throws IOException { - metaClient = HoodieTestUtils.init(tmpFolder.getRoot().getAbsolutePath()); + metaClient = HoodieTestUtils.initTableType(getDefaultHadoopConf(), + tmpFolder.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ); basePath = metaClient.getBasePath(); } @@ -156,12 +154,12 @@ public class TestCompactionUtils { @Test(expected = IllegalStateException.class) public void testGetAllPendingCompactionOperationsWithDupFileId() throws IOException { // Case where there is duplicate fileIds in compaction requests - HoodieCompactionPlan plan1 = createCompactionPlan("000", 10); - HoodieCompactionPlan plan2 = createCompactionPlan("001", 10); - scheduleCompaction("000", plan1); - scheduleCompaction("001", plan2); + HoodieCompactionPlan plan1 = createCompactionPlan(metaClient, "000", "001", 10, true, true); + HoodieCompactionPlan plan2 = createCompactionPlan(metaClient, "002", "003", 0, false, false); + scheduleCompaction(metaClient, "001", plan1); + scheduleCompaction(metaClient, "003", plan2); // schedule same plan again so that there will be duplicates - scheduleCompaction("003", plan1); + scheduleCompaction(metaClient, "005", plan1); metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true); Map> res = CompactionUtils.getAllPendingCompactionOperations(metaClient); @@ -170,114 +168,19 @@ public class TestCompactionUtils { @Test public void testGetAllPendingCompactionOperations() throws IOException { // Case where there are 4 compaction requests where 1 is empty. - testGetAllPendingCompactionOperations(false, 10, 10, 10, 0); + setupAndValidateCompactionOperations(metaClient, false, 10, 10, 10, 0); } @Test public void testGetAllPendingInflightCompactionOperations() throws IOException { // Case where there are 4 compaction requests where 1 is empty. All of them are marked inflight - testGetAllPendingCompactionOperations(true, 10, 10, 10, 0); + setupAndValidateCompactionOperations(metaClient, true, 10, 10, 10, 0); } @Test public void testGetAllPendingCompactionOperationsForEmptyCompactions() throws IOException { // Case where there are 4 compaction requests and all are empty. - testGetAllPendingCompactionOperations(false, 0, 0, 0, 0); - } - - private void testGetAllPendingCompactionOperations(boolean inflight, int numEntriesInPlan1, int numEntriesInPlan2, - int numEntriesInPlan3, int numEntriesInPlan4) throws IOException { - HoodieCompactionPlan plan1 = createCompactionPlan("000", numEntriesInPlan1); - HoodieCompactionPlan plan2 = createCompactionPlan("001", numEntriesInPlan2); - HoodieCompactionPlan plan3 = createCompactionPlan("002", numEntriesInPlan3); - HoodieCompactionPlan plan4 = createCompactionPlan("003", numEntriesInPlan4); - - if (inflight) { - scheduleInflightCompaction("000", plan1); - scheduleInflightCompaction("001", plan2); - scheduleInflightCompaction("002", plan3); - scheduleInflightCompaction("003", plan4); - } else { - scheduleCompaction("000", plan1); - scheduleCompaction("001", plan2); - scheduleCompaction("002", plan3); - scheduleCompaction("003", plan4); - } - - List expectedNumEntries = - Arrays.asList(numEntriesInPlan1, numEntriesInPlan2, numEntriesInPlan3, numEntriesInPlan4); - List plans = new ImmutableList.Builder() - .add(plan1, plan2, plan3, plan4).build(); - IntStream.range(0, 4).boxed().forEach(idx -> { - if (expectedNumEntries.get(idx) > 0) { - Assert.assertEquals("check if plan " + idx + " has exp entries", - expectedNumEntries.get(idx).longValue(), plans.get(idx).getOperations().size()); - } else { - Assert.assertNull("Plan " + idx + " has null ops", plans.get(idx).getOperations()); - } - }); - - metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true); - Map> pendingCompactionMap = - CompactionUtils.getAllPendingCompactionOperations(metaClient); - - Map> expPendingCompactionMap = - generateExpectedCompactionOperations(Arrays.asList(plan1, plan2, plan3, plan4)); - - // Ensure all the - Assert.assertEquals(expPendingCompactionMap, pendingCompactionMap); - } - - private Map> generateExpectedCompactionOperations( - List plans) { - return plans.stream() - .flatMap(plan -> { - if (plan.getOperations() != null) { - return plan.getOperations().stream().map(op -> Pair.of(op.getFileId(), - Pair.of(op.getBaseInstantTime(), op))); - } - return Stream.empty(); - }).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - } - - private void scheduleCompaction(String instantTime, HoodieCompactionPlan compactionPlan) throws IOException { - metaClient.getActiveTimeline().saveToCompactionRequested( - new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, instantTime), - AvroUtils.serializeCompactionPlan(compactionPlan)); - } - - private void scheduleInflightCompaction(String instantTime, HoodieCompactionPlan compactionPlan) throws IOException { - metaClient.getActiveTimeline().saveToCompactionRequested( - new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, instantTime), - AvroUtils.serializeCompactionPlan(compactionPlan)); - metaClient.getActiveTimeline().transitionCompactionRequestedToInflight( - new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, instantTime)); - } - - private HoodieCompactionPlan createCompactionPlan(String instantId, int numFileIds) { - List ops = IntStream.range(0, numFileIds).boxed().map(idx -> { - try { - String fileId = - HoodieTestUtils.createNewDataFile(basePath, DEFAULT_PARTITION_PATHS[0], instantId); - HoodieTestUtils.createNewLogFile(metaClient.getFs(), basePath, DEFAULT_PARTITION_PATHS[0], - instantId, fileId, Optional.of(1)); - HoodieTestUtils.createNewLogFile(metaClient.getFs(), basePath, DEFAULT_PARTITION_PATHS[0], - instantId, fileId, Optional.of(2)); - FileSlice slice = new FileSlice(instantId, fileId); - slice.setDataFile(new TestHoodieDataFile(HoodieTestUtils.createDataFile(basePath, DEFAULT_PARTITION_PATHS[0], - instantId, fileId))); - String logFilePath1 = HoodieTestUtils.getLogFilePath(basePath, DEFAULT_PARTITION_PATHS[0], instantId, fileId, - Optional.of(1)); - String logFilePath2 = HoodieTestUtils.getLogFilePath(basePath, DEFAULT_PARTITION_PATHS[0], instantId, fileId, - Optional.of(2)); - slice.addLogFile(new HoodieLogFile(new Path(logFilePath1))); - slice.addLogFile(new HoodieLogFile(new Path(logFilePath2))); - return CompactionUtils.buildFromFileSlice(DEFAULT_PARTITION_PATHS[0], slice, Optional.empty()); - } catch (IOException e) { - throw new HoodieIOException(e.getMessage(), e); - } - }).collect(Collectors.toList()); - return new HoodieCompactionPlan(ops.isEmpty() ? null : ops, new HashMap<>()); + setupAndValidateCompactionOperations(metaClient, false, 0, 0, 0, 0); } /** @@ -315,35 +218,4 @@ public class TestCompactionUtils { }); Assert.assertEquals("Metrics set", metrics, op.getMetrics()); } - - - private static class TestHoodieDataFile extends HoodieDataFile { - - private final String path; - - public TestHoodieDataFile(String path) { - super(null); - this.path = path; - } - - @Override - public String getPath() { - return path; - } - - @Override - public String getFileId() { - return UUID.randomUUID().toString(); - } - - @Override - public String getCommitTime() { - return "100"; - } - - @Override - public long getFileSize() { - return 0; - } - } } diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactionAdminTool.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactionAdminTool.java new file mode 100644 index 000000000..0a225e91a --- /dev/null +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactionAdminTool.java @@ -0,0 +1,154 @@ +package com.uber.hoodie.utilities; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.uber.hoodie.CompactionAdminClient; +import com.uber.hoodie.CompactionAdminClient.RenameOpResult; +import com.uber.hoodie.CompactionAdminClient.ValidationOpResult; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.util.FSUtils; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.List; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.api.java.JavaSparkContext; + +public class HoodieCompactionAdminTool { + + private final Config cfg; + + public HoodieCompactionAdminTool(Config cfg) { + this.cfg = cfg; + } + + /** + * + * @param args + * @throws Exception + */ + public static void main(String[] args) throws Exception { + final Config cfg = new Config(); + JCommander cmd = new JCommander(cfg, args); + if (cfg.help || args.length == 0) { + cmd.usage(); + System.exit(1); + } + HoodieCompactionAdminTool admin = new HoodieCompactionAdminTool(cfg); + admin.run(UtilHelpers.buildSparkContext("admin-compactor", cfg.sparkMaster, cfg.sparkMemory)); + } + + /** + * Executes one of compaction admin operations + */ + public void run(JavaSparkContext jsc) throws Exception { + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.basePath); + CompactionAdminClient admin = new CompactionAdminClient(jsc, cfg.basePath); + final FileSystem fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration()); + if (cfg.outputPath != null && fs.exists(new Path(cfg.outputPath))) { + throw new IllegalStateException("Output File Path already exists"); + } + switch (cfg.operation) { + case VALIDATE: + List res = + admin.validateCompactionPlan(metaClient, cfg.compactionInstantTime, cfg.parallelism); + if (cfg.printOutput) { + printOperationResult("Result of Validation Operation :", res); + } + serializeOperationResult(fs, res); + break; + case UNSCHEDULE_FILE: + List r = + admin.unscheduleCompactionFileId(cfg.fileId, cfg.skipValidation, cfg.dryRun); + if (cfg.printOutput) { + System.out.println(r); + } + serializeOperationResult(fs, r); + break; + case UNSCHEDULE_PLAN: + List r2 = + admin.unscheduleCompactionPlan(cfg.compactionInstantTime, cfg.skipValidation, cfg.parallelism, cfg.dryRun); + if (cfg.printOutput) { + printOperationResult("Result of Unscheduling Compaction Plan :", r2); + } + serializeOperationResult(fs, r2); + break; + case REPAIR: + List r3 = + admin.repairCompaction(cfg.compactionInstantTime, cfg.parallelism, cfg.dryRun); + if (cfg.printOutput) { + printOperationResult("Result of Repair Operation :", r3); + } + serializeOperationResult(fs, r3); + break; + default: + throw new IllegalStateException("Not yet implemented !!"); + } + } + + private void serializeOperationResult(FileSystem fs, T result) throws Exception { + if ((cfg.outputPath != null) && (result != null)) { + Path outputPath = new Path(cfg.outputPath); + FSDataOutputStream fsout = fs.create(outputPath, true); + ObjectOutputStream out = new ObjectOutputStream(fsout); + out.writeObject(result); + out.close(); + fsout.close(); + } + } + + /** + * Print Operation Result + * + * @param initialLine Initial Line + * @param result Result + */ + private void printOperationResult(String initialLine, List result) { + System.out.println(initialLine); + for (T r : result) { + System.out.print(r); + } + } + + /** + * Operation Types + */ + public enum Operation { + VALIDATE, + UNSCHEDULE_PLAN, + UNSCHEDULE_FILE, + REPAIR + } + + /** + * Admin Configuration Options + */ + public static class Config implements Serializable { + + @Parameter(names = {"--operation", "-op"}, description = "Operation", required = true) + public Operation operation = Operation.VALIDATE; + @Parameter(names = {"--base-path", "-bp"}, description = "Base path for the dataset", required = true) + public String basePath = null; + @Parameter(names = {"--instant-time", "-in"}, description = "Compaction Instant time", required = false) + public String compactionInstantTime = null; + @Parameter(names = {"--file-id", "-id"}, description = "File Id", required = false) + public String fileId = null; + @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = false) + public int parallelism = 3; + @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = true) + public String sparkMaster = null; + @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true) + public String sparkMemory = null; + @Parameter(names = {"--dry-run", "-dr"}, description = "Dry Run Mode", required = false) + public boolean dryRun = false; + @Parameter(names = {"--skip-validation", "-sv"}, description = "Skip Validation", required = false) + public boolean skipValidation = false; + @Parameter(names = {"--output-path", "-ot"}, description = "Output Path", required = false) + public String outputPath = null; + @Parameter(names = {"--print-output", "-pt"}, description = "Print Output", required = false) + public boolean printOutput = true; + @Parameter(names = {"--help", "-h"}, help = true) + public Boolean help = false; + } +}