1
0

Compaction validate, unschedule and repair

This commit is contained in:
Balaji Varadarajan
2018-10-03 10:39:10 -07:00
committed by vinoth chandar
parent d904fe69ca
commit 07324e7a20
10 changed files with 1830 additions and 157 deletions

View File

@@ -161,6 +161,33 @@ hoodie:trips->commit showfiles --commit 20161005165855 --sortBy "Partition Path"
....
```
#### FileSystem View
Hudi views each partition as a collection of file-groups with each file-group containing a list of file-slices in commit
order (See Concepts). The below commands allow users to view the file-slices for a data-set.
```
hoodie:stock_ticks_mor->show fsview all
....
_______________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________
| Partition | FileId | Base-Instant | Data-File | Data-File Size| Num Delta Files| Total Delta File Size| Delta Files |
|==============================================================================================================================================================================================================================================================================================================================================================================================================|
| 2018/08/31| 111415c3-f26d-4639-86c8-f9956f245ac3| 20181002180759| hdfs://namenode:8020/user/hive/warehouse/stock_ticks_mor/2018/08/31/111415c3-f26d-4639-86c8-f9956f245ac3_0_20181002180759.parquet| 432.5 KB | 1 | 20.8 KB | [HoodieLogFile {hdfs://namenode:8020/user/hive/warehouse/stock_ticks_mor/2018/08/31/.111415c3-f26d-4639-86c8-f9956f245ac3_20181002180759.log.1}]|
hoodie:stock_ticks_mor->show fsview latest --partitionPath "2018/08/31"
......
__________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________
| Partition | FileId | Base-Instant | Data-File | Data-File Size| Num Delta Files| Total Delta Size| Delta Size - compaction scheduled| Delta Size - compaction unscheduled| Delta To Base Ratio - compaction scheduled| Delta To Base Ratio - compaction unscheduled| Delta Files - compaction scheduled | Delta Files - compaction unscheduled|
|=================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================|
| 2018/08/31| 111415c3-f26d-4639-86c8-f9956f245ac3| 20181002180759| hdfs://namenode:8020/user/hive/warehouse/stock_ticks_mor/2018/08/31/111415c3-f26d-4639-86c8-f9956f245ac3_0_20181002180759.parquet| 432.5 KB | 1 | 20.8 KB | 20.8 KB | 0.0 B | 0.0 B | 0.0 B | [HoodieLogFile {hdfs://namenode:8020/user/hive/warehouse/stock_ticks_mor/2018/08/31/.111415c3-f26d-4639-86c8-f9956f245ac3_20181002180759.log.1}]| [] |
hoodie:stock_ticks_mor->
```
#### Statistics
Since Hoodie directly manages file sizes for HDFS dataset, it might be good to get an overall picture
@@ -280,31 +307,76 @@ Description: Run Compaction for given instant time
* compaction run - Run Compaction for given instant time
```
##### Up-Coming CLI for Compaction
In the next release, more useful CLI to revert/repair compaction schedules will be added. Here is a preview of them:
##### Validate Compaction
Validating a compaction plan : Check if all the files necessary for compactions are present and are valid
```
hoodie:trips->compaction validate --compactionInstant <instantId>
hoodie:stock_ticks_mor->compaction validate --instant 20181005222611
...
COMPACTION PLAN VALID
___________________________________________________________________________________________________________________________________________________________________________________________________________________________
| File Id | Base Instant Time| Base Data File | Num Delta Files| Valid| Error|
|==========================================================================================================================================================================================================================|
| 05320e98-9a57-4c38-b809-a6beaaeb36bd| 20181005222445 | hdfs://namenode:8020/user/hive/warehouse/stock_ticks_mor/2018/08/31/05320e98-9a57-4c38-b809-a6beaaeb36bd_0_20181005222445.parquet| 1 | true | |
hoodie:stock_ticks_mor->compaction validate --instant 20181005222601
COMPACTION PLAN INVALID
_______________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________
| File Id | Base Instant Time| Base Data File | Num Delta Files| Valid| Error |
|=====================================================================================================================================================================================================================================================================================================|
| 05320e98-9a57-4c38-b809-a6beaaeb36bd| 20181005222445 | hdfs://namenode:8020/user/hive/warehouse/stock_ticks_mor/2018/08/31/05320e98-9a57-4c38-b809-a6beaaeb36bd_0_20181005222445.parquet| 1 | false| All log files specified in compaction operation is not present. Missing .... |
```
##### NOTE
The following commands must be executed without any other writer/ingestion application running.
Sometimes, it becomes necessary to remove a fileId from a compaction-plan inorder to speed-up or unblock compaction
operation. Any new log-files that happened on this file after the compaction got scheduled will be safely renamed
so that are preserved. Hudi provides the following CLI to support it
##### UnScheduling Compaction
```
hoodie:trips->compaction unscheduleFileId --fileId <FileUUID>
....
No File renames needed to unschedule file from pending compaction. Operation successful.
```
In other cases, an entire compaction plan needs to be reverted. This is supported by the following CLI
```
hoodie:trips->compaction unschedule --compactionInstant <compactionInstant>
.....
No File renames needed to unschedule pending compaction. Operation successful.
```
##### Repair Compaction
The above compaction unscheduling operations could sometimes fail partially (e:g -> HDFS temporarily unavailable). With
partial failures, the compaction operation could become inconsistent with the state of file-slices. When you run
`compaction validate`, you can notice invalid compaction operations if there is one. In these cases, the repair
command comes to the rescue, it will rearrange the file-slices so that there is no loss and the file-slices are
consistent with the compaction plan
```
hoodie:stock_ticks_mor->compaction repair --instant 20181005222611
......
Compaction successfully repaired
.....
```
## Metrics
Once the Hoodie Client is configured with the right datasetname and environment for metrics, it produces the following graphite metrics, that aid in debugging hoodie datasets

View File

