diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index ed0ab9742..172bbc491 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -19,13 +19,13 @@ package org.apache.hudi import java.util.Properties - import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hudi.client.utils.SparkRowSerDe import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator} @@ -60,12 +60,28 @@ object HoodieSparkUtils extends SparkAdapterSupport { } /** - * This method copied from [[org.apache.spark.deploy.SparkHadoopUtil]]. - * [[org.apache.spark.deploy.SparkHadoopUtil]] becomes private since Spark 3.0.0 and hence we had to copy it locally. + * This method is inspired from [[org.apache.spark.deploy.SparkHadoopUtil]] with some modifications like + * skipping meta paths. */ def globPath(fs: FileSystem, pattern: Path): Seq[Path] = { - Option(fs.globStatus(pattern)).map { statuses => - statuses.map(_.getPath.makeQualified(fs.getUri, fs.getWorkingDirectory)).toSeq + // find base path to assist in skipping meta paths + var basePath = pattern.getParent + while (basePath.getName.equals("*")) { + basePath = basePath.getParent + } + + Option(fs.globStatus(pattern)).map { statuses => { + val nonMetaStatuses = statuses.filterNot(entry => { + // skip all entries in meta path + var leafPath = entry.getPath + // walk through every parent until we reach base path. if .hoodie is found anywhere, path needs to be skipped + while (!leafPath.equals(basePath) && !leafPath.getName.equals(HoodieTableMetaClient.METAFOLDER_NAME)) { + leafPath = leafPath.getParent + } + leafPath.getName.equals(HoodieTableMetaClient.METAFOLDER_NAME) + }) + nonMetaStatuses.map(_.getPath.makeQualified(fs.getUri, fs.getWorkingDirectory)).toSeq + } }.getOrElse(Seq.empty[Path]) } @@ -88,8 +104,7 @@ object HoodieSparkUtils extends SparkAdapterSupport { def checkAndGlobPathIfNecessary(paths: Seq[String], fs: FileSystem): Seq[Path] = { paths.flatMap(path => { val qualified = new Path(path).makeQualified(fs.getUri, fs.getWorkingDirectory) - val globPaths = globPathIfNecessary(fs, qualified) - globPaths + globPathIfNecessary(fs, qualified) }) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala index b86eade9b..1b756b5e2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala @@ -41,14 +41,18 @@ class TestHoodieSparkUtils { def testGlobPaths(@TempDir tempDir: File): Unit = { val folders: Seq[Path] = Seq( new Path(Paths.get(tempDir.getAbsolutePath, "folder1").toUri), - new Path(Paths.get(tempDir.getAbsolutePath, "folder2").toUri) + new Path(Paths.get(tempDir.getAbsolutePath, "folder2").toUri), + new Path(Paths.get(tempDir.getAbsolutePath, ".hoodie").toUri), + new Path(Paths.get(tempDir.getAbsolutePath, ".hoodie", "metadata").toUri) ) val files: Seq[Path] = Seq( new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file1").toUri), new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file2").toUri), new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file3").toUri), - new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file4").toUri) + new Path(Paths.get(tempDir.getAbsolutePath, "folder2","file4").toUri), + new Path(Paths.get(tempDir.getAbsolutePath, ".hoodie","metadata", "file5").toUri), + new Path(Paths.get(tempDir.getAbsolutePath, ".hoodie","metadata", "file6").toUri) ) folders.foreach(folder => new File(folder.toUri).mkdir()) @@ -57,12 +61,14 @@ class TestHoodieSparkUtils { var paths = Seq(tempDir.getAbsolutePath + "/*") var globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths, new Path(paths.head).getFileSystem(new Configuration())) - assertEquals(folders.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString)) + assertEquals(folders.filterNot(entry => entry.toString.contains(".hoodie")) + .sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString)) paths = Seq(tempDir.getAbsolutePath + "/*/*") globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths, new Path(paths.head).getFileSystem(new Configuration())) - assertEquals(files.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString)) + assertEquals(files.filterNot(entry => entry.toString.contains(".hoodie")) + .sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString)) paths = Seq(tempDir.getAbsolutePath + "/folder1/*") globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths, @@ -79,7 +85,8 @@ class TestHoodieSparkUtils { paths = Seq(tempDir.getAbsolutePath + "/folder1/*", tempDir.getAbsolutePath + "/folder2/*") globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths, new Path(paths.head).getFileSystem(new Configuration())) - assertEquals(files.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString)) + assertEquals(files.filterNot(entry => entry.toString.contains(".hoodie")) + .sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString)) } @Test diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index ffe2b4e05..663493438 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -419,7 +419,7 @@ class TestCOWDataSource extends HoodieClientTestBase { @Test def testSparkPartitonByWithCustomKeyGenerator(): Unit = { // Without fieldType, the default is SIMPLE - var writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName, false) + var writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName) writer.partitionBy("current_ts") .mode(SaveMode.Overwrite) .save(basePath) @@ -428,7 +428,7 @@ class TestCOWDataSource extends HoodieClientTestBase { assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("current_ts").cast("string")).count() == 0) // Specify fieldType as TIMESTAMP - writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName, false) + writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName) writer.partitionBy("current_ts:TIMESTAMP") .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS") .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd") @@ -504,7 +504,7 @@ class TestCOWDataSource extends HoodieClientTestBase { } @Test def testSparkPartitonByWithTimestampBasedKeyGenerator() { - val writer = getDataFrameWriter(classOf[TimestampBasedKeyGenerator].getName, false) + val writer = getDataFrameWriter(classOf[TimestampBasedKeyGenerator].getName) writer.partitionBy("current_ts") .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS") .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd")