[HUDI-3571] Spark datasource continuous checkpoint should have own fs variable (#5265)
This commit is contained in:
@@ -34,8 +34,9 @@ class SparkDataSourceContinuousIngest(val spark: SparkSession, val conf: Configu
|
|||||||
|
|
||||||
def startIngestion(): Unit = {
|
def startIngestion(): Unit = {
|
||||||
val fs = sourcePath.getFileSystem(conf)
|
val fs = sourcePath.getFileSystem(conf)
|
||||||
|
var checkPointFs = checkpointFile.getFileSystem(conf)
|
||||||
var orderedBatch : Array[FileStatus] = null
|
var orderedBatch : Array[FileStatus] = null
|
||||||
if (fs.exists(checkpointFile)) {
|
if (checkPointFs.exists(checkpointFile)) {
|
||||||
log.info("Checkpoint file exists. ")
|
log.info("Checkpoint file exists. ")
|
||||||
val checkpoint = spark.sparkContext.textFile(checkpointFile.toString).collect()(0)
|
val checkpoint = spark.sparkContext.textFile(checkpointFile.toString).collect()(0)
|
||||||
log.warn("Checkpoint to resume from " + checkpoint)
|
log.warn("Checkpoint to resume from " + checkpoint)
|
||||||
@@ -69,7 +70,7 @@ class SparkDataSourceContinuousIngest(val spark: SparkSession, val conf: Configu
|
|||||||
val df = spark.read.format(sourceFormat).load(pathToConsume.toString)
|
val df = spark.read.format(sourceFormat).load(pathToConsume.toString)
|
||||||
|
|
||||||
df.write.format("hudi").options(hudiOptions).mode(SaveMode.Append).save(hudiBasePath.toString)
|
df.write.format("hudi").options(hudiOptions).mode(SaveMode.Append).save(hudiBasePath.toString)
|
||||||
writeToFile(checkpointFile, entry.getPath.getName, fs)
|
writeToFile(checkpointFile, entry.getPath.getName, checkPointFs)
|
||||||
log.info("Completed batch " + entry + ". Moving to next batch. Sleeping for " + minSyncIntervalSeconds + " secs before next batch")
|
log.info("Completed batch " + entry + ". Moving to next batch. Sleeping for " + minSyncIntervalSeconds + " secs before next batch")
|
||||||
Thread.sleep(minSyncIntervalSeconds * 1000)
|
Thread.sleep(minSyncIntervalSeconds * 1000)
|
||||||
})
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user