1
0

add jssc.stop() (#797)

This commit is contained in:
eisig
2019-07-19 20:01:45 +08:00
committed by vinoth chandar
parent 6efa16317c
commit 9857c4b21c
2 changed files with 14 additions and 4 deletions

View File

@@ -61,6 +61,7 @@ import scala.Tuple2;
* Loads data from Parquet Sources * Loads data from Parquet Sources
*/ */
public class HDFSParquetImporter implements Serializable { public class HDFSParquetImporter implements Serializable {
private static volatile Logger log = LogManager.getLogger(HDFSParquetImporter.class); private static volatile Logger log = LogManager.getLogger(HDFSParquetImporter.class);
public static final SimpleDateFormat PARTITION_FORMATTER = new SimpleDateFormat("yyyy/MM/dd"); public static final SimpleDateFormat PARTITION_FORMATTER = new SimpleDateFormat("yyyy/MM/dd");
@@ -87,9 +88,14 @@ public class HDFSParquetImporter implements Serializable {
System.exit(1); System.exit(1);
} }
HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg); HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
dataImporter JavaSparkContext jssc = UtilHelpers
.dataImport(UtilHelpers.buildSparkContext("data-importer-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory), .buildSparkContext("data-importer-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory);
cfg.retry); try {
dataImporter.dataImport(jssc, cfg.retry);
} finally {
jssc.stop();
}
} }
public int dataImport(JavaSparkContext jsc, int retry) throws Exception { public int dataImport(JavaSparkContext jsc, int retry) throws Exception {

View File

@@ -280,7 +280,11 @@ public class HoodieDeltaStreamer implements Serializable {
Map<String, String> additionalSparkConfigs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg); Map<String, String> additionalSparkConfigs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
JavaSparkContext jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, JavaSparkContext jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName,
cfg.sparkMaster, additionalSparkConfigs); cfg.sparkMaster, additionalSparkConfigs);
try {
new HoodieDeltaStreamer(cfg, jssc).sync(); new HoodieDeltaStreamer(cfg, jssc).sync();
} finally {
jssc.stop();
}
} }
/** /**