1
0

[HUDI-1102] Add common useful Spark related and Table path detection utilities (#1841)

Co-authored-by: Mehrotra <uditme@amazon.com>
This commit is contained in:
Udit Mehrotra
2020-07-18 16:16:32 -07:00
committed by GitHub
parent bf1d36fa63
commit 1aae437257
6 changed files with 415 additions and 0 deletions

View File

@@ -18,6 +18,8 @@
package org.apache.hudi;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -28,6 +30,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.TablePathUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -45,6 +48,8 @@ import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericRecord;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -61,6 +66,8 @@ import java.util.stream.Collectors;
*/
public class DataSourceUtils {
private static final Logger LOG = LogManager.getLogger(DataSourceUtils.class);
/**
* Obtain value of the provided field as string, denoted by dot notation. e.g: a.b.c
*/
@@ -105,6 +112,22 @@ public class DataSourceUtils {
}
}
public static String getTablePath(FileSystem fs, Path[] userProvidedPaths) throws IOException {
LOG.info("Getting table path..");
for (Path path: userProvidedPaths) {
try {
Option<Path> tablePath = TablePathUtils.getTablePath(fs, path);
if (tablePath.isPresent()) {
return tablePath.get().toString();
}
} catch (HoodieException he) {
LOG.warn("Error trying to get table path from " + path.toString(), he);
}
}
throw new TableNotFoundException("Unable to find a hudi table for the user provided paths.");
}
/**
* This method converts values for fields with certain Avro/Parquet data types that require special handling.
*

View File

@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.common.model.HoodieRecord
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import scala.collection.JavaConverters._
object HudiSparkUtils {
def getHudiMetadataSchema: StructType = {
StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => {
StructField(col, StringType, nullable = true)
}))
}
def checkAndGlobPathIfNecessary(paths: Seq[String], fs: FileSystem): Seq[Path] = {
paths.flatMap(path => {
val qualified = new Path(path).makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPaths = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
globPaths
})
}
def createInMemoryFileIndex(sparkSession: SparkSession, globbedPaths: Seq[Path]): InMemoryFileIndex = {
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, fileStatusCache)
}
}

View File

@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import java.io.File
import java.nio.file.Paths
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.io.TempDir
class TestHudiSparkUtils {
@Test
def testGlobPaths(@TempDir tempDir: File): Unit = {
val folders: Seq[Path] = Seq(
new Path(Paths.get(tempDir.getAbsolutePath, "folder1").toUri),
new Path(Paths.get(tempDir.getAbsolutePath, "folder2").toUri)
)
val files: Seq[Path] = Seq(
new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file1").toUri),
new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file2").toUri),
new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file3").toUri),
new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file4").toUri)
)
folders.foreach(folder => new File(folder.toUri).mkdir())
files.foreach(file => new File(file.toUri).createNewFile())
var paths = Seq(tempDir.getAbsolutePath + "/*")
var globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths,
new Path(paths.head).getFileSystem(new Configuration()))
assertEquals(folders.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString))
paths = Seq(tempDir.getAbsolutePath + "/*/*")
globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths,
new Path(paths.head).getFileSystem(new Configuration()))
assertEquals(files.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString))
paths = Seq(tempDir.getAbsolutePath + "/folder1/*")
globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths,
new Path(paths.head).getFileSystem(new Configuration()))
assertEquals(Seq(files(0), files(1)).sortWith(_.toString < _.toString),
globbedPaths.sortWith(_.toString < _.toString))
paths = Seq(tempDir.getAbsolutePath + "/folder2/*")
globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths,
new Path(paths.head).getFileSystem(new Configuration()))
assertEquals(Seq(files(2), files(3)).sortWith(_.toString < _.toString),
globbedPaths.sortWith(_.toString < _.toString))
paths = Seq(tempDir.getAbsolutePath + "/folder1/*", tempDir.getAbsolutePath + "/folder2/*")
globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths,
new Path(paths.head).getFileSystem(new Configuration()))
assertEquals(files.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString))
}
@Test
def testCreateInMemoryIndex(@TempDir tempDir: File): Unit = {
val spark = SparkSession.builder
.appName("Hoodie Datasource test")
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate
val folders: Seq[Path] = Seq(
new Path(Paths.get(tempDir.getAbsolutePath, "folder1").toUri),
new Path(Paths.get(tempDir.getAbsolutePath, "folder2").toUri)
)
val files: Seq[Path] = Seq(
new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file1").toUri),
new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file2").toUri),
new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file3").toUri),
new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file4").toUri)
)
folders.foreach(folder => new File(folder.toUri).mkdir())
files.foreach(file => new File(file.toUri).createNewFile())
val index = HudiSparkUtils.createInMemoryFileIndex(spark, Seq(folders(0), folders(1)))
val indexedFilePaths = index.allFiles().map(fs => fs.getPath)
assertEquals(files.sortWith(_.toString < _.toString), indexedFilePaths.sortWith(_.toString < _.toString))
}
}