diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HDFSParquetImporter.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HDFSParquetImporter.java index c446afb8c..661f1a114 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HDFSParquetImporter.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HDFSParquetImporter.java @@ -61,6 +61,7 @@ import scala.Tuple2; * Loads data from Parquet Sources */ public class HDFSParquetImporter implements Serializable { + private static volatile Logger log = LogManager.getLogger(HDFSParquetImporter.class); public static final SimpleDateFormat PARTITION_FORMATTER = new SimpleDateFormat("yyyy/MM/dd"); @@ -87,9 +88,14 @@ public class HDFSParquetImporter implements Serializable { System.exit(1); } HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg); - dataImporter - .dataImport(UtilHelpers.buildSparkContext("data-importer-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory), - cfg.retry); + JavaSparkContext jssc = UtilHelpers + .buildSparkContext("data-importer-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory); + try { + dataImporter.dataImport(jssc, cfg.retry); + } finally { + jssc.stop(); + } + } public int dataImport(JavaSparkContext jsc, int retry) throws Exception { diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java index 195154680..1d0aca35f 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -280,7 +280,11 @@ public class HoodieDeltaStreamer implements Serializable { Map additionalSparkConfigs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg); JavaSparkContext jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, cfg.sparkMaster, additionalSparkConfigs); - new HoodieDeltaStreamer(cfg, jssc).sync(); + try { + new HoodieDeltaStreamer(cfg, jssc).sync(); + } finally { + jssc.stop(); + } } /**