diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml
index e19070a6f..1a558aeae 100644
--- a/hudi-common/pom.xml
+++ b/hudi-common/pom.xml
@@ -36,61 +36,7 @@
-
-
-
- net.alchim31.maven
- scala-maven-plugin
- ${scala-maven-plugin.version}
-
-
- -nobootcp
- -target:jvm-1.8
-
- false
-
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
-
-
-
-
-
- net.alchim31.maven
- scala-maven-plugin
-
-
- scala-compile-first
- process-resources
-
- add-source
- compile
-
-
-
- scala-test-compile
- process-test-resources
-
- testCompile
-
-
-
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
-
-
- compile
-
- compile
-
-
-
-
org.apache.maven.plugins
maven-jar-plugin
@@ -111,10 +57,6 @@
org.apache.rat
apache-rat-plugin
-
- org.scalastyle
- scalastyle-maven-plugin
-
org.jacoco
jacoco-maven-plugin
@@ -156,13 +98,6 @@
-
-
- org.scala-lang
- scala-library
- ${scala.version}
-
-
com.fasterxml.jackson.core
diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
new file mode 100644
index 000000000..d4bf2a0fd
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieTableQueryType;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Common (engine-agnostic) File Index implementation enabling individual query engines to
+ * list Hudi Table contents based on the
+ *
+ *
+ * - Table type (MOR, COW)
+ * - Query type (snapshot, read_optimized, incremental)
+ * - Query instant/range
+ *
+ */
+public abstract class BaseHoodieTableFileIndex {
+
+ private static final Logger LOG = LogManager.getLogger(BaseHoodieTableFileIndex.class);
+
+ private final String[] partitionColumns;
+
+ private final FileSystemViewStorageConfig fileSystemStorageConfig;
+ private final HoodieMetadataConfig metadataConfig;
+
+ private final HoodieTableQueryType queryType;
+ private final Option specifiedQueryInstant;
+ protected final List queryPaths;
+
+ private final boolean shouldIncludePendingCommits;
+
+ private final HoodieTableType tableType;
+ protected final String basePath;
+
+ private final HoodieTableMetaClient metaClient;
+ private final HoodieEngineContext engineContext;
+
+ private final transient FileStatusCache fileStatusCache;
+
+ protected transient volatile long cachedFileSize = 0L;
+ protected transient volatile Map> cachedAllInputFileSlices;
+
+ protected volatile boolean queryAsNonePartitionedTable = false;
+
+ private transient volatile HoodieTableFileSystemView fileSystemView = null;
+
+ /**
+ * @param engineContext Hudi engine-specific context
+ * @param metaClient Hudi table's meta-client
+ * @param configProperties unifying configuration (in the form of generic properties)
+ * @param queryType target query type
+ * @param queryPaths target DFS paths being queried
+ * @param specifiedQueryInstant instant as of which table is being queried
+ * @param shouldIncludePendingCommits flags whether file-index should exclude any pending operations
+ * @param fileStatusCache transient cache of fetched [[FileStatus]]es
+ */
+ public BaseHoodieTableFileIndex(HoodieEngineContext engineContext,
+ HoodieTableMetaClient metaClient,
+ TypedProperties configProperties,
+ HoodieTableQueryType queryType,
+ List queryPaths,
+ Option specifiedQueryInstant,
+ boolean shouldIncludePendingCommits,
+ FileStatusCache fileStatusCache) {
+ this.partitionColumns = metaClient.getTableConfig().getPartitionFields()
+ .orElse(new String[0]);
+
+ this.fileSystemStorageConfig = FileSystemViewStorageConfig.newBuilder()
+ .fromProperties(configProperties)
+ .build();
+ this.metadataConfig = HoodieMetadataConfig.newBuilder()
+ .fromProperties(configProperties)
+ .build();
+
+ this.queryType = queryType;
+ this.queryPaths = queryPaths;
+ this.specifiedQueryInstant = specifiedQueryInstant;
+ this.shouldIncludePendingCommits = shouldIncludePendingCommits;
+
+ this.tableType = metaClient.getTableType();
+ this.basePath = metaClient.getBasePath();
+
+ this.metaClient = metaClient;
+ this.engineContext = engineContext;
+ this.fileStatusCache = fileStatusCache;
+
+ doRefresh();
+ }
+
+ protected abstract Object[] parsePartitionColumnValues(String[] partitionColumns, String partitionPath);
+
+ /**
+ * Returns latest completed instant as seen by this instance of the file-index
+ */
+ public Option getLatestCompletedInstant() {
+ return getActiveTimeline().filterCompletedInstants().lastInstant();
+ }
+
+ /**
+ * Fetch list of latest base files and log files per partition.
+ *
+ * @return mapping from string partition paths to its base/log files
+ */
+ public Map> listFileSlices() {
+ return cachedAllInputFileSlices.entrySet()
+ .stream()
+ .collect(Collectors.toMap(e -> e.getKey().path, Map.Entry::getValue));
+ }
+
+ protected List getAllQueryPartitionPaths() {
+ List queryRelativePartitionPaths = queryPaths.stream()
+ .map(path -> FSUtils.getRelativePartitionPath(new Path(basePath), path))
+ .collect(Collectors.toList());
+
+ // Load all the partition path from the basePath, and filter by the query partition path.
+ // TODO load files from the queryRelativePartitionPaths directly.
+ List matchedPartitionPaths = FSUtils.getAllPartitionPaths(engineContext, metadataConfig, basePath)
+ .stream()
+ .filter(path -> queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+ .collect(Collectors.toList());
+
+ // Convert partition's path into partition descriptor
+ return matchedPartitionPaths.stream()
+ .map(partitionPath -> {
+ Object[] partitionColumnValues = parsePartitionColumnValues(partitionColumns, partitionPath);
+ return new PartitionPath(partitionPath, partitionColumnValues);
+ })
+ .collect(Collectors.toList());
+ }
+
+ protected void refresh() {
+ fileStatusCache.invalidate();
+ doRefresh();
+ }
+
+ protected HoodieTimeline getActiveTimeline() {
+ // NOTE: We have to use commits and compactions timeline, to make sure that we're properly
+ // handling the following case: when records are inserted into the new log-file w/in the file-group
+ // that is under the pending compaction process, new log-file will bear the compaction's instant (on the
+ // timeline) in its name, as opposed to the base-file's commit instant. To make sure we're not filtering
+ // such log-file we have to _always_ include pending compaction instants into consideration
+ // TODO(HUDI-3302) re-evaluate whether we should not filter any commits in here
+ HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline();
+ if (shouldIncludePendingCommits) {
+ return timeline;
+ } else {
+ return timeline.filterCompletedAndCompactionInstants();
+ }
+ }
+
+ /**
+ * Load all partition paths and it's files under the query table path.
+ */
+ private Map loadPartitionPathFiles() {
+ // List files in all partition paths
+ List pathToFetch = new ArrayList<>();
+ Map cachedPartitionToFiles = new HashMap<>();
+
+ // Fetch from the FileStatusCache
+ List partitionPaths = getAllQueryPartitionPaths();
+ partitionPaths.forEach(partitionPath -> {
+ Option filesInPartition = fileStatusCache.get(partitionPath.fullPartitionPath(basePath));
+ if (filesInPartition.isPresent()) {
+ cachedPartitionToFiles.put(partitionPath, filesInPartition.get());
+ } else {
+ pathToFetch.add(partitionPath);
+ }
+ });
+
+ Map fetchedPartitionToFiles;
+
+ if (pathToFetch.isEmpty()) {
+ fetchedPartitionToFiles = Collections.emptyMap();
+ } else {
+ Map fullPartitionPathsMapToFetch = pathToFetch.stream()
+ .collect(Collectors.toMap(
+ partitionPath -> partitionPath.fullPartitionPath(basePath).toString(),
+ Function.identity())
+ );
+
+ fetchedPartitionToFiles =
+ FSUtils.getFilesInPartitions(
+ engineContext,
+ metadataConfig,
+ basePath,
+ fullPartitionPathsMapToFetch.keySet().toArray(new String[0]),
+ fileSystemStorageConfig.getSpillableDir())
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(e -> fullPartitionPathsMapToFetch.get(e.getKey()), e -> e.getValue()));
+
+ }
+
+ // Update the fileStatusCache
+ fetchedPartitionToFiles.forEach((partitionPath, filesInPartition) -> {
+ fileStatusCache.put(partitionPath.fullPartitionPath(basePath), filesInPartition);
+ });
+
+ return CollectionUtils.combine(cachedPartitionToFiles, fetchedPartitionToFiles);
+ }
+
+ private void doRefresh() {
+ long startTime = System.currentTimeMillis();
+
+ Map partitionFiles = loadPartitionPathFiles();
+ FileStatus[] allFiles = partitionFiles.values().stream().flatMap(Arrays::stream).toArray(FileStatus[]::new);
+
+ metaClient.reloadActiveTimeline();
+
+ HoodieTimeline activeTimeline = getActiveTimeline();
+ Option latestInstant = activeTimeline.lastInstant();
+
+ // TODO we can optimize the flow by:
+ // - First fetch list of files from instants of interest
+ // - Load FileStatus's
+ fileSystemView = new HoodieTableFileSystemView(metaClient, activeTimeline, allFiles);
+
+ Option queryInstant =
+ specifiedQueryInstant.or(() -> latestInstant.map(HoodieInstant::getTimestamp));
+
+ if (tableType.equals(HoodieTableType.MERGE_ON_READ) && queryType.equals(HoodieTableQueryType.SNAPSHOT)) {
+ cachedAllInputFileSlices = partitionFiles.keySet().stream()
+ .collect(Collectors.toMap(
+ Function.identity(),
+ partitionPath ->
+ queryInstant.map(instant ->
+ fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionPath.path, queryInstant.get())
+ .collect(Collectors.toList())
+ )
+ .orElse(Collections.emptyList())
+ )
+ );
+ } else {
+ // TODO re-align with the branch (MOR, snapshot) branch
+ cachedAllInputFileSlices = partitionFiles.keySet().stream()
+ .collect(Collectors.toMap(
+ Function.identity(),
+ partitionPath ->
+ specifiedQueryInstant.map(instant ->
+ fileSystemView.getLatestFileSlicesBeforeOrOn(partitionPath.path, instant, true))
+ .orElse(fileSystemView.getLatestFileSlices(partitionPath.path))
+ .collect(Collectors.toList())
+ )
+ );
+ }
+
+ cachedFileSize = cachedAllInputFileSlices.values().stream()
+ .flatMap(Collection::stream)
+ .mapToLong(BaseHoodieTableFileIndex::fileSliceSize)
+ .sum();
+
+ // If the partition value contains InternalRow.empty, we query it as a non-partitioned table.
+ queryAsNonePartitionedTable = partitionFiles.keySet().stream().anyMatch(p -> p.values.length == 0);
+
+ long duration = System.currentTimeMillis() - startTime;
+
+ LOG.info(String.format("Refresh table %s, spent: %d ms", metaClient.getTableConfig().getTableName(), duration));
+ }
+
+ private static long fileSliceSize(FileSlice fileSlice) {
+ long logFileSize = fileSlice.getLogFiles().map(HoodieLogFile::getFileSize)
+ .filter(s -> s > 0)
+ .reduce(0L, Long::sum);
+
+ return fileSlice.getBaseFile().map(BaseFile::getFileLen).orElse(0L) + logFileSize;
+ }
+
+ protected static final class PartitionPath {
+ final String path;
+ final Object[] values;
+
+ public PartitionPath(String path, Object[] values) {
+ this.path = path;
+ this.values = values;
+ }
+
+ Path fullPartitionPath(String basePath) {
+ if (!path.isEmpty()) {
+ return new Path(basePath, path);
+ }
+
+ return new Path(basePath);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return other instanceof PartitionPath
+ && Objects.equals(path, ((PartitionPath) other).path)
+ && Arrays.equals(values, ((PartitionPath) other).values);
+ }
+
+ @Override
+ public int hashCode() {
+ return path.hashCode() * 1103 + Arrays.hashCode(values);
+ }
+ }
+
+ protected interface FileStatusCache {
+ Option get(Path path);
+
+ void put(Path path, FileStatus[] leafFiles);
+
+ void invalidate();
+ }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieTableQueryType.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieTableQueryType.java
index 15449b329..f1d7557ae 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieTableQueryType.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieTableQueryType.java
@@ -30,7 +30,7 @@ package org.apache.hudi.common.model;
*
*/
public enum HoodieTableQueryType {
- QUERY_TYPE_SNAPSHOT,
- QUERY_TYPE_INCREMENTAL,
- QUERY_TYPE_READ_OPTIMIZED
+ SNAPSHOT,
+ INCREMENTAL,
+ READ_OPTIMIZED
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
index 6a4efca29..1a3d053e2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
@@ -62,7 +62,7 @@ public class CollectionUtils {
/**
- * Combines provided {@link List}s into one
+ * Combines provided {@link List}s into one, returning new instance of {@link ArrayList}
*/
public static List combine(List one, List another) {
ArrayList combined = new ArrayList<>(one.size() + another.size());
@@ -71,6 +71,19 @@ public class CollectionUtils {
return combined;
}
+ /**
+ * Combines provided {@link Map}s into one, returning new instance of {@link HashMap}.
+ *
+ * NOTE: That values associated with overlapping keys from the second map, will override
+ * values from the first one
+ */
+ public static Map combine(Map one, Map another) {
+ Map combined = new HashMap<>(one.size() + another.size());
+ combined.putAll(one);
+ combined.putAll(another);
+ return combined;
+ }
+
/**
* Returns difference b/w {@code one} {@link Set} of elements and {@code another}
*/
diff --git a/hudi-common/src/main/scala/org/apache/hudi/HoodieTableFileIndexBase.scala b/hudi-common/src/main/scala/org/apache/hudi/HoodieTableFileIndexBase.scala
deleted file mode 100644
index f46b79f38..000000000
--- a/hudi-common/src/main/scala/org/apache/hudi/HoodieTableFileIndexBase.scala
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi
-
-import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
-import org.apache.hudi.common.engine.HoodieEngineContext
-import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ
-import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType}
-import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.common.table.timeline.HoodieInstant
-import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView}
-
-import scala.collection.JavaConverters._
-import scala.collection.JavaConversions._
-import scala.collection.mutable
-import scala.language.implicitConversions
-
-/**
- * Common (engine-agnostic) File Index implementation enabling individual query engines to
- * list Hudi Table contents based on the
- *
- *
- * - Table type (MOR, COW)
- * - Query type (snapshot, read_optimized, incremental)
- * - Query instant/range
- *
- *
- * @param engineContext Hudi engine-specific context
- * @param metaClient Hudi table's meta-client
- * @param configProperties unifying configuration (in the form of generic properties)
- * @param queryType target query type
- * @param queryPaths target DFS paths being queried
- * @param specifiedQueryInstant instant as of which table is being queried
- * @param shouldIncludePendingCommits flags whether file-index should exclude any pending operations
- * @param fileStatusCache transient cache of fetched [[FileStatus]]es
- */
-abstract class HoodieTableFileIndexBase(engineContext: HoodieEngineContext,
- metaClient: HoodieTableMetaClient,
- configProperties: TypedProperties,
- queryType: HoodieTableQueryType,
- protected val queryPaths: Seq[Path],
- specifiedQueryInstant: Option[String] = None,
- shouldIncludePendingCommits: Boolean = false,
- @transient fileStatusCache: FileStatusCacheTrait) {
- /**
- * Get all completeCommits.
- */
- lazy val completedCommits = metaClient.getCommitsTimeline
- .filterCompletedInstants().getInstants.iterator().toList.map(_.getTimestamp)
-
- private lazy val _partitionColumns: Array[String] =
- metaClient.getTableConfig.getPartitionFields.orElse(Array[String]())
-
- private lazy val fileSystemStorageConfig = FileSystemViewStorageConfig.newBuilder()
- .fromProperties(configProperties)
- .build()
- private lazy val metadataConfig = HoodieMetadataConfig.newBuilder
- .fromProperties(configProperties)
- .build()
-
- private val tableType = metaClient.getTableType
-
- protected val basePath: String = metaClient.getBasePath
-
- @transient
- @volatile protected var cachedFileSize: Long = 0L
- @transient
- @volatile protected var cachedAllInputFileSlices: Map[PartitionPath, Seq[FileSlice]] = _
- @volatile protected var queryAsNonePartitionedTable: Boolean = _
- @transient
- @volatile private var fileSystemView: HoodieTableFileSystemView = _
-
- refresh0()
-
- /**
- * Returns latest completed instant as seen by this instance of the file-index
- */
- def latestCompletedInstant(): Option[HoodieInstant] =
- getActiveTimeline.filterCompletedInstants().lastInstant()
-
- /**
- * Fetch list of latest base files and log files per partition.
- *
- * @return mapping from string partition paths to its base/log files
- */
- def listFileSlices(): Map[String, Seq[FileSlice]] = {
- if (queryAsNonePartitionedTable) {
- // Read as Non-Partitioned table.
- cachedAllInputFileSlices.map(entry => (entry._1.path, entry._2))
- } else {
- cachedAllInputFileSlices.keys.toSeq.map(partition => {
- (partition.path, cachedAllInputFileSlices(partition))
- }).toMap
- }
- }
-
- private def refresh0(): Unit = {
- val startTime = System.currentTimeMillis()
- val partitionFiles = loadPartitionPathFiles()
- val allFiles = partitionFiles.values.reduceOption(_ ++ _)
- .getOrElse(Array.empty[FileStatus])
-
- metaClient.reloadActiveTimeline()
-
- val activeTimeline = getActiveTimeline
- val latestInstant = activeTimeline.lastInstant()
- // TODO we can optimize the flow by:
- // - First fetch list of files from instants of interest
- // - Load FileStatus's
- fileSystemView = new HoodieTableFileSystemView(metaClient, activeTimeline, allFiles)
- val queryInstant = if (specifiedQueryInstant.isDefined) {
- specifiedQueryInstant
- } else if (latestInstant.isPresent) {
- Some(latestInstant.get.getTimestamp)
- } else {
- None
- }
-
- (tableType, queryType) match {
- case (MERGE_ON_READ, HoodieTableQueryType.QUERY_TYPE_SNAPSHOT) =>
- // Fetch and store latest base and log files, and their sizes
- cachedAllInputFileSlices = partitionFiles.map(p => {
- val latestSlices = if (queryInstant.isDefined) {
- fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.path, queryInstant.get)
- .iterator().asScala.toSeq
- } else {
- Seq()
- }
- (p._1, latestSlices)
- })
- cachedFileSize = cachedAllInputFileSlices.values.flatten.map(fileSlice => {
- if (fileSlice.getBaseFile.isPresent) {
- fileSlice.getBaseFile.get().getFileLen + fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).sum
- } else {
- fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).sum
- }
- }).sum
- case (_, _) =>
- // Fetch and store latest base files and its sizes
- cachedAllInputFileSlices = partitionFiles.map(p => {
- val fileSlices = specifiedQueryInstant
- .map(instant =>
- fileSystemView.getLatestFileSlicesBeforeOrOn(p._1.path, instant, true))
- .getOrElse(fileSystemView.getLatestFileSlices(p._1.path))
- .iterator().asScala.toSeq
- (p._1, fileSlices)
- })
- cachedFileSize = cachedAllInputFileSlices.values.flatten.map(fileSliceSize).sum
- }
-
- // If the partition value contains InternalRow.empty, we query it as a non-partitioned table.
- queryAsNonePartitionedTable = partitionFiles.keys.exists(p => p.values.isEmpty)
- val flushSpend = System.currentTimeMillis() - startTime
-
- logInfo(s"Refresh table ${metaClient.getTableConfig.getTableName}," +
- s" spend: $flushSpend ms")
- }
-
- protected def refresh(): Unit = {
- fileStatusCache.invalidate()
- refresh0()
- }
-
- private def getActiveTimeline = {
- // NOTE: We have to use commits and compactions timeline, to make sure that we're properly
- // handling the following case: when records are inserted into the new log-file w/in the file-group
- // that is under the pending compaction process, new log-file will bear the compaction's instant (on the
- // timeline) in its name, as opposed to the base-file's commit instant. To make sure we're not filtering
- // such log-file we have to _always_ include pending compaction instants into consideration
- // TODO(HUDI-3302) re-evaluate whether we should not filter any commits in here
- val timeline = metaClient.getCommitsAndCompactionTimeline
- if (shouldIncludePendingCommits) {
- timeline
- } else {
- timeline.filterCompletedAndCompactionInstants()
- }
- }
-
- private def fileSliceSize(fileSlice: FileSlice): Long = {
- val logFileSize = fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).filter(_ > 0).sum
- if (fileSlice.getBaseFile.isPresent) {
- fileSlice.getBaseFile.get().getFileLen + logFileSize
- } else {
- logFileSize
- }
- }
-
- /**
- * Load all partition paths and it's files under the query table path.
- */
- private def loadPartitionPathFiles(): Map[PartitionPath, Array[FileStatus]] = {
- val partitionPaths = getAllQueryPartitionPaths
- // List files in all of the partition path.
- val pathToFetch = mutable.ArrayBuffer[PartitionPath]()
- val cachePartitionToFiles = mutable.Map[PartitionPath, Array[FileStatus]]()
- // Fetch from the FileStatusCache
- partitionPaths.foreach { partitionPath =>
- fileStatusCache.get(partitionPath.fullPartitionPath(basePath)) match {
- case Some(filesInPartition) =>
- cachePartitionToFiles.put(partitionPath, filesInPartition)
-
- case None => pathToFetch.append(partitionPath)
- }
- }
-
- val fetchedPartitionToFiles =
- if (pathToFetch.nonEmpty) {
- val fullPartitionPathsToFetch = pathToFetch.map(p => (p, p.fullPartitionPath(basePath).toString)).toMap
- val partitionToFilesMap = FSUtils.getFilesInPartitions(engineContext, metadataConfig, basePath,
- fullPartitionPathsToFetch.values.toArray, fileSystemStorageConfig.getSpillableDir)
- fullPartitionPathsToFetch.map(p => {
- (p._1, partitionToFilesMap.get(p._2))
- })
- } else {
- Map.empty[PartitionPath, Array[FileStatus]]
- }
-
- // Update the fileStatusCache
- fetchedPartitionToFiles.foreach {
- case (partitionRowPath, filesInPartition) =>
- fileStatusCache.put(partitionRowPath.fullPartitionPath(basePath), filesInPartition)
- }
- cachePartitionToFiles.toMap ++ fetchedPartitionToFiles
- }
-
- def getAllQueryPartitionPaths: Seq[PartitionPath] = {
- val queryRelativePartitionPaths = queryPaths.map(FSUtils.getRelativePartitionPath(new Path(basePath), _))
- // Load all the partition path from the basePath, and filter by the query partition path.
- // TODO load files from the queryRelativePartitionPaths directly.
- val partitionPaths = FSUtils.getAllPartitionPaths(engineContext, metadataConfig, basePath).asScala
- .filter(path => queryRelativePartitionPaths.exists(path.startsWith))
-
- val partitionSchema = _partitionColumns
-
- // Convert partition's path into partition descriptor
- partitionPaths.map { partitionPath =>
- val partitionColumnValues = parsePartitionColumnValues(partitionSchema, partitionPath)
- PartitionPath(partitionPath, partitionColumnValues)
- }
- }
-
- /**
- * Parses partition columns' values from the provided partition's path, returning list of
- * values (that might have engine-specific representation)
- *
- * @param partitionColumns partitioning columns identifying the partition
- * @param partitionPath partition's path to parse partitioning columns' values from
- */
- protected def parsePartitionColumnValues(partitionColumns: Array[String], partitionPath: String): Array[Any]
-
- // TODO eval whether we should just use logger directly
- protected def logWarning(str: => String): Unit
- protected def logInfo(str: => String): Unit
-
- /**
- * Represents a partition as a tuple of
- *
- * - Actual partition path (relative to the table's base path)
- * - Values of the corresponding columns table is being partitioned by (partitioning columns)
- *
- *
- * E.g. PartitionPath("2021/02/01", Array("2021","02","01"))
- *
- * NOTE: Partitioning column values might have engine specific representation (for ex,
- * {@code UTF8String} for Spark, etc) and are solely used in partition pruning in an very
- * engine-specific ways
- *
- * @param values values of the corresponding partitioning columns
- * @param path partition's path
- *
- * TODO expose as a trait and make impls engine-specific (current impl is tailored for Spark)
- */
- case class PartitionPath(path: String, values: Array[Any]) {
- override def equals(other: Any): Boolean = other match {
- case PartitionPath(otherPath, _) => path == otherPath
- case _ => false
- }
-
- override def hashCode(): Int = {
- path.hashCode
- }
-
- def fullPartitionPath(basePath: String): Path = {
- if (path.isEmpty) {
- new Path(basePath) // This is a non-partition path
- } else {
- new Path(basePath, path)
- }
- }
- }
-
- /**
- * Converts Hudi's internal representation of the {@code Option} into Scala's default one
- */
- implicit def asScalaOption[T](opt: org.apache.hudi.common.util.Option[T]): Option[T] =
- if (opt.isPresent) {
- Some(opt.get)
- } else {
- None
- }
-}
-
-trait FileStatusCacheTrait {
- def get(path: Path): Option[Array[FileStatus]]
- def put(path: Path, leafFiles: Array[FileStatus]): Unit
- def invalidate(): Unit
-}
diff --git a/hudi-hadoop-mr/pom.xml b/hudi-hadoop-mr/pom.xml
index 57fbdb7b8..bf87bfaa3 100644
--- a/hudi-hadoop-mr/pom.xml
+++ b/hudi-hadoop-mr/pom.xml
@@ -30,13 +30,6 @@
-
-
- org.scala-lang
- scala-library
- ${scala.version}
-
-
org.apache.hudi
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java
index 585728d1e..176c1c5e5 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java
@@ -20,8 +20,7 @@ package org.apache.hudi.hadoop;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hudi.HoodieTableFileIndexBase;
-import org.apache.hudi.FileStatusCacheTrait;
+import org.apache.hudi.BaseHoodieTableFileIndex;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieTableQueryType;
@@ -29,15 +28,13 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Function0;
-import scala.collection.JavaConverters;
import java.util.List;
/**
- * Implementation of {@link HoodieTableFileIndexBase} for Hive-based query engines
+ * Implementation of {@link BaseHoodieTableFileIndex} for Hive-based query engines
*/
-public class HiveHoodieTableFileIndex extends HoodieTableFileIndexBase {
+public class HiveHoodieTableFileIndex extends BaseHoodieTableFileIndex {
public static final Logger LOG = LoggerFactory.getLogger(HiveHoodieTableFileIndex.class);
@@ -53,16 +50,12 @@ public class HiveHoodieTableFileIndex extends HoodieTableFileIndexBase {
metaClient,
configProperties,
queryType,
- JavaConverters.asScalaBufferConverter(queryPaths).asScala(),
- toScalaOption(specifiedQueryInstant),
+ queryPaths,
+ specifiedQueryInstant,
shouldIncludePendingCommits,
new NoopCache());
}
- private static scala.Option toScalaOption(Option opt) {
- return scala.Option.apply(opt.orElse(null));
- }
-
@Override
public Object[] parsePartitionColumnValues(String[] partitionColumns, String partitionPath) {
// NOTE: Parsing partition path into partition column values isn't required on Hive,
@@ -71,20 +64,10 @@ public class HiveHoodieTableFileIndex extends HoodieTableFileIndexBase {
return new Object[0];
}
- @Override
- public void logInfo(Function0 lazyStr) {
- LOG.info(lazyStr.apply());
- }
-
- @Override
- public void logWarning(Function0 lazyStr) {
- LOG.info(lazyStr.apply());
- }
-
- static class NoopCache implements FileStatusCacheTrait {
+ static class NoopCache implements FileStatusCache {
@Override
- public scala.Option get(Path path) {
- return scala.Option.empty();
+ public Option get(Path path) {
+ return Option.empty();
}
@Override
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
index efb198ad6..fd5ef8da7 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
@@ -46,14 +46,13 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
-import scala.collection.JavaConverters;
-import scala.collection.Seq;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -79,24 +78,6 @@ public class HoodieCopyOnWriteTableInputFormat extends FileInputFormat logFiles) {
- List sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
- FileStatus baseFileStatus = getFileStatusUnchecked(baseFile);
- try {
- RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus);
- rtFileStatus.setDeltaLogFiles(sortedLogFiles);
- rtFileStatus.setBaseFilePath(baseFile.getPath());
- if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
- rtFileStatus.setBootStrapFileStatus(baseFileStatus);
- }
-
- return rtFileStatus;
- } catch (IOException e) {
- throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
- }
- }
-
@Override
public final Configuration getConf() {
return conf;
@@ -265,25 +246,23 @@ public class HoodieCopyOnWriteTableInputFormat extends FileInputFormat> partitionedFileSlices =
- JavaConverters.mapAsJavaMapConverter(fileIndex.listFileSlices()).asJava();
+ Map> partitionedFileSlices = fileIndex.listFileSlices();
targetFiles.addAll(
partitionedFileSlices.values()
.stream()
- .flatMap(seq -> JavaConverters.seqAsJavaListConverter(seq).asJava().stream())
+ .flatMap(Collection::stream)
.map(fileSlice -> {
Option baseFileOpt = fileSlice.getBaseFile();
Option latestLogFileOpt = fileSlice.getLatestLogFile();
Stream logFiles = fileSlice.getLogFiles();
- Option latestCompletedInstantOpt =
- fromScala(fileIndex.latestCompletedInstant());
+ Option latestCompletedInstantOpt = fileIndex.getLatestCompletedInstant();
// Check if we're reading a MOR table
if (includeLogFilesForSnapshotView()) {
@@ -307,7 +286,7 @@ public class HoodieCopyOnWriteTableInputFormat extends FileInputFormat fromScala(scala.Option opt) {
- if (opt.isDefined()) {
- return Option.of(opt.get());
- }
-
- return Option.empty();
- }
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index f9a7620b9..67f110801 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -72,7 +72,7 @@ case class HoodieFileIndex(spark: SparkSession,
)
with FileIndex {
- override def rootPaths: Seq[Path] = queryPaths
+ override def rootPaths: Seq[Path] = queryPaths.asScala
def enableDataSkipping(): Boolean = {
options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
@@ -88,7 +88,7 @@ case class HoodieFileIndex(spark: SparkSession,
* @return List of FileStatus for base files
*/
def allFiles: Seq[FileStatus] = {
- cachedAllInputFileSlices.values.flatten
+ cachedAllInputFileSlices.values.asScala.flatMap(_.asScala)
.filter(_.getBaseFile.isPresent)
.map(_.getBaseFile.get().getFileStatus)
.toSeq
@@ -101,31 +101,29 @@ case class HoodieFileIndex(spark: SparkSession,
* @param dataFilters data columns filters
* @return list of PartitionDirectory containing partition to base files mapping
*/
- override def listFiles(partitionFilters: Seq[Expression],
- dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
+ override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
// Look up candidate files names in the col-stats index, if all of the following conditions are true
// - Data-skipping is enabled
// - Col-Stats Index is present
// - List of predicates (filters) is present
val candidateFilesNamesOpt: Option[Set[String]] =
- lookupCandidateFilesInColStatsIndex(dataFilters) match {
- case Success(opt) => opt
- case Failure(e) =>
- if (e.isInstanceOf[AnalysisException]) {
- logDebug("Failed to relay provided data filters to Z-index lookup", e)
- } else {
- logError("Failed to lookup candidate files in Z-index", e)
- }
- Option.empty
- }
+ lookupCandidateFilesInColStatsIndex(dataFilters) match {
+ case Success(opt) => opt
+ case Failure(e) =>
+ if (e.isInstanceOf[AnalysisException]) {
+ logDebug("Failed to relay provided data filters to Z-index lookup", e)
+ } else {
+ logError("Failed to lookup candidate files in Z-index", e)
+ }
+ Option.empty
+ }
logDebug(s"Overlapping candidate files (from Z-index): ${candidateFilesNamesOpt.getOrElse(Set.empty)}")
if (queryAsNonePartitionedTable) {
// Read as Non-Partitioned table
// Filter in candidate files based on the col-stats index lookup
- val candidateFiles =
- allFiles.filter(fileStatus =>
+ val candidateFiles = allFiles.filter(fileStatus =>
// NOTE: This predicate is true when {@code Option} is empty
candidateFilesNamesOpt.forall(_.contains(fileStatus.getPath.getName))
)
@@ -137,22 +135,21 @@ case class HoodieFileIndex(spark: SparkSession,
Seq(PartitionDirectory(InternalRow.empty, candidateFiles))
} else {
// Prune the partition path by the partition filters
- val prunedPartitions = prunePartition(cachedAllInputFileSlices.keys.toSeq, partitionFilters)
+ val prunedPartitions = prunePartition(cachedAllInputFileSlices.keySet.asScala.toSeq, partitionFilters)
var totalFileSize = 0
var candidateFileSize = 0
val result = prunedPartitions.map { partition =>
val baseFileStatuses: Seq[FileStatus] =
- cachedAllInputFileSlices(partition)
+ cachedAllInputFileSlices.get(partition).asScala
.map(fs => fs.getBaseFile.orElse(null))
.filter(_ != null)
.map(_.getFileStatus)
// Filter in candidate files based on the col-stats index lookup
- val candidateFiles =
- baseFileStatuses.filter(fs =>
- // NOTE: This predicate is true when {@code Option} is empty
- candidateFilesNamesOpt.forall(_.contains(fs.getPath.getName)))
+ val candidateFiles = baseFileStatuses.filter(fs =>
+ // NOTE: This predicate is true when {@code Option} is empty
+ candidateFilesNamesOpt.forall(_.contains(fs.getPath.getName)))
totalFileSize += baseFileStatuses.size
candidateFileSize += candidateFiles.size
@@ -194,12 +191,14 @@ case class HoodieFileIndex(spark: SparkSession,
// scalastyle:on return
}
+ val completedCommits = getActiveTimeline.filterCompletedInstants().getInstants.iterator.asScala.toList.map(_.getTimestamp)
+
// Collect all index tables present in `.zindex` folder
val candidateIndexTables =
fs.listStatus(new Path(indexPath))
.filter(_.isDirectory)
.map(_.getPath.getName)
- .filter(f => completedCommits.contains(f))
+ .filter(completedCommits.contains(_))
.sortBy(x => x)
if (candidateIndexTables.isEmpty) {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index a79ac6f1d..be3247104 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -18,8 +18,9 @@
package org.apache.hudi
import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_INCREMENTAL_OPT_VAL, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
-import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, generateFieldMap}
+import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, generateFieldMap, toJavaOption}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType}
@@ -36,10 +37,11 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.language.implicitConversions
/**
- * Implementation of the [[HoodieTableFileIndexBase]] for Spark
+ * Implementation of the [[BaseHoodieTableFileIndex]] for Spark
*
* @param spark spark session
* @param metaClient Hudi table's meta-client
@@ -55,14 +57,15 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
queryPaths: Seq[Path],
specifiedQueryInstant: Option[String] = None,
@transient fileStatusCache: FileStatusCache = NoopCache)
- extends HoodieTableFileIndexBase(
- engineContext = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)),
+ extends BaseHoodieTableFileIndex(
+ new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)),
metaClient,
configProperties,
- queryType = deduceQueryType(configProperties),
- queryPaths,
- specifiedQueryInstant,
- fileStatusCache = SparkHoodieTableFileIndex.adapt(fileStatusCache)
+ deduceQueryType(configProperties),
+ queryPaths.asJava,
+ toJavaOption(specifiedQueryInstant),
+ false,
+ SparkHoodieTableFileIndex.adapt(fileStatusCache)
)
with SparkAdapterSupport
with Logging {
@@ -136,9 +139,9 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
*/
def listFileSlices(partitionFilters: Seq[Expression]): Map[String, Seq[FileSlice]] = {
// Prune the partition path by the partition filters
- val prunedPartitions = prunePartition(cachedAllInputFileSlices.keys.toSeq, partitionFilters)
+ val prunedPartitions = prunePartition(cachedAllInputFileSlices.asScala.keys.toSeq, partitionFilters)
prunedPartitions.map(partition => {
- (partition.path, cachedAllInputFileSlices(partition))
+ (partition.path, cachedAllInputFileSlices.get(partition).asScala)
}).toMap
}
@@ -150,9 +153,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
* @param predicates The filter condition.
* @return The Pruned partition paths.
*/
- def prunePartition(partitionPaths: Seq[PartitionPath],
- predicates: Seq[Expression]): Seq[PartitionPath] = {
-
+ def prunePartition(partitionPaths: Seq[PartitionPath], predicates: Seq[Expression]): Seq[PartitionPath] = {
val partitionColumnNames = partitionSchema.fields.map(_.name).toSet
val partitionPruningPredicates = predicates.filter {
_.references.map(_.name).toSet.subsetOf(partitionColumnNames)
@@ -167,8 +168,9 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
})
val prunedPartitionPaths = partitionPaths.filter {
- case PartitionPath(_, values) => boundPredicate.eval(InternalRow.fromSeq(values))
+ partitionPath => boundPredicate.eval(InternalRow.fromSeq(partitionPath.values))
}
+
logInfo(s"Total partition size is: ${partitionPaths.size}," +
s" after partition prune size is: ${prunedPartitionPaths.size}")
prunedPartitionPaths
@@ -177,7 +179,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
}
}
- protected def parsePartitionColumnValues(partitionColumns: Array[String], partitionPath: String): Array[Any] = {
+ protected def parsePartitionColumnValues(partitionColumns: Array[String], partitionPath: String): Array[Object] = {
if (partitionColumns.length == 0) {
// This is a non-partitioned table
Array.empty
@@ -225,7 +227,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
val pathWithPartitionName = new Path(basePath, partitionWithName)
val partitionValues = parsePartitionPath(pathWithPartitionName, partitionSchema)
- partitionValues.toArray
+ partitionValues.map(_.asInstanceOf[Object]).toArray
}
}
}
@@ -247,6 +249,13 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
object SparkHoodieTableFileIndex {
+ implicit def toJavaOption[T](opt: Option[T]): org.apache.hudi.common.util.Option[T] =
+ if (opt.isDefined) {
+ org.apache.hudi.common.util.Option.of(opt.get)
+ } else {
+ org.apache.hudi.common.util.Option.empty()
+ }
+
/**
* This method unravels [[StructType]] into a [[Map]] of pairs of dot-path notation with corresponding
* [[StructField]] object for every field of the provided [[StructType]], recursively.
@@ -287,17 +296,17 @@ object SparkHoodieTableFileIndex {
}
private def deduceQueryType(configProperties: TypedProperties): HoodieTableQueryType = {
- configProperties(QUERY_TYPE.key()) match {
- case QUERY_TYPE_SNAPSHOT_OPT_VAL => HoodieTableQueryType.QUERY_TYPE_SNAPSHOT
- case QUERY_TYPE_INCREMENTAL_OPT_VAL => HoodieTableQueryType.QUERY_TYPE_INCREMENTAL
- case QUERY_TYPE_READ_OPTIMIZED_OPT_VAL => HoodieTableQueryType.QUERY_TYPE_READ_OPTIMIZED
+ configProperties.asScala(QUERY_TYPE.key()) match {
+ case QUERY_TYPE_SNAPSHOT_OPT_VAL => HoodieTableQueryType.SNAPSHOT
+ case QUERY_TYPE_INCREMENTAL_OPT_VAL => HoodieTableQueryType.INCREMENTAL
+ case QUERY_TYPE_READ_OPTIMIZED_OPT_VAL => HoodieTableQueryType.READ_OPTIMIZED
case _ @ qt => throw new IllegalArgumentException(s"query-type ($qt) not supported")
}
}
- private def adapt(cache: FileStatusCache): FileStatusCacheTrait = {
- new FileStatusCacheTrait {
- override def get(path: Path): Option[Array[FileStatus]] = cache.getLeafFiles(path)
+ private def adapt(cache: FileStatusCache): BaseHoodieTableFileIndex.FileStatusCache = {
+ new BaseHoodieTableFileIndex.FileStatusCache {
+ override def get(path: Path): org.apache.hudi.common.util.Option[Array[FileStatus]] = toJavaOption(cache.getLeafFiles(path))
override def put(path: Path, leafFiles: Array[FileStatus]): Unit = cache.putLeafFiles(path, leafFiles)
override def invalidate(): Unit = cache.invalidateAll()
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
index 4896ddf07..3c474f76e 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
@@ -26,9 +26,9 @@ import org.apache.hudi.common.engine.EngineType
import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
-import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestUtils}
import org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestUtils}
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.config.HoodieWriteConfig
@@ -213,8 +213,11 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
GreaterThanOrEqual(attribute("partition"), literal("2021/03/08")),
LessThan(attribute("partition"), literal("2021/03/10"))
)
- val prunedPartitions = fileIndex.listFiles(Seq(partitionFilter2),
- Seq.empty).map(_.values.toSeq(Seq(StringType)).mkString(",")).toList
+ val prunedPartitions = fileIndex.listFiles(Seq(partitionFilter2), Seq.empty)
+ .map(_.values.toSeq(Seq(StringType))
+ .mkString(","))
+ .toList
+ .sorted
assertEquals(List("2021/03/08", "2021/03/09"), prunedPartitions)
}
diff --git a/packaging/hudi-hadoop-mr-bundle/pom.xml b/packaging/hudi-hadoop-mr-bundle/pom.xml
index 23399233e..f6215b1e0 100644
--- a/packaging/hudi-hadoop-mr-bundle/pom.xml
+++ b/packaging/hudi-hadoop-mr-bundle/pom.xml
@@ -67,9 +67,6 @@
org.apache.hudi:hudi-common
org.apache.hudi:hudi-hadoop-mr
-
- org.scala-lang:scala-library
-
org.apache.parquet:parquet-avro
org.apache.avro:avro
com.esotericsoftware:kryo-shaded
@@ -155,14 +152,6 @@
${project.version}
-
-
-
- org.scala-lang
- scala-library
- ${scala.version}
-
-
org.apache.parquet
diff --git a/packaging/hudi-hive-sync-bundle/pom.xml b/packaging/hudi-hive-sync-bundle/pom.xml
index 9b775e76c..75fce574e 100644
--- a/packaging/hudi-hive-sync-bundle/pom.xml
+++ b/packaging/hudi-hive-sync-bundle/pom.xml
@@ -69,9 +69,6 @@
org.apache.hudi:hudi-sync-common
org.apache.hudi:hudi-hive-sync
-
- org.scala-lang:scala-library
-
com.beust:jcommander
org.apache.avro:avro
org.apache.parquet:parquet-avro
@@ -134,14 +131,6 @@
${project.version}
-
-
-
- org.scala-lang
- scala-library
- ${scala.version}
-
-
org.apache.parquet
diff --git a/packaging/hudi-presto-bundle/pom.xml b/packaging/hudi-presto-bundle/pom.xml
index f085c30b4..90c1087dc 100644
--- a/packaging/hudi-presto-bundle/pom.xml
+++ b/packaging/hudi-presto-bundle/pom.xml
@@ -67,9 +67,6 @@
org.apache.hudi:hudi-common
org.apache.hudi:hudi-hadoop-mr
-
- org.scala-lang:scala-library
-
org.apache.parquet:parquet-avro
org.apache.avro:avro
org.codehaus.jackson:*
@@ -190,14 +187,6 @@
-
-
-
- org.scala-lang
- scala-library
- ${scala.version}
-
-
org.apache.parquet
diff --git a/packaging/hudi-trino-bundle/pom.xml b/packaging/hudi-trino-bundle/pom.xml
index a7f41ecaf..adf73f1bb 100644
--- a/packaging/hudi-trino-bundle/pom.xml
+++ b/packaging/hudi-trino-bundle/pom.xml
@@ -68,9 +68,6 @@
org.apache.hudi:hudi-common
org.apache.hudi:hudi-hadoop-mr
-
- org.scala-lang:scala-library
-
org.apache.parquet:parquet-avro
org.apache.avro:avro
org.codehaus.jackson:*
@@ -189,14 +186,6 @@
-
-
-
- org.scala-lang
- scala-library
- ${scala.version}
-
-
org.apache.hbase