Fixed TableNotFoundException when write with structured streaming (#778)
- When write to a new hoodie table, if checkpoint dir is under target path, Spark will create the base path and thus skip initializing .hoodie which result in error - apply .hoodie existent check for all save mode
This commit is contained in:
committed by
vinoth chandar
parent
62ecb2da62
commit
11c4121f73
@@ -100,23 +100,23 @@ private[hoodie] object HoodieSparkSqlWriter {
|
|||||||
|
|
||||||
val basePath = new Path(parameters("path"))
|
val basePath = new Path(parameters("path"))
|
||||||
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
|
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
|
||||||
var exists = fs.exists(basePath)
|
var exists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
|
||||||
|
|
||||||
// Handle various save modes
|
// Handle various save modes
|
||||||
if (mode == SaveMode.ErrorIfExists && exists) {
|
if (mode == SaveMode.ErrorIfExists && exists) {
|
||||||
throw new HoodieException(s"basePath ${basePath} already exists.")
|
throw new HoodieException(s"hoodie dataset at $basePath already exists.")
|
||||||
}
|
}
|
||||||
if (mode == SaveMode.Ignore && exists) {
|
if (mode == SaveMode.Ignore && exists) {
|
||||||
log.warn(s" basePath ${basePath} already exists. Ignoring & not performing actual writes.")
|
log.warn(s"hoodie dataset at $basePath already exists. Ignoring & not performing actual writes.")
|
||||||
return (true, None)
|
return (true, None)
|
||||||
}
|
}
|
||||||
if (mode == SaveMode.Overwrite && exists) {
|
if (mode == SaveMode.Overwrite && exists) {
|
||||||
log.warn(s" basePath ${basePath} already exists. Deleting existing data & overwriting with new data.")
|
log.warn(s"hoodie dataset at $basePath already exists. Deleting existing data & overwriting with new data.")
|
||||||
fs.delete(basePath, true)
|
fs.delete(basePath, true)
|
||||||
exists = false
|
exists = false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the dataset if not present (APPEND mode)
|
// Create the dataset if not present
|
||||||
if (!exists) {
|
if (!exists) {
|
||||||
HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, storageType,
|
HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, storageType,
|
||||||
tblName.get, "archived")
|
tblName.get, "archived")
|
||||||
|
|||||||
Reference in New Issue
Block a user