[HUDI-2089]fix the bug that metatable cannot support non_partition table (#3182)
This commit is contained in:
@@ -318,7 +318,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
||||
createInstantTime);
|
||||
}).forEach(status -> {
|
||||
HoodieWriteStat writeStat = new HoodieWriteStat();
|
||||
writeStat.setPath(partition + Path.SEPARATOR + status.getPath().getName());
|
||||
writeStat.setPath((partition.isEmpty() ? "" : partition + Path.SEPARATOR) + status.getPath().getName());
|
||||
writeStat.setPartitionPath(partition);
|
||||
writeStat.setTotalWriteBytes(status.getLen());
|
||||
commitMetadata.addWriteStat(partition, writeStat);
|
||||
@@ -374,9 +374,10 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
||||
.collect(Collectors.toList());
|
||||
|
||||
if (p.getRight().length > filesInDir.size()) {
|
||||
// Is a partition. Add all data files to result.
|
||||
String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetMetaClient.getBasePath()), p.getLeft());
|
||||
partitionToFileStatus.put(partitionName, filesInDir);
|
||||
// deal with Non-partition table, we should exclude .hoodie
|
||||
partitionToFileStatus.put(partitionName, filesInDir.stream()
|
||||
.filter(f -> !f.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)).collect(Collectors.toList()));
|
||||
} else {
|
||||
// Add sub-dirs to the queue
|
||||
pathsToList.addAll(Arrays.stream(p.getRight())
|
||||
|
||||
@@ -700,4 +700,50 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("test Non partition table with metatable support") {
|
||||
List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).foreach { tableType =>
|
||||
initSparkContext("testNonPartitionTableWithMetaTable")
|
||||
initSparkContext("test_schema_evolution")
|
||||
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
|
||||
val basePath = path.toAbsolutePath.toString
|
||||
try {
|
||||
val df = spark.range(0, 10).toDF("keyid")
|
||||
.withColumn("col3", expr("keyid"))
|
||||
.withColumn("age", expr("keyid + 1000"))
|
||||
|
||||
df.write.format("hudi")
|
||||
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key, tableType)
|
||||
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY.key, "col3")
|
||||
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY.key, "keyid")
|
||||
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY.key, "")
|
||||
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY.key, "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
|
||||
.option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, "insert")
|
||||
.option("hoodie.insert.shuffle.parallelism", "1")
|
||||
.option("hoodie.metadata.enable", "true")
|
||||
.option(HoodieWriteConfig.TABLE_NAME.key, "hoodie_test")
|
||||
.mode(SaveMode.Overwrite).save(basePath)
|
||||
// upsert same record again
|
||||
val df_update = spark.range(0, 10).toDF("keyid")
|
||||
.withColumn("col3", expr("keyid"))
|
||||
.withColumn("age", expr("keyid + 2000"))
|
||||
df_update.write.format("hudi")
|
||||
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key, tableType)
|
||||
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY.key, "col3")
|
||||
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY.key, "keyid")
|
||||
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY.key, "")
|
||||
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY.key, "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
|
||||
.option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, "upsert")
|
||||
.option("hoodie.upsert.shuffle.parallelism", "1")
|
||||
.option("hoodie.metadata.enable", "true")
|
||||
.option(HoodieWriteConfig.TABLE_NAME.key, "hoodie_test")
|
||||
.mode(SaveMode.Append).save(basePath)
|
||||
assert(spark.read.format("hudi").load(basePath).count() == 10)
|
||||
assert(spark.read.format("hudi").load(basePath).where("age >= 2000").count() == 10)
|
||||
} finally {
|
||||
spark.stop()
|
||||
FileUtils.deleteDirectory(path.toFile)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user