1
0

[HUDI-2449] Incremental read for Flink (#3686)

This commit is contained in:
Danny Chan
2021-09-19 09:06:46 +08:00
committed by GitHub
parent c7a5c8273b
commit 3354fac42f
16 changed files with 689 additions and 365 deletions

View File

@@ -197,12 +197,18 @@ public class FlinkOptions extends HoodieConfig {
.withDescription("Check interval for streaming read of SECOND, default 1 minute");
public static final String START_COMMIT_EARLIEST = "earliest";
public static final ConfigOption<String> READ_STREAMING_START_COMMIT = ConfigOptions
.key("read.streaming.start-commit")
public static final ConfigOption<String> READ_START_COMMIT = ConfigOptions
.key("read.start-commit")
.stringType()
.noDefaultValue()
.withDescription("Start commit instant for streaming read, the commit time format should be 'yyyyMMddHHmmss', "
+ "by default reading from the latest instant");
.withDescription("Start commit instant for reading, the commit time format should be 'yyyyMMddHHmmss', "
+ "by default reading from the latest instant for streaming read");
public static final ConfigOption<String> READ_END_COMMIT = ConfigOptions
.key("read.end-commit")
.stringType()
.noDefaultValue()
.withDescription("End commit instant for reading, the commit time format should be 'yyyyMMddHHmmss'");
// ------------------------------------------------------------------------
// Write Options

View File

@@ -28,6 +28,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import javax.annotation.Nullable;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
@@ -36,6 +38,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
/**
* A file index which supports listing files efficiently through metadata table.
@@ -137,11 +140,29 @@ public class FileIndex {
this.partitionPaths = null;
}
// -------------------------------------------------------------------------
// Getter/Setter
// -------------------------------------------------------------------------
/**
* Sets up explicit partition paths for pruning.
*/
public void setPartitionPaths(@Nullable Set<String> partitionPaths) {
if (partitionPaths != null) {
this.partitionPaths = new ArrayList<>(partitionPaths);
}
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
private List<String> getOrBuildPartitionPaths() {
/**
* Returns all the relative partition paths.
*
* <p>The partition paths are cached once invoked.
*/
public List<String> getOrBuildPartitionPaths() {
if (this.partitionPaths != null) {
return this.partitionPaths;
}

View File

@@ -0,0 +1,365 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.source;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.hadoop.fs.FileStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
/**
* Utilities to generate incremental input splits {@link MergeOnReadInputSplit}.
* The input splits are used for streaming and incremental read.
*
* <p>How to generate the input splits:
* <ol>
* <li>first fetch all the commit metadata for the incremental instants;</li>
* <li>resolve the incremental commit file paths;</li>
* <li>filter the full file paths by required partitions;</li>
* <li>use the file paths from #step 3 as the back-up of the filesystem view.</li>
* </ol>
*/
public class IncrementalInputSplits {
private static final Logger LOG = LoggerFactory.getLogger(IncrementalInputSplits.class);
private final Configuration conf;
private final Path path;
private final long maxCompactionMemoryInBytes;
// for partition pruning
private final Set<String> requiredPartitions;
private IncrementalInputSplits(
Configuration conf,
Path path,
long maxCompactionMemoryInBytes,
@Nullable Set<String> requiredPartitions) {
this.conf = conf;
this.path = path;
this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
this.requiredPartitions = requiredPartitions;
}
/**
* Returns the builder.
*/
public static Builder builder() {
return new Builder();
}
/**
* Returns the incremental input splits.
*
* @param metaClient The meta client
* @param hadoopConf The hadoop configuration
* @return The list of incremental input splits or empty if there are no new instants
*/
public Result inputSplits(
HoodieTableMetaClient metaClient,
org.apache.hadoop.conf.Configuration hadoopConf) {
return inputSplits(metaClient, hadoopConf, null);
}
/**
* Returns the incremental input splits.
*
* @param metaClient The meta client
* @param hadoopConf The hadoop configuration
* @param issuedInstant The last issued instant, only valid in streaming read
* @return The list of incremental input splits or empty if there are no new instants
*/
public Result inputSplits(
HoodieTableMetaClient metaClient,
org.apache.hadoop.conf.Configuration hadoopConf,
String issuedInstant) {
metaClient.reloadActiveTimeline();
HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
if (commitTimeline.empty()) {
LOG.warn("No splits found for the table under path " + path);
return Result.EMPTY;
}
List<HoodieInstant> instants = filterInstantsWithRange(commitTimeline, issuedInstant);
// get the latest instant that satisfies condition
final HoodieInstant instantToIssue = instants.size() == 0 ? null : instants.get(instants.size() - 1);
final InstantRange instantRange;
if (instantToIssue != null) {
if (issuedInstant != null) {
// the streaming reader may record the last issued instant, if the issued instant is present,
// the instant range should be: (issued instant, the latest instant].
instantRange = InstantRange.getInstance(issuedInstant, instantToIssue.getTimestamp(),
InstantRange.RangeType.OPEN_CLOSE);
} else if (this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()) {
// first time consume and has a start commit
final String startCommit = this.conf.getString(FlinkOptions.READ_START_COMMIT);
instantRange = startCommit.equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)
? null
: InstantRange.getInstance(startCommit, instantToIssue.getTimestamp(), InstantRange.RangeType.CLOSE_CLOSE);
} else {
// first time consume and no start commit, consumes the latest incremental data set.
instantRange = InstantRange.getInstance(instantToIssue.getTimestamp(), instantToIssue.getTimestamp(),
InstantRange.RangeType.CLOSE_CLOSE);
}
} else {
LOG.info("No new instant found for the table under path " + path + ", skip reading");
return Result.EMPTY;
}
String tableName = conf.getString(FlinkOptions.TABLE_NAME);
List<HoodieCommitMetadata> activeMetadataList = instants.stream()
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
List<HoodieCommitMetadata> archivedMetadataList = getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName);
if (archivedMetadataList.size() > 0) {
LOG.warn(""
+ "--------------------------------------------------------------------------------\n"
+ "---------- caution: the reader has fall behind too much from the writer,\n"
+ "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n"
+ "--------------------------------------------------------------------------------");
}
List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
? mergeList(activeMetadataList, archivedMetadataList)
: activeMetadataList;
Set<String> writePartitions = getWritePartitionPaths(metadataList);
// apply partition push down
if (this.requiredPartitions != null) {
writePartitions = writePartitions.stream()
.filter(this.requiredPartitions::contains).collect(Collectors.toSet());
}
FileStatus[] fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList);
if (fileStatuses.length == 0) {
LOG.warn("No files found for reading in user provided path.");
return Result.EMPTY;
}
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses);
final String endInstant = instantToIssue.getTimestamp();
final AtomicInteger cnt = new AtomicInteger(0);
final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
List<MergeOnReadInputSplit> inputSplits = writePartitions.stream()
.map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, endInstant)
.map(fileSlice -> {
Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
.map(logFile -> logFile.getPath().toString())
.collect(Collectors.toList()));
String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
return new MergeOnReadInputSplit(cnt.getAndAdd(1),
basePath, logPaths, endInstant,
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange);
}).collect(Collectors.toList()))
.flatMap(Collection::stream)
.collect(Collectors.toList());
return Result.instance(inputSplits, endInstant);
}
/**
* Returns the archived metadata in case the reader consumes untimely or it wants
* to read from the earliest.
*
* <p>Note: should improve it with metadata table when the metadata table is stable enough.
*
* @param metaClient The meta client
* @param instantRange The instant range to filter the timeline instants
* @param commitTimeline The commit timeline
* @param tableName The table name
* @return the list of archived metadata, or empty if there is no need to read the archived timeline
*/
private List<HoodieCommitMetadata> getArchivedMetadata(
HoodieTableMetaClient metaClient,
InstantRange instantRange,
HoodieTimeline commitTimeline,
String tableName) {
if (instantRange == null || commitTimeline.isBeforeTimelineStarts(instantRange.getStartInstant())) {
// read the archived metadata if:
// 1. the start commit is 'earliest';
// 2. the start instant is archived.
HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
HoodieTimeline archivedCompleteTimeline = archivedTimeline.getCommitsTimeline().filterCompletedInstants();
if (!archivedCompleteTimeline.empty()) {
final String endTs = archivedCompleteTimeline.lastInstant().get().getTimestamp();
Stream<HoodieInstant> instantStream = archivedCompleteTimeline.getInstants();
if (instantRange != null) {
archivedTimeline.loadInstantDetailsInMemory(instantRange.getStartInstant(), endTs);
instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, instantRange.getStartInstant()));
} else {
final String startTs = archivedCompleteTimeline.firstInstant().get().getTimestamp();
archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
}
return instantStream
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, archivedTimeline)).collect(Collectors.toList());
}
}
return Collections.emptyList();
}
/**
* Returns the instants with a given issuedInstant to start from.
*
* @param commitTimeline The completed commits timeline
* @param issuedInstant The last issued instant that has already been delivered to downstream
* @return the filtered hoodie instants
*/
private List<HoodieInstant> filterInstantsWithRange(
HoodieTimeline commitTimeline,
final String issuedInstant) {
HoodieTimeline completedTimeline = commitTimeline.filterCompletedInstants();
if (issuedInstant != null) {
// returns early for streaming mode
return completedTimeline.getInstants()
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, issuedInstant))
.collect(Collectors.toList());
}
Stream<HoodieInstant> instantStream = completedTimeline.getInstants();
if (this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()
&& !this.conf.get(FlinkOptions.READ_START_COMMIT).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)) {
final String startCommit = this.conf.get(FlinkOptions.READ_START_COMMIT);
instantStream = instantStream
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, startCommit));
}
if (this.conf.getOptional(FlinkOptions.READ_END_COMMIT).isPresent()) {
final String endCommit = this.conf.get(FlinkOptions.READ_END_COMMIT);
instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN_OR_EQUALS, endCommit));
}
return instantStream.collect(Collectors.toList());
}
/**
* Returns all the incremental write partition paths as a set with the given commits metadata.
*
* @param metadataList The commits metadata
* @return the partition path set
*/
private Set<String> getWritePartitionPaths(List<HoodieCommitMetadata> metadataList) {
return metadataList.stream()
.map(HoodieCommitMetadata::getWritePartitionPaths)
.flatMap(Collection::stream)
.collect(Collectors.toSet());
}
private static <T> List<T> mergeList(List<T> list1, List<T> list2) {
List<T> merged = new ArrayList<>(list1);
merged.addAll(list2);
return merged;
}
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
/**
* Represents a result of calling {@link #inputSplits}.
*/
public static class Result {
private final List<MergeOnReadInputSplit> inputSplits; // input splits
private final String endInstant; // end instant to consume to
public static final Result EMPTY = instance(Collections.emptyList(), "");
public boolean isEmpty() {
return this.inputSplits.size() == 0;
}
public List<MergeOnReadInputSplit> getInputSplits() {
return this.inputSplits;
}
public String getEndInstant() {
return this.endInstant;
}
private Result(List<MergeOnReadInputSplit> inputSplits, String endInstant) {
this.inputSplits = inputSplits;
this.endInstant = endInstant;
}
public static Result instance(List<MergeOnReadInputSplit> inputSplits, String endInstant) {
return new Result(inputSplits, endInstant);
}
}
/**
* Builder for {@link IncrementalInputSplits}.
*/
public static class Builder {
private Configuration conf;
private Path path;
private long maxCompactionMemoryInBytes;
// for partition pruning
private Set<String> requiredPartitions;
public Builder() {
}
public Builder conf(Configuration conf) {
this.conf = conf;
return this;
}
public Builder path(Path path) {
this.path = path;
return this;
}
public Builder maxCompactionMemoryInBytes(long maxCompactionMemoryInBytes) {
this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
return this;
}
public Builder requiredPartitions(@Nullable Set<String> requiredPartitions) {
this.requiredPartitions = requiredPartitions;
return this;
}
public IncrementalInputSplits build() {
return new IncrementalInputSplits(Objects.requireNonNull(this.conf), Objects.requireNonNull(this.path),
this.maxCompactionMemoryInBytes, this.requiredPartitions);
}
}
}

