[HUDI-2073] Fix the bug of hoodieClusteringJob never quit (#3157)
Co-authored-by: yuezhang <yuezhang@freewheel.tv>
This commit is contained in:
@@ -149,12 +149,12 @@ public class HoodieClusteringJob {
|
|||||||
|
|
||||||
private int doCluster(JavaSparkContext jsc) throws Exception {
|
private int doCluster(JavaSparkContext jsc) throws Exception {
|
||||||
String schemaStr = getSchemaFromLatestInstant();
|
String schemaStr = getSchemaFromLatestInstant();
|
||||||
SparkRDDWriteClient<HoodieRecordPayload> client =
|
try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
|
||||||
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props);
|
|
||||||
JavaRDD<WriteStatus> writeResponse =
|
JavaRDD<WriteStatus> writeResponse =
|
||||||
client.cluster(cfg.clusteringInstantTime, true).getWriteStatuses();
|
client.cluster(cfg.clusteringInstantTime, true).getWriteStatuses();
|
||||||
return UtilHelpers.handleErrors(jsc, cfg.clusteringInstantTime, writeResponse);
|
return UtilHelpers.handleErrors(jsc, cfg.clusteringInstantTime, writeResponse);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@TestOnly
|
@TestOnly
|
||||||
public Option<String> doSchedule() throws Exception {
|
public Option<String> doSchedule() throws Exception {
|
||||||
@@ -163,12 +163,12 @@ public class HoodieClusteringJob {
|
|||||||
|
|
||||||
private Option<String> doSchedule(JavaSparkContext jsc) throws Exception {
|
private Option<String> doSchedule(JavaSparkContext jsc) throws Exception {
|
||||||
String schemaStr = getSchemaFromLatestInstant();
|
String schemaStr = getSchemaFromLatestInstant();
|
||||||
SparkRDDWriteClient<HoodieRecordPayload> client =
|
try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
|
||||||
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props);
|
|
||||||
if (cfg.clusteringInstantTime != null) {
|
if (cfg.clusteringInstantTime != null) {
|
||||||
client.scheduleClusteringAtInstant(cfg.clusteringInstantTime, Option.empty());
|
client.scheduleClusteringAtInstant(cfg.clusteringInstantTime, Option.empty());
|
||||||
return Option.of(cfg.clusteringInstantTime);
|
return Option.of(cfg.clusteringInstantTime);
|
||||||
}
|
}
|
||||||
return client.scheduleClustering(Option.empty());
|
return client.scheduleClustering(Option.empty());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user