1
0

[HUDI-1371] [HUDI-1893] Support metadata based listing for Spark DataSource and Spark SQL (#2893)

This commit is contained in:
Udit Mehrotra
2021-08-03 14:47:40 -07:00
committed by GitHub
parent 245e1fd17d
commit 1ff2d3459a
13 changed files with 383 additions and 125 deletions

View File

@@ -18,8 +18,6 @@
package org.apache.hudi.metadata;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -55,6 +53,9 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -71,6 +72,7 @@ import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
@@ -227,9 +229,14 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
"Must not contain the filtered directory " + filteredDirectoryThree);
FileStatus[] statuses = metadata(client).getAllFilesInPartition(new Path(basePath, "p1"));
assertTrue(statuses.length == 2);
assertEquals(2, statuses.length);
statuses = metadata(client).getAllFilesInPartition(new Path(basePath, "p2"));
assertTrue(statuses.length == 5);
assertEquals(5, statuses.length);
Map<String, FileStatus[]> partitionsToFilesMap = metadata(client).getAllFilesInPartitions(
Arrays.asList(basePath + "/p1", basePath + "/p2"));
assertEquals(2, partitionsToFilesMap.size());
assertEquals(2, partitionsToFilesMap.get(basePath + "/p1").length);
assertEquals(5, partitionsToFilesMap.get(basePath + "/p2").length);
}
}
@@ -881,6 +888,10 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieSparkTable.create(config, engineContext);
TableFileSystemView tableView = table.getHoodieView();
List<String> fullPartitionPaths = fsPartitions.stream().map(partition -> basePath + "/" + partition).collect(Collectors.toList());
Map<String, FileStatus[]> partitionToFilesMap = tableMetadata.getAllFilesInPartitions(fullPartitionPaths);
assertEquals(fsPartitions.size(), partitionToFilesMap.size());
fsPartitions.forEach(partition -> {
try {
Path partitionPath;
@@ -899,6 +910,8 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
Collections.sort(fsFileNames);
Collections.sort(metadataFilenames);
assertEquals(fsStatuses.length, partitionToFilesMap.get(basePath + "/" + partition).length);
// File sizes should be valid
Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getLen() > 0));

View File

@@ -52,6 +52,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
@@ -277,13 +278,16 @@ public class FSUtils {
}
}
public static FileStatus[] getFilesInPartition(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig,
String basePathStr, Path partitionPath) {
try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(engineContext,
metadataConfig, basePathStr, FileSystemViewStorageConfig.FILESYSTEM_VIEW_SPILLABLE_DIR.defaultValue())) {
return tableMetadata.getAllFilesInPartition(partitionPath);
} catch (Exception e) {
throw new HoodieException("Error get files in partition: " + partitionPath, e);
public static Map<String, FileStatus[]> getFilesInPartitions(HoodieEngineContext engineContext,
HoodieMetadataConfig metadataConfig,
String basePathStr,
String[] partitionPaths,
String spillableMapPath) {
try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, basePathStr,
spillableMapPath, true)) {
return tableMetadata.getAllFilesInPartitions(Arrays.asList(partitionPaths));
} catch (Exception ex) {
throw new HoodieException("Error get files in partitions: " + String.join(",", partitionPaths), ex);
}
}

View File

@@ -44,7 +44,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public abstract class BaseTableMetadata implements HoodieTableMetadata {
@@ -134,6 +136,26 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
.getAllFilesInPartition(partitionPath);
}
@Override
public Map<String, FileStatus[]> getAllFilesInPartitions(List<String> partitionPaths)
throws IOException {
if (enabled) {
Map<String, FileStatus[]> partitionsFilesMap = new HashMap<>();
try {
for (String partitionPath : partitionPaths) {
partitionsFilesMap.put(partitionPath, fetchAllFilesInPartition(new Path(partitionPath)));
}
return partitionsFilesMap;
} catch (Exception e) {
throw new HoodieMetadataException("Failed to retrieve files in partition from metadata", e);
}
}
return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, datasetBasePath, metadataConfig.shouldAssumeDatePartitioning())
.getAllFilesInPartitions(partitionPaths);
}
/**
* Returns a list of all partitions.
*/

View File