@@ -16,6 +16,10 @@
package com.uber.hoodie.cli.commands;
import static com.uber.hoodie.common.table.HoodieTimeline.COMPACTION_ACTION;
import com.uber.hoodie.CompactionAdminClient.RenameOpResult;
import com.uber.hoodie.CompactionAdminClient.ValidationOpResult;
import com.uber.hoodie.avro.model.HoodieCompactionOperation;
import com.uber.hoodie.avro.model.HoodieCompactionPlan;
import com.uber.hoodie.cli.HoodieCLI;
@@ -32,14 +36,20 @@ import com.uber.hoodie.common.table.timeline.HoodieInstant.State;
import com.uber.hoodie.common.util.AvroUtils;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.launcher.SparkLauncher;
@@ -53,7 +63,9 @@ import org.springframework.stereotype.Component;
@Component
public class CompactionCommand implements CommandMarker {
private static Logger log = LogManager.getLogger(HDFSParquetImportCommand.class);
private static Logger log = LogManager.getLogger(CompactionCommand.class);
private static final String TMP_DIR = "/tmp/";
@CliAvailabilityIndicator({"compactions show all", "compaction show", "compaction run", "compaction schedule"})
public boolean isAvailable() {
@@ -84,7 +96,7 @@ public class CompactionCommand implements CommandMarker {
for (int i = 0; i < instants.size(); i++) {
HoodieInstant instant = instants.get(i);
HoodieCompactionPlan workload = null;
if (!instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) {
if (!instant.getAction().equals(COMPACTION_ACTION)) {
try {
// This could be a completed compaction. Assume a compaction request file is present but skip if fails
workload = AvroUtils.deserializeCompactionPlan(
@@ -203,7 +215,8 @@ public class CompactionCommand implements CommandMarker {
final String schemaFilePath,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", help = "Spark executor memory")
final String sparkMemory,
@CliOption(key = "retry", unspecifiedDefaultValue = "1", help = "Number of retries") final String retry,
@CliOption(key = "retry", unspecifiedDefaultValue = "1", help = "Number of retries")
final String retry,
@CliOption(key = "compactionInstant", mandatory = true, help = "Base path for the target hoodie dataset")
final String compactionInstantTime) throws Exception {
boolean initialized = HoodieCLI.initConf();
@@ -227,4 +240,272 @@ public class CompactionCommand implements CommandMarker {
throw new Exception("Compactions can only be run for table type : MERGE_ON_READ");
}
}
private static String getTmpSerializerFile() {
return TMP_DIR + UUID.randomUUID().toString() + ".ser";
}
private <T> T deSerializeOperationResult(String inputP, FileSystem fs) throws Exception {
Path inputPath = new Path(inputP);
FSDataInputStream fsDataInputStream = fs.open(inputPath);
ObjectInputStream in = new ObjectInputStream(fsDataInputStream);
try {
T result = (T) in.readObject();
log.info("Result : " + result);
return result;
} finally {
in.close();
fsDataInputStream.close();
}
}
@CliCommand(value = "compaction validate", help = "Validate Compaction")
public String validateCompaction(
@CliOption(key = "instant", mandatory = true, help = "Compaction Instant") String compactionInstant,
@CliOption(key = {"parallelism"}, unspecifiedDefaultValue = "3", help = "Parallelism") String parallelism,
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master ") String master,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = "executor memory") String sparkMemory,
@CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") boolean descending,
@CliOption(key = {
"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") boolean headerOnly)
throws Exception {
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);
String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr);
String output = null;
if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) {
try {
String sparkPropertiesPath = Utils.getDefaultPropertiesFile(
scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_VALIDATE.toString(),
HoodieCLI.tableMetadata.getBasePath(), compactionInstant, outputPathStr, parallelism, master,
sparkMemory);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to validate compaction for " + compactionInstant;
}
List<ValidationOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
boolean valid = res.stream().map(r -> r.isSuccess()).reduce(Boolean::logicalAnd).orElse(true);
String message = "\n\n\t COMPACTION PLAN " + (valid ? "VALID" : "INVALID") + "\n\n";
List<Comparable[]> rows = new ArrayList<>();
res.stream().forEach(r -> {
Comparable[] row = new Comparable[]{r.getOperation().getFileId(),
r.getOperation().getBaseInstantTime(),
r.getOperation().getDataFilePath().isPresent() ? r.getOperation().getDataFilePath().get() : "",
r.getOperation().getDeltaFilePaths().size(), r.isSuccess(),
r.getException().isPresent() ? r.getException().get().getMessage() : ""};
rows.add(row);
});
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
TableHeader header = new TableHeader()
.addTableHeaderField("File Id")
.addTableHeaderField("Base Instant Time")
.addTableHeaderField("Base Data File")
.addTableHeaderField("Num Delta Files")
.addTableHeaderField("Valid")
.addTableHeaderField("Error");
output = message + HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit,
headerOnly, rows);
} finally {
// Delete tmp file used to serialize result
if (HoodieCLI.fs.exists(outputPath)) {
HoodieCLI.fs.delete(outputPath, false);
}
}
return output;
} else {
throw new Exception("Compactions can only be run for table type : MERGE_ON_READ");
}
}
@CliCommand(value = "compaction unschedule", help = "Unschedule Compaction")
public String unscheduleCompaction(
@CliOption(key = "instant", mandatory = true, help = "Compaction Instant") String compactionInstant,
@CliOption(key = {"parallelism"}, unspecifiedDefaultValue = "3", help = "Parallelism") String parallelism,
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master ") String master,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = "executor memory") String sparkMemory,
@CliOption(key = {"skipValidation"}, help = "skip validation", unspecifiedDefaultValue = "false") boolean skipV,
@CliOption(key = {"dryRun"}, help = "Dry Run Mode", unspecifiedDefaultValue = "false") boolean dryRun,
@CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") boolean descending,
@CliOption(key = {
"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") boolean headerOnly)
throws Exception {
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);
String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr);
String output = "";
if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) {
try {
String sparkPropertiesPath = Utils.getDefaultPropertiesFile(
scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_PLAN.toString(),
HoodieCLI.tableMetadata.getBasePath(), compactionInstant, outputPathStr, parallelism, master,
sparkMemory, Boolean.valueOf(skipV).toString(), Boolean.valueOf(dryRun).toString());
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to unschedule compaction for " + compactionInstant;
}
List<RenameOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
output = getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly,
"unschedule pending compaction");
} finally {
// Delete tmp file used to serialize result
if (HoodieCLI.fs.exists(outputPath)) {
HoodieCLI.fs.delete(outputPath, false);
}
}
return output;
} else {
throw new Exception("Compactions can only be run for table type : MERGE_ON_READ");
}
}
@CliCommand(value = "compaction unscheduleFileId", help = "UnSchedule Compaction for a fileId")
public String unscheduleCompactFile(
@CliOption(key = "fileId", mandatory = true, help = "File Id") final String fileId,
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master ") String master,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = "executor memory") String sparkMemory,
@CliOption(key = {"skipValidation"}, help = "skip validation", unspecifiedDefaultValue = "false") boolean skipV,
@CliOption(key = {"dryRun"}, help = "Dry Run Mode", unspecifiedDefaultValue = "false") boolean dryRun,
@CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") boolean descending,
@CliOption(key = {"headeronly"}, help = "Header Only", unspecifiedDefaultValue = "false") boolean headerOnly)
throws Exception {
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);
String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr);
String output = "";
if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) {
try {
String sparkPropertiesPath = Utils.getDefaultPropertiesFile(
scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_FILE.toString(),
HoodieCLI.tableMetadata.getBasePath(), fileId, outputPathStr, "1", master,
sparkMemory, Boolean.valueOf(skipV).toString(), Boolean.valueOf(dryRun).toString());
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to unschedule compaction for file " + fileId;
}
List<RenameOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
output = getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly,
"unschedule file from pending compaction");
} finally {
// Delete tmp file used to serialize result
if (HoodieCLI.fs.exists(outputPath)) {
HoodieCLI.fs.delete(outputPath, false);
}
}
return output;
} else {
throw new Exception("Compactions can only be run for table type : MERGE_ON_READ");
}
}
@CliCommand(value = "compaction repair", help = "Renames the files to make them consistent with the timeline as "
+ "dictated by Hoodie metadata. Use when compaction unschedule fails partially.")
public String repairCompaction(
@CliOption(key = "instant", mandatory = true, help = "Compaction Instant") String compactionInstant,
@CliOption(key = {"parallelism"}, unspecifiedDefaultValue = "3", help = "Parallelism") String parallelism,
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master ") String master,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = "executor memory") String sparkMemory,
@CliOption(key = {"dryRun"}, help = "Dry Run Mode", unspecifiedDefaultValue = "false") boolean dryRun,
@CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") boolean descending,
@CliOption(key = {
"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") boolean headerOnly)
throws Exception {
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);
String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr);
String output = "";
if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) {
try {
String sparkPropertiesPath = Utils.getDefaultPropertiesFile(
scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_REPAIR.toString(),
HoodieCLI.tableMetadata.getBasePath(), compactionInstant, outputPathStr, parallelism, master,
sparkMemory, Boolean.valueOf(dryRun).toString());
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to unschedule compaction for " + compactionInstant;
}
List<RenameOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
output = getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, "repair compaction");
} finally {
// Delete tmp file used to serialize result
if (HoodieCLI.fs.exists(outputPath)) {
HoodieCLI.fs.delete(outputPath, false);
}
}
return output;
} else {
throw new Exception("Compactions can only be run for table type : MERGE_ON_READ");
}
}
private String getRenamesToBePrinted(List<RenameOpResult> res, Integer limit,
String sortByField, boolean descending, boolean headerOnly, String operation) {
Optional<Boolean> result = res.stream().map(r -> r.isExecuted() && r.isSuccess()).reduce(Boolean::logicalAnd);
if (result.isPresent()) {
System.out.println("There were some file renames that needed to be done to " + operation);
if (result.get()) {
System.out.println("All renames successfully completed to " + operation + " done !!");
} else {
System.out.println("Some renames failed. DataSet could be in inconsistent-state. "
+ "Try running compaction repair");
}
List<Comparable[]> rows = new ArrayList<>();
res.stream().forEach(r -> {
Comparable[] row = new Comparable[] {
r.getOperation().fileId, r.getOperation().srcPath, r.getOperation().destPath,
r.isExecuted(), r.isSuccess(), r.getException().isPresent() ? r.getException().get().getMessage() : ""
};
rows.add(row);
});
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
TableHeader header = new TableHeader()
.addTableHeaderField("File Id")
.addTableHeaderField("Source File Path")
.addTableHeaderField("Destination File Path")
.addTableHeaderField("Rename Executed?")
.addTableHeaderField("Rename Succeeded?")
.addTableHeaderField("Error");
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending,
limit, headerOnly, rows);
} else {
return "No File renames needed to " + operation + ". Operation successful.";
}
}
}

View File