View File

@@ -18,19 +18,9 @@
package org.apache.hudi.source;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.StreamerUtil;
@@ -45,24 +35,15 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.hadoop.fs.FileStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
/**
* This is the single (non-parallel) monitoring task which takes a {@link MergeOnReadInputSplit}
@@ -112,21 +93,21 @@ public class StreamReadMonitoringFunction
private HoodieTableMetaClient metaClient;
private final long maxCompactionMemoryInBytes;
// for partition pruning
private final Set<String> requiredPartitionPaths;
private final IncrementalInputSplits incrementalInputSplits;
public StreamReadMonitoringFunction(
Configuration conf,
Path path,
long maxCompactionMemoryInBytes,
Set<String> requiredPartitionPaths) {
@Nullable Set<String> requiredPartitionPaths) {
this.conf = conf;
this.path = path;
this.interval = conf.getInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL);
this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
this.requiredPartitionPaths = requiredPartitionPaths;
this.incrementalInputSplits = IncrementalInputSplits.builder()
.conf(conf)
.path(path)
.maxCompactionMemoryInBytes(maxCompactionMemoryInBytes)
.requiredPartitions(requiredPartitionPaths).build();
}
@Override
@@ -208,98 +189,23 @@ public class StreamReadMonitoringFunction
// table does not exist
return;
}
metaClient.reloadActiveTimeline();
HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
if (commitTimeline.empty()) {
LOG.warn("No splits found for the table under path " + path);
return;
}
List<HoodieInstant> instants = filterInstantsWithStart(commitTimeline, this.issuedInstant);
// get the latest instant that satisfies condition
final HoodieInstant instantToIssue = instants.size() == 0 ? null : instants.get(instants.size() - 1);
final InstantRange instantRange;
if (instantToIssue != null) {
if (this.issuedInstant != null) {
// had already consumed an instant
instantRange = InstantRange.getInstance(this.issuedInstant, instantToIssue.getTimestamp(),
InstantRange.RangeType.OPEN_CLOSE);
} else if (this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()) {
// first time consume and has a start commit
final String specifiedStart = this.conf.getString(FlinkOptions.READ_STREAMING_START_COMMIT);
instantRange = specifiedStart.equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)
? null
: InstantRange.getInstance(specifiedStart, instantToIssue.getTimestamp(), InstantRange.RangeType.CLOSE_CLOSE);
} else {
// first time consume and no start commit, consumes the latest incremental data set.
instantRange = InstantRange.getInstance(instantToIssue.getTimestamp(), instantToIssue.getTimestamp(),
InstantRange.RangeType.CLOSE_CLOSE);
}
} else {
LOG.info("No new instant found for the table under path " + path + ", skip reading");
return;
}
// generate input split:
// 1. first fetch all the commit metadata for the incremental instants;
// 2. filter the relative partition paths
// 3. filter the full file paths
// 4. use the file paths from #step 3 as the back-up of the filesystem view
String tableName = conf.getString(FlinkOptions.TABLE_NAME);
List<HoodieCommitMetadata> activeMetadataList = instants.stream()
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
List<HoodieCommitMetadata> archivedMetadataList = getArchivedMetadata(instantRange, commitTimeline, tableName);
if (archivedMetadataList.size() > 0) {
LOG.warn(""
+ "--------------------------------------------------------------------------------\n"
+ "---------- caution: the reader has fall behind too much from the writer,\n"
+ "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n"
+ "--------------------------------------------------------------------------------");
}
List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
? mergeList(activeMetadataList, archivedMetadataList)
: activeMetadataList;
Set<String> writePartitions = getWritePartitionPaths(metadataList);
// apply partition push down
if (this.requiredPartitionPaths.size() > 0) {
writePartitions = writePartitions.stream()
.filter(this.requiredPartitionPaths::contains).collect(Collectors.toSet());
}
FileStatus[] fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList);
if (fileStatuses.length == 0) {
LOG.warn("No files found for reading in user provided path.");
IncrementalInputSplits.Result result =
incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, this.issuedInstant);
if (result.isEmpty()) {
// no new instants, returns early
return;
}
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses);
final String commitToIssue = instantToIssue.getTimestamp();
final AtomicInteger cnt = new AtomicInteger(0);
final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
List<MergeOnReadInputSplit> inputSplits = writePartitions.stream()
.map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, commitToIssue)
.map(fileSlice -> {
Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
.map(logFile -> logFile.getPath().toString())
.collect(Collectors.toList()));
String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
return new MergeOnReadInputSplit(cnt.getAndAdd(1),
basePath, logPaths, commitToIssue,
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange);
}).collect(Collectors.toList()))
.flatMap(Collection::stream)
.collect(Collectors.toList());
for (MergeOnReadInputSplit split : inputSplits) {
for (MergeOnReadInputSplit split : result.getInputSplits()) {
context.collect(split);
}
// update the issues instant time
this.issuedInstant = commitToIssue;
this.issuedInstant = result.getEndInstant();
LOG.info(""
+ "------------------------------------------------------------\n"
+ "---------- consumed to instant: {}\n"
+ "------------------------------------------------------------",
commitToIssue);
+ "------------------------------------------------------------\n"
+ "---------- consumed to instant: {}\n"
+ "------------------------------------------------------------",
this.issuedInstant);
}
@Override
@@ -343,87 +249,4 @@ public class StreamReadMonitoringFunction
this.instantState.add(this.issuedInstant);
}
}
/**
* Returns the archived metadata in case the reader consumes untimely or it wants
* to read from the earliest.
*
* <p>Note: should improve it with metadata table when the metadata table is stable enough.
*
* @param instantRange The instant range to filter the timeline instants
* @param commitTimeline The commit timeline
* @param tableName The table name
* @return the list of archived metadata, or empty if there is no need to read the archived timeline
*/
private List<HoodieCommitMetadata> getArchivedMetadata(
InstantRange instantRange,
HoodieTimeline commitTimeline,
String tableName) {
if (instantRange == null || commitTimeline.isBeforeTimelineStarts(instantRange.getStartInstant())) {
// read the archived metadata if:
// 1. the start commit is 'earliest';
// 2. the start instant is archived.
HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
HoodieTimeline archivedCompleteTimeline = archivedTimeline.getCommitsTimeline().filterCompletedInstants();
if (!archivedCompleteTimeline.empty()) {
final String endTs = archivedCompleteTimeline.lastInstant().get().getTimestamp();
Stream<HoodieInstant> instantStream = archivedCompleteTimeline.getInstants();
if (instantRange != null) {
archivedTimeline.loadInstantDetailsInMemory(instantRange.getStartInstant(), endTs);
instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, instantRange.getStartInstant()));
} else {
final String startTs = archivedCompleteTimeline.firstInstant().get().getTimestamp();
archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
}
return instantStream
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, archivedTimeline)).collect(Collectors.toList());
}
}
return Collections.emptyList();
}
/**
* Returns the instants with a given issuedInstant to start from.
*
* @param commitTimeline The completed commits timeline
* @param issuedInstant The last issued instant that has already been delivered to downstream
* @return the filtered hoodie instants
*/
private List<HoodieInstant> filterInstantsWithStart(
HoodieTimeline commitTimeline,
final String issuedInstant) {
HoodieTimeline completedTimeline = commitTimeline.filterCompletedInstants();
if (issuedInstant != null) {
return completedTimeline.getInstants()
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, issuedInstant))
.collect(Collectors.toList());
} else if (this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()
&& !this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)) {
String definedStartCommit = this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT);
return completedTimeline.getInstants()
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, definedStartCommit))
.collect(Collectors.toList());
} else {
return completedTimeline.getInstants().collect(Collectors.toList());
}
}
/**
* Returns all the incremental write partition paths as a set with the given commits metadata.
*
* @param metadataList The commits metadata
* @return the partition path set
*/
private Set<String> getWritePartitionPaths(List<HoodieCommitMetadata> metadataList) {
return metadataList.stream()
.map(HoodieCommitMetadata::getWritePartitionPaths)
.flatMap(Collection::stream)
.collect(Collectors.toSet());
}
private static <T> List<T> mergeList(List<T> list1, List<T> list2) {
List<T> merged = new ArrayList<>(list1);
merged.addAll(list2);
return merged;
}
}

