diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngest.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngest.scala index 550ff9776..9ead7f290 100644 --- a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngest.scala +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngest.scala @@ -34,8 +34,9 @@ class SparkDataSourceContinuousIngest(val spark: SparkSession, val conf: Configu def startIngestion(): Unit = { val fs = sourcePath.getFileSystem(conf) + var checkPointFs = checkpointFile.getFileSystem(conf) var orderedBatch : Array[FileStatus] = null - if (fs.exists(checkpointFile)) { + if (checkPointFs.exists(checkpointFile)) { log.info("Checkpoint file exists. ") val checkpoint = spark.sparkContext.textFile(checkpointFile.toString).collect()(0) log.warn("Checkpoint to resume from " + checkpoint) @@ -69,7 +70,7 @@ class SparkDataSourceContinuousIngest(val spark: SparkSession, val conf: Configu val df = spark.read.format(sourceFormat).load(pathToConsume.toString) df.write.format("hudi").options(hudiOptions).mode(SaveMode.Append).save(hudiBasePath.toString) - writeToFile(checkpointFile, entry.getPath.getName, fs) + writeToFile(checkpointFile, entry.getPath.getName, checkPointFs) log.info("Completed batch " + entry + ". Moving to next batch. Sleeping for " + minSyncIntervalSeconds + " secs before next batch") Thread.sleep(minSyncIntervalSeconds * 1000) })