1
0

Close Hoodie Clients which are opened to properly shutdown embedded timeline service

This commit is contained in:
Balaji Varadarajan
2019-06-11 12:58:58 -07:00
committed by vinoth chandar
parent 065173211e
commit 51d122b5c3
2 changed files with 52 additions and 42 deletions

View File

@@ -183,14 +183,19 @@ public class DataSourceUtils {
public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc,
JavaRDD<HoodieRecord> incomingHoodieRecords, JavaRDD<HoodieRecord> incomingHoodieRecords,
HoodieWriteConfig writeConfig) throws Exception { HoodieWriteConfig writeConfig) throws Exception {
HoodieReadClient client = null;
try { try {
HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig); client = new HoodieReadClient<>(jssc, writeConfig);
return client.tagLocation(incomingHoodieRecords) return client.tagLocation(incomingHoodieRecords)
.filter(r -> !((HoodieRecord<HoodieRecordPayload>) r).isCurrentLocationKnown()); .filter(r -> !((HoodieRecord<HoodieRecordPayload>) r).isCurrentLocationKnown());
} catch (DatasetNotFoundException e) { } catch (DatasetNotFoundException e) {
// this will be executed when there is no hoodie dataset yet // this will be executed when there is no hoodie dataset yet
// so no dups to drop // so no dups to drop
return incomingHoodieRecords; return incomingHoodieRecords;
} finally {
if (null != client) {
client.close();
}
} }
} }

View File

@@ -58,47 +58,52 @@ public class HoodieCompactionAdminTool {
*/ */
public void run(JavaSparkContext jsc) throws Exception { public void run(JavaSparkContext jsc) throws Exception {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.basePath);
CompactionAdminClient admin = new CompactionAdminClient(jsc, cfg.basePath); final CompactionAdminClient admin = new CompactionAdminClient(jsc, cfg.basePath);
final FileSystem fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration()); try {
if (cfg.outputPath != null && fs.exists(new Path(cfg.outputPath))) { final FileSystem fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration());
throw new IllegalStateException("Output File Path already exists"); if (cfg.outputPath != null && fs.exists(new Path(cfg.outputPath))) {
} throw new IllegalStateException("Output File Path already exists");
switch (cfg.operation) { }
case VALIDATE: switch (cfg.operation) {
List<ValidationOpResult> res = case VALIDATE:
admin.validateCompactionPlan(metaClient, cfg.compactionInstantTime, cfg.parallelism); List<ValidationOpResult> res =
if (cfg.printOutput) { admin.validateCompactionPlan(metaClient, cfg.compactionInstantTime, cfg.parallelism);
printOperationResult("Result of Validation Operation :", res); if (cfg.printOutput) {
} printOperationResult("Result of Validation Operation :", res);
serializeOperationResult(fs, res); }
break; serializeOperationResult(fs, res);
case UNSCHEDULE_FILE: break;
List<RenameOpResult> r = case UNSCHEDULE_FILE:
admin.unscheduleCompactionFileId(new HoodieFileGroupId(cfg.partitionPath, cfg.fileId), List<RenameOpResult> r =
cfg.skipValidation, cfg.dryRun); admin.unscheduleCompactionFileId(new HoodieFileGroupId(cfg.partitionPath, cfg.fileId),
if (cfg.printOutput) { cfg.skipValidation, cfg.dryRun);
System.out.println(r); if (cfg.printOutput) {
} System.out.println(r);
serializeOperationResult(fs, r); }
break; serializeOperationResult(fs, r);
case UNSCHEDULE_PLAN: break;
List<RenameOpResult> r2 = case UNSCHEDULE_PLAN:
admin.unscheduleCompactionPlan(cfg.compactionInstantTime, cfg.skipValidation, cfg.parallelism, cfg.dryRun); List<RenameOpResult> r2 =
if (cfg.printOutput) { admin
printOperationResult("Result of Unscheduling Compaction Plan :", r2); .unscheduleCompactionPlan(cfg.compactionInstantTime, cfg.skipValidation, cfg.parallelism, cfg.dryRun);
} if (cfg.printOutput) {
serializeOperationResult(fs, r2); printOperationResult("Result of Unscheduling Compaction Plan :", r2);
break; }
case REPAIR: serializeOperationResult(fs, r2);
List<RenameOpResult> r3 = break;
admin.repairCompaction(cfg.compactionInstantTime, cfg.parallelism, cfg.dryRun); case REPAIR:
if (cfg.printOutput) { List<RenameOpResult> r3 =
printOperationResult("Result of Repair Operation :", r3); admin.repairCompaction(cfg.compactionInstantTime, cfg.parallelism, cfg.dryRun);
} if (cfg.printOutput) {
serializeOperationResult(fs, r3); printOperationResult("Result of Repair Operation :", r3);
break; }
default: serializeOperationResult(fs, r3);
throw new IllegalStateException("Not yet implemented !!"); break;
default:
throw new IllegalStateException("Not yet implemented !!");
}
} finally {
admin.close();
} }
} }