View File

@@ -155,6 +155,8 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
setupCompactionOptions(conf);
// hive options
setupHiveOptions(conf);
// read options
setupReadOptions(conf);
// infer avro schema from physical DDL schema
inferAvroSchema(conf, schema.toPhysicalRowDataType().notNull().getLogicalType());
}
@@ -270,6 +272,16 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
}
}
/**
* Sets up the read options from the table definition.
*/
private static void setupReadOptions(Configuration conf) {
if (!conf.getBoolean(FlinkOptions.READ_AS_STREAMING)
&& (conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent() || conf.getOptional(FlinkOptions.READ_END_COMMIT).isPresent())) {
conf.setString(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_INCREMENTAL);
}
}
/**
* Inferences the deserialization Avro schema from the table schema (e.g. the DDL)
* if both options {@link FlinkOptions#SOURCE_AVRO_SCHEMA_PATH} and

View File

@@ -19,7 +19,6 @@
package org.apache.hudi.table;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
@@ -31,6 +30,7 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.HoodieROTablePathFilter;
import org.apache.hudi.source.FileIndex;
import org.apache.hudi.source.IncrementalInputSplits;
import org.apache.hudi.source.StreamReadMonitoringFunction;
import org.apache.hudi.source.StreamReadOperator;
import org.apache.hudi.table.format.FilePathUtils;
@@ -108,6 +108,9 @@ public class HoodieTableSource implements
private static final int NO_LIMIT_CONSTANT = -1;
private static final InputFormat<RowData, ?> EMPTY_INPUT_FORMAT =
new CollectionInputFormat<>(Collections.emptyList(), null);
private final transient org.apache.hadoop.conf.Configuration hadoopConf;
private final transient HoodieTableMetaClient metaClient;
private final long maxCompactionMemoryInBytes;
@@ -220,7 +223,7 @@ public class HoodieTableSource implements
public Result applyFilters(List<ResolvedExpression> filters) {
this.filters = new ArrayList<>(filters);
// refuse all the filters now
return Result.of(Collections.emptyList(), new ArrayList<>(filters));
return SupportsFilterPushDown.Result.of(Collections.emptyList(), new ArrayList<>(filters));
}
@Override
@@ -256,8 +259,8 @@ public class HoodieTableSource implements
DataType[] schemaTypes = this.schema.getColumnDataTypes().toArray(new DataType[0]);
return DataTypes.ROW(Arrays.stream(this.requiredPos)
.mapToObj(i -> DataTypes.FIELD(schemaFieldNames[i], schemaTypes[i]))
.toArray(DataTypes.Field[]::new))
.mapToObj(i -> DataTypes.FIELD(schemaFieldNames[i], schemaTypes[i]))
.toArray(DataTypes.Field[]::new))
.bridgedTo(RowData.class);
}
@@ -268,16 +271,21 @@ public class HoodieTableSource implements
return requiredPartitions;
}
@Nullable
private Set<String> getRequiredPartitionPaths() {
if (this.requiredPartitions == null) {
return Collections.emptySet();
// returns null for non partition pruning
return null;
}
return FilePathUtils.toRelativePartitionPaths(this.partitionKeys, this.requiredPartitions,
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING));
}
private List<MergeOnReadInputSplit> buildFileIndex(Path[] paths) {
if (paths.length == 0) {
private List<MergeOnReadInputSplit> buildFileIndex() {
Set<String> requiredPartitionPaths = getRequiredPartitionPaths();
fileIndex.setPartitionPaths(requiredPartitionPaths);
List<String> relPartitionPaths = fileIndex.getOrBuildPartitionPaths();
if (relPartitionPaths.size() == 0) {
return Collections.emptyList();
}
FileStatus[] fileStatuses = fileIndex.getFilesInPartitions();
@@ -292,19 +300,17 @@ public class HoodieTableSource implements
final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
final AtomicInteger cnt = new AtomicInteger(0);
// generates one input split for each file group
return Arrays.stream(paths).map(partitionPath -> {
String relPartitionPath = FSUtils.getRelativePartitionPath(path, partitionPath);
return fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, latestCommit)
.map(fileSlice -> {
String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
.map(logFile -> logFile.getPath().toString())
.collect(Collectors.toList()));
return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, latestCommit,
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null);
}).collect(Collectors.toList());
})
return relPartitionPaths.stream()
.map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, latestCommit)
.map(fileSlice -> {
String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
.map(logFile -> logFile.getPath().toString())
.collect(Collectors.toList()));
return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, latestCommit,
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null);
}).collect(Collectors.toList()))
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
@@ -319,16 +325,6 @@ public class HoodieTableSource implements
}
private InputFormat<RowData, ?> getBatchInputFormat() {
// When this table has no partition, just return an empty source.
if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {
return new CollectionInputFormat<>(Collections.emptyList(), null);
}
final Path[] paths = getReadPaths();
if (paths.length == 0) {
return new CollectionInputFormat<>(Collections.emptyList(), null);
}
final Schema tableAvroSchema = getTableAvroSchema();
final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
final RowType rowType = (RowType) rowDataType.getLogicalType();
@@ -340,62 +336,37 @@ public class HoodieTableSource implements
final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE));
switch (tableType) {
case MERGE_ON_READ:
final List<MergeOnReadInputSplit> inputSplits = buildFileIndex(paths);
final List<MergeOnReadInputSplit> inputSplits = buildFileIndex();
if (inputSplits.size() == 0) {
// When there is no input splits, just return an empty source.
LOG.warn("No input splits generate for MERGE_ON_READ input format, returns empty collection instead");
return new CollectionInputFormat<>(Collections.emptyList(), null);
return EMPTY_INPUT_FORMAT;
}
final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
rowType,
requiredRowType,
tableAvroSchema.toString(),
AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
inputSplits,
conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
return MergeOnReadInputFormat.builder()
.config(this.conf)
.paths(FilePathUtils.toFlinkPaths(paths))
.tableState(hoodieTableState)
// use the explicit fields data type because the AvroSchemaConverter
// is not very stable.
.fieldTypes(rowDataType.getChildren())
.defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
.limit(this.limit)
.emitDelete(false)
.build();
return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema,
rowDataType, inputSplits, false);
case COPY_ON_WRITE:
FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
FilePathUtils.toFlinkPaths(paths),
this.schema.getColumnNames().toArray(new String[0]),
this.schema.getColumnDataTypes().toArray(new DataType[0]),
this.requiredPos,
this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value
getParquetConf(this.conf, this.hadoopConf),
this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE)
);
format.setFilesFilter(new LatestFileFilter(this.hadoopConf));
return format;
return baseFileOnlyInputFormat();
default:
throw new HoodieException("Unexpected table type: " + this.conf.getString(FlinkOptions.TABLE_TYPE));
}
case FlinkOptions.QUERY_TYPE_READ_OPTIMIZED:
FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
FilePathUtils.toFlinkPaths(paths),
this.schema.getColumnNames().toArray(new String[0]),
this.schema.getColumnDataTypes().toArray(new DataType[0]),
this.requiredPos,
"default",
this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value
getParquetConf(this.conf, this.hadoopConf),
this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE)
);
format.setFilesFilter(new LatestFileFilter(this.hadoopConf));
return format;
return baseFileOnlyInputFormat();
case FlinkOptions.QUERY_TYPE_INCREMENTAL:
IncrementalInputSplits incrementalInputSplits = IncrementalInputSplits.builder()
.conf(conf).path(FilePathUtils.toFlinkPath(path))
.maxCompactionMemoryInBytes(maxCompactionMemoryInBytes)
.requiredPartitions(getRequiredPartitionPaths()).build();
final IncrementalInputSplits.Result result = incrementalInputSplits.inputSplits(metaClient, hadoopConf);
if (result.isEmpty()) {
// When there is no input splits, just return an empty source.
LOG.warn("No input splits generate for incremental read, returns empty collection instead");
return new CollectionInputFormat<>(Collections.emptyList(), null);
}
return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema,
rowDataType, result.getInputSplits(), false);
default:
String errMsg = String.format("Invalid query type : '%s', options ['%s', '%s'] are supported now", queryType,
FlinkOptions.QUERY_TYPE_SNAPSHOT, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED);
String errMsg = String.format("Invalid query type : '%s', options ['%s', '%s', '%s'] are supported now", queryType,
FlinkOptions.QUERY_TYPE_SNAPSHOT, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED, FlinkOptions.QUERY_TYPE_INCREMENTAL);
throw new HoodieException(errMsg);
}
}
@@ -408,56 +379,62 @@ public class HoodieTableSource implements
final RowType requiredRowType = (RowType) getProducedDataType().notNull().getLogicalType();
final String queryType = this.conf.getString(FlinkOptions.QUERY_TYPE);
org.apache.flink.core.fs.Path[] paths = new org.apache.flink.core.fs.Path[0];
if (FlinkOptions.QUERY_TYPE_SNAPSHOT.equals(queryType)) {
final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE));
switch (tableType) {
case MERGE_ON_READ:
final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
rowType,
requiredRowType,
tableAvroSchema.toString(),
AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
Collections.emptyList(),
conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
return MergeOnReadInputFormat.builder()
.config(this.conf)
.paths(paths)
.tableState(hoodieTableState)
// use the explicit fields data type because the AvroSchemaConverter
// is not very stable.
.fieldTypes(rowDataType.getChildren())
.defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
.limit(this.limit)
.emitDelete(true)
.build();
case COPY_ON_WRITE:
final MergeOnReadTableState hoodieTableState2 = new MergeOnReadTableState(
rowType,
requiredRowType,
tableAvroSchema.toString(),
AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
Collections.emptyList(),
conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
return MergeOnReadInputFormat.builder()
.config(this.conf)
.paths(paths)
.tableState(hoodieTableState2)
// use the explicit fields data type because the AvroSchemaConverter
// is not very stable.
.fieldTypes(rowDataType.getChildren())
.defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
.limit(this.limit)
.build();
default:
throw new HoodieException("Unexpected table type: " + this.conf.getString(FlinkOptions.TABLE_TYPE));
}
boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ;
return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema,
rowDataType, Collections.emptyList(), emitDelete);
}
String errMsg = String.format("Invalid query type : '%s', options ['%s'] are supported now", queryType,
FlinkOptions.QUERY_TYPE_SNAPSHOT);
throw new HoodieException(errMsg);
}
private MergeOnReadInputFormat mergeOnReadInputFormat(
RowType rowType,
RowType requiredRowType,
Schema tableAvroSchema,
DataType rowDataType,
List<MergeOnReadInputSplit> inputSplits,
boolean emitDelete) {
final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
rowType,
requiredRowType,
tableAvroSchema.toString(),
AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
inputSplits,
conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
return MergeOnReadInputFormat.builder()
.config(this.conf)
.tableState(hoodieTableState)
// use the explicit fields' data type because the AvroSchemaConverter
// is not very stable.
.fieldTypes(rowDataType.getChildren())
.defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
.limit(this.limit)
.emitDelete(emitDelete)
.build();
}
private InputFormat<RowData, ?> baseFileOnlyInputFormat() {
final Path[] paths = getReadPaths();
if (paths.length == 0) {
return EMPTY_INPUT_FORMAT;
}
FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
FilePathUtils.toFlinkPaths(paths),
this.schema.getColumnNames().toArray(new String[0]),
this.schema.getColumnDataTypes().toArray(new DataType[0]),
this.requiredPos,
this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value
getParquetConf(this.conf, this.hadoopConf),
this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE)
);
format.setFilesFilter(new LatestFileFilter(this.hadoopConf));
return format;
}
private Schema inferSchemaFromDdl() {
Schema schema = AvroSchemaConverter.convertToSchema(this.schema.toPhysicalRowDataType().getLogicalType());
return HoodieAvroUtils.addMetadataFields(schema, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));

View File

@@ -45,7 +45,6 @@ import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
@@ -85,8 +84,6 @@ public class MergeOnReadInputFormat
private transient org.apache.hadoop.conf.Configuration hadoopConf;
private Path[] paths;
private final MergeOnReadTableState tableState;
/**
@@ -134,14 +131,12 @@ public class MergeOnReadInputFormat
private MergeOnReadInputFormat(
Configuration conf,
Path[] paths,
MergeOnReadTableState tableState,
List<DataType> fieldTypes,
String defaultPartName,
long limit,
boolean emitDelete) {
this.conf = conf;
this.paths = paths;
this.tableState = tableState;
this.fieldNames = tableState.getRowType().getFieldNames();
this.fieldTypes = fieldTypes;
@@ -165,7 +160,7 @@ public class MergeOnReadInputFormat
this.currentReadCount = 0L;
this.hadoopConf = StreamerUtil.getHadoopConf();
if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() > 0)) {
if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
if (split.getInstantRange() != null) {
// base file only with commit time filtering
this.iterator = new BaseFileOnlyFilteringIterator(
split.getInstantRange(),
@@ -212,16 +207,8 @@ public class MergeOnReadInputFormat
@Override
public void configure(Configuration configuration) {
if (this.paths.length == 0) {
// file path was not specified yet. Try to set it from the parameters.
String filePath = configuration.getString(FlinkOptions.PATH, null);
if (filePath == null) {
throw new IllegalArgumentException("File path was not specified in input format or configuration.");
} else {
this.paths = new Path[] {new Path(filePath)};
}
}
// may supports nested files in the future.
// no operation
// may support nested files in the future.
}
@Override
@@ -750,7 +737,6 @@ public class MergeOnReadInputFormat
*/
public static class Builder {
private Configuration conf;
private Path[] paths;
private MergeOnReadTableState tableState;
private List<DataType> fieldTypes;
private String defaultPartName;
@@ -762,11 +748,6 @@ public class MergeOnReadInputFormat
return this;
}
public Builder paths(Path[] paths) {
this.paths = paths;
return this;
}
public Builder tableState(MergeOnReadTableState tableState) {
this.tableState = tableState;
return this;
@@ -793,8 +774,8 @@ public class MergeOnReadInputFormat
}
public MergeOnReadInputFormat build() {
return new MergeOnReadInputFormat(conf, paths, tableState,
fieldTypes, defaultPartName, limit, emitDelete);
return new MergeOnReadInputFormat(conf, tableState, fieldTypes,
defaultPartName, limit, emitDelete);
}
}