1
0

[HUDI-1749] Clean/Compaction/Rollback command maybe never exit when operation fail (#2752)

This commit is contained in:
li36909
2021-04-06 14:23:15 +08:00
committed by GitHub
parent e970e1f483
commit 920537cac8
3 changed files with 137 additions and 118 deletions

View File

@@ -83,121 +83,128 @@ public class SparkMain {
? SparkUtil.initJavaSparkConf("hoodie-cli-" + command, Option.of(args[1]), Option.of(args[2])) ? SparkUtil.initJavaSparkConf("hoodie-cli-" + command, Option.of(args[1]), Option.of(args[2]))
: SparkUtil.initJavaSparkConf("hoodie-cli-" + command); : SparkUtil.initJavaSparkConf("hoodie-cli-" + command);
int returnCode = 0; int returnCode = 0;
switch (cmd) { try {
case ROLLBACK: switch (cmd) {
assert (args.length == 5); case ROLLBACK:
returnCode = rollback(jsc, args[3], args[4]); assert (args.length == 5);
break; returnCode = rollback(jsc, args[3], args[4]);
case DEDUPLICATE: break;
assert (args.length == 8); case DEDUPLICATE:
returnCode = deduplicatePartitionPath(jsc, args[3], args[4], args[5], Boolean.parseBoolean(args[6]), args[7]); assert (args.length == 8);
break; returnCode = deduplicatePartitionPath(jsc, args[3], args[4], args[5], Boolean.parseBoolean(args[6]), args[7]);
case ROLLBACK_TO_SAVEPOINT: break;
assert (args.length == 5); case ROLLBACK_TO_SAVEPOINT:
returnCode = rollbackToSavepoint(jsc, args[3], args[4]); assert (args.length == 5);
break; returnCode = rollbackToSavepoint(jsc, args[3], args[4]);
case IMPORT: break;
case UPSERT: case IMPORT:
assert (args.length >= 13); case UPSERT:
String propsFilePath = null; assert (args.length >= 13);
if (!StringUtils.isNullOrEmpty(args[12])) { String propsFilePath = null;
propsFilePath = args[12]; if (!StringUtils.isNullOrEmpty(args[12])) {
} propsFilePath = args[12];
List<String> configs = new ArrayList<>(); }
if (args.length > 13) { List<String> configs = new ArrayList<>();
configs.addAll(Arrays.asList(args).subList(13, args.length)); if (args.length > 13) {
} configs.addAll(Arrays.asList(args).subList(13, args.length));
returnCode = dataLoad(jsc, command, args[3], args[4], args[5], args[6], args[7], args[8], }
Integer.parseInt(args[9]), args[10], Integer.parseInt(args[11]), propsFilePath, configs); returnCode = dataLoad(jsc, command, args[3], args[4], args[5], args[6], args[7], args[8],
break; Integer.parseInt(args[9]), args[10], Integer.parseInt(args[11]), propsFilePath, configs);
case COMPACT_RUN: break;
assert (args.length >= 9); case COMPACT_RUN:
propsFilePath = null; assert (args.length >= 9);
if (!StringUtils.isNullOrEmpty(args[8])) { propsFilePath = null;
propsFilePath = args[8]; if (!StringUtils.isNullOrEmpty(args[8])) {
} propsFilePath = args[8];
configs = new ArrayList<>(); }
if (args.length > 9) { configs = new ArrayList<>();
configs.addAll(Arrays.asList(args).subList(9, args.length)); if (args.length > 9) {
} configs.addAll(Arrays.asList(args).subList(9, args.length));
returnCode = compact(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6], }
Integer.parseInt(args[7]), false, propsFilePath, configs); returnCode = compact(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6],
break; Integer.parseInt(args[7]), false, propsFilePath, configs);
case COMPACT_SCHEDULE: break;
assert (args.length >= 6); case COMPACT_SCHEDULE:
propsFilePath = null; assert (args.length >= 6);
if (!StringUtils.isNullOrEmpty(args[5])) { propsFilePath = null;
propsFilePath = args[5]; if (!StringUtils.isNullOrEmpty(args[5])) {
} propsFilePath = args[5];
configs = new ArrayList<>(); }
if (args.length > 6) { configs = new ArrayList<>();
configs.addAll(Arrays.asList(args).subList(6, args.length)); if (args.length > 6) {
} configs.addAll(Arrays.asList(args).subList(6, args.length));
returnCode = compact(jsc, args[1], args[2], args[3], 1, "", args[4], 0, true, propsFilePath, configs); }
break; returnCode = compact(jsc, args[1], args[2], args[3], 1, "", args[4], 0, true, propsFilePath, configs);
case COMPACT_VALIDATE: break;
assert (args.length == 7); case COMPACT_VALIDATE:
doCompactValidate(jsc, args[3], args[4], args[5], Integer.parseInt(args[6])); assert (args.length == 7);
returnCode = 0; doCompactValidate(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]));
break; returnCode = 0;
case COMPACT_REPAIR: break;
assert (args.length == 8); case COMPACT_REPAIR:
doCompactRepair(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), assert (args.length == 8);
Boolean.parseBoolean(args[7])); doCompactRepair(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]),
returnCode = 0; Boolean.parseBoolean(args[7]));
break; returnCode = 0;
case COMPACT_UNSCHEDULE_FILE: break;
assert (args.length == 9); case COMPACT_UNSCHEDULE_FILE:
doCompactUnscheduleFile(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), assert (args.length == 9);
Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8])); doCompactUnscheduleFile(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]),
returnCode = 0; Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8]));
break; returnCode = 0;
case COMPACT_UNSCHEDULE_PLAN: break;
assert (args.length == 9); case COMPACT_UNSCHEDULE_PLAN:
doCompactUnschedule(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), assert (args.length == 9);
Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8])); doCompactUnschedule(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]),
returnCode = 0; Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8]));
break; returnCode = 0;
case CLEAN: break;
assert (args.length >= 5); case CLEAN:
propsFilePath = null; assert (args.length >= 5);
if (!StringUtils.isNullOrEmpty(args[4])) { propsFilePath = null;
propsFilePath = args[4]; if (!StringUtils.isNullOrEmpty(args[4])) {
} propsFilePath = args[4];
configs = new ArrayList<>(); }
if (args.length > 5) { configs = new ArrayList<>();
configs.addAll(Arrays.asList(args).subList(5, args.length)); if (args.length > 5) {
} configs.addAll(Arrays.asList(args).subList(5, args.length));
clean(jsc, args[3], propsFilePath, configs); }
break; clean(jsc, args[3], propsFilePath, configs);
case SAVEPOINT: break;
assert (args.length == 7); case SAVEPOINT:
returnCode = createSavepoint(jsc, args[3], args[4], args[5], args[6]); assert (args.length == 7);
break; returnCode = createSavepoint(jsc, args[3], args[4], args[5], args[6]);
case DELETE_SAVEPOINT: break;
assert (args.length == 5); case DELETE_SAVEPOINT:
returnCode = deleteSavepoint(jsc, args[3], args[4]); assert (args.length == 5);
break; returnCode = deleteSavepoint(jsc, args[3], args[4]);
case BOOTSTRAP: break;
assert (args.length >= 18); case BOOTSTRAP:
propsFilePath = null; assert (args.length >= 18);
if (!StringUtils.isNullOrEmpty(args[17])) { propsFilePath = null;
propsFilePath = args[17]; if (!StringUtils.isNullOrEmpty(args[17])) {
} propsFilePath = args[17];
configs = new ArrayList<>(); }
if (args.length > 18) { configs = new ArrayList<>();
configs.addAll(Arrays.asList(args).subList(18, args.length)); if (args.length > 18) {
} configs.addAll(Arrays.asList(args).subList(18, args.length));
returnCode = doBootstrap(jsc, args[3], args[4], args[5], args[6], args[7], args[8], args[9], args[10], }
args[11], args[12], args[13], args[14], args[15], args[16], propsFilePath, configs); returnCode = doBootstrap(jsc, args[3], args[4], args[5], args[6], args[7], args[8], args[9], args[10],
break; args[11], args[12], args[13], args[14], args[15], args[16], propsFilePath, configs);
case UPGRADE: break;
case DOWNGRADE: case UPGRADE:
assert (args.length == 5); case DOWNGRADE:
returnCode = upgradeOrDowngradeTable(jsc, args[3], args[4]); assert (args.length == 5);
break; returnCode = upgradeOrDowngradeTable(jsc, args[3], args[4]);
default: break;
break; default:
break;
}
} catch (Throwable throwable) {
LOG.error("Fail to execute command", throwable);
returnCode = -1;
} finally {
jsc.stop();
} }
System.exit(returnCode); System.exit(returnCode);
} }

View File

@@ -110,6 +110,12 @@ public class HoodieCleaner {
String dirName = new Path(cfg.basePath).getName(); String dirName = new Path(cfg.basePath).getName();
JavaSparkContext jssc = UtilHelpers.buildSparkContext("hoodie-cleaner-" + dirName, cfg.sparkMaster); JavaSparkContext jssc = UtilHelpers.buildSparkContext("hoodie-cleaner-" + dirName, cfg.sparkMaster);
new HoodieCleaner(cfg, jssc).run(); try {
new HoodieCleaner(cfg, jssc).run();
} catch (Throwable throwable) {
LOG.error("Fail to run cleaning for " + cfg.basePath, throwable);
} finally {
jssc.stop();
}
} }
} }

View File

@@ -103,8 +103,14 @@ public class HoodieCompactor {
System.exit(1); System.exit(1);
} }
final JavaSparkContext jsc = UtilHelpers.buildSparkContext("compactor-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory); final JavaSparkContext jsc = UtilHelpers.buildSparkContext("compactor-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory);
HoodieCompactor compactor = new HoodieCompactor(jsc, cfg); try {
compactor.compact(cfg.retry); HoodieCompactor compactor = new HoodieCompactor(jsc, cfg);
compactor.compact(cfg.retry);
} catch (Throwable throwable) {
LOG.error("Fail to run compaction for " + cfg.tableName, throwable);
} finally {
jsc.stop();
}
} }
public int compact(int retry) { public int compact(int retry) {