From 26eb7b8183767c1275aaeaa4c7cd39aee0a10792 Mon Sep 17 00:00:00 2001 From: satishm <84978833+data-storyteller@users.noreply.github.com> Date: Fri, 8 Apr 2022 16:46:01 +0530 Subject: [PATCH] [HUDI-3571] Spark datasource continuous checkpoint should have own fs variable (#5265) --- .../integ/testsuite/SparkDataSourceContinuousIngest.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngest.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngest.scala index 550ff9776..9ead7f290 100644 --- a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngest.scala +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngest.scala @@ -34,8 +34,9 @@ class SparkDataSourceContinuousIngest(val spark: SparkSession, val conf: Configu def startIngestion(): Unit = { val fs = sourcePath.getFileSystem(conf) + var checkPointFs = checkpointFile.getFileSystem(conf) var orderedBatch : Array[FileStatus] = null - if (fs.exists(checkpointFile)) { + if (checkPointFs.exists(checkpointFile)) { log.info("Checkpoint file exists. ") val checkpoint = spark.sparkContext.textFile(checkpointFile.toString).collect()(0) log.warn("Checkpoint to resume from " + checkpoint) @@ -69,7 +70,7 @@ class SparkDataSourceContinuousIngest(val spark: SparkSession, val conf: Configu val df = spark.read.format(sourceFormat).load(pathToConsume.toString) df.write.format("hudi").options(hudiOptions).mode(SaveMode.Append).save(hudiBasePath.toString) - writeToFile(checkpointFile, entry.getPath.getName, fs) + writeToFile(checkpointFile, entry.getPath.getName, checkPointFs) log.info("Completed batch " + entry + ". Moving to next batch. Sleeping for " + minSyncIntervalSeconds + " secs before next batch") Thread.sleep(minSyncIntervalSeconds * 1000) })