@@ -25,6 +25,8 @@ import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.io.compact.strategy.UnBoundedCompactionStrategy;
import com.uber.hoodie.utilities.HDFSParquetImporter;
import com.uber.hoodie.utilities.HoodieCompactionAdminTool;
import com.uber.hoodie.utilities.HoodieCompactionAdminTool.Operation;
import com.uber.hoodie.utilities.HoodieCompactor;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
@@ -39,6 +41,7 @@ public class SparkMain {
*/
enum SparkCommand {
ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN,
COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR
}
public static void main(String[] args) throws Exception {
@@ -78,10 +81,32 @@ public class SparkMain {
returnCode = compact(jsc, args[1], args[2], args[3], 1,
"", args[4], 0, true);
break;
case COMPACT_VALIDATE:
assert (args.length == 7);
doCompactValidate(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6]);
returnCode = 0;
break;
case COMPACT_REPAIR:
assert (args.length == 8);
doCompactRepair(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6],
Boolean.valueOf(args[7]));
returnCode = 0;
break;
case COMPACT_UNSCHEDULE_FILE:
assert (args.length == 9);
doCompactUnscheduleFile(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6],
Boolean.valueOf(args[7]), Boolean.valueOf(args[8]));
returnCode = 0;
break;
case COMPACT_UNSCHEDULE_PLAN:
assert (args.length == 9);
doCompactUnschedule(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6],
Boolean.valueOf(args[7]), Boolean.valueOf(args[8]));
returnCode = 0;
break;
default:
break;
}
System.exit(returnCode);
}
@@ -103,6 +128,73 @@ public class SparkMain {
return new HDFSParquetImporter(cfg).dataImport(jsc, retry);
}
private static void doCompactValidate(JavaSparkContext jsc, String basePath, String compactionInstant,
String outputPath, int parallelism, String sparkMaster, String sparkMemory) throws Exception {
HoodieCompactionAdminTool.Config cfg = new HoodieCompactionAdminTool.Config();
cfg.basePath = basePath;
cfg.operation = Operation.VALIDATE;
cfg.outputPath = outputPath;
cfg.compactionInstantTime = compactionInstant;
cfg.parallelism = parallelism;
if ((null != sparkMaster) && (!sparkMaster.isEmpty())) {
jsc.getConf().setMaster(sparkMaster);
}
jsc.getConf().set("spark.executor.memory", sparkMemory);
new HoodieCompactionAdminTool(cfg).run(jsc);
}
private static void doCompactRepair(JavaSparkContext jsc, String basePath, String compactionInstant,
String outputPath, int parallelism, String sparkMaster, String sparkMemory, boolean dryRun) throws Exception {
HoodieCompactionAdminTool.Config cfg = new HoodieCompactionAdminTool.Config();
cfg.basePath = basePath;
cfg.operation = Operation.REPAIR;
cfg.outputPath = outputPath;
cfg.compactionInstantTime = compactionInstant;
cfg.parallelism = parallelism;
cfg.dryRun = dryRun;
if ((null != sparkMaster) && (!sparkMaster.isEmpty())) {
jsc.getConf().setMaster(sparkMaster);
}
jsc.getConf().set("spark.executor.memory", sparkMemory);
new HoodieCompactionAdminTool(cfg).run(jsc);
}
private static void doCompactUnschedule(JavaSparkContext jsc, String basePath, String compactionInstant,
String outputPath, int parallelism, String sparkMaster, String sparkMemory, boolean skipValidation,
boolean dryRun) throws Exception {
HoodieCompactionAdminTool.Config cfg = new HoodieCompactionAdminTool.Config();
cfg.basePath = basePath;
cfg.operation = Operation.UNSCHEDULE_PLAN;
cfg.outputPath = outputPath;
cfg.compactionInstantTime = compactionInstant;
cfg.parallelism = parallelism;
cfg.dryRun = dryRun;
cfg.skipValidation = skipValidation;
if ((null != sparkMaster) && (!sparkMaster.isEmpty())) {
jsc.getConf().setMaster(sparkMaster);
}
jsc.getConf().set("spark.executor.memory", sparkMemory);
new HoodieCompactionAdminTool(cfg).run(jsc);
}
private static void doCompactUnscheduleFile(JavaSparkContext jsc, String basePath, String fileId,
String outputPath, int parallelism, String sparkMaster, String sparkMemory, boolean skipValidation,
boolean dryRun) throws Exception {
HoodieCompactionAdminTool.Config cfg = new HoodieCompactionAdminTool.Config();
cfg.basePath = basePath;
cfg.operation = Operation.UNSCHEDULE_FILE;
cfg.outputPath = outputPath;
cfg.fileId = fileId;
cfg.parallelism = parallelism;
cfg.dryRun = dryRun;
cfg.skipValidation = skipValidation;
if ((null != sparkMaster) && (!sparkMaster.isEmpty())) {
jsc.getConf().setMaster(sparkMaster);
}
jsc.getConf().set("spark.executor.memory", sparkMemory);
new HoodieCompactionAdminTool(cfg).run(jsc);
}
private static int compact(JavaSparkContext jsc, String basePath, String tableName, String compactionInstant,
int parallelism, String schemaFile, String sparkMemory, int retry, boolean schedule) throws Exception {
HoodieCompactor.Config cfg = new HoodieCompactor.Config();

View File

@@ -0,0 +1,546 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie;
import static com.uber.hoodie.common.table.HoodieTimeline.COMPACTION_ACTION;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.uber.hoodie.avro.model.HoodieCompactionOperation;
import com.uber.hoodie.avro.model.HoodieCompactionPlan;
import com.uber.hoodie.common.model.CompactionOperation;
import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.timeline.HoodieInstant.State;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.common.util.AvroUtils;
import com.uber.hoodie.common.util.CompactionUtils;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.func.OperationResult;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
/**
* Client to perform admin operations related to compaction
*/
public class CompactionAdminClient implements Serializable {
private static Logger log = LogManager.getLogger(CompactionAdminClient.class);
private final transient JavaSparkContext jsc;
private final String basePath;
public CompactionAdminClient(JavaSparkContext jsc, String basePath) {
this.jsc = jsc;
this.basePath = basePath;
}
/**
* Validate all compaction operations in a compaction plan. Verifies the file-slices are consistent with corresponding
* compaction operations.
*
* @param metaClient Hoodie Table Meta Client
* @param compactionInstant Compaction Instant
*/
public List<ValidationOpResult> validateCompactionPlan(HoodieTableMetaClient metaClient,
String compactionInstant, int parallelism) throws IOException {
HoodieCompactionPlan plan = getCompactionPlan(metaClient, compactionInstant);
HoodieTableFileSystemView fsView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
if (plan.getOperations() != null) {
List<CompactionOperation> ops = plan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
return jsc.parallelize(ops, parallelism).map(op -> {
try {
return validateCompactionOperation(metaClient, compactionInstant, op, Optional.of(fsView));
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}).collect();
}
return new ArrayList<>();
}
/**
* Un-schedules compaction plan. Remove All compaction operation scheduled and re-arrange delta-files that were
* created after the compaction was scheduled.
*
* This operation MUST be executed with compactions and writer turned OFF.
*
* @param compactionInstant Compaction Instant
* @param skipValidation Skip validation step
* @param parallelism Parallelism
* @param dryRun Dry Run
*/
public List<RenameOpResult> unscheduleCompactionPlan(
String compactionInstant, boolean skipValidation, int parallelism, boolean dryRun) throws Exception {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
List<Pair<HoodieLogFile, HoodieLogFile>> renameActions =
getRenamingActionsForUnschedulingCompactionPlan(metaClient, compactionInstant, parallelism,
Optional.absent(), skipValidation);
List<RenameOpResult> res =
runRenamingOps(metaClient, renameActions, parallelism, dryRun);
java.util.Optional<Boolean> success =
res.stream().map(r -> (r.isExecuted() && r.isSuccess())).reduce(Boolean::logicalAnd);
Optional<Boolean> allSuccess = success.isPresent() ? Optional.of(success.get()) : Optional.absent();
// Only if all operations are successfully executed
if (!dryRun && allSuccess.isPresent() && allSuccess.get()) {
// Overwrite compaction request with empty compaction operations
HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(metaClient, compactionInstant);
HoodieCompactionPlan newPlan =
HoodieCompactionPlan.newBuilder().setOperations(new ArrayList<>()).setExtraMetadata(plan.getExtraMetadata())
.build();
HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, compactionInstant);
Path inflightPath = new Path(metaClient.getMetaPath(), inflight.getFileName());
if (metaClient.getFs().exists(inflightPath)) {
// We need to rollback data-files because of this inflight compaction before unscheduling
throw new IllegalStateException("Please rollback the inflight compaction before unscheduling");
}
metaClient.getActiveTimeline().saveToCompactionRequested(
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, compactionInstant),
AvroUtils.serializeCompactionPlan(newPlan));
}
return res;
}
/**
* Remove a fileId from pending compaction. Removes the associated compaction operation and rename delta-files
* that were generated for that file-id after the compaction operation was scheduled.
*
* This operation MUST be executed with compactions and writer turned OFF.
*
* @param fileId FileId to be unscheduled
* @param skipValidation Skip validation
* @param dryRun Dry Run Mode
*/
public List<RenameOpResult> unscheduleCompactionFileId(String fileId,
boolean skipValidation, boolean dryRun) throws Exception {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
List<Pair<HoodieLogFile, HoodieLogFile>> renameActions =
getRenamingActionsForUnschedulingCompactionForFileId(metaClient, fileId, Optional.absent(), skipValidation);
List<RenameOpResult> res = runRenamingOps(metaClient, renameActions, 1, dryRun);
if (!dryRun && !res.isEmpty() && res.get(0).isExecuted() && res.get(0).isSuccess()) {
// Ready to remove this file-Id from compaction request
Pair<String, HoodieCompactionOperation> compactionOperationWithInstant =
CompactionUtils.getAllPendingCompactionOperations(metaClient).get(fileId);
HoodieCompactionPlan plan = CompactionUtils
.getCompactionPlan(metaClient, compactionOperationWithInstant.getKey());
List<HoodieCompactionOperation> newOps = plan.getOperations().stream()
.filter(op -> !op.getFileId().equals(fileId)).collect(Collectors.toList());
HoodieCompactionPlan newPlan =
HoodieCompactionPlan.newBuilder().setOperations(newOps).setExtraMetadata(plan.getExtraMetadata()).build();
HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION,
compactionOperationWithInstant.getLeft());
Path inflightPath = new Path(metaClient.getMetaPath(), inflight.getFileName());
if (metaClient.getFs().exists(inflightPath)) {
// revert if in inflight state
metaClient.getActiveTimeline().revertCompactionInflightToRequested(inflight);
}
metaClient.getActiveTimeline().saveToCompactionRequested(
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, compactionOperationWithInstant.getLeft()),
AvroUtils.serializeCompactionPlan(newPlan));
}
return res;
}
/**
* Renames delta files to make file-slices consistent with the timeline as dictated by Hoodie metadata.
* Use when compaction unschedule fails partially.
*
* This operation MUST be executed with compactions and writer turned OFF.
* @param compactionInstant Compaction Instant to be repaired
* @param dryRun Dry Run Mode
*/
public List<RenameOpResult> repairCompaction(String compactionInstant,
int parallelism, boolean dryRun) throws Exception {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
List<ValidationOpResult> validationResults =
validateCompactionPlan(metaClient, compactionInstant, parallelism);
List<ValidationOpResult> failed = validationResults.stream()
.filter(v -> !v.isSuccess()).collect(Collectors.toList());
if (failed.isEmpty()) {
return new ArrayList<>();
}
final HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
metaClient.getCommitsAndCompactionTimeline());
List<Pair<HoodieLogFile, HoodieLogFile>> renameActions = failed.stream().flatMap(v ->
getRenamingActionsToAlignWithCompactionOperation(metaClient, compactionInstant,
v.getOperation(), Optional.of(fsView)).stream()).collect(Collectors.toList());
return runRenamingOps(metaClient, renameActions, parallelism, dryRun);
}
/**
* Construction Compaction Plan from compaction instant
*/
private static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient,
String compactionInstant) throws IOException {
HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan(
metaClient.getActiveTimeline().getInstantAuxiliaryDetails(
HoodieTimeline.getCompactionRequestedInstant(compactionInstant)).get());
return compactionPlan;
}
/**
* Get Renaming actions to ensure the log-files of merged file-slices is aligned with compaction operation. This
* method is used to recover from failures during unschedule compaction operations.
*
* @param metaClient Hoodie Table Meta Client
* @param compactionInstant Compaction Instant
* @param op Compaction Operation
* @param fsViewOpt File System View
*/
protected static List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsToAlignWithCompactionOperation(
HoodieTableMetaClient metaClient, String compactionInstant, CompactionOperation op,
Optional<HoodieTableFileSystemView> fsViewOpt) {
HoodieTableFileSystemView fileSystemView = fsViewOpt.isPresent() ? fsViewOpt.get() :
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
HoodieInstant lastInstant = metaClient.getCommitsAndCompactionTimeline().lastInstant().get();
FileSlice merged =
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(op.getPartitionPath(), lastInstant.getTimestamp())
.filter(fs -> fs.getFileId().equals(op.getFileId())).findFirst().get();
final int maxVersion =
op.getDeltaFilePaths().stream().map(lf -> FSUtils.getFileVersionFromLog(new Path(lf)))
.reduce((x, y) -> x > y ? x : y).map(x -> x).orElse(0);
List<HoodieLogFile> logFilesToBeMoved =
merged.getLogFiles().filter(lf -> lf.getLogVersion() > maxVersion).collect(Collectors.toList());
return logFilesToBeMoved.stream().map(lf -> {
Preconditions.checkArgument(lf.getLogVersion() - maxVersion > 0,
"Expect new log version to be sane");
HoodieLogFile newLogFile = new HoodieLogFile(new Path(lf.getPath().getParent(),
FSUtils.makeLogFileName(lf.getFileId(), "." + FSUtils.getFileExtensionFromLog(lf.getPath()),
compactionInstant, lf.getLogVersion() - maxVersion)));
return Pair.of(lf, newLogFile);
}).collect(Collectors.toList());
}
/**
* Rename log files. This is done for un-scheduling a pending compaction operation NOTE: Can only be used safely when
* no writer (ingestion/compaction) is running.
*
* @param metaClient Hoodie Table Meta-Client
* @param oldLogFile Old Log File
* @param newLogFile New Log File
*/
protected static void renameLogFile(HoodieTableMetaClient metaClient, HoodieLogFile oldLogFile,
HoodieLogFile newLogFile) throws IOException {
FileStatus[] statuses = metaClient.getFs().listStatus(oldLogFile.getPath());
Preconditions.checkArgument(statuses.length == 1, "Only one status must be present");
Preconditions.checkArgument(statuses[0].isFile(), "Source File must exist");
Preconditions.checkArgument(oldLogFile.getPath().getParent().equals(newLogFile.getPath().getParent()),
"Log file must only be moved within the parent directory");
metaClient.getFs().rename(oldLogFile.getPath(), newLogFile.getPath());
}
/**
* Check if a compaction operation is valid
*
* @param metaClient Hoodie Table Meta client
* @param compactionInstant Compaction Instant
* @param operation Compaction Operation
* @param fsViewOpt File System View
*/
private ValidationOpResult validateCompactionOperation(HoodieTableMetaClient metaClient,
String compactionInstant, CompactionOperation operation, Optional<HoodieTableFileSystemView> fsViewOpt)
throws IOException {
HoodieTableFileSystemView fileSystemView = fsViewOpt.isPresent() ? fsViewOpt.get() :
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
java.util.Optional<HoodieInstant> lastInstant = metaClient.getCommitsAndCompactionTimeline().lastInstant();
try {
if (lastInstant.isPresent()) {
java.util.Optional<FileSlice> fileSliceOptional =
fileSystemView.getLatestUnCompactedFileSlices(operation.getPartitionPath())
.filter(fs -> fs.getFileId().equals(operation.getFileId())).findFirst();
if (fileSliceOptional.isPresent()) {
FileSlice fs = fileSliceOptional.get();
java.util.Optional<HoodieDataFile> df = fs.getDataFile();
if (operation.getDataFilePath().isPresent()) {
String expPath = metaClient.getFs().getFileStatus(new Path(operation.getDataFilePath().get())).getPath()
.toString();
Preconditions.checkArgument(df.isPresent(), "Data File must be present. File Slice was : "
+ fs + ", operation :" + operation);
Preconditions.checkArgument(df.get().getPath().equals(expPath),
"Base Path in operation is specified as " + expPath + " but got path " + df.get().getPath());
}
Set<HoodieLogFile> logFilesInFileSlice = fs.getLogFiles().collect(Collectors.toSet());
Set<HoodieLogFile> logFilesInCompactionOp = operation.getDeltaFilePaths().stream()
.map(dp -> {
try {
FileStatus[] fileStatuses = metaClient.getFs().listStatus(new Path(dp));
Preconditions.checkArgument(fileStatuses.length == 1, "Expect only 1 file-status");
return new HoodieLogFile(fileStatuses[0]);
} catch (FileNotFoundException fe) {
throw new CompactionValidationException(fe.getMessage());
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
}).collect(Collectors.toSet());
Set<HoodieLogFile> missing =
logFilesInCompactionOp.stream().filter(lf -> !logFilesInFileSlice.contains(lf))
.collect(Collectors.toSet());
Preconditions.checkArgument(missing.isEmpty(),
"All log files specified in compaction operation is not present. Missing :" + missing
+ ", Exp :" + logFilesInCompactionOp + ", Got :" + logFilesInFileSlice);
Set<HoodieLogFile> diff =
logFilesInFileSlice.stream().filter(lf -> !logFilesInCompactionOp.contains(lf))
.collect(Collectors.toSet());
Preconditions.checkArgument(diff.stream()
.filter(lf -> !lf.getBaseCommitTime().equals(compactionInstant)).count() == 0,
"There are some log-files which are neither specified in compaction plan "
+ "nor present after compaction request instant. Some of these :" + diff);
} else {
throw new CompactionValidationException("Unable to find file-slice for file-id (" + operation.getFileId()
+ " Compaction operation is invalid.");
}
} else {
throw new CompactionValidationException("Unable to find any committed instant. Compaction Operation may "
+ "be pointing to stale file-slices");
}
} catch (CompactionValidationException | IllegalArgumentException e) {
return new ValidationOpResult(operation, false, Optional.of(e));
}
return new ValidationOpResult(operation, true, Optional.absent());
}
/**
* Execute Renaming operation
*
* @param metaClient HoodieTable MetaClient
* @param renameActions List of rename operations
*/
private List<RenameOpResult> runRenamingOps(HoodieTableMetaClient metaClient,
List<Pair<HoodieLogFile, HoodieLogFile>> renameActions, int parallelism, boolean dryRun) {
if (renameActions.isEmpty()) {
log.info("No renaming of log-files needed. Proceeding to removing file-id from compaction-plan");
return new ArrayList<>();
} else {
log.info("The following compaction renaming operations needs to be performed to un-schedule");
if (!dryRun) {
return jsc.parallelize(renameActions, parallelism).map(lfPair -> {
try {
log.info("RENAME " + lfPair.getLeft().getPath() + " => " + lfPair.getRight().getPath());
renameLogFile(metaClient, lfPair.getLeft(), lfPair.getRight());
return new RenameOpResult(lfPair, true, Optional.absent());
} catch (IOException e) {
log.error("Error renaming log file", e);
log.error("\n\n\n***NOTE Compaction is in inconsistent state. Try running \"compaction repair "
+ lfPair.getLeft().getBaseCommitTime() + "\" to recover from failure ***\n\n\n");
return new RenameOpResult(lfPair, false, Optional.of(e));
}
}).collect();
} else {
log.info("Dry-Run Mode activated for rename operations");
return renameActions.parallelStream()
.map(lfPair -> new RenameOpResult(lfPair, false, false, Optional.absent()))
.collect(Collectors.toList());
}
}
}
/**
* Generate renaming actions for unscheduling a pending compaction plan. NOTE: Can only be used safely when no writer
* (ingestion/compaction) is running.
*
* @param metaClient Hoodie Table MetaClient
* @param compactionInstant Compaction Instant to be unscheduled
* @param fsViewOpt Cached File System View
* @param skipValidation Skip Validation
* @return list of pairs of log-files (old, new) and for each pair, rename must be done to successfully unschedule
* compaction.
*/
protected List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsForUnschedulingCompactionPlan(
HoodieTableMetaClient metaClient, String compactionInstant, int parallelism,
Optional<HoodieTableFileSystemView> fsViewOpt, boolean skipValidation) throws IOException {
HoodieTableFileSystemView fsView = fsViewOpt.isPresent() ? fsViewOpt.get() :
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
HoodieCompactionPlan plan = getCompactionPlan(metaClient, compactionInstant);
if (plan.getOperations() != null) {
log.info("Number of Compaction Operations :" + plan.getOperations().size()
+ " for instant :" + compactionInstant);
List<CompactionOperation> ops = plan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
return jsc.parallelize(ops, parallelism).flatMap(op -> {
try {
return getRenamingActionsForUnschedulingCompactionOperation(metaClient, compactionInstant,
op, Optional.of(fsView), skipValidation).iterator();
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
} catch (CompactionValidationException ve) {
throw new HoodieException(ve);
}
}).collect();
}
log.warn("No operations for compaction instant : " + compactionInstant);
return new ArrayList<>();
}
/**
* Generate renaming actions for unscheduling a compaction operation NOTE: Can only be used safely when no writer
* (ingestion/compaction) is running.
*
* @param metaClient Hoodie Table MetaClient
* @param compactionInstant Compaction Instant
* @param operation Compaction Operation
* @param fsViewOpt Cached File System View
* @param skipValidation Skip Validation
* @return list of pairs of log-files (old, new) and for each pair, rename must be done to successfully unschedule
* compaction.
*/
public List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsForUnschedulingCompactionOperation(
HoodieTableMetaClient metaClient, String compactionInstant, CompactionOperation operation,
Optional<HoodieTableFileSystemView> fsViewOpt, boolean skipValidation) throws IOException {
List<Pair<HoodieLogFile, HoodieLogFile>> result = new ArrayList<>();
HoodieTableFileSystemView fileSystemView = fsViewOpt.isPresent() ? fsViewOpt.get() :
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
if (!skipValidation) {
validateCompactionOperation(metaClient, compactionInstant, operation, Optional.of(fileSystemView));
}
HoodieInstant lastInstant = metaClient.getCommitsAndCompactionTimeline().lastInstant().get();
FileSlice merged =
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(operation.getPartitionPath(), lastInstant.getTimestamp())
.filter(fs -> fs.getFileId().equals(operation.getFileId())).findFirst().get();
List<HoodieLogFile> logFilesToRepair =
merged.getLogFiles().filter(lf -> lf.getBaseCommitTime().equals(compactionInstant))
.collect(Collectors.toList());
logFilesToRepair.sort(HoodieLogFile.getBaseInstantAndLogVersionComparator().reversed());
FileSlice fileSliceForCompaction =
fileSystemView.getLatestFileSlicesBeforeOrOn(operation.getPartitionPath(), operation.getBaseInstantTime())
.filter(fs -> fs.getFileId().equals(operation.getFileId())).findFirst().get();
int maxUsedVersion =
fileSliceForCompaction.getLogFiles().findFirst().map(lf -> lf.getLogVersion())
.orElse(HoodieLogFile.LOGFILE_BASE_VERSION - 1);
String logExtn = fileSliceForCompaction.getLogFiles().findFirst().map(lf -> "." + lf.getFileExtension())
.orElse(HoodieLogFile.DELTA_EXTENSION);
String parentPath = fileSliceForCompaction.getDataFile().map(df -> new Path(df.getPath()).getParent().toString())
.orElse(fileSliceForCompaction.getLogFiles().findFirst().map(lf -> lf.getPath().getParent().toString()).get());
for (HoodieLogFile toRepair : logFilesToRepair) {
int version = maxUsedVersion + 1;
HoodieLogFile newLf = new HoodieLogFile(new Path(parentPath, FSUtils.makeLogFileName(operation.getFileId(),
logExtn, operation.getBaseInstantTime(), version)));
result.add(Pair.of(toRepair, newLf));
maxUsedVersion = version;
}
return result;
}
/**
* Generate renaming actions for unscheduling a fileId from pending compaction. NOTE: Can only be used safely when no
* writer (ingestion/compaction) is running.
*
* @param metaClient Hoodie Table MetaClient
* @param fileId FileId to remove compaction
* @param fsViewOpt Cached File System View
* @param skipValidation Skip Validation
* @return list of pairs of log-files (old, new) and for each pair, rename must be done to successfully unschedule
* compaction.
*/
public List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsForUnschedulingCompactionForFileId(
HoodieTableMetaClient metaClient, String fileId, Optional<HoodieTableFileSystemView> fsViewOpt,
boolean skipValidation) throws IOException {
Map<String, Pair<String, HoodieCompactionOperation>> allPendingCompactions =
CompactionUtils.getAllPendingCompactionOperations(metaClient);
if (allPendingCompactions.containsKey(fileId)) {
Pair<String, HoodieCompactionOperation> opWithInstant = allPendingCompactions.get(fileId);
return getRenamingActionsForUnschedulingCompactionOperation(metaClient, opWithInstant.getKey(),
CompactionOperation.convertFromAvroRecordInstance(opWithInstant.getValue()), fsViewOpt, skipValidation);
}
throw new HoodieException("FileId " + fileId + " not in pending compaction");
}
/**
* Holds Operation result for Renaming
*/
public static class RenameOpResult extends OperationResult<RenameInfo> {
public RenameOpResult() {
}
public RenameOpResult(Pair<HoodieLogFile, HoodieLogFile> op, boolean success,
Optional<Exception> exception) {
super(new RenameInfo(op.getKey().getFileId(), op.getKey().getPath().toString(),
op.getRight().getPath().toString()), success, exception);
}
public RenameOpResult(
Pair<HoodieLogFile, HoodieLogFile> op, boolean executed, boolean success,
Optional<Exception> exception) {
super(new RenameInfo(op.getKey().getFileId(), op.getKey().getPath().toString(),
op.getRight().getPath().toString()), executed, success, exception);
}
}
/**
* Holds Operation result for Renaming
*/
public static class ValidationOpResult extends OperationResult<CompactionOperation> {
public ValidationOpResult() {
}
public ValidationOpResult(
CompactionOperation operation, boolean success, Optional<Exception> exception) {
super(operation, success, exception);
}
}
public static class RenameInfo implements Serializable {
public String fileId;
public String srcPath;
public String destPath;
public RenameInfo() {
}
public RenameInfo(String fileId, String srcPath, String destPath) {
this.fileId = fileId;
this.srcPath = srcPath;
this.destPath = destPath;
}
}
public static class CompactionValidationException extends RuntimeException {
public CompactionValidationException(String msg) {
super(msg);
}
}
}

View File

@@ -0,0 +1,77 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.func;
import com.google.common.base.Optional;
import java.io.Serializable;
/**
* Holds Operation result. Used as a result container for Compaction Admin Client (running as part of Spark-launcher
* process) to communicate results back to Hoodie CLI process.
*/
public class OperationResult<T> implements Serializable {
private T operation;
private boolean executed;
private boolean success;
private Optional<Exception> exception;
public OperationResult() {
}
public OperationResult(T operation, boolean success, Optional<Exception> exception) {
this.operation = operation;
this.success = success;
this.exception = exception;
this.executed = true;
}
public OperationResult(T operation, boolean executed, boolean success, Optional<Exception> exception) {
this.operation = operation;
this.success = success;
this.exception = exception;
this.executed = executed;
}
public T getOperation() {
return operation;
}
public boolean isSuccess() {
return success;
}
public boolean isExecuted() {
return executed;
}
public Optional<Exception> getException() {
return exception;
}
@Override
public String toString() {
return "OperationResult{"
+ "operation=" + operation
+ ", executed=" + executed
+ ", success=" + success
+ ", exception=" + exception
+ '}';
}
}

View File

@@ -0,0 +1,363 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie;
import static com.uber.hoodie.common.model.HoodieTestUtils.getDefaultHadoopConf;
import com.google.common.base.Optional;
import com.uber.hoodie.CompactionAdminClient.ValidationOpResult;
import com.uber.hoodie.common.model.CompactionOperation;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.common.util.CompactionTestUtils;
import com.uber.hoodie.common.util.CompactionUtils;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestCompactionAdminClient extends TestHoodieClientBase {
private HoodieTableMetaClient metaClient;
private CompactionAdminClient client;
@Before
public void init() throws IOException {
super.init();
metaClient = HoodieTestUtils.initTableType(getDefaultHadoopConf(), basePath, HoodieTableType.MERGE_ON_READ);
client = new CompactionAdminClient(jsc, basePath);
}
@Override
public void tearDown() throws IOException {
super.tearDown();
}
@Test
public void testUnscheduleCompactionPlan() throws Exception {
int numEntriesPerInstant = 10;
CompactionTestUtils
.setupAndValidateCompactionOperations(metaClient, false, numEntriesPerInstant, numEntriesPerInstant,
numEntriesPerInstant, numEntriesPerInstant);
// THere are delta-commits after compaction instant
validateUnSchedulePlan(client,
"000", "001", numEntriesPerInstant, 2 * numEntriesPerInstant);
// THere are delta-commits after compaction instant
validateUnSchedulePlan(client,
"002", "003", numEntriesPerInstant, 2 * numEntriesPerInstant);
// THere are no delta-commits after compaction instant
validateUnSchedulePlan(client,
"004", "005", numEntriesPerInstant, 0);
// THere are no delta-commits after compaction instant
validateUnSchedulePlan(client,
"006", "007", numEntriesPerInstant, 0);
}
@Test
public void testUnscheduleCompactionFileId() throws Exception {
int numEntriesPerInstant = 10;
CompactionTestUtils
.setupAndValidateCompactionOperations(metaClient, false, numEntriesPerInstant, numEntriesPerInstant,
numEntriesPerInstant, numEntriesPerInstant);
Map<String, CompactionOperation> instantsWithOp =
Arrays.asList("001", "003", "005", "007").stream().map(instant -> {
try {
return Pair.of(instant, CompactionUtils.getCompactionPlan(metaClient, instant));
} catch (IOException ioe) {
throw new HoodieException(ioe);
}
}).map(instantWithPlan -> instantWithPlan.getRight().getOperations().stream().map(op -> Pair.of(
instantWithPlan.getLeft(), CompactionOperation.convertFromAvroRecordInstance(op))).findFirst().get())
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
// THere are delta-commits after compaction instant
validateUnScheduleFileId(client,
"000", "001", instantsWithOp.get("001"), 2);
// THere are delta-commits after compaction instant
validateUnScheduleFileId(client,
"002", "003", instantsWithOp.get("003"), 2);
// THere are no delta-commits after compaction instant
validateUnScheduleFileId(client,
"004", "005", instantsWithOp.get("005"), 0);
// THere are no delta-commits after compaction instant
validateUnScheduleFileId(client,
"006", "007", instantsWithOp.get("007"), 0);
}
@Test
public void testRepairCompactionPlan() throws Exception {
int numEntriesPerInstant = 10;
CompactionTestUtils
.setupAndValidateCompactionOperations(metaClient,false, numEntriesPerInstant, numEntriesPerInstant,
numEntriesPerInstant, numEntriesPerInstant);
// THere are delta-commits after compaction instant
validateRepair("000", "001", numEntriesPerInstant, 2 * numEntriesPerInstant);
// THere are delta-commits after compaction instant
validateRepair("002", "003", numEntriesPerInstant, 2 * numEntriesPerInstant);
// THere are no delta-commits after compaction instant
validateRepair("004", "005", numEntriesPerInstant, 0);
// THere are no delta-commits after compaction instant
validateRepair("006", "007", numEntriesPerInstant, 0);
}
private void validateRepair(String ingestionInstant, String compactionInstant, int numEntriesPerInstant,
int expNumRepairs) throws Exception {
List<Pair<HoodieLogFile, HoodieLogFile>> renameFiles =
validateUnSchedulePlan(client, ingestionInstant, compactionInstant, numEntriesPerInstant, expNumRepairs, true);
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
List<ValidationOpResult> result = client.validateCompactionPlan(metaClient, compactionInstant, 1);
if (expNumRepairs > 0) {
Assert.assertTrue("Expect some failures in validation", result.stream().filter(r -> !r.isSuccess()).count() > 0);
}
// Now repair
List<Pair<HoodieLogFile, HoodieLogFile>> undoFiles = result.stream().flatMap(r ->
client.getRenamingActionsToAlignWithCompactionOperation(metaClient,
compactionInstant, r.getOperation(), Optional.absent()).stream())
.map(rn -> {
try {
client.renameLogFile(metaClient, rn.getKey(), rn.getValue());
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
return rn;
}).collect(Collectors.toList());
Map<String, String> renameFilesFromUndo =
undoFiles.stream().collect(Collectors.toMap(p -> p.getRight().getPath().toString(),
x -> x.getLeft().getPath().toString()));
Map<String, String> expRenameFiles =
renameFiles.stream().collect(Collectors.toMap(p -> p.getLeft().getPath().toString(),
x -> x.getRight().getPath().toString()));
if (expNumRepairs > 0) {
Assert.assertFalse("Rename Files must be non-empty", renameFiles.isEmpty());
} else {
Assert.assertTrue("Rename Files must be empty", renameFiles.isEmpty());
}
expRenameFiles.entrySet().stream().forEach(r -> {
System.out.println("Key :" + r.getKey() + " renamed to " + r.getValue() + " rolled back to "
+ renameFilesFromUndo.get(r.getKey()));
});
Assert.assertEquals("Undo must completely rollback renames", expRenameFiles, renameFilesFromUndo);
// Now expect validation to succeed
result = client.validateCompactionPlan(metaClient, compactionInstant, 1);
Assert.assertTrue("Expect no failures in validation", result.stream().filter(r -> !r.isSuccess()).count() == 0);
Assert.assertEquals("Expected Num Repairs", expNumRepairs, undoFiles.size());
}
/**
* Enssure compaction plan is valid
* @param compactionInstant Compaction Instant
* @throws Exception
*/
private void ensureValidCompactionPlan(String compactionInstant) throws Exception {
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
// Ensure compaction-plan is good to begin with
List<ValidationOpResult> validationResults = client.validateCompactionPlan(metaClient,
compactionInstant, 1);
Assert.assertFalse("Some validations failed",
validationResults.stream().filter(v -> !v.isSuccess()).findAny().isPresent());
}
private void validateRenameFiles(List<Pair<HoodieLogFile, HoodieLogFile>> renameFiles,
String ingestionInstant, String compactionInstant, HoodieTableFileSystemView fsView) {
// Ensure new names of log-files are on expected lines
Set<HoodieLogFile> uniqNewLogFiles = new HashSet<>();
Set<HoodieLogFile> uniqOldLogFiles = new HashSet<>();
renameFiles.stream().forEach(lfPair -> {
Assert.assertFalse("Old Log File Names do not collide", uniqOldLogFiles.contains(lfPair.getKey()));
Assert.assertFalse("New Log File Names do not collide", uniqNewLogFiles.contains(lfPair.getValue()));
uniqOldLogFiles.add(lfPair.getKey());
uniqNewLogFiles.add(lfPair.getValue());
});
renameFiles.stream().forEach(lfPair -> {
HoodieLogFile oldLogFile = lfPair.getLeft();
HoodieLogFile newLogFile = lfPair.getValue();
Assert.assertEquals("Base Commit time is expected", ingestionInstant, newLogFile.getBaseCommitTime());
Assert.assertEquals("Base Commit time is expected", compactionInstant, oldLogFile.getBaseCommitTime());
Assert.assertEquals("File Id is expected", oldLogFile.getFileId(), newLogFile.getFileId());
HoodieLogFile lastLogFileBeforeCompaction =
fsView.getLatestMergedFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], ingestionInstant)
.filter(fs -> fs.getFileId().equals(oldLogFile.getFileId()))
.map(fs -> fs.getLogFiles().findFirst().get()).findFirst().get();
Assert.assertEquals("Log Version expected",
lastLogFileBeforeCompaction.getLogVersion() + oldLogFile.getLogVersion(),
newLogFile.getLogVersion());
Assert.assertTrue("Log version does not collide",
newLogFile.getLogVersion() > lastLogFileBeforeCompaction.getLogVersion());
});
}
/**
* Validate Unschedule operations
*/
private List<Pair<HoodieLogFile, HoodieLogFile>> validateUnSchedulePlan(CompactionAdminClient client,
String ingestionInstant, String compactionInstant, int numEntriesPerInstant, int expNumRenames)
throws Exception {
return validateUnSchedulePlan(client, ingestionInstant, compactionInstant, numEntriesPerInstant,
expNumRenames, false);
}
/**
* Validate Unschedule operations
*/
private List<Pair<HoodieLogFile, HoodieLogFile>> validateUnSchedulePlan(CompactionAdminClient client,
String ingestionInstant, String compactionInstant, int numEntriesPerInstant, int expNumRenames,
boolean skipUnSchedule) throws Exception {
ensureValidCompactionPlan(compactionInstant);
// Check suggested rename operations
List<Pair<HoodieLogFile, HoodieLogFile>> renameFiles =
client.getRenamingActionsForUnschedulingCompactionPlan(metaClient, compactionInstant, 1,
Optional.absent(), false);
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
// Log files belonging to file-slices created because of compaction request must be renamed
Set<HoodieLogFile> gotLogFilesToBeRenamed = renameFiles.stream().map(p -> p.getLeft()).collect(Collectors.toSet());
final HoodieTableFileSystemView fsView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
Set<HoodieLogFile> expLogFilesToBeRenamed = fsView.getLatestFileSlices(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0])
.filter(fs -> fs.getBaseInstantTime().equals(compactionInstant))
.flatMap(fs -> fs.getLogFiles())
.collect(Collectors.toSet());
Assert.assertEquals("Log files belonging to file-slices created because of compaction request must be renamed",
expLogFilesToBeRenamed, gotLogFilesToBeRenamed);
if (skipUnSchedule) {
// Do the renaming only but do not touch the compaction plan - Needed for repair tests
renameFiles.stream().forEach(lfPair -> {
try {
client.renameLogFile(metaClient, lfPair.getLeft(), lfPair.getRight());
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
});
} else {
validateRenameFiles(renameFiles, ingestionInstant, compactionInstant, fsView);
}
Map<String, Long> fileIdToCountsBeforeRenaming =
fsView.getLatestMergedFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant)
.filter(fs -> fs.getBaseInstantTime().equals(ingestionInstant))
.map(fs -> Pair.of(fs.getFileId(), fs.getLogFiles().count()))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
// Call the main unschedule API
client.unscheduleCompactionPlan(compactionInstant, false, 1, false);
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
final HoodieTableFileSystemView newFsView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
// Expect all file-slice whose base-commit is same as compaction commit to contain no new Log files
newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant)
.filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)).forEach(fs -> {
Assert.assertFalse("No Data file must be present", fs.getDataFile().isPresent());
Assert.assertTrue("No Log Files", fs.getLogFiles().count() == 0);
});
// Ensure same number of log-files before and after renaming per fileId
Map<String, Long> fileIdToCountsAfterRenaming =
newFsView.getAllFileGroups(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0]).flatMap(fg -> fg.getAllFileSlices())
.filter(fs -> fs.getBaseInstantTime().equals(ingestionInstant))
.map(fs -> Pair.of(fs.getFileId(), fs.getLogFiles().count()))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
Assert.assertEquals("Each File Id has same number of log-files",
fileIdToCountsBeforeRenaming, fileIdToCountsAfterRenaming);
Assert.assertEquals("Not Empty", numEntriesPerInstant, fileIdToCountsAfterRenaming.size());
Assert.assertEquals("Expected number of renames", expNumRenames, renameFiles.size());
return renameFiles;
}
/**
* Validate Unschedule operations
*/
private void validateUnScheduleFileId(CompactionAdminClient client, String ingestionInstant,
String compactionInstant, CompactionOperation op, int expNumRenames) throws Exception {
ensureValidCompactionPlan(compactionInstant);
// Check suggested rename operations
List<Pair<HoodieLogFile, HoodieLogFile>> renameFiles =
client.getRenamingActionsForUnschedulingCompactionOperation(metaClient, compactionInstant, op,
Optional.absent(), false);
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
// Log files belonging to file-slices created because of compaction request must be renamed
Set<HoodieLogFile> gotLogFilesToBeRenamed = renameFiles.stream().map(p -> p.getLeft()).collect(Collectors.toSet());
final HoodieTableFileSystemView fsView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
Set<HoodieLogFile> expLogFilesToBeRenamed = fsView.getLatestFileSlices(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0])
.filter(fs -> fs.getBaseInstantTime().equals(compactionInstant))
.filter(fs -> fs.getFileId().equals(op.getFileId()))
.flatMap(fs -> fs.getLogFiles())
.collect(Collectors.toSet());
Assert.assertEquals("Log files belonging to file-slices created because of compaction request must be renamed",
expLogFilesToBeRenamed, gotLogFilesToBeRenamed);
validateRenameFiles(renameFiles, ingestionInstant, compactionInstant, fsView);
Map<String, Long> fileIdToCountsBeforeRenaming =
fsView.getLatestMergedFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant)
.filter(fs -> fs.getBaseInstantTime().equals(ingestionInstant))
.filter(fs -> fs.getFileId().equals(op.getFileId()))
.map(fs -> Pair.of(fs.getFileId(), fs.getLogFiles().count()))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
// Call the main unschedule API
client.unscheduleCompactionFileId(op.getFileId(), false, false);
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
final HoodieTableFileSystemView newFsView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
// Expect all file-slice whose base-commit is same as compaction commit to contain no new Log files
newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant)
.filter(fs -> fs.getBaseInstantTime().equals(compactionInstant))
.filter(fs -> fs.getFileId().equals(op.getFileId())).forEach(fs -> {
Assert.assertFalse("No Data file must be present", fs.getDataFile().isPresent());
Assert.assertTrue("No Log Files", fs.getLogFiles().count() == 0);
});
// Ensure same number of log-files before and after renaming per fileId
Map<String, Long> fileIdToCountsAfterRenaming =
newFsView.getAllFileGroups(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0]).flatMap(fg -> fg.getAllFileSlices())
.filter(fs -> fs.getBaseInstantTime().equals(ingestionInstant))
.filter(fs -> fs.getFileId().equals(op.getFileId()))
.map(fs -> Pair.of(fs.getFileId(), fs.getLogFiles().count()))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
Assert.assertEquals("Each File Id has same number of log-files",
fileIdToCountsBeforeRenaming, fileIdToCountsAfterRenaming);
Assert.assertEquals("Not Empty", 1, fileIdToCountsAfterRenaming.size());
Assert.assertEquals("Expected number of renames", expNumRenames, renameFiles.size());
}
}

View File

@@ -33,12 +33,16 @@ import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
/**
* Helper class to generate compaction plan from FileGroup/FileSlice abstraction
*/
public class CompactionUtils {
private static final Logger LOG = LogManager.getLogger(CompactionUtils.class);
/**
* Generate compaction operation from file-slice
*
@@ -47,7 +51,7 @@ public class CompactionUtils {
* @param metricsCaptureFunction Metrics Capture function
* @return Compaction Operation
*/
public static HoodieCompactionOperation buildFromFileSlice(String partitionPath, FileSlice fileSlice,
public static HoodieCompactionOperation buildFromFileSlice(String partitionPath, FileSlice fileSlice,
Optional<Function<Pair<String, FileSlice>, Map<String, Double>>> metricsCaptureFunction) {
HoodieCompactionOperation.Builder builder = HoodieCompactionOperation.newBuilder();
builder.setPartitionPath(partitionPath);
@@ -114,16 +118,21 @@ public class CompactionUtils {
metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants().collect(Collectors.toList());
return pendingCompactionInstants.stream().map(instant -> {
try {
HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan(
metaClient.getActiveTimeline().getInstantAuxiliaryDetails(
HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get());
return Pair.of(instant, compactionPlan);
return Pair.of(instant, getCompactionPlan(metaClient, instant.getTimestamp()));
} catch (IOException e) {
throw new HoodieException(e);
}
}).collect(Collectors.toList());
}
public static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient,
String compactionInstant) throws IOException {
HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan(
metaClient.getActiveTimeline().getInstantAuxiliaryDetails(
HoodieTimeline.getCompactionRequestedInstant(compactionInstant)).get());
return compactionPlan;
}
/**
* Get all file-ids with pending Compaction operations and their target compaction instant time
*

View File

@@ -0,0 +1,207 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.util;
import static com.uber.hoodie.common.model.HoodieTestUtils.DEFAULT_PARTITION_PATHS;
import static com.uber.hoodie.common.table.HoodieTimeline.COMPACTION_ACTION;
import static com.uber.hoodie.common.table.HoodieTimeline.DELTA_COMMIT_ACTION;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.uber.hoodie.avro.model.HoodieCompactionOperation;
import com.uber.hoodie.avro.model.HoodieCompactionPlan;
import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.timeline.HoodieInstant.State;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
public class CompactionTestUtils {
public static Map<String, Pair<String, HoodieCompactionOperation>> setupAndValidateCompactionOperations(
HoodieTableMetaClient metaClient, boolean inflight,
int numEntriesInPlan1, int numEntriesInPlan2,
int numEntriesInPlan3, int numEntriesInPlan4) throws IOException {
HoodieCompactionPlan plan1 = createCompactionPlan(metaClient, "000", "001", numEntriesInPlan1, true, true);
HoodieCompactionPlan plan2 = createCompactionPlan(metaClient, "002", "003", numEntriesInPlan2, false, true);
HoodieCompactionPlan plan3 = createCompactionPlan(metaClient, "004", "005", numEntriesInPlan3, true, false);
HoodieCompactionPlan plan4 = createCompactionPlan(metaClient, "006", "007", numEntriesInPlan4, false, false);
if (inflight) {
scheduleInflightCompaction(metaClient, "001", plan1);
scheduleInflightCompaction(metaClient, "003", plan2);
scheduleInflightCompaction(metaClient, "005", plan3);
scheduleInflightCompaction(metaClient, "007", plan4);
} else {
scheduleCompaction(metaClient, "001", plan1);
scheduleCompaction(metaClient, "003", plan2);
scheduleCompaction(metaClient, "005", plan3);
scheduleCompaction(metaClient, "007", plan4);
}
createDeltaCommit(metaClient, "000");
createDeltaCommit(metaClient, "002");
createDeltaCommit(metaClient, "004");
createDeltaCommit(metaClient, "006");
Map<String, String> baseInstantsToCompaction =
new ImmutableMap.Builder<String, String>().put("000", "001").put("002", "003")
.put("004", "005").put("006", "007").build();
List<Integer> expectedNumEntries =
Arrays.asList(numEntriesInPlan1, numEntriesInPlan2, numEntriesInPlan3, numEntriesInPlan4);
List<HoodieCompactionPlan> plans = new ImmutableList.Builder<HoodieCompactionPlan>()
.add(plan1, plan2, plan3, plan4).build();
IntStream.range(0, 4).boxed().forEach(idx -> {
if (expectedNumEntries.get(idx) > 0) {
Assert.assertEquals("check if plan " + idx + " has exp entries",
expectedNumEntries.get(idx).longValue(), plans.get(idx).getOperations().size());
} else {
Assert.assertNull("Plan " + idx + " has null ops", plans.get(idx).getOperations());
}
});
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath(), true);
Map<String, Pair<String, HoodieCompactionOperation>> pendingCompactionMap =
CompactionUtils.getAllPendingCompactionOperations(metaClient);
Map<String, Pair<String, HoodieCompactionOperation>> expPendingCompactionMap =
generateExpectedCompactionOperations(Arrays.asList(plan1, plan2, plan3, plan4), baseInstantsToCompaction);
// Ensure Compaction operations are fine.
Assert.assertEquals(expPendingCompactionMap, pendingCompactionMap);
return expPendingCompactionMap;
}
public static Map<String, Pair<String, HoodieCompactionOperation>> generateExpectedCompactionOperations(
List<HoodieCompactionPlan> plans, Map<String, String> baseInstantsToCompaction) {
return plans.stream()
.flatMap(plan -> {
if (plan.getOperations() != null) {
return plan.getOperations().stream().map(op -> Pair.of(op.getFileId(),
Pair.of(baseInstantsToCompaction.get(op.getBaseInstantTime()), op)));
}
return Stream.empty();
}).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}
public static void scheduleCompaction(HoodieTableMetaClient metaClient,
String instantTime, HoodieCompactionPlan compactionPlan) throws IOException {
metaClient.getActiveTimeline().saveToCompactionRequested(
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, instantTime),
AvroUtils.serializeCompactionPlan(compactionPlan));
}
public static void createDeltaCommit(HoodieTableMetaClient metaClient, String instantTime) throws IOException {
metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, DELTA_COMMIT_ACTION, instantTime), Optional.empty());
}
public static void scheduleInflightCompaction(HoodieTableMetaClient metaClient, String instantTime,
HoodieCompactionPlan compactionPlan) throws IOException {
scheduleCompaction(metaClient, instantTime, compactionPlan);
metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, instantTime));
}
public static HoodieCompactionPlan createCompactionPlan(HoodieTableMetaClient metaClient, String instantId,
String compactionInstantId, int numFileIds, boolean createDataFile,
boolean deltaCommitsAfterCompactionRequests) {
List<HoodieCompactionOperation> ops = IntStream.range(0, numFileIds).boxed().map(idx -> {
try {
String fileId = UUID.randomUUID().toString();
if (createDataFile) {
HoodieTestUtils.createDataFile(metaClient.getBasePath(), DEFAULT_PARTITION_PATHS[0], instantId, fileId);
}
HoodieTestUtils.createNewLogFile(metaClient.getFs(), metaClient.getBasePath(), DEFAULT_PARTITION_PATHS[0],
instantId, fileId, Optional.of(1));
HoodieTestUtils.createNewLogFile(metaClient.getFs(), metaClient.getBasePath(), DEFAULT_PARTITION_PATHS[0],
instantId, fileId, Optional.of(2));
FileSlice slice = new FileSlice(instantId, fileId);
if (createDataFile) {
slice.setDataFile(new TestHoodieDataFile(metaClient.getBasePath() + "/" + DEFAULT_PARTITION_PATHS[0]
+ "/" + FSUtils.makeDataFileName(instantId, 1, fileId)));
}
String logFilePath1 = HoodieTestUtils
.getLogFilePath(metaClient.getBasePath(), DEFAULT_PARTITION_PATHS[0], instantId, fileId,
Optional.of(1));
String logFilePath2 = HoodieTestUtils
.getLogFilePath(metaClient.getBasePath(), DEFAULT_PARTITION_PATHS[0], instantId, fileId,
Optional.of(2));
slice.addLogFile(new HoodieLogFile(new Path(logFilePath1)));
slice.addLogFile(new HoodieLogFile(new Path(logFilePath2)));
HoodieCompactionOperation op =
CompactionUtils.buildFromFileSlice(DEFAULT_PARTITION_PATHS[0], slice, Optional.empty());
if (deltaCommitsAfterCompactionRequests) {
HoodieTestUtils.createNewLogFile(metaClient.getFs(), metaClient.getBasePath(), DEFAULT_PARTITION_PATHS[0],
compactionInstantId, fileId, Optional.of(1));
HoodieTestUtils.createNewLogFile(metaClient.getFs(), metaClient.getBasePath(), DEFAULT_PARTITION_PATHS[0],
compactionInstantId, fileId, Optional.of(2));
}
return op;
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}).collect(Collectors.toList());
return new HoodieCompactionPlan(ops.isEmpty() ? null : ops, new HashMap<>());
}
public static class TestHoodieDataFile extends HoodieDataFile {
private final String path;
public TestHoodieDataFile(String path) {
super(null);
this.path = path;
}
@Override
public String getPath() {
return path;
}
@Override
public String getFileId() {
return UUID.randomUUID().toString();
}
@Override
public String getCommitTime() {
return "100";
}
@Override
public long getFileSize() {
return 0;
}
}
}

View File

@@ -17,32 +17,29 @@
package com.uber.hoodie.common.util;
import static com.uber.hoodie.common.model.HoodieTestUtils.DEFAULT_PARTITION_PATHS;
import static com.uber.hoodie.common.table.HoodieTimeline.COMPACTION_ACTION;
import static com.uber.hoodie.common.model.HoodieTestUtils.getDefaultHadoopConf;
import static com.uber.hoodie.common.util.CompactionTestUtils.createCompactionPlan;
import static com.uber.hoodie.common.util.CompactionTestUtils.scheduleCompaction;
import static com.uber.hoodie.common.util.CompactionTestUtils.setupAndValidateCompactionOperations;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.uber.hoodie.avro.model.HoodieCompactionOperation;
import com.uber.hoodie.avro.model.HoodieCompactionPlan;
import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.timeline.HoodieInstant.State;
import com.uber.hoodie.common.util.CompactionTestUtils.TestHoodieDataFile;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Before;
@@ -64,7 +61,8 @@ public class TestCompactionUtils {
@Before
public void init() throws IOException {
metaClient = HoodieTestUtils.init(tmpFolder.getRoot().getAbsolutePath());
metaClient = HoodieTestUtils.initTableType(getDefaultHadoopConf(),
tmpFolder.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ);
basePath = metaClient.getBasePath();
}
@@ -156,12 +154,12 @@ public class TestCompactionUtils {
@Test(expected = IllegalStateException.class)
public void testGetAllPendingCompactionOperationsWithDupFileId() throws IOException {
// Case where there is duplicate fileIds in compaction requests
HoodieCompactionPlan plan1 = createCompactionPlan("000", 10);
HoodieCompactionPlan plan2 = createCompactionPlan("001", 10);
scheduleCompaction("000", plan1);
scheduleCompaction("001", plan2);
HoodieCompactionPlan plan1 = createCompactionPlan(metaClient, "000", "001", 10, true, true);
HoodieCompactionPlan plan2 = createCompactionPlan(metaClient, "002", "003", 0, false, false);
scheduleCompaction(metaClient, "001", plan1);
scheduleCompaction(metaClient, "003", plan2);
// schedule same plan again so that there will be duplicates
scheduleCompaction("003", plan1);
scheduleCompaction(metaClient, "005", plan1);
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
Map<String, Pair<String, HoodieCompactionOperation>> res =
CompactionUtils.getAllPendingCompactionOperations(metaClient);
@@ -170,114 +168,19 @@ public class TestCompactionUtils {
@Test
public void testGetAllPendingCompactionOperations() throws IOException {
// Case where there are 4 compaction requests where 1 is empty.
testGetAllPendingCompactionOperations(false, 10, 10, 10, 0);
setupAndValidateCompactionOperations(metaClient, false, 10, 10, 10, 0);
}
@Test
public void testGetAllPendingInflightCompactionOperations() throws IOException {
// Case where there are 4 compaction requests where 1 is empty. All of them are marked inflight
testGetAllPendingCompactionOperations(true, 10, 10, 10, 0);
setupAndValidateCompactionOperations(metaClient, true, 10, 10, 10, 0);
}
@Test
public void testGetAllPendingCompactionOperationsForEmptyCompactions() throws IOException {
// Case where there are 4 compaction requests and all are empty.
testGetAllPendingCompactionOperations(false, 0, 0, 0, 0);
}
private void testGetAllPendingCompactionOperations(boolean inflight, int numEntriesInPlan1, int numEntriesInPlan2,
int numEntriesInPlan3, int numEntriesInPlan4) throws IOException {
HoodieCompactionPlan plan1 = createCompactionPlan("000", numEntriesInPlan1);
HoodieCompactionPlan plan2 = createCompactionPlan("001", numEntriesInPlan2);
HoodieCompactionPlan plan3 = createCompactionPlan("002", numEntriesInPlan3);
HoodieCompactionPlan plan4 = createCompactionPlan("003", numEntriesInPlan4);
if (inflight) {
scheduleInflightCompaction("000", plan1);
scheduleInflightCompaction("001", plan2);
scheduleInflightCompaction("002", plan3);
scheduleInflightCompaction("003", plan4);
} else {
scheduleCompaction("000", plan1);
scheduleCompaction("001", plan2);
scheduleCompaction("002", plan3);
scheduleCompaction("003", plan4);
}
List<Integer> expectedNumEntries =
Arrays.asList(numEntriesInPlan1, numEntriesInPlan2, numEntriesInPlan3, numEntriesInPlan4);
List<HoodieCompactionPlan> plans = new ImmutableList.Builder<HoodieCompactionPlan>()
.add(plan1, plan2, plan3, plan4).build();
IntStream.range(0, 4).boxed().forEach(idx -> {
if (expectedNumEntries.get(idx) > 0) {
Assert.assertEquals("check if plan " + idx + " has exp entries",
expectedNumEntries.get(idx).longValue(), plans.get(idx).getOperations().size());
} else {
Assert.assertNull("Plan " + idx + " has null ops", plans.get(idx).getOperations());
}
});
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
Map<String, Pair<String, HoodieCompactionOperation>> pendingCompactionMap =
CompactionUtils.getAllPendingCompactionOperations(metaClient);
Map<String, Pair<String, HoodieCompactionOperation>> expPendingCompactionMap =
generateExpectedCompactionOperations(Arrays.asList(plan1, plan2, plan3, plan4));
// Ensure all the
Assert.assertEquals(expPendingCompactionMap, pendingCompactionMap);
}
private Map<String, Pair<String, HoodieCompactionOperation>> generateExpectedCompactionOperations(
List<HoodieCompactionPlan> plans) {
return plans.stream()
.flatMap(plan -> {
if (plan.getOperations() != null) {
return plan.getOperations().stream().map(op -> Pair.of(op.getFileId(),
Pair.of(op.getBaseInstantTime(), op)));
}
return Stream.empty();
}).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}
private void scheduleCompaction(String instantTime, HoodieCompactionPlan compactionPlan) throws IOException {
metaClient.getActiveTimeline().saveToCompactionRequested(
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, instantTime),
AvroUtils.serializeCompactionPlan(compactionPlan));
}
private void scheduleInflightCompaction(String instantTime, HoodieCompactionPlan compactionPlan) throws IOException {
metaClient.getActiveTimeline().saveToCompactionRequested(
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, instantTime),
AvroUtils.serializeCompactionPlan(compactionPlan));
metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, instantTime));
}
private HoodieCompactionPlan createCompactionPlan(String instantId, int numFileIds) {
List<HoodieCompactionOperation> ops = IntStream.range(0, numFileIds).boxed().map(idx -> {
try {
String fileId =
HoodieTestUtils.createNewDataFile(basePath, DEFAULT_PARTITION_PATHS[0], instantId);
HoodieTestUtils.createNewLogFile(metaClient.getFs(), basePath, DEFAULT_PARTITION_PATHS[0],
instantId, fileId, Optional.of(1));
HoodieTestUtils.createNewLogFile(metaClient.getFs(), basePath, DEFAULT_PARTITION_PATHS[0],
instantId, fileId, Optional.of(2));
FileSlice slice = new FileSlice(instantId, fileId);
slice.setDataFile(new TestHoodieDataFile(HoodieTestUtils.createDataFile(basePath, DEFAULT_PARTITION_PATHS[0],
instantId, fileId)));
String logFilePath1 = HoodieTestUtils.getLogFilePath(basePath, DEFAULT_PARTITION_PATHS[0], instantId, fileId,
Optional.of(1));
String logFilePath2 = HoodieTestUtils.getLogFilePath(basePath, DEFAULT_PARTITION_PATHS[0], instantId, fileId,
Optional.of(2));
slice.addLogFile(new HoodieLogFile(new Path(logFilePath1)));
slice.addLogFile(new HoodieLogFile(new Path(logFilePath2)));
return CompactionUtils.buildFromFileSlice(DEFAULT_PARTITION_PATHS[0], slice, Optional.empty());
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}).collect(Collectors.toList());
return new HoodieCompactionPlan(ops.isEmpty() ? null : ops, new HashMap<>());
setupAndValidateCompactionOperations(metaClient, false, 0, 0, 0, 0);
}
/**
@@ -315,35 +218,4 @@ public class TestCompactionUtils {
});
Assert.assertEquals("Metrics set", metrics, op.getMetrics());
}
private static class TestHoodieDataFile extends HoodieDataFile {
private final String path;
public TestHoodieDataFile(String path) {
super(null);
this.path = path;
}
@Override
public String getPath() {
return path;
}
@Override
public String getFileId() {
return UUID.randomUUID().toString();
}
@Override
public String getCommitTime() {
return "100";
}
@Override
public long getFileSize() {
return 0;
}
}
}

View File

@@ -0,0 +1,154 @@
package com.uber.hoodie.utilities;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.uber.hoodie.CompactionAdminClient;
import com.uber.hoodie.CompactionAdminClient.RenameOpResult;
import com.uber.hoodie.CompactionAdminClient.ValidationOpResult;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.util.FSUtils;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.List;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
public class HoodieCompactionAdminTool {
private final Config cfg;
public HoodieCompactionAdminTool(Config cfg) {
this.cfg = cfg;
}
/**
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
final Config cfg = new Config();
JCommander cmd = new JCommander(cfg, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
HoodieCompactionAdminTool admin = new HoodieCompactionAdminTool(cfg);
admin.run(UtilHelpers.buildSparkContext("admin-compactor", cfg.sparkMaster, cfg.sparkMemory));
}
/**
* Executes one of compaction admin operations
*/
public void run(JavaSparkContext jsc) throws Exception {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.basePath);
CompactionAdminClient admin = new CompactionAdminClient(jsc, cfg.basePath);
final FileSystem fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration());
if (cfg.outputPath != null && fs.exists(new Path(cfg.outputPath))) {
throw new IllegalStateException("Output File Path already exists");
}
switch (cfg.operation) {
case VALIDATE:
List<ValidationOpResult> res =
admin.validateCompactionPlan(metaClient, cfg.compactionInstantTime, cfg.parallelism);
if (cfg.printOutput) {
printOperationResult("Result of Validation Operation :", res);
}
serializeOperationResult(fs, res);
break;
case UNSCHEDULE_FILE:
List<RenameOpResult> r =
admin.unscheduleCompactionFileId(cfg.fileId, cfg.skipValidation, cfg.dryRun);
if (cfg.printOutput) {
System.out.println(r);
}
serializeOperationResult(fs, r);
break;
case UNSCHEDULE_PLAN:
List<RenameOpResult> r2 =
admin.unscheduleCompactionPlan(cfg.compactionInstantTime, cfg.skipValidation, cfg.parallelism, cfg.dryRun);
if (cfg.printOutput) {
printOperationResult("Result of Unscheduling Compaction Plan :", r2);
}
serializeOperationResult(fs, r2);
break;
case REPAIR:
List<RenameOpResult> r3 =
admin.repairCompaction(cfg.compactionInstantTime, cfg.parallelism, cfg.dryRun);
if (cfg.printOutput) {
printOperationResult("Result of Repair Operation :", r3);
}
serializeOperationResult(fs, r3);
break;
default:
throw new IllegalStateException("Not yet implemented !!");
}
}
private <T> void serializeOperationResult(FileSystem fs, T result) throws Exception {
if ((cfg.outputPath != null) && (result != null)) {
Path outputPath = new Path(cfg.outputPath);
FSDataOutputStream fsout = fs.create(outputPath, true);
ObjectOutputStream out = new ObjectOutputStream(fsout);
out.writeObject(result);
out.close();
fsout.close();
}
}
/**
* Print Operation Result
*
* @param initialLine Initial Line
* @param result Result
*/
private <T> void printOperationResult(String initialLine, List<T> result) {
System.out.println(initialLine);
for (T r : result) {
System.out.print(r);
}
}
/**
* Operation Types
*/
public enum Operation {
VALIDATE,
UNSCHEDULE_PLAN,
UNSCHEDULE_FILE,
REPAIR
}
/**
* Admin Configuration Options
*/
public static class Config implements Serializable {
@Parameter(names = {"--operation", "-op"}, description = "Operation", required = true)
public Operation operation = Operation.VALIDATE;
@Parameter(names = {"--base-path", "-bp"}, description = "Base path for the dataset", required = true)
public String basePath = null;
@Parameter(names = {"--instant-time", "-in"}, description = "Compaction Instant time", required = false)
public String compactionInstantTime = null;
@Parameter(names = {"--file-id", "-id"}, description = "File Id", required = false)
public String fileId = null;
@Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = false)
public int parallelism = 3;
@Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = true)
public String sparkMaster = null;
@Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true)
public String sparkMemory = null;
@Parameter(names = {"--dry-run", "-dr"}, description = "Dry Run Mode", required = false)
public boolean dryRun = false;
@Parameter(names = {"--skip-validation", "-sv"}, description = "Skip Validation", required = false)
public boolean skipValidation = false;
@Parameter(names = {"--output-path", "-ot"}, description = "Output Path", required = false)
public String outputPath = null;
@Parameter(names = {"--print-output", "-pt"}, description = "Print Output", required = false)
public boolean printOutput = true;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
}
}