[HUDI-438] Merge duplicated code fragment in HoodieSparkSqlWriter (#1114)
This commit is contained in:
@@ -24,6 +24,7 @@ import org.apache.avro.generic.GenericRecord
|
||||
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||
import org.apache.hadoop.hive.conf.HiveConf
|
||||
import org.apache.hudi.DataSourceWriteOptions._
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
|
||||
import org.apache.hudi.common.util.{FSUtils, TypedProperties}
|
||||
@@ -74,19 +75,14 @@ private[hudi] object HoodieSparkSqlWriter {
|
||||
parameters(OPERATION_OPT_KEY)
|
||||
}
|
||||
|
||||
var writeSuccessful: Boolean = false
|
||||
var writeStatuses: JavaRDD[WriteStatus] = null
|
||||
|
||||
val jsc = new JavaSparkContext(sparkContext)
|
||||
val basePath = new Path(parameters("path"))
|
||||
val commitTime = HoodieActiveTimeline.createNewInstantTime();
|
||||
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
|
||||
var exists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
|
||||
|
||||
// Running into issues wrt generic type conversion from Java to Scala. Couldn't make common code paths for
|
||||
// write and deletes. Specifically, instantiating client of type HoodieWriteClient<T extends HoodieRecordPayload>
|
||||
// is having issues. Hence some codes blocks are same in both if and else blocks.
|
||||
if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
|
||||
val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
|
||||
if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
|
||||
// register classes & schemas
|
||||
val structName = s"${tblName.get}_record"
|
||||
val nameSpace = s"hoodie.${tblName.get}"
|
||||
@@ -147,54 +143,8 @@ private[hudi] object HoodieSparkSqlWriter {
|
||||
(true, common.util.Option.empty())
|
||||
}
|
||||
client.startCommitWithTime(commitTime)
|
||||
writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation)
|
||||
// Check for errors and commit the write.
|
||||
val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
|
||||
writeSuccessful =
|
||||
if (errorCount == 0) {
|
||||
log.info("No errors. Proceeding to commit the write.")
|
||||
val metaMap = parameters.filter(kv =>
|
||||
kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
|
||||
val commitSuccess = if (metaMap.isEmpty) {
|
||||
client.commit(commitTime, writeStatuses)
|
||||
} else {
|
||||
client.commit(commitTime, writeStatuses,
|
||||
common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
|
||||
}
|
||||
|
||||
if (commitSuccess) {
|
||||
log.info("Commit " + commitTime + " successful!")
|
||||
}
|
||||
else {
|
||||
log.info("Commit " + commitTime + " failed!")
|
||||
}
|
||||
|
||||
val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
|
||||
val syncHiveSucess = if (hiveSyncEnabled) {
|
||||
log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
|
||||
val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
|
||||
syncHive(basePath, fs, parameters)
|
||||
} else {
|
||||
true
|
||||
}
|
||||
client.close()
|
||||
commitSuccess && syncHiveSucess
|
||||
} else {
|
||||
log.error(s"$operation failed with ${errorCount} errors :");
|
||||
if (log.isTraceEnabled) {
|
||||
log.trace("Printing out the top 100 errors")
|
||||
writeStatuses.rdd.filter(ws => ws.hasErrors)
|
||||
.take(100)
|
||||
.foreach(ws => {
|
||||
log.trace("Global error :", ws.getGlobalError)
|
||||
if (ws.getErrors.size() > 0) {
|
||||
ws.getErrors.foreach(kt =>
|
||||
log.trace(s"Error for key: ${kt._1}", kt._2))
|
||||
}
|
||||
})
|
||||
}
|
||||
false
|
||||
}
|
||||
val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation)
|
||||
(writeStatuses, client)
|
||||
} else {
|
||||
|
||||
// Handle save modes
|
||||
@@ -225,55 +175,12 @@ private[hudi] object HoodieSparkSqlWriter {
|
||||
|
||||
// Issue deletes
|
||||
client.startCommitWithTime(commitTime)
|
||||
writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, commitTime)
|
||||
val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
|
||||
writeSuccessful =
|
||||
if (errorCount == 0) {
|
||||
log.info("No errors. Proceeding to commit the write.")
|
||||
val metaMap = parameters.filter(kv =>
|
||||
kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
|
||||
val commitSuccess = if (metaMap.isEmpty) {
|
||||
client.commit(commitTime, writeStatuses)
|
||||
} else {
|
||||
client.commit(commitTime, writeStatuses,
|
||||
common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
|
||||
}
|
||||
|
||||
if (commitSuccess) {
|
||||
log.info("Commit " + commitTime + " successful!")
|
||||
}
|
||||
else {
|
||||
log.info("Commit " + commitTime + " failed!")
|
||||
}
|
||||
|
||||
val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
|
||||
val syncHiveSucess = if (hiveSyncEnabled) {
|
||||
log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
|
||||
val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
|
||||
syncHive(basePath, fs, parameters)
|
||||
} else {
|
||||
true
|
||||
}
|
||||
client.close()
|
||||
commitSuccess && syncHiveSucess
|
||||
} else {
|
||||
log.error(s"$operation failed with ${errorCount} errors :");
|
||||
if (log.isTraceEnabled) {
|
||||
log.trace("Printing out the top 100 errors")
|
||||
writeStatuses.rdd.filter(ws => ws.hasErrors)
|
||||
.take(100)
|
||||
.foreach(ws => {
|
||||
log.trace("Global error :", ws.getGlobalError)
|
||||
if (ws.getErrors.size() > 0) {
|
||||
ws.getErrors.foreach(kt =>
|
||||
log.trace(s"Error for key: ${kt._1}", kt._2))
|
||||
}
|
||||
})
|
||||
}
|
||||
false
|
||||
}
|
||||
val writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, commitTime)
|
||||
(writeStatuses, client)
|
||||
}
|
||||
|
||||
// Check for errors and commit the write.
|
||||
val writeSuccessful = checkWriteStatus(writeStatuses, parameters, writeClient, commitTime, basePath, operation, jsc)
|
||||
(writeSuccessful, common.util.Option.ofNullable(commitTime))
|
||||
}
|
||||
|
||||
@@ -340,4 +247,58 @@ private[hudi] object HoodieSparkSqlWriter {
|
||||
hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY)
|
||||
hiveSyncConfig
|
||||
}
|
||||
|
||||
private def checkWriteStatus(writeStatuses: JavaRDD[WriteStatus],
|
||||
parameters: Map[String, String],
|
||||
client: HoodieWriteClient[_],
|
||||
commitTime: String,
|
||||
basePath: Path,
|
||||
operation: String,
|
||||
jsc: JavaSparkContext): Boolean = {
|
||||
val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
|
||||
if (errorCount == 0) {
|
||||
log.info("No errors. Proceeding to commit the write.")
|
||||
val metaMap = parameters.filter(kv =>
|
||||
kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
|
||||
val commitSuccess = if (metaMap.isEmpty) {
|
||||
client.commit(commitTime, writeStatuses)
|
||||
} else {
|
||||
client.commit(commitTime, writeStatuses,
|
||||
common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
|
||||
}
|
||||
|
||||
if (commitSuccess) {
|
||||
log.info("Commit " + commitTime + " successful!")
|
||||
}
|
||||
else {
|
||||
log.info("Commit " + commitTime + " failed!")
|
||||
}
|
||||
|
||||
val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
|
||||
val syncHiveSucess = if (hiveSyncEnabled) {
|
||||
log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
|
||||
val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
|
||||
syncHive(basePath, fs, parameters)
|
||||
} else {
|
||||
true
|
||||
}
|
||||
client.close()
|
||||
commitSuccess && syncHiveSucess
|
||||
} else {
|
||||
log.error(s"$operation failed with ${errorCount} errors :");
|
||||
if (log.isTraceEnabled) {
|
||||
log.trace("Printing out the top 100 errors")
|
||||
writeStatuses.rdd.filter(ws => ws.hasErrors)
|
||||
.take(100)
|
||||
.foreach(ws => {
|
||||
log.trace("Global error :", ws.getGlobalError)
|
||||
if (ws.getErrors.size() > 0) {
|
||||
ws.getErrors.foreach(kt =>
|
||||
log.trace(s"Error for key: ${kt._1}", kt._2))
|
||||
}
|
||||
})
|
||||
}
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user