@@ -33,8 +33,10 @@ import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
@@ -105,6 +107,24 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
return partitionPaths;
}
@Override
public Map<String, FileStatus[]> getAllFilesInPartitions(List<String> partitionPaths)
throws IOException {
if (partitionPaths == null || partitionPaths.isEmpty()) {
return Collections.emptyMap();
}
int parallelism = Math.min(DEFAULT_LISTING_PARALLELISM, partitionPaths.size());
List<Pair<String, FileStatus[]>> partitionToFiles = engineContext.map(partitionPaths, partitionPathStr -> {
Path partitionPath = new Path(partitionPathStr);
FileSystem fs = partitionPath.getFileSystem(hadoopConf.get());
return Pair.of(partitionPathStr, FSUtils.getAllDataFilesInPartition(fs, partitionPath));
}, parallelism);
return partitionToFiles.stream().collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
}
@Override
public Option<String> getSyncedInstantTime() {
throw new UnsupportedOperationException();

View File

@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
/**
* Interface that supports querying various pieces of metadata about a hudi table.
@@ -95,6 +96,11 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable {
*/
List<String> getAllPartitionPaths() throws IOException;
/**
* Fetch all files for given partition paths.
*/
Map<String, FileStatus[]> getAllFilesInPartitions(List<String> partitionPaths) throws IOException;
/**
* Get the instant time to which the metadata is synced w.r.t data timeline.
*/

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.metadata;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
@@ -30,7 +31,10 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class TestFileSystemBackedTableMetadata extends HoodieCommonTestHarness {
@@ -63,8 +67,10 @@ public class TestFileSystemBackedTableMetadata extends HoodieCommonTestHarness {
HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf());
FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
Assertions.assertTrue(fileSystemBackedTableMetadata.getAllPartitionPaths().size() == 0);
Assertions.assertTrue(fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath)).length == 10);
Assertions.assertEquals(0, fileSystemBackedTableMetadata.getAllPartitionPaths().size());
Assertions.assertEquals(10, fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath)).length);
Assertions.assertEquals(10, fileSystemBackedTableMetadata.getAllFilesInPartitions(
Collections.singletonList(basePath)).get(basePath).length);
}
/**
@@ -86,8 +92,14 @@ public class TestFileSystemBackedTableMetadata extends HoodieCommonTestHarness {
HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf());
FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, true);
Assertions.assertTrue(fileSystemBackedTableMetadata.getAllPartitionPaths().size() == 3);
Assertions.assertTrue(fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + DATE_PARTITIONS.get(0))).length == 10);
Assertions.assertEquals(3, fileSystemBackedTableMetadata.getAllPartitionPaths().size());
Assertions.assertEquals(10, fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + DATE_PARTITIONS.get(0))).length);
List<String> fullPartitionPaths = DATE_PARTITIONS.stream().map(p -> basePath + "/" + p).collect(Collectors.toList());
Map<String, FileStatus[]> partitionToFilesMap = fileSystemBackedTableMetadata.getAllFilesInPartitions(fullPartitionPaths);
for (String p : fullPartitionPaths) {
Assertions.assertEquals(10, partitionToFilesMap.get(p).length);
}
}
/**
@@ -101,7 +113,9 @@ public class TestFileSystemBackedTableMetadata extends HoodieCommonTestHarness {
// Generate 10 files under each partition
DATE_PARTITIONS.stream().forEach(p -> {
try {
hoodieTestTable = hoodieTestTable.withBaseFilesInPartition(p, IntStream.range(0, 10).toArray());
hoodieTestTable = hoodieTestTable
.withPartitionMetaFiles(p)
.withBaseFilesInPartition(p, IntStream.range(0, 10).toArray());
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -109,7 +123,13 @@ public class TestFileSystemBackedTableMetadata extends HoodieCommonTestHarness {
HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf());
FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
Assertions.assertTrue(fileSystemBackedTableMetadata.getAllPartitionPaths().size() == 0);
Assertions.assertEquals(3, fileSystemBackedTableMetadata.getAllPartitionPaths().size());
List<String> fullPartitionPaths = DATE_PARTITIONS.stream().map(p -> basePath + "/" + p).collect(Collectors.toList());
Map<String, FileStatus[]> partitionToFilesMap = fileSystemBackedTableMetadata.getAllFilesInPartitions(fullPartitionPaths);
for (String p : fullPartitionPaths) {
Assertions.assertEquals(10, partitionToFilesMap.get(p).length);
}
}
@Test
@@ -128,8 +148,14 @@ public class TestFileSystemBackedTableMetadata extends HoodieCommonTestHarness {
HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf());
FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
Assertions.assertTrue(fileSystemBackedTableMetadata.getAllPartitionPaths().size() == 3);
Assertions.assertTrue(fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + ONE_LEVEL_PARTITIONS.get(0))).length == 10);
Assertions.assertEquals(3, fileSystemBackedTableMetadata.getAllPartitionPaths().size());
Assertions.assertEquals(10, fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + ONE_LEVEL_PARTITIONS.get(0))).length);
List<String> fullPartitionPaths = ONE_LEVEL_PARTITIONS.stream().map(p -> basePath + "/" + p).collect(Collectors.toList());
Map<String, FileStatus[]> partitionToFilesMap = fileSystemBackedTableMetadata.getAllFilesInPartitions(fullPartitionPaths);
for (String p : fullPartitionPaths) {
Assertions.assertEquals(10, partitionToFilesMap.get(p).length);
}
}
@Test
@@ -148,8 +174,14 @@ public class TestFileSystemBackedTableMetadata extends HoodieCommonTestHarness {
HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf());
FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
Assertions.assertTrue(fileSystemBackedTableMetadata.getAllPartitionPaths().size() == 3);
Assertions.assertTrue(fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + MULTI_LEVEL_PARTITIONS.get(0))).length == 10);
Assertions.assertEquals(3, fileSystemBackedTableMetadata.getAllPartitionPaths().size());
Assertions.assertEquals(10, fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + MULTI_LEVEL_PARTITIONS.get(0))).length);
List<String> fullPartitionPaths = MULTI_LEVEL_PARTITIONS.stream().map(p -> basePath + "/" + p).collect(Collectors.toList());
Map<String, FileStatus[]> partitionToFilesMap = fileSystemBackedTableMetadata.getAllFilesInPartitions(fullPartitionPaths);
for (String p : fullPartitionPaths) {
Assertions.assertEquals(10, partitionToFilesMap.get(p).length);
}
}
@Test
@@ -167,8 +199,14 @@ public class TestFileSystemBackedTableMetadata extends HoodieCommonTestHarness {
HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf());
FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
Assertions.assertTrue(fileSystemBackedTableMetadata.getAllPartitionPaths().size() == 3);
Assertions.assertTrue(fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + MULTI_LEVEL_PARTITIONS.get(0))).length == 0);
Assertions.assertEquals(3, fileSystemBackedTableMetadata.getAllPartitionPaths().size());
Assertions.assertEquals(0, fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + MULTI_LEVEL_PARTITIONS.get(0))).length);
List<String> fullPartitionPaths = MULTI_LEVEL_PARTITIONS.stream().map(p -> basePath + "/" + p).collect(Collectors.toList());
Map<String, FileStatus[]> partitionToFilesMap = fileSystemBackedTableMetadata.getAllFilesInPartitions(fullPartitionPaths);
for (String p : fullPartitionPaths) {
Assertions.assertEquals(0, partitionToFilesMap.get(p).length);
}
}
}

View File

@@ -17,11 +17,13 @@
package org.apache.hudi
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
import org.apache.hudi.common.config.ConfigProperty
import org.apache.hudi.common.fs.ConsistencyGuardConfig
import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.util.ConfigUtils
import org.apache.hudi.hive.{HiveSyncTool, SlashEncodedDayPartitionValueExtractor}
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{CustomKeyGenerator, SimpleKeyGenerator}
@@ -471,4 +473,16 @@ object DataSourceOptionsHelper {
})
translatedOpt.toMap
}
def parametersWithReadDefaults(parameters: Map[String, String]): Map[String, String] = {
// First check if the ConfigUtils.IS_QUERY_AS_RO_TABLE has set by HiveSyncTool,
// or else use query type from QUERY_TYPE_OPT_KEY.
val queryType = parameters.get(ConfigUtils.IS_QUERY_AS_RO_TABLE)
.map(is => if (is.toBoolean) QUERY_TYPE_READ_OPTIMIZED_OPT_VAL else QUERY_TYPE_SNAPSHOT_OPT_VAL)
.getOrElse(parameters.getOrElse(QUERY_TYPE_OPT_KEY.key, QUERY_TYPE_OPT_KEY.defaultValue()))
Map(
QUERY_TYPE_OPT_KEY.key -> queryType
) ++ translateConfigurations(parameters)
}
}

View File

@@ -72,7 +72,7 @@ class DefaultSource extends RelationProvider
optParams: Map[String, String],
schema: StructType): BaseRelation = {
// Add default options for unspecified read options keys.
val parameters = DataSourceOptionsHelper.translateConfigurations(optParams)
val parameters = DataSourceOptionsHelper.parametersWithReadDefaults(optParams)
val path = parameters.get("path")
val readPathsStr = parameters.get(DataSourceReadOptions.READ_PATHS_OPT_KEY.key)
@@ -106,12 +106,7 @@ class DefaultSource extends RelationProvider
val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent
val tableType = metaClient.getTableType
// First check if the ConfigUtils.IS_QUERY_AS_RO_TABLE has set by HiveSyncTool,
// or else use query type from QUERY_TYPE_OPT_KEY.
val queryType = parameters.get(ConfigUtils.IS_QUERY_AS_RO_TABLE)
.map(is => if (is.toBoolean) QUERY_TYPE_READ_OPTIMIZED_OPT_VAL else QUERY_TYPE_SNAPSHOT_OPT_VAL)
.getOrElse(parameters.getOrElse(QUERY_TYPE_OPT_KEY.key, QUERY_TYPE_OPT_KEY.defaultValue()))
val queryType = parameters(QUERY_TYPE_OPT_KEY.key)
log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType")

View File

@@ -21,14 +21,14 @@ import java.util.Properties
import scala.collection.JavaConverters._
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, QUERY_TYPE_SNAPSHOT_OPT_VAL}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration}
import org.apache.hudi.common.engine.HoodieLocalEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieBaseFile
import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{InternalRow, expressions}
@@ -76,6 +76,11 @@ case class HoodieFileIndex(
private val basePath = metaClient.getBasePath
@transient private val queryPath = new Path(options.getOrElse("path", "'path' option required"))
private val queryType = options(QUERY_TYPE_OPT_KEY.key())
private val tableType = metaClient.getTableType
/**
* Get the schema of the table.
*/
@@ -106,16 +111,35 @@ case class HoodieFileIndex(
}
}
private lazy val metadataConfig = {
private lazy val engineContext = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
private lazy val configProperties = {
val sqlConf: SQLConf = spark.sessionState.conf
val properties = new Properties()
// To support metadata listing via Spark SQL we allow users to pass the config via SQL Conf in spark session. Users
// would be able to run SET hoodie.metadata.enable=true in the spark sql session to enable metadata listing.
properties.put(HoodieMetadataConfig.METADATA_ENABLE_PROP,
sqlConf.getConfString(HoodieMetadataConfig.METADATA_ENABLE_PROP.key(),
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS.toString))
properties.put(HoodieMetadataConfig.METADATA_VALIDATE_PROP,
sqlConf.getConfString(HoodieMetadataConfig.METADATA_VALIDATE_PROP.key(),
HoodieMetadataConfig.METADATA_VALIDATE_PROP.defaultValue().toString))
properties.putAll(options.asJava)
HoodieMetadataConfig.newBuilder.fromProperties(properties).build()
properties
}
private lazy val fileSystemStorageConfig = FileSystemViewStorageConfig.newBuilder()
.fromProperties(configProperties)
.build()
private lazy val metadataConfig = HoodieMetadataConfig.newBuilder
.fromProperties(configProperties)
.build()
@transient @volatile private var fileSystemView: HoodieTableFileSystemView = _
@transient @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] = _
@transient @volatile private var cachedAllInputFileSlices: Map[PartitionRowPath, Seq[FileSlice]] = _
@transient @volatile private var cachedFileSize: Long = 0L
@transient @volatile private var cachedAllPartitionPaths: Seq[PartitionRowPath] = _
@volatile private var queryAsNonePartitionedTable: Boolean = _
@@ -123,24 +147,58 @@ case class HoodieFileIndex(
override def rootPaths: Seq[Path] = queryPath :: Nil
/**
* Invoked by Spark to fetch list of latest base files per partition.
*
* @param partitionFilters partition column filters
* @param dataFilters data columns filters
* @return list of PartitionDirectory containing partition to base files mapping
*/
override def listFiles(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
if (queryAsNonePartitionedTable) { // Read as Non-Partitioned table.
Seq(PartitionDirectory(InternalRow.empty, allFiles))
} else {
// Prune the partition path by the partition filters
val prunedPartitions = prunePartition(cachedAllPartitionPaths, partitionFilters)
val prunedPartitions = prunePartition(cachedAllInputFileSlices.keys.toSeq, partitionFilters)
prunedPartitions.map { partition =>
val fileStatues = fileSystemView.getLatestBaseFiles(partition.partitionPath).iterator()
.asScala.toSeq
.map(_.getFileStatus)
PartitionDirectory(partition.values, fileStatues)
val baseFileStatuses = cachedAllInputFileSlices(partition).map(fileSlice => {
if (fileSlice.getBaseFile.isPresent) {
fileSlice.getBaseFile.get().getFileStatus
} else {
null
}
}).filterNot(_ == null)
PartitionDirectory(partition.values, baseFileStatuses)
}
}
}
/**
* Fetch list of latest base files and log files per partition.
*
* @param partitionFilters partition column filters
* @param dataFilters data column filters
* @return mapping from string partition paths to its base/log files
*/
def listFileSlices(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Map[String, Seq[FileSlice]] = {
if (queryAsNonePartitionedTable) {
// Read as Non-Partitioned table.
cachedAllInputFileSlices.map(entry => (entry._1.partitionPath, entry._2))
} else {
// Prune the partition path by the partition filters
val prunedPartitions = prunePartition(cachedAllInputFileSlices.keys.toSeq, partitionFilters)
prunedPartitions.map(partition => {
(partition.partitionPath, cachedAllInputFileSlices(partition))
}).toMap
}
}
override def inputFiles: Array[String] = {
cachedAllInputFiles.map(_.getFileStatus.getPath.toString)
val fileStatusList = allFiles
fileStatusList.map(_.getPath.toString).toArray
}
override def refresh(): Unit = {
@@ -157,13 +215,36 @@ case class HoodieFileIndex(
metaClient.reloadActiveTimeline()
val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants, allFiles)
cachedAllInputFiles = fileSystemView.getLatestBaseFiles.iterator().asScala.toArray
cachedAllPartitionPaths = partitionFiles.keys.toSeq
cachedFileSize = cachedAllInputFiles.map(_.getFileLen).sum
(tableType, queryType) match {
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) =>
// Fetch and store latest base and log files, and their sizes
cachedAllInputFileSlices = partitionFiles.map(p => {
val latestSlices = if (activeInstants.lastInstant().isPresent) {
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.partitionPath,
activeInstants.lastInstant().get().getTimestamp).iterator().asScala.toSeq
} else {
Seq()
}
(p._1, latestSlices)
})
cachedFileSize = cachedAllInputFileSlices.values.flatten.map(fileSlice => {
if (fileSlice.getBaseFile.isPresent) {
fileSlice.getBaseFile.get().getFileLen + fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).sum
} else {
fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).sum
}
}).sum
case (_, _) =>
// Fetch and store latest base files and its sizes
cachedAllInputFileSlices = partitionFiles.map(p => {
(p._1, fileSystemView.getLatestFileSlices(p._1.partitionPath).iterator().asScala.toSeq)
})
cachedFileSize = cachedAllInputFileSlices.values.flatten.map(_.getBaseFile.get().getFileLen).sum
}
// If the partition value contains InternalRow.empty, we query it as a non-partitioned table.
queryAsNonePartitionedTable = cachedAllPartitionPaths
.exists(p => p.values == InternalRow.empty)
queryAsNonePartitionedTable = partitionFiles.keys.exists(p => p.values == InternalRow.empty)
val flushSpend = System.currentTimeMillis() - startTime
logInfo(s"Refresh for table ${metaClient.getTableConfig.getTableName}," +
s" spend: $flushSpend ms")
@@ -192,7 +273,20 @@ case class HoodieFileIndex(
StructType(schema.fields.filterNot(f => partitionColumns.contains(f.name)))
}
def allFiles: Seq[FileStatus] = cachedAllInputFiles.map(_.getFileStatus)
/**
* Returns the FileStatus for all the base files (excluding log files). This should be used only for
* cases where Spark directly fetches the list of files via HoodieFileIndex or for read optimized query logic
* implemented internally within Hudi like HoodieBootstrapRelation. This helps avoid the use of path filter
* to filter out log files within Spark.
*
* @return List of FileStatus for base files
*/
def allFiles: Seq[FileStatus] = {
cachedAllInputFileSlices.values.flatten
.filter(_.getBaseFile.isPresent)
.map(_.getBaseFile.get().getFileStatus)
.toSeq
}
/**
* Prune the partition by the filter.This implementation is fork from
@@ -229,12 +323,12 @@ case class HoodieFileIndex(
}
def getAllQueryPartitionPaths: Seq[PartitionRowPath] = {
val sparkEngine = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
val queryPartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), queryPath)
// Load all the partition path from the basePath, and filter by the query partition path.
// TODO load files from the queryPartitionPath directly.
val partitionPaths = FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, basePath).asScala
val partitionPaths = FSUtils.getAllPartitionPaths(engineContext, metadataConfig, basePath).asScala
.filter(_.startsWith(queryPartitionPath))
val partitionSchema = _partitionSchemaFromProperties
val timeZoneId = CaseInsensitiveMap(options)
.get(DateTimeUtils.TIMEZONE_OPTION)
@@ -307,13 +401,6 @@ case class HoodieFileIndex(
* Load all partition paths and it's files under the query table path.
*/
private def loadPartitionPathFiles(): Map[PartitionRowPath, Array[FileStatus]] = {
val properties = new Properties()
properties.putAll(options.asJava)
val writeConfig = HoodieWriteConfig.newBuilder()
.withPath(basePath).withProperties(properties).build()
val maxListParallelism = writeConfig.getFileListingParallelism
val serializableConf = new SerializableConfiguration(spark.sessionState.newHadoopConf())
val partitionRowPaths = getAllQueryPartitionPaths
// List files in all of the partition path.
val pathToFetch = mutable.ArrayBuffer[PartitionRowPath]()
@@ -327,28 +414,25 @@ case class HoodieFileIndex(
case None => pathToFetch.append(partitionRowPath)
}
}
// Fetch the rest from the file system.
val fetchedPartition2Files =
val fetchedPartitionToFiles =
if (pathToFetch.nonEmpty) {
spark.sparkContext.parallelize(pathToFetch, Math.min(pathToFetch.size, maxListParallelism))
.map { partitionRowPath =>
// Here we use a LocalEngineContext to get the files in the partition.
// We can do this because the TableMetadata.getAllFilesInPartition only rely on the
// hadoopConf of the EngineContext.
val engineContext = new HoodieLocalEngineContext(serializableConf.get())
val filesInPartition = FSUtils.getFilesInPartition(engineContext, metadataConfig,
basePath, partitionRowPath.fullPartitionPath(basePath))
(partitionRowPath, filesInPartition)
}.collect().map(f => f._1 -> f._2).toMap
val fullPartitionPathsToFetch = pathToFetch.map(p => (p, p.fullPartitionPath(basePath).toString)).toMap
val partitionToFilesMap = FSUtils.getFilesInPartitions(engineContext, metadataConfig, basePath,
fullPartitionPathsToFetch.values.toArray, fileSystemStorageConfig.getSpillableDir)
fullPartitionPathsToFetch.map(p => {
(p._1, partitionToFilesMap.get(p._2))
})
} else {
Map.empty[PartitionRowPath, Array[FileStatus]]
}
// Update the fileStatusCache
fetchedPartition2Files.foreach {
fetchedPartitionToFiles.foreach {
case (partitionRowPath, filesInPartition) =>
fileStatusCache.putLeafFiles(partitionRowPath.fullPartitionPath(basePath), filesInPartition)
}
cachePartitionToFiles.toMap ++ fetchedPartition2Files
cachePartitionToFiles.toMap ++ fetchedPartitionToFiles
}
/**

View File

@@ -19,6 +19,7 @@
package org.apache.hudi
import org.apache.avro.Schema
import org.apache.hudi.common.model.HoodieLogFile
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
@@ -142,15 +143,45 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
}
def buildFileIndex(filters: Array[Filter]): List[HoodieMergeOnReadFileSplit] = {
// Get all partition paths
val partitionPaths = if (globPaths.isDefined) {
if (globPaths.isDefined) {
// Load files from the global paths if it has defined to be compatible with the original mode
val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths.get)
val fsView = new HoodieTableFileSystemView(metaClient,
metaClient.getActiveTimeline.getCommitsTimeline
.filterCompletedInstants, inMemoryFileIndex.allFiles().toArray)
fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus.getPath.getParent)
} else { // Load partition path by the HoodieFileIndex.
val partitionPaths = fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus.getPath.getParent)
if (partitionPaths.isEmpty) { // If this an empty table, return an empty split list.
List.empty[HoodieMergeOnReadFileSplit]
} else {
val lastInstant = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants.lastInstant()
if (!lastInstant.isPresent) { // Return empty list if the table has no commit
List.empty
} else {
val latestCommit = lastInstant.get().getTimestamp
val baseAndLogsList = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, partitionPaths.asJava).asScala
val fileSplits = baseAndLogsList.map(kv => {
val baseFile = kv.getLeft
val logPaths = if (kv.getRight.isEmpty) Option.empty else Option(kv.getRight.asScala.toList)
val baseDataPath = if (baseFile.isPresent) {
Some(PartitionedFile(
InternalRow.empty,
MergeOnReadSnapshotRelation.getFilePath(baseFile.get.getFileStatus.getPath),
0, baseFile.get.getFileLen)
)
} else {
None
}
HoodieMergeOnReadFileSplit(baseDataPath, logPaths, latestCommit,
metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
}).toList
fileSplits
}
}
} else {
// Load files by the HoodieFileIndex.
val hoodieFileIndex = HoodieFileIndex(sqlContext.sparkSession, metaClient,
Some(tableStructSchema), optParams, FileStatusCache.getOrCreate(sqlContext.sparkSession))
@@ -160,36 +191,34 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
val partitionFilterExpression =
HoodieSparkUtils.convertToCatalystExpressions(partitionFilters, tableStructSchema)
val allPartitionPaths = hoodieFileIndex.getAllQueryPartitionPaths
// If convert success to catalyst expression, use the partition prune
hoodieFileIndex.prunePartition(allPartitionPaths, partitionFilterExpression.map(Seq(_)).getOrElse(Seq.empty))
.map(_.fullPartitionPath(metaClient.getBasePath))
}
if (partitionPaths.isEmpty) { // If this an empty table, return an empty split list.
List.empty[HoodieMergeOnReadFileSplit]
} else {
val lastInstant = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants.lastInstant()
if (!lastInstant.isPresent) { // Return empty list if the table has no commit
List.empty
val fileSlices = if (partitionFilterExpression.isDefined) {
hoodieFileIndex.listFileSlices(Seq(partitionFilterExpression.get), Seq.empty)
} else {
val latestCommit = lastInstant.get().getTimestamp
val baseAndLogsList = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, partitionPaths.asJava).asScala
val fileSplits = baseAndLogsList.map(kv => {
val baseFile = kv.getLeft
val logPaths = if (kv.getRight.isEmpty) Option.empty else Option(kv.getRight.asScala.toList)
hoodieFileIndex.listFileSlices(Seq.empty, Seq.empty)
}
val baseDataPath = if (baseFile.isPresent) {
Some(PartitionedFile(
InternalRow.empty,
MergeOnReadSnapshotRelation.getFilePath(baseFile.get.getFileStatus.getPath),
0, baseFile.get.getFileLen)
)
if (fileSlices.isEmpty) {
// If this an empty table, return an empty split list.
List.empty[HoodieMergeOnReadFileSplit]
} else {
val fileSplits = fileSlices.values.flatten.map(fileSlice => {
val latestCommit = metaClient.getActiveTimeline.getCommitsTimeline
.filterCompletedInstants.lastInstant().get().getTimestamp
val partitionedFile = if (fileSlice.getBaseFile.isPresent) {
val baseFile = fileSlice.getBaseFile.get()
val baseFilePath = MergeOnReadSnapshotRelation.getFilePath(baseFile.getFileStatus.getPath)
Option(PartitionedFile(InternalRow.empty, baseFilePath, 0, baseFile.getFileLen))
} else {
None
Option.empty
}
HoodieMergeOnReadFileSplit(baseDataPath, logPaths, latestCommit,
metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
val logPaths = fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala
.map(logFile => MergeOnReadSnapshotRelation.getFilePath(logFile.getPath)).toList
val logPathsOptional = if (logPaths.isEmpty) Option.empty else Option(logPaths)
HoodieMergeOnReadFileSplit(partitionedFile, logPathsOptional, latestCommit, metaClient.getBasePath,
maxCompactionMemoryInBytes, mergeType)
}).toList
fileSplits
}

View File

@@ -53,6 +53,10 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
HoodieWriteConfig.TABLE_NAME.key -> "hoodie_test"
)
var queryOpts = Map(
DataSourceReadOptions.QUERY_TYPE_OPT_KEY.key -> DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL
)
@BeforeEach override def setUp() {
initPath()
initSparkContexts()
@@ -60,6 +64,8 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
initTestDataGenerator()
initFileSystem()
initMetaClient()
queryOpts = queryOpts ++ Map("path" -> basePath)
}
@ParameterizedTest
@@ -74,7 +80,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
.mode(SaveMode.Overwrite)
.save(basePath)
metaClient = HoodieTableMetaClient.reload(metaClient)
val fileIndex = HoodieFileIndex(spark, metaClient, None, Map("path" -> basePath))
val fileIndex = HoodieFileIndex(spark, metaClient, None, queryOpts)
assertEquals("partition", fileIndex.partitionSchema.fields.map(_.name).mkString(","))
}
@@ -96,7 +102,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
.mode(SaveMode.Overwrite)
.save(basePath)
metaClient = HoodieTableMetaClient.reload(metaClient)
val fileIndex = HoodieFileIndex(spark, metaClient, None, Map("path" -> basePath))
val fileIndex = HoodieFileIndex(spark, metaClient, None, queryOpts)
assertEquals("partition", fileIndex.partitionSchema.fields.map(_.name).mkString(","))
}
@@ -115,7 +121,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
.mode(SaveMode.Overwrite)
.save(basePath)
metaClient = HoodieTableMetaClient.reload(metaClient)
val fileIndex = HoodieFileIndex(spark, metaClient, None, Map("path" -> basePath))
val fileIndex = HoodieFileIndex(spark, metaClient, None, queryOpts)
assertEquals("partition", fileIndex.partitionSchema.fields.map(_.name).mkString(","))
}
@@ -133,7 +139,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
.mode(SaveMode.Overwrite)
.save(basePath)
metaClient = HoodieTableMetaClient.reload(metaClient)
val fileIndex = HoodieFileIndex(spark, metaClient, None, Map("path" -> basePath))
val fileIndex = HoodieFileIndex(spark, metaClient, None, queryOpts)
val partitionFilter1 = EqualTo(attribute("partition"), literal("2021/03/08"))
val partitionName = if (partitionEncode) PartitionPathEncodeUtils.escapePathName("2021/03/08")
@@ -176,7 +182,8 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
.mode(SaveMode.Overwrite)
.save(basePath)
metaClient = HoodieTableMetaClient.reload(metaClient)
val fileIndex = HoodieFileIndex(spark, metaClient, None, Map("path" -> basePath))
val fileIndex = HoodieFileIndex(spark, metaClient, None,
queryOpts ++ Map(HoodieMetadataConfig.METADATA_ENABLE_PROP.key -> useMetaFileList.toString))
val partitionFilter1 = And(
EqualTo(attribute("dt"), literal("2021-03-01")),
@@ -190,7 +197,9 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
assertEquals(partitionValues.toSeq(Seq(StringType)).mkString(","), "2021-03-01,10")
assertEquals(getFileCountInPartitionPath("2021-03-01/10"), filesAfterPrune.size)
val readDF1 = spark.read.format("hudi").load(basePath)
val readDF1 = spark.read.format("hudi")
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key(), useMetaFileList)
.load(basePath)
assertEquals(10, readDF1.count())
assertEquals(5, readDF1.filter("dt = '2021-03-01' and hh = '10'").count())
@@ -206,6 +215,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
.option(PARTITIONPATH_FIELD_OPT_KEY.key, "dt,hh")
.option(KEYGENERATOR_CLASS_OPT_KEY.key, classOf[ComplexKeyGenerator].getName)
.option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY.key, "false")
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key(), useMetaFileList)
.mode(SaveMode.Overwrite)
.save(basePath)
@@ -224,7 +234,9 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
// The returned file size should equal to the whole file size in all the partition paths.
assertEquals(getFileCountInPartitionPaths("2021/03/01/10", "2021/03/02/10"),
filesAfterPrune2.length)
val readDF2 = spark.read.format("hudi").load(basePath)
val readDF2 = spark.read.format("hudi")
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, useMetaFileList)
.load(basePath)
assertEquals(10, readDF2.count())
// There are 5 rows in the dt = 2021/03/01 and hh = 10

View File

@@ -41,7 +41,7 @@ import org.joda.time.format.DateTimeFormat
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, fail}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
/**
@@ -160,9 +160,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
@ParameterizedTest
//TODO(metadata): Needs HUDI-1459 to be fixed
//@ValueSource(booleans = Array(true, false))
@ValueSource(booleans = Array(false))
@ValueSource(booleans = Array(true, false))
def testCopyOnWriteStorage(isMetadataEnabled: Boolean) {
// Insert Operation
val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
@@ -180,7 +178,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
// Snapshot query
val snapshotDF1 = spark.read.format("org.apache.hudi")
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled)
.load(basePath + "/*/*/*/*")
.load(basePath + "/*/*/*")
assertEquals(100, snapshotDF1.count())
// Upsert based on the written table with Hudi metadata columns
@@ -189,11 +187,14 @@ class TestCOWDataSource extends HoodieClientTestBase {
updateDf.write.format("org.apache.hudi")
.options(commonOpts)
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled)
.mode(SaveMode.Append)
.save(basePath)
val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
val snapshotDF2 = spark.read.format("hudi").load(basePath + "/*/*/*/*")
val snapshotDF2 = spark.read.format("hudi")
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled)
.load(basePath + "/*/*/*")
assertEquals(100, snapshotDF2.count())
assertEquals(updatedVerificationVal, snapshotDF2.filter(col("_row_key") === verificationRowKey).select(verificationCol).first.getString(0))
@@ -214,7 +215,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
// Snapshot Query
val snapshotDF3 = spark.read.format("org.apache.hudi")
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled)
.load(basePath + "/*/*/*/*")
.load(basePath + "/*/*/*")
assertEquals(100, snapshotDF3.count()) // still 100, since we only updated
// Read Incremental Query
@@ -666,8 +667,8 @@ class TestCOWDataSource extends HoodieClientTestBase {
}
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testQueryCOWWithBasePathAndFileIndex(partitionEncode: Boolean): Unit = {
@CsvSource(Array("true,false", "true,true", "false,true", "false,false"))
def testQueryCOWWithBasePathAndFileIndex(partitionEncode: Boolean, isMetadataEnabled: Boolean): Unit = {
val N = 20
// Test query with partition prune if URL_ENCODE_PARTITIONING_OPT_KEY has enable
val records1 = dataGen.generateInsertsContainsAllPartitions("000", N)
@@ -676,6 +677,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY.key, partitionEncode)
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled)
.mode(SaveMode.Overwrite)
.save(basePath)
val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
@@ -683,6 +685,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
val countIn20160315 = records1.asScala.count(record => record.getPartitionPath == "2016/03/15")
// query the partition by filter
val count1 = spark.read.format("hudi")
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled)
.load(basePath)
.filter("partition = '2016/03/15'")
.count()
@@ -691,6 +694,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
// query the partition by path
val partitionPath = if (partitionEncode) "2016%2F03%2F15" else "2016/03/15"
val count2 = spark.read.format("hudi")
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled)
.load(basePath + s"/$partitionPath")
.count()
assertEquals(countIn20160315, count2)
@@ -702,6 +706,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY.key, partitionEncode)
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled)
.mode(SaveMode.Append)
.save(basePath)
// Incremental query without "*" in path

View File

@@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path
import scala.collection.JavaConverters._
import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_OPT_KEY, PARTITIONPATH_FIELD_OPT_KEY, PAYLOAD_CLASS_OPT_KEY, PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY}
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType}
import org.apache.hudi.common.table.HoodieTableMetaClient
@@ -75,7 +76,9 @@ class TestMORDataSource extends HoodieClientTestBase {
cleanupFileSystem()
}
@Test def testMergeOnReadStorage() {
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testMergeOnReadStorage(isMetadataEnabled: Boolean) {
val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
// Bulk Insert Operation
@@ -86,6 +89,7 @@ class TestMORDataSource extends HoodieClientTestBase {
.option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
.option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled)
.mode(SaveMode.Overwrite)
.save(basePath)
@@ -94,7 +98,9 @@ class TestMORDataSource extends HoodieClientTestBase {
// Read RO View
val hudiRODF1 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY.key, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
.load(basePath + "/*/*/*/*")
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled)
.load(basePath + "/*/*/*")
assertEquals(100, hudiRODF1.count()) // still 100, since we only updated
val insertCommitTime = HoodieDataSourceHelpers.latestCommit(fs, basePath)
val insertCommitTimes = hudiRODF1.select("_hoodie_commit_time").distinct().collectAsList().map(r => r.getString(0)).toList
@@ -105,6 +111,7 @@ class TestMORDataSource extends HoodieClientTestBase {
val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled)
.mode(SaveMode.Append)
.save(basePath)
@@ -112,7 +119,9 @@ class TestMORDataSource extends HoodieClientTestBase {
val updateCommitTime = HoodieDataSourceHelpers.latestCommit(fs, basePath)
val hudiSnapshotDF2 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load(basePath + "/*/*/*/*")
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled)
.load(basePath + "/*/*/*")
val updateCommitTimes = hudiSnapshotDF2.select("_hoodie_commit_time").distinct().collectAsList().map(r => r.getString(0)).toList
assertEquals(List(updateCommitTime), updateCommitTimes)
@@ -122,10 +131,13 @@ class TestMORDataSource extends HoodieClientTestBase {
inputDF3.write.format("org.apache.hudi")
.options(commonOpts)
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled)
.mode(SaveMode.Append)
.save(basePath)
val hudiSnapshotDF3 = spark.read.format("hudi").load(basePath + "/*/*/*/*")
val hudiSnapshotDF3 = spark.read.format("hudi")
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled)
.load(basePath + "/*/*/*")
assertEquals(100, hudiSnapshotDF3.count())
assertEquals(updatedVerificationVal, hudiSnapshotDF3.filter(col("_row_key") === verificationRowKey).select(verificationCol).first.getString(0))
}
@@ -580,8 +592,8 @@ class TestMORDataSource extends HoodieClientTestBase {
}
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testQueryMORWithBasePathAndFileIndex(partitionEncode: Boolean): Unit = {
@CsvSource(Array("true,false", "true,true", "false,true", "false,false"))
def testQueryMORWithBasePathAndFileIndex(partitionEncode: Boolean, isMetadataEnabled: Boolean): Unit = {
val N = 20
// Test query with partition prune if URL_ENCODE_PARTITIONING_OPT_KEY has enable
val records1 = dataGen.generateInsertsContainsAllPartitions("000", N)
@@ -591,6 +603,7 @@ class TestMORDataSource extends HoodieClientTestBase {
.option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY.key, partitionEncode)
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled)
.mode(SaveMode.Overwrite)
.save(basePath)
val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
@@ -598,6 +611,7 @@ class TestMORDataSource extends HoodieClientTestBase {
val countIn20160315 = records1.asScala.count(record => record.getPartitionPath == "2016/03/15")
// query the partition by filter
val count1 = spark.read.format("hudi")
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled)
.load(basePath)
.filter("partition = '2016/03/15'")
.count()
@@ -606,6 +620,7 @@ class TestMORDataSource extends HoodieClientTestBase {
// query the partition by path
val partitionPath = if (partitionEncode) "2016%2F03%2F15" else "2016/03/15"
val count2 = spark.read.format("hudi")
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled)
.load(basePath + s"/$partitionPath")
.count()
assertEquals(countIn20160315, count2)
@@ -618,6 +633,7 @@ class TestMORDataSource extends HoodieClientTestBase {
.option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY.key, partitionEncode)
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled)
.mode(SaveMode.Append)
.save(basePath)
// Incremental query without "*" in path