diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java index 0d70f8aa1..340126662 100644 --- a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java +++ b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java @@ -183,14 +183,19 @@ public class DataSourceUtils { public static JavaRDD dropDuplicates(JavaSparkContext jssc, JavaRDD incomingHoodieRecords, HoodieWriteConfig writeConfig) throws Exception { + HoodieReadClient client = null; try { - HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig); + client = new HoodieReadClient<>(jssc, writeConfig); return client.tagLocation(incomingHoodieRecords) .filter(r -> !((HoodieRecord) r).isCurrentLocationKnown()); } catch (DatasetNotFoundException e) { // this will be executed when there is no hoodie dataset yet // so no dups to drop return incomingHoodieRecords; + } finally { + if (null != client) { + client.close(); + } } } diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactionAdminTool.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactionAdminTool.java index 4f5c95d68..0c640f6d5 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactionAdminTool.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactionAdminTool.java @@ -58,47 +58,52 @@ public class HoodieCompactionAdminTool { */ public void run(JavaSparkContext jsc) throws Exception { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.basePath); - CompactionAdminClient admin = new CompactionAdminClient(jsc, cfg.basePath); - final FileSystem fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration()); - if (cfg.outputPath != null && fs.exists(new Path(cfg.outputPath))) { - throw new IllegalStateException("Output File Path already exists"); - } - switch (cfg.operation) { - case VALIDATE: - List res = - admin.validateCompactionPlan(metaClient, cfg.compactionInstantTime, cfg.parallelism); - if (cfg.printOutput) { - printOperationResult("Result of Validation Operation :", res); - } - serializeOperationResult(fs, res); - break; - case UNSCHEDULE_FILE: - List r = - admin.unscheduleCompactionFileId(new HoodieFileGroupId(cfg.partitionPath, cfg.fileId), - cfg.skipValidation, cfg.dryRun); - if (cfg.printOutput) { - System.out.println(r); - } - serializeOperationResult(fs, r); - break; - case UNSCHEDULE_PLAN: - List r2 = - admin.unscheduleCompactionPlan(cfg.compactionInstantTime, cfg.skipValidation, cfg.parallelism, cfg.dryRun); - if (cfg.printOutput) { - printOperationResult("Result of Unscheduling Compaction Plan :", r2); - } - serializeOperationResult(fs, r2); - break; - case REPAIR: - List r3 = - admin.repairCompaction(cfg.compactionInstantTime, cfg.parallelism, cfg.dryRun); - if (cfg.printOutput) { - printOperationResult("Result of Repair Operation :", r3); - } - serializeOperationResult(fs, r3); - break; - default: - throw new IllegalStateException("Not yet implemented !!"); + final CompactionAdminClient admin = new CompactionAdminClient(jsc, cfg.basePath); + try { + final FileSystem fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration()); + if (cfg.outputPath != null && fs.exists(new Path(cfg.outputPath))) { + throw new IllegalStateException("Output File Path already exists"); + } + switch (cfg.operation) { + case VALIDATE: + List res = + admin.validateCompactionPlan(metaClient, cfg.compactionInstantTime, cfg.parallelism); + if (cfg.printOutput) { + printOperationResult("Result of Validation Operation :", res); + } + serializeOperationResult(fs, res); + break; + case UNSCHEDULE_FILE: + List r = + admin.unscheduleCompactionFileId(new HoodieFileGroupId(cfg.partitionPath, cfg.fileId), + cfg.skipValidation, cfg.dryRun); + if (cfg.printOutput) { + System.out.println(r); + } + serializeOperationResult(fs, r); + break; + case UNSCHEDULE_PLAN: + List r2 = + admin + .unscheduleCompactionPlan(cfg.compactionInstantTime, cfg.skipValidation, cfg.parallelism, cfg.dryRun); + if (cfg.printOutput) { + printOperationResult("Result of Unscheduling Compaction Plan :", r2); + } + serializeOperationResult(fs, r2); + break; + case REPAIR: + List r3 = + admin.repairCompaction(cfg.compactionInstantTime, cfg.parallelism, cfg.dryRun); + if (cfg.printOutput) { + printOperationResult("Result of Repair Operation :", r3); + } + serializeOperationResult(fs, r3); + break; + default: + throw new IllegalStateException("Not yet implemented !!"); + } + } finally { + admin.close(); } }