1
0

[HUDI-2494] Fixing glob pattern to skip all hoodie meta paths (#3768)

This commit is contained in:
Sivabalan Narayanan
2021-10-12 14:06:40 -04:00
committed by GitHub
parent 252c4ed380
commit 8a487eafa7
3 changed files with 37 additions and 15 deletions

View File

@@ -19,13 +19,13 @@
package org.apache.hudi package org.apache.hudi
import java.util.Properties import java.util.Properties
import org.apache.avro.Schema import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.client.utils.SparkRowSerDe import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator} import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator}
@@ -60,12 +60,28 @@ object HoodieSparkUtils extends SparkAdapterSupport {
} }
/** /**
* This method copied from [[org.apache.spark.deploy.SparkHadoopUtil]]. * This method is inspired from [[org.apache.spark.deploy.SparkHadoopUtil]] with some modifications like
* [[org.apache.spark.deploy.SparkHadoopUtil]] becomes private since Spark 3.0.0 and hence we had to copy it locally. * skipping meta paths.
*/ */
def globPath(fs: FileSystem, pattern: Path): Seq[Path] = { def globPath(fs: FileSystem, pattern: Path): Seq[Path] = {
Option(fs.globStatus(pattern)).map { statuses => // find base path to assist in skipping meta paths
statuses.map(_.getPath.makeQualified(fs.getUri, fs.getWorkingDirectory)).toSeq var basePath = pattern.getParent
while (basePath.getName.equals("*")) {
basePath = basePath.getParent
}
Option(fs.globStatus(pattern)).map { statuses => {
val nonMetaStatuses = statuses.filterNot(entry => {
// skip all entries in meta path
var leafPath = entry.getPath
// walk through every parent until we reach base path. if .hoodie is found anywhere, path needs to be skipped
while (!leafPath.equals(basePath) && !leafPath.getName.equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
leafPath = leafPath.getParent
}
leafPath.getName.equals(HoodieTableMetaClient.METAFOLDER_NAME)
})
nonMetaStatuses.map(_.getPath.makeQualified(fs.getUri, fs.getWorkingDirectory)).toSeq
}
}.getOrElse(Seq.empty[Path]) }.getOrElse(Seq.empty[Path])
} }
@@ -88,8 +104,7 @@ object HoodieSparkUtils extends SparkAdapterSupport {
def checkAndGlobPathIfNecessary(paths: Seq[String], fs: FileSystem): Seq[Path] = { def checkAndGlobPathIfNecessary(paths: Seq[String], fs: FileSystem): Seq[Path] = {
paths.flatMap(path => { paths.flatMap(path => {
val qualified = new Path(path).makeQualified(fs.getUri, fs.getWorkingDirectory) val qualified = new Path(path).makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPaths = globPathIfNecessary(fs, qualified) globPathIfNecessary(fs, qualified)
globPaths
}) })
} }

View File

@@ -41,14 +41,18 @@ class TestHoodieSparkUtils {
def testGlobPaths(@TempDir tempDir: File): Unit = { def testGlobPaths(@TempDir tempDir: File): Unit = {
val folders: Seq[Path] = Seq( val folders: Seq[Path] = Seq(
new Path(Paths.get(tempDir.getAbsolutePath, "folder1").toUri), new Path(Paths.get(tempDir.getAbsolutePath, "folder1").toUri),
new Path(Paths.get(tempDir.getAbsolutePath, "folder2").toUri) new Path(Paths.get(tempDir.getAbsolutePath, "folder2").toUri),
new Path(Paths.get(tempDir.getAbsolutePath, ".hoodie").toUri),
new Path(Paths.get(tempDir.getAbsolutePath, ".hoodie", "metadata").toUri)
) )
val files: Seq[Path] = Seq( val files: Seq[Path] = Seq(
new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file1").toUri), new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file1").toUri),
new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file2").toUri), new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file2").toUri),
new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file3").toUri), new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file3").toUri),
new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file4").toUri) new Path(Paths.get(tempDir.getAbsolutePath, "folder2","file4").toUri),
new Path(Paths.get(tempDir.getAbsolutePath, ".hoodie","metadata", "file5").toUri),
new Path(Paths.get(tempDir.getAbsolutePath, ".hoodie","metadata", "file6").toUri)
) )
folders.foreach(folder => new File(folder.toUri).mkdir()) folders.foreach(folder => new File(folder.toUri).mkdir())
@@ -57,12 +61,14 @@ class TestHoodieSparkUtils {
var paths = Seq(tempDir.getAbsolutePath + "/*") var paths = Seq(tempDir.getAbsolutePath + "/*")
var globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths, var globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths,
new Path(paths.head).getFileSystem(new Configuration())) new Path(paths.head).getFileSystem(new Configuration()))
assertEquals(folders.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString)) assertEquals(folders.filterNot(entry => entry.toString.contains(".hoodie"))
.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString))
paths = Seq(tempDir.getAbsolutePath + "/*/*") paths = Seq(tempDir.getAbsolutePath + "/*/*")
globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths, globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths,
new Path(paths.head).getFileSystem(new Configuration())) new Path(paths.head).getFileSystem(new Configuration()))
assertEquals(files.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString)) assertEquals(files.filterNot(entry => entry.toString.contains(".hoodie"))
.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString))
paths = Seq(tempDir.getAbsolutePath + "/folder1/*") paths = Seq(tempDir.getAbsolutePath + "/folder1/*")
globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths, globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths,
@@ -79,7 +85,8 @@ class TestHoodieSparkUtils {
paths = Seq(tempDir.getAbsolutePath + "/folder1/*", tempDir.getAbsolutePath + "/folder2/*") paths = Seq(tempDir.getAbsolutePath + "/folder1/*", tempDir.getAbsolutePath + "/folder2/*")
globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths, globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths,
new Path(paths.head).getFileSystem(new Configuration())) new Path(paths.head).getFileSystem(new Configuration()))
assertEquals(files.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString)) assertEquals(files.filterNot(entry => entry.toString.contains(".hoodie"))
.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString))
} }
@Test @Test

View File

@@ -419,7 +419,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
@Test def testSparkPartitonByWithCustomKeyGenerator(): Unit = { @Test def testSparkPartitonByWithCustomKeyGenerator(): Unit = {
// Without fieldType, the default is SIMPLE // Without fieldType, the default is SIMPLE
var writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName, false) var writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName)
writer.partitionBy("current_ts") writer.partitionBy("current_ts")
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.save(basePath) .save(basePath)
@@ -428,7 +428,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("current_ts").cast("string")).count() == 0) assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("current_ts").cast("string")).count() == 0)
// Specify fieldType as TIMESTAMP // Specify fieldType as TIMESTAMP
writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName, false) writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName)
writer.partitionBy("current_ts:TIMESTAMP") writer.partitionBy("current_ts:TIMESTAMP")
.option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS") .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS")
.option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd") .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd")
@@ -504,7 +504,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
} }
@Test def testSparkPartitonByWithTimestampBasedKeyGenerator() { @Test def testSparkPartitonByWithTimestampBasedKeyGenerator() {
val writer = getDataFrameWriter(classOf[TimestampBasedKeyGenerator].getName, false) val writer = getDataFrameWriter(classOf[TimestampBasedKeyGenerator].getName)
writer.partitionBy("current_ts") writer.partitionBy("current_ts")
.option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS") .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS")
.option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd") .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd")