From 920537cac83d59ac05676fb952d5479c41adf757 Mon Sep 17 00:00:00 2001 From: li36909 Date: Tue, 6 Apr 2021 14:23:15 +0800 Subject: [PATCH] [HUDI-1749] Clean/Compaction/Rollback command maybe never exit when operation fail (#2752) --- .../apache/hudi/cli/commands/SparkMain.java | 237 +++++++++--------- .../apache/hudi/utilities/HoodieCleaner.java | 8 +- .../hudi/utilities/HoodieCompactor.java | 10 +- 3 files changed, 137 insertions(+), 118 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index 0f0a85146..7833ee750 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -83,121 +83,128 @@ public class SparkMain { ? SparkUtil.initJavaSparkConf("hoodie-cli-" + command, Option.of(args[1]), Option.of(args[2])) : SparkUtil.initJavaSparkConf("hoodie-cli-" + command); int returnCode = 0; - switch (cmd) { - case ROLLBACK: - assert (args.length == 5); - returnCode = rollback(jsc, args[3], args[4]); - break; - case DEDUPLICATE: - assert (args.length == 8); - returnCode = deduplicatePartitionPath(jsc, args[3], args[4], args[5], Boolean.parseBoolean(args[6]), args[7]); - break; - case ROLLBACK_TO_SAVEPOINT: - assert (args.length == 5); - returnCode = rollbackToSavepoint(jsc, args[3], args[4]); - break; - case IMPORT: - case UPSERT: - assert (args.length >= 13); - String propsFilePath = null; - if (!StringUtils.isNullOrEmpty(args[12])) { - propsFilePath = args[12]; - } - List configs = new ArrayList<>(); - if (args.length > 13) { - configs.addAll(Arrays.asList(args).subList(13, args.length)); - } - returnCode = dataLoad(jsc, command, args[3], args[4], args[5], args[6], args[7], args[8], - Integer.parseInt(args[9]), args[10], Integer.parseInt(args[11]), propsFilePath, configs); - break; - case COMPACT_RUN: - assert (args.length >= 9); - propsFilePath = null; - if (!StringUtils.isNullOrEmpty(args[8])) { - propsFilePath = args[8]; - } - configs = new ArrayList<>(); - if (args.length > 9) { - configs.addAll(Arrays.asList(args).subList(9, args.length)); - } - returnCode = compact(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6], - Integer.parseInt(args[7]), false, propsFilePath, configs); - break; - case COMPACT_SCHEDULE: - assert (args.length >= 6); - propsFilePath = null; - if (!StringUtils.isNullOrEmpty(args[5])) { - propsFilePath = args[5]; - } - configs = new ArrayList<>(); - if (args.length > 6) { - configs.addAll(Arrays.asList(args).subList(6, args.length)); - } - returnCode = compact(jsc, args[1], args[2], args[3], 1, "", args[4], 0, true, propsFilePath, configs); - break; - case COMPACT_VALIDATE: - assert (args.length == 7); - doCompactValidate(jsc, args[3], args[4], args[5], Integer.parseInt(args[6])); - returnCode = 0; - break; - case COMPACT_REPAIR: - assert (args.length == 8); - doCompactRepair(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), - Boolean.parseBoolean(args[7])); - returnCode = 0; - break; - case COMPACT_UNSCHEDULE_FILE: - assert (args.length == 9); - doCompactUnscheduleFile(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), - Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8])); - returnCode = 0; - break; - case COMPACT_UNSCHEDULE_PLAN: - assert (args.length == 9); - doCompactUnschedule(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), - Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8])); - returnCode = 0; - break; - case CLEAN: - assert (args.length >= 5); - propsFilePath = null; - if (!StringUtils.isNullOrEmpty(args[4])) { - propsFilePath = args[4]; - } - configs = new ArrayList<>(); - if (args.length > 5) { - configs.addAll(Arrays.asList(args).subList(5, args.length)); - } - clean(jsc, args[3], propsFilePath, configs); - break; - case SAVEPOINT: - assert (args.length == 7); - returnCode = createSavepoint(jsc, args[3], args[4], args[5], args[6]); - break; - case DELETE_SAVEPOINT: - assert (args.length == 5); - returnCode = deleteSavepoint(jsc, args[3], args[4]); - break; - case BOOTSTRAP: - assert (args.length >= 18); - propsFilePath = null; - if (!StringUtils.isNullOrEmpty(args[17])) { - propsFilePath = args[17]; - } - configs = new ArrayList<>(); - if (args.length > 18) { - configs.addAll(Arrays.asList(args).subList(18, args.length)); - } - returnCode = doBootstrap(jsc, args[3], args[4], args[5], args[6], args[7], args[8], args[9], args[10], - args[11], args[12], args[13], args[14], args[15], args[16], propsFilePath, configs); - break; - case UPGRADE: - case DOWNGRADE: - assert (args.length == 5); - returnCode = upgradeOrDowngradeTable(jsc, args[3], args[4]); - break; - default: - break; + try { + switch (cmd) { + case ROLLBACK: + assert (args.length == 5); + returnCode = rollback(jsc, args[3], args[4]); + break; + case DEDUPLICATE: + assert (args.length == 8); + returnCode = deduplicatePartitionPath(jsc, args[3], args[4], args[5], Boolean.parseBoolean(args[6]), args[7]); + break; + case ROLLBACK_TO_SAVEPOINT: + assert (args.length == 5); + returnCode = rollbackToSavepoint(jsc, args[3], args[4]); + break; + case IMPORT: + case UPSERT: + assert (args.length >= 13); + String propsFilePath = null; + if (!StringUtils.isNullOrEmpty(args[12])) { + propsFilePath = args[12]; + } + List configs = new ArrayList<>(); + if (args.length > 13) { + configs.addAll(Arrays.asList(args).subList(13, args.length)); + } + returnCode = dataLoad(jsc, command, args[3], args[4], args[5], args[6], args[7], args[8], + Integer.parseInt(args[9]), args[10], Integer.parseInt(args[11]), propsFilePath, configs); + break; + case COMPACT_RUN: + assert (args.length >= 9); + propsFilePath = null; + if (!StringUtils.isNullOrEmpty(args[8])) { + propsFilePath = args[8]; + } + configs = new ArrayList<>(); + if (args.length > 9) { + configs.addAll(Arrays.asList(args).subList(9, args.length)); + } + returnCode = compact(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6], + Integer.parseInt(args[7]), false, propsFilePath, configs); + break; + case COMPACT_SCHEDULE: + assert (args.length >= 6); + propsFilePath = null; + if (!StringUtils.isNullOrEmpty(args[5])) { + propsFilePath = args[5]; + } + configs = new ArrayList<>(); + if (args.length > 6) { + configs.addAll(Arrays.asList(args).subList(6, args.length)); + } + returnCode = compact(jsc, args[1], args[2], args[3], 1, "", args[4], 0, true, propsFilePath, configs); + break; + case COMPACT_VALIDATE: + assert (args.length == 7); + doCompactValidate(jsc, args[3], args[4], args[5], Integer.parseInt(args[6])); + returnCode = 0; + break; + case COMPACT_REPAIR: + assert (args.length == 8); + doCompactRepair(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), + Boolean.parseBoolean(args[7])); + returnCode = 0; + break; + case COMPACT_UNSCHEDULE_FILE: + assert (args.length == 9); + doCompactUnscheduleFile(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), + Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8])); + returnCode = 0; + break; + case COMPACT_UNSCHEDULE_PLAN: + assert (args.length == 9); + doCompactUnschedule(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), + Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8])); + returnCode = 0; + break; + case CLEAN: + assert (args.length >= 5); + propsFilePath = null; + if (!StringUtils.isNullOrEmpty(args[4])) { + propsFilePath = args[4]; + } + configs = new ArrayList<>(); + if (args.length > 5) { + configs.addAll(Arrays.asList(args).subList(5, args.length)); + } + clean(jsc, args[3], propsFilePath, configs); + break; + case SAVEPOINT: + assert (args.length == 7); + returnCode = createSavepoint(jsc, args[3], args[4], args[5], args[6]); + break; + case DELETE_SAVEPOINT: + assert (args.length == 5); + returnCode = deleteSavepoint(jsc, args[3], args[4]); + break; + case BOOTSTRAP: + assert (args.length >= 18); + propsFilePath = null; + if (!StringUtils.isNullOrEmpty(args[17])) { + propsFilePath = args[17]; + } + configs = new ArrayList<>(); + if (args.length > 18) { + configs.addAll(Arrays.asList(args).subList(18, args.length)); + } + returnCode = doBootstrap(jsc, args[3], args[4], args[5], args[6], args[7], args[8], args[9], args[10], + args[11], args[12], args[13], args[14], args[15], args[16], propsFilePath, configs); + break; + case UPGRADE: + case DOWNGRADE: + assert (args.length == 5); + returnCode = upgradeOrDowngradeTable(jsc, args[3], args[4]); + break; + default: + break; + } + } catch (Throwable throwable) { + LOG.error("Fail to execute command", throwable); + returnCode = -1; + } finally { + jsc.stop(); } System.exit(returnCode); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java index 43101d127..24e2828a5 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java @@ -110,6 +110,12 @@ public class HoodieCleaner { String dirName = new Path(cfg.basePath).getName(); JavaSparkContext jssc = UtilHelpers.buildSparkContext("hoodie-cleaner-" + dirName, cfg.sparkMaster); - new HoodieCleaner(cfg, jssc).run(); + try { + new HoodieCleaner(cfg, jssc).run(); + } catch (Throwable throwable) { + LOG.error("Fail to run cleaning for " + cfg.basePath, throwable); + } finally { + jssc.stop(); + } } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java index 6830f7f74..c1493e6a4 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java @@ -103,8 +103,14 @@ public class HoodieCompactor { System.exit(1); } final JavaSparkContext jsc = UtilHelpers.buildSparkContext("compactor-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory); - HoodieCompactor compactor = new HoodieCompactor(jsc, cfg); - compactor.compact(cfg.retry); + try { + HoodieCompactor compactor = new HoodieCompactor(jsc, cfg); + compactor.compact(cfg.retry); + } catch (Throwable throwable) { + LOG.error("Fail to run compaction for " + cfg.tableName, throwable); + } finally { + jsc.stop(); + } } public int compact(int retry) {