[HUDI-415] Get commit time when Spark start (#1113)
This commit is contained in:
committed by
vinoth chandar
parent
14881e99e0
commit
36b3b6f5dd
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
|
|||||||
import org.apache.hadoop.hive.conf.HiveConf
|
import org.apache.hadoop.hive.conf.HiveConf
|
||||||
import org.apache.hudi.DataSourceWriteOptions._
|
import org.apache.hudi.DataSourceWriteOptions._
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient
|
import org.apache.hudi.common.table.HoodieTableMetaClient
|
||||||
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
|
||||||
import org.apache.hudi.common.util.{FSUtils, TypedProperties}
|
import org.apache.hudi.common.util.{FSUtils, TypedProperties}
|
||||||
import org.apache.hudi.config.HoodieWriteConfig
|
import org.apache.hudi.config.HoodieWriteConfig
|
||||||
import org.apache.hudi.exception.HoodieException
|
import org.apache.hudi.exception.HoodieException
|
||||||
@@ -74,11 +75,11 @@ private[hudi] object HoodieSparkSqlWriter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var writeSuccessful: Boolean = false
|
var writeSuccessful: Boolean = false
|
||||||
var commitTime: String = null
|
|
||||||
var writeStatuses: JavaRDD[WriteStatus] = null
|
var writeStatuses: JavaRDD[WriteStatus] = null
|
||||||
|
|
||||||
val jsc = new JavaSparkContext(sparkContext)
|
val jsc = new JavaSparkContext(sparkContext)
|
||||||
val basePath = new Path(parameters("path"))
|
val basePath = new Path(parameters("path"))
|
||||||
|
val commitTime = HoodieActiveTimeline.createNewInstantTime();
|
||||||
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
|
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
|
||||||
var exists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
|
var exists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
|
||||||
|
|
||||||
@@ -145,7 +146,7 @@ private[hudi] object HoodieSparkSqlWriter {
|
|||||||
log.info("new batch has no new records, skipping...")
|
log.info("new batch has no new records, skipping...")
|
||||||
return (true, common.util.Option.empty())
|
return (true, common.util.Option.empty())
|
||||||
}
|
}
|
||||||
commitTime = client.startCommit()
|
client.startCommitWithTime(commitTime)
|
||||||
writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation)
|
writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation)
|
||||||
// Check for errors and commit the write.
|
// Check for errors and commit the write.
|
||||||
val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
|
val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
|
||||||
@@ -223,7 +224,7 @@ private[hudi] object HoodieSparkSqlWriter {
|
|||||||
)
|
)
|
||||||
|
|
||||||
// Issue deletes
|
// Issue deletes
|
||||||
commitTime = client.startCommit()
|
client.startCommitWithTime(commitTime)
|
||||||
writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, commitTime)
|
writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, commitTime)
|
||||||
val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
|
val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
|
||||||
writeSuccessful =
|
writeSuccessful =
|
||||||
|
|||||||
Reference in New Issue
Block a user