[HUDI-4391] Incremental read from archived commits for flink (#6096)
This commit is contained in:
@@ -19,6 +19,7 @@
|
|||||||
package org.apache.hudi.common.table.log;
|
package org.apache.hudi.common.table.log;
|
||||||
|
|
||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
|
import org.apache.hudi.common.util.ValidationUtils;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
@@ -33,19 +34,15 @@ public abstract class InstantRange implements Serializable {
|
|||||||
protected final String endInstant;
|
protected final String endInstant;
|
||||||
|
|
||||||
public InstantRange(String startInstant, String endInstant) {
|
public InstantRange(String startInstant, String endInstant) {
|
||||||
this.startInstant = Objects.requireNonNull(startInstant);
|
this.startInstant = startInstant;
|
||||||
this.endInstant = Objects.requireNonNull(endInstant);
|
this.endInstant = endInstant;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static InstantRange getInstance(String startInstant, String endInstant, RangeType rangeType) {
|
/**
|
||||||
switch (rangeType) {
|
* Returns the builder.
|
||||||
case OPEN_CLOSE:
|
*/
|
||||||
return new OpenCloseRange(startInstant, endInstant);
|
public static Builder builder() {
|
||||||
case CLOSE_CLOSE:
|
return new Builder();
|
||||||
return new CloseCloseRange(startInstant, endInstant);
|
|
||||||
default:
|
|
||||||
throw new AssertionError();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getStartInstant() {
|
public String getStartInstant() {
|
||||||
@@ -65,14 +62,14 @@ public abstract class InstantRange implements Serializable {
|
|||||||
/**
|
/**
|
||||||
* Represents a range type.
|
* Represents a range type.
|
||||||
*/
|
*/
|
||||||
public enum RangeType {
|
public static enum RangeType {
|
||||||
OPEN_CLOSE, CLOSE_CLOSE
|
OPEN_CLOSE, CLOSE_CLOSE
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class OpenCloseRange extends InstantRange {
|
private static class OpenCloseRange extends InstantRange {
|
||||||
|
|
||||||
public OpenCloseRange(String startInstant, String endInstant) {
|
public OpenCloseRange(String startInstant, String endInstant) {
|
||||||
super(startInstant, endInstant);
|
super(Objects.requireNonNull(startInstant), endInstant);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -84,10 +81,31 @@ public abstract class InstantRange implements Serializable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class OpenCloseRangeNullableBoundary extends InstantRange {
|
||||||
|
|
||||||
|
public OpenCloseRangeNullableBoundary(String startInstant, String endInstant) {
|
||||||
|
super(startInstant, endInstant);
|
||||||
|
ValidationUtils.checkArgument(startInstant != null || endInstant != null,
|
||||||
|
"Start and end instants can not both be null");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isInRange(String instant) {
|
||||||
|
if (startInstant == null) {
|
||||||
|
return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant);
|
||||||
|
} else if (endInstant == null) {
|
||||||
|
return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN, startInstant);
|
||||||
|
} else {
|
||||||
|
return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN, startInstant)
|
||||||
|
&& HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static class CloseCloseRange extends InstantRange {
|
private static class CloseCloseRange extends InstantRange {
|
||||||
|
|
||||||
public CloseCloseRange(String startInstant, String endInstant) {
|
public CloseCloseRange(String startInstant, String endInstant) {
|
||||||
super(startInstant, endInstant);
|
super(Objects.requireNonNull(startInstant), endInstant);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -98,4 +116,78 @@ public abstract class InstantRange implements Serializable {
|
|||||||
return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, startInstant);
|
return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, startInstant);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class CloseCloseRangeNullableBoundary extends InstantRange {
|
||||||
|
|
||||||
|
public CloseCloseRangeNullableBoundary(String startInstant, String endInstant) {
|
||||||
|
super(startInstant, endInstant);
|
||||||
|
ValidationUtils.checkArgument(startInstant != null || endInstant != null,
|
||||||
|
"Start and end instants can not both be null");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isInRange(String instant) {
|
||||||
|
if (startInstant == null) {
|
||||||
|
return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant);
|
||||||
|
} else if (endInstant == null) {
|
||||||
|
return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, startInstant);
|
||||||
|
} else {
|
||||||
|
return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, startInstant)
|
||||||
|
&& HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// Inner Class
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Builder for {@link InstantRange}.
|
||||||
|
*/
|
||||||
|
public static class Builder {
|
||||||
|
private String startInstant;
|
||||||
|
private String endInstant;
|
||||||
|
private RangeType rangeType;
|
||||||
|
private boolean nullableBoundary = false;
|
||||||
|
|
||||||
|
private Builder() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder startInstant(String startInstant) {
|
||||||
|
this.startInstant = startInstant;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder endInstant(String endInstant) {
|
||||||
|
this.endInstant = endInstant;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder rangeType(RangeType rangeType) {
|
||||||
|
this.rangeType = rangeType;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder nullableBoundary(boolean nullable) {
|
||||||
|
this.nullableBoundary = nullable;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public InstantRange build() {
|
||||||
|
ValidationUtils.checkState(this.rangeType != null, "Range type is required");
|
||||||
|
switch (rangeType) {
|
||||||
|
case OPEN_CLOSE:
|
||||||
|
return nullableBoundary
|
||||||
|
? new OpenCloseRangeNullableBoundary(startInstant, endInstant)
|
||||||
|
: new OpenCloseRange(startInstant, endInstant);
|
||||||
|
case CLOSE_CLOSE:
|
||||||
|
return nullableBoundary
|
||||||
|
? new CloseCloseRangeNullableBoundary(startInstant, endInstant)
|
||||||
|
: new CloseCloseRange(startInstant, endInstant);
|
||||||
|
default:
|
||||||
|
throw new AssertionError();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -172,4 +172,12 @@ public class OptionsResolver {
|
|||||||
return conf.getString(FlinkOptions.OPERATION).equals(WriteOperationType.INSERT_OVERWRITE_TABLE.value())
|
return conf.getString(FlinkOptions.OPERATION).equals(WriteOperationType.INSERT_OVERWRITE_TABLE.value())
|
||||||
|| conf.getString(FlinkOptions.OPERATION).equals(WriteOperationType.INSERT_OVERWRITE.value());
|
|| conf.getString(FlinkOptions.OPERATION).equals(WriteOperationType.INSERT_OVERWRITE.value());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns whether the read start commit is specific commit timestamp.
|
||||||
|
*/
|
||||||
|
public static boolean isSpecificStartCommit(Configuration conf) {
|
||||||
|
return conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()
|
||||||
|
&& !conf.get(FlinkOptions.READ_START_COMMIT).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -82,6 +82,29 @@ public class WriteProfiles {
|
|||||||
PROFILES.remove(path);
|
PROFILES.remove(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns all the incremental write file statuses with the given commits metadata.
|
||||||
|
*
|
||||||
|
* <p> Different with {@link #getWritePathsOfInstants}, the files are not filtered by
|
||||||
|
* existence.
|
||||||
|
*
|
||||||
|
* @param basePath Table base path
|
||||||
|
* @param hadoopConf The hadoop conf
|
||||||
|
* @param metadataList The commits metadata
|
||||||
|
* @param tableType The table type
|
||||||
|
* @return the file status array
|
||||||
|
*/
|
||||||
|
public static FileStatus[] getRawWritePathsOfInstants(
|
||||||
|
Path basePath,
|
||||||
|
Configuration hadoopConf,
|
||||||
|
List<HoodieCommitMetadata> metadataList,
|
||||||
|
HoodieTableType tableType) {
|
||||||
|
Map<String, FileStatus> uniqueIdToFileStatus = new HashMap<>();
|
||||||
|
metadataList.forEach(metadata ->
|
||||||
|
uniqueIdToFileStatus.putAll(getFilesToReadOfInstant(basePath, metadata, hadoopConf, tableType)));
|
||||||
|
return uniqueIdToFileStatus.values().toArray(new FileStatus[0]);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns all the incremental write file statuses with the given commits metadata.
|
* Returns all the incremental write file statuses with the given commits metadata.
|
||||||
*
|
*
|
||||||
@@ -103,6 +126,25 @@ public class WriteProfiles {
|
|||||||
return uniqueIdToFileStatus.values().toArray(new FileStatus[0]);
|
return uniqueIdToFileStatus.values().toArray(new FileStatus[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the commit file status info with given metadata.
|
||||||
|
*
|
||||||
|
* @param basePath Table base path
|
||||||
|
* @param metadata The metadata
|
||||||
|
* @param hadoopConf The filesystem
|
||||||
|
* @param tableType The table type
|
||||||
|
* @return the commit file status info grouping by specific ID
|
||||||
|
*/
|
||||||
|
private static Map<String, FileStatus> getFilesToReadOfInstant(
|
||||||
|
Path basePath,
|
||||||
|
HoodieCommitMetadata metadata,
|
||||||
|
Configuration hadoopConf,
|
||||||
|
HoodieTableType tableType) {
|
||||||
|
return getFilesToRead(hadoopConf, metadata, basePath.toString(), tableType).entrySet().stream()
|
||||||
|
.filter(entry -> StreamerUtil.isValidFile(entry.getValue()))
|
||||||
|
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the commit file status info with given metadata.
|
* Returns the commit file status info with given metadata.
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.source;
|
package org.apache.hudi.source;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
import org.apache.hudi.common.model.BaseFile;
|
import org.apache.hudi.common.model.BaseFile;
|
||||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||||
import org.apache.hudi.common.model.HoodieLogFile;
|
import org.apache.hudi.common.model.HoodieLogFile;
|
||||||
@@ -29,20 +30,24 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
|||||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
|
import org.apache.hudi.configuration.OptionsResolver;
|
||||||
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
|
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
|
||||||
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
|
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
|
||||||
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
|
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
|
||||||
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
|
|
||||||
import org.apache.flink.configuration.Configuration;
|
import org.apache.flink.configuration.Configuration;
|
||||||
import org.apache.flink.core.fs.Path;
|
import org.apache.flink.core.fs.Path;
|
||||||
import org.apache.flink.table.types.logical.RowType;
|
import org.apache.flink.table.types.logical.RowType;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
@@ -117,7 +122,102 @@ public class IncrementalInputSplits implements Serializable {
|
|||||||
public Result inputSplits(
|
public Result inputSplits(
|
||||||
HoodieTableMetaClient metaClient,
|
HoodieTableMetaClient metaClient,
|
||||||
org.apache.hadoop.conf.Configuration hadoopConf) {
|
org.apache.hadoop.conf.Configuration hadoopConf) {
|
||||||
return inputSplits(metaClient, hadoopConf, null);
|
HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
|
||||||
|
if (commitTimeline.empty()) {
|
||||||
|
LOG.warn("No splits found for the table under path " + path);
|
||||||
|
return Result.EMPTY;
|
||||||
|
}
|
||||||
|
|
||||||
|
final String startCommit = this.conf.getString(FlinkOptions.READ_START_COMMIT);
|
||||||
|
final String endCommit = this.conf.getString(FlinkOptions.READ_END_COMMIT);
|
||||||
|
final boolean startFromEarliest = FlinkOptions.START_COMMIT_EARLIEST.equalsIgnoreCase(startCommit);
|
||||||
|
final boolean startOutOfRange = startCommit != null && commitTimeline.isBeforeTimelineStarts(startCommit);
|
||||||
|
final boolean endOutOfRange = endCommit != null && commitTimeline.isBeforeTimelineStarts(endCommit);
|
||||||
|
boolean fullTableScan = startFromEarliest || startOutOfRange || endOutOfRange;
|
||||||
|
|
||||||
|
// Step1: find out the files to read, tries to read the files from the commit metadata first,
|
||||||
|
// fallback to full table scan if any of the following conditions matches:
|
||||||
|
// 1. there are files in metadata be deleted;
|
||||||
|
// 2. read from earliest
|
||||||
|
// 3. the start commit is archived
|
||||||
|
// 4. the end commit is archived
|
||||||
|
Set<String> readPartitions;
|
||||||
|
final FileStatus[] fileStatuses;
|
||||||
|
List<HoodieInstant> instants = filterInstantsWithRange(commitTimeline, null);
|
||||||
|
if (fullTableScan) {
|
||||||
|
// scans the partitions and files directly.
|
||||||
|
FileIndex fileIndex = getFileIndex();
|
||||||
|
readPartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());
|
||||||
|
if (readPartitions.size() == 0) {
|
||||||
|
LOG.warn("No partitions found for reading in user provided path.");
|
||||||
|
return Result.EMPTY;
|
||||||
|
}
|
||||||
|
fileStatuses = fileIndex.getFilesInPartitions();
|
||||||
|
} else {
|
||||||
|
if (instants.size() == 0) {
|
||||||
|
LOG.info("No new instant found for the table under path " + path + ", skip reading");
|
||||||
|
return Result.EMPTY;
|
||||||
|
}
|
||||||
|
String tableName = conf.getString(FlinkOptions.TABLE_NAME);
|
||||||
|
List<HoodieCommitMetadata> metadataList = instants.stream()
|
||||||
|
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
|
||||||
|
readPartitions = getReadPartitions(metadataList);
|
||||||
|
if (readPartitions.size() == 0) {
|
||||||
|
LOG.warn("No partitions found for reading in user provided path.");
|
||||||
|
return Result.EMPTY;
|
||||||
|
}
|
||||||
|
FileStatus[] files = WriteProfiles.getRawWritePathsOfInstants(path, hadoopConf, metadataList, metaClient.getTableType());
|
||||||
|
FileSystem fs = FSUtils.getFs(path.toString(), hadoopConf);
|
||||||
|
if (Arrays.stream(files).anyMatch(fileStatus -> !StreamerUtil.fileExists(fs, fileStatus.getPath()))) {
|
||||||
|
LOG.warn("Found deleted files in metadata, fall back to full table scan.");
|
||||||
|
// fallback to full table scan
|
||||||
|
fullTableScan = true;
|
||||||
|
// reading from the earliest, scans the partitions and files directly.
|
||||||
|
FileIndex fileIndex = getFileIndex();
|
||||||
|
readPartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());
|
||||||
|
if (readPartitions.size() == 0) {
|
||||||
|
LOG.warn("No partitions found for reading in user provided path.");
|
||||||
|
return Result.EMPTY;
|
||||||
|
}
|
||||||
|
fileStatuses = fileIndex.getFilesInPartitions();
|
||||||
|
} else {
|
||||||
|
fileStatuses = files;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fileStatuses.length == 0) {
|
||||||
|
LOG.warn("No files found for reading in user provided path.");
|
||||||
|
return Result.EMPTY;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step2: generates the instant range
|
||||||
|
// if the specified end commit is archived, still uses the specified timestamp,
|
||||||
|
// else uses the latest filtered instant time
|
||||||
|
// (would be the latest instant time if the specified end commit is greater than the latest instant time)
|
||||||
|
final String rangeEnd = endOutOfRange ? endCommit : instants.get(instants.size() - 1).getTimestamp();
|
||||||
|
// keep the same semantics with streaming read, default start from the latest commit
|
||||||
|
final String rangeStart = startFromEarliest ? null : (startCommit == null ? rangeEnd : startCommit);
|
||||||
|
final InstantRange instantRange;
|
||||||
|
if (!fullTableScan) {
|
||||||
|
instantRange = InstantRange.builder().startInstant(rangeStart).endInstant(rangeEnd)
|
||||||
|
.rangeType(InstantRange.RangeType.CLOSE_CLOSE).build();
|
||||||
|
} else if (startFromEarliest && endCommit == null) {
|
||||||
|
// short-cut for snapshot read
|
||||||
|
instantRange = null;
|
||||||
|
} else {
|
||||||
|
instantRange = InstantRange.builder().startInstant(rangeStart).endInstant(rangeEnd)
|
||||||
|
.rangeType(InstantRange.RangeType.CLOSE_CLOSE).nullableBoundary(true).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step3: decides the read end commit
|
||||||
|
final String endInstant = fullTableScan
|
||||||
|
? commitTimeline.lastInstant().get().getTimestamp()
|
||||||
|
: instants.get(instants.size() - 1).getTimestamp();
|
||||||
|
|
||||||
|
List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline,
|
||||||
|
fileStatuses, readPartitions, endInstant, instantRange);
|
||||||
|
|
||||||
|
return Result.instance(inputSplits, endInstant);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -146,18 +246,19 @@ public class IncrementalInputSplits implements Serializable {
|
|||||||
if (issuedInstant != null) {
|
if (issuedInstant != null) {
|
||||||
// the streaming reader may record the last issued instant, if the issued instant is present,
|
// the streaming reader may record the last issued instant, if the issued instant is present,
|
||||||
// the instant range should be: (issued instant, the latest instant].
|
// the instant range should be: (issued instant, the latest instant].
|
||||||
instantRange = InstantRange.getInstance(issuedInstant, instantToIssue.getTimestamp(),
|
instantRange = InstantRange.builder().startInstant(issuedInstant).endInstant(instantToIssue.getTimestamp())
|
||||||
InstantRange.RangeType.OPEN_CLOSE);
|
.rangeType(InstantRange.RangeType.OPEN_CLOSE).build();
|
||||||
} else if (this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()) {
|
} else if (this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()) {
|
||||||
// first time consume and has a start commit
|
// first time consume and has a start commit
|
||||||
final String startCommit = this.conf.getString(FlinkOptions.READ_START_COMMIT);
|
final String startCommit = this.conf.getString(FlinkOptions.READ_START_COMMIT);
|
||||||
instantRange = startCommit.equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)
|
instantRange = startCommit.equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)
|
||||||
? null
|
? null
|
||||||
: InstantRange.getInstance(startCommit, instantToIssue.getTimestamp(), InstantRange.RangeType.CLOSE_CLOSE);
|
: InstantRange.builder().startInstant(startCommit).endInstant(instantToIssue.getTimestamp())
|
||||||
|
.rangeType(InstantRange.RangeType.CLOSE_CLOSE).build();
|
||||||
} else {
|
} else {
|
||||||
// first time consume and no start commit, consumes the latest incremental data set.
|
// first time consume and no start commit, consumes the latest incremental data set.
|
||||||
instantRange = InstantRange.getInstance(instantToIssue.getTimestamp(), instantToIssue.getTimestamp(),
|
instantRange = InstantRange.builder().startInstant(instantToIssue.getTimestamp()).endInstant(instantToIssue.getTimestamp())
|
||||||
InstantRange.RangeType.CLOSE_CLOSE);
|
.rangeType(InstantRange.RangeType.CLOSE_CLOSE).build();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
LOG.info("No new instant found for the table under path " + path + ", skip reading");
|
LOG.info("No new instant found for the table under path " + path + ", skip reading");
|
||||||
@@ -166,18 +267,14 @@ public class IncrementalInputSplits implements Serializable {
|
|||||||
|
|
||||||
String tableName = conf.getString(FlinkOptions.TABLE_NAME);
|
String tableName = conf.getString(FlinkOptions.TABLE_NAME);
|
||||||
|
|
||||||
Set<String> writePartitions;
|
Set<String> readPartitions;
|
||||||
final FileStatus[] fileStatuses;
|
final FileStatus[] fileStatuses;
|
||||||
|
|
||||||
if (instantRange == null) {
|
if (instantRange == null) {
|
||||||
// reading from the earliest, scans the partitions and files directly.
|
// reading from the earliest, scans the partitions and files directly.
|
||||||
FileIndex fileIndex = FileIndex.instance(new org.apache.hadoop.fs.Path(path.toUri()), conf, rowType);
|
FileIndex fileIndex = getFileIndex();
|
||||||
if (this.requiredPartitions != null) {
|
readPartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());
|
||||||
// apply partition push down
|
if (readPartitions.size() == 0) {
|
||||||
fileIndex.setPartitionPaths(this.requiredPartitions);
|
|
||||||
}
|
|
||||||
writePartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());
|
|
||||||
if (writePartitions.size() == 0) {
|
|
||||||
LOG.warn("No partitions found for reading in user provided path.");
|
LOG.warn("No partitions found for reading in user provided path.");
|
||||||
return Result.EMPTY;
|
return Result.EMPTY;
|
||||||
}
|
}
|
||||||
@@ -198,13 +295,8 @@ public class IncrementalInputSplits implements Serializable {
|
|||||||
? mergeList(archivedMetadataList, activeMetadataList)
|
? mergeList(archivedMetadataList, activeMetadataList)
|
||||||
: activeMetadataList;
|
: activeMetadataList;
|
||||||
|
|
||||||
writePartitions = HoodieInputFormatUtils.getWritePartitionPaths(metadataList);
|
readPartitions = getReadPartitions(metadataList);
|
||||||
// apply partition push down
|
if (readPartitions.size() == 0) {
|
||||||
if (this.requiredPartitions != null) {
|
|
||||||
writePartitions = writePartitions.stream()
|
|
||||||
.filter(this.requiredPartitions::contains).collect(Collectors.toSet());
|
|
||||||
}
|
|
||||||
if (writePartitions.size() == 0) {
|
|
||||||
LOG.warn("No partitions found for reading in user provided path.");
|
LOG.warn("No partitions found for reading in user provided path.");
|
||||||
return Result.EMPTY;
|
return Result.EMPTY;
|
||||||
}
|
}
|
||||||
@@ -216,11 +308,24 @@ public class IncrementalInputSplits implements Serializable {
|
|||||||
return Result.EMPTY;
|
return Result.EMPTY;
|
||||||
}
|
}
|
||||||
|
|
||||||
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses);
|
|
||||||
final String endInstant = instantToIssue.getTimestamp();
|
final String endInstant = instantToIssue.getTimestamp();
|
||||||
|
List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline,
|
||||||
|
fileStatuses, readPartitions, endInstant, instantRange);
|
||||||
|
|
||||||
|
return Result.instance(inputSplits, endInstant);
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<MergeOnReadInputSplit> getInputSplits(
|
||||||
|
HoodieTableMetaClient metaClient,
|
||||||
|
HoodieTimeline commitTimeline,
|
||||||
|
FileStatus[] fileStatuses,
|
||||||
|
Set<String> readPartitions,
|
||||||
|
String endInstant,
|
||||||
|
InstantRange instantRange) {
|
||||||
|
final HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses);
|
||||||
final AtomicInteger cnt = new AtomicInteger(0);
|
final AtomicInteger cnt = new AtomicInteger(0);
|
||||||
final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
|
final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
|
||||||
List<MergeOnReadInputSplit> inputSplits = writePartitions.stream()
|
return readPartitions.stream()
|
||||||
.map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, endInstant)
|
.map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, endInstant)
|
||||||
.map(fileSlice -> {
|
.map(fileSlice -> {
|
||||||
Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
|
Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
|
||||||
@@ -234,7 +339,32 @@ public class IncrementalInputSplits implements Serializable {
|
|||||||
}).collect(Collectors.toList()))
|
}).collect(Collectors.toList()))
|
||||||
.flatMap(Collection::stream)
|
.flatMap(Collection::stream)
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
return Result.instance(inputSplits, endInstant);
|
}
|
||||||
|
|
||||||
|
private FileIndex getFileIndex() {
|
||||||
|
FileIndex fileIndex = FileIndex.instance(new org.apache.hadoop.fs.Path(path.toUri()), conf, rowType);
|
||||||
|
if (this.requiredPartitions != null) {
|
||||||
|
// apply partition push down
|
||||||
|
fileIndex.setPartitionPaths(this.requiredPartitions);
|
||||||
|
}
|
||||||
|
return fileIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the partitions to read with given metadata list.
|
||||||
|
* The partitions would be filtered by the pushed down required partitions.
|
||||||
|
*
|
||||||
|
* @param metadataList The metadata list
|
||||||
|
* @return the set of read partitions
|
||||||
|
*/
|
||||||
|
private Set<String> getReadPartitions(List<HoodieCommitMetadata> metadataList) {
|
||||||
|
Set<String> partitions = HoodieInputFormatUtils.getWritePartitionPaths(metadataList);
|
||||||
|
// apply partition push down
|
||||||
|
if (this.requiredPartitions != null) {
|
||||||
|
return partitions.stream()
|
||||||
|
.filter(this.requiredPartitions::contains).collect(Collectors.toSet());
|
||||||
|
}
|
||||||
|
return partitions;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -287,8 +417,7 @@ public class IncrementalInputSplits implements Serializable {
|
|||||||
|
|
||||||
Stream<HoodieInstant> instantStream = completedTimeline.getInstants();
|
Stream<HoodieInstant> instantStream = completedTimeline.getInstants();
|
||||||
|
|
||||||
if (this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()
|
if (OptionsResolver.isSpecificStartCommit(this.conf)) {
|
||||||
&& !this.conf.get(FlinkOptions.READ_START_COMMIT).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)) {
|
|
||||||
final String startCommit = this.conf.get(FlinkOptions.READ_START_COMMIT);
|
final String startCommit = this.conf.get(FlinkOptions.READ_START_COMMIT);
|
||||||
instantStream = instantStream
|
instantStream = instantStream
|
||||||
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, startCommit));
|
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, startCommit));
|
||||||
|
|||||||
@@ -514,4 +514,12 @@ public class StreamerUtil {
|
|||||||
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
|
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
|
||||||
return schemaUtil.getTableAvroSchema(includeMetadataFields);
|
return schemaUtil.getTableAvroSchema(includeMetadataFields);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static boolean fileExists(FileSystem fs, Path path) {
|
||||||
|
try {
|
||||||
|
return fs.exists(path);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new HoodieException("Exception while checking file " + path + " existence", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -40,6 +40,7 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions;
|
|||||||
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
|
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
|
||||||
import org.apache.flink.table.catalog.ObjectPath;
|
import org.apache.flink.table.catalog.ObjectPath;
|
||||||
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
|
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
|
||||||
|
import org.apache.flink.table.data.RowData;
|
||||||
import org.apache.flink.test.util.AbstractTestBase;
|
import org.apache.flink.test.util.AbstractTestBase;
|
||||||
import org.apache.flink.types.Row;
|
import org.apache.flink.types.Row;
|
||||||
import org.apache.flink.util.CollectionUtil;
|
import org.apache.flink.util.CollectionUtil;
|
||||||
@@ -1120,6 +1121,39 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
|
|||||||
assertRowsEquals(result, TestData.dataSetInsert(5, 6));
|
assertRowsEquals(result, TestData.dataSetInsert(5, 6));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@EnumSource(value = HoodieTableType.class)
|
||||||
|
void testIncrementalReadArchivedCommits(HoodieTableType tableType) throws Exception {
|
||||||
|
TableEnvironment tableEnv = batchTableEnv;
|
||||||
|
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
|
||||||
|
conf.setString(FlinkOptions.TABLE_NAME, "t1");
|
||||||
|
conf.setString(FlinkOptions.TABLE_TYPE, tableType.name());
|
||||||
|
conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, 3);
|
||||||
|
conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, 4);
|
||||||
|
conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, 2);
|
||||||
|
conf.setString("hoodie.commits.archival.batch", "1");
|
||||||
|
|
||||||
|
// write 10 batches of data set
|
||||||
|
for (int i = 0; i < 20; i += 2) {
|
||||||
|
List<RowData> dataset = TestData.dataSetInsert(i + 1, i + 2);
|
||||||
|
TestData.writeData(dataset, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
String secondArchived = TestUtils.getNthArchivedInstant(tempFile.getAbsolutePath(), 1);
|
||||||
|
|
||||||
|
String hoodieTableDDL = sql("t1")
|
||||||
|
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
|
||||||
|
.option(FlinkOptions.TABLE_TYPE, tableType)
|
||||||
|
.option(FlinkOptions.READ_START_COMMIT, secondArchived)
|
||||||
|
.end();
|
||||||
|
tableEnv.executeSql(hoodieTableDDL);
|
||||||
|
|
||||||
|
List<Row> result = CollectionUtil.iterableToList(
|
||||||
|
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
|
||||||
|
assertRowsEquals(result, TestData.dataSetInsert(3, 4, 5, 6, 7, 8, 9, 10,
|
||||||
|
11, 12, 13, 14, 15, 16, 17, 18, 19, 20));
|
||||||
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@EnumSource(value = HoodieTableType.class)
|
@EnumSource(value = HoodieTableType.class)
|
||||||
void testReadWithWiderSchema(HoodieTableType tableType) throws Exception {
|
void testReadWithWiderSchema(HoodieTableType tableType) throws Exception {
|
||||||
|
|||||||
@@ -18,6 +18,8 @@
|
|||||||
|
|
||||||
package org.apache.hudi.table.format;
|
package org.apache.hudi.table.format;
|
||||||
|
|
||||||
|
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
||||||
|
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
||||||
import org.apache.hudi.common.model.HoodieTableType;
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
@@ -448,6 +450,104 @@ public class TestInputFormat {
|
|||||||
List<RowData> actual4 = readData(inputFormat4);
|
List<RowData> actual4 = readData(inputFormat4);
|
||||||
final List<RowData> expected4 = TestData.dataSetInsert(3, 4);
|
final List<RowData> expected4 = TestData.dataSetInsert(3, 4);
|
||||||
TestData.assertRowDataEquals(actual4, expected4);
|
TestData.assertRowDataEquals(actual4, expected4);
|
||||||
|
|
||||||
|
// start and end commit: start commit out of range
|
||||||
|
conf.setString(FlinkOptions.READ_START_COMMIT, "000");
|
||||||
|
conf.setString(FlinkOptions.READ_END_COMMIT, commits.get(1));
|
||||||
|
this.tableSource = getTableSource(conf);
|
||||||
|
InputFormat<RowData, ?> inputFormat5 = this.tableSource.getInputFormat();
|
||||||
|
assertThat(inputFormat4, instanceOf(MergeOnReadInputFormat.class));
|
||||||
|
|
||||||
|
List<RowData> actual5 = readData(inputFormat5);
|
||||||
|
final List<RowData> expected5 = TestData.dataSetInsert(1, 2, 3, 4);
|
||||||
|
TestData.assertRowDataEquals(actual5, expected5);
|
||||||
|
|
||||||
|
// start and end commit: both are out of range
|
||||||
|
conf.setString(FlinkOptions.READ_START_COMMIT, "001");
|
||||||
|
conf.setString(FlinkOptions.READ_END_COMMIT, "002");
|
||||||
|
this.tableSource = getTableSource(conf);
|
||||||
|
InputFormat<RowData, ?> inputFormat6 = this.tableSource.getInputFormat();
|
||||||
|
assertThat(inputFormat6, instanceOf(MergeOnReadInputFormat.class));
|
||||||
|
|
||||||
|
List<RowData> actual6 = readData(inputFormat6);
|
||||||
|
TestData.assertRowDataEquals(actual6, Collections.emptyList());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testReadArchivedCommitsIncrementally() throws Exception {
|
||||||
|
Map<String, String> options = new HashMap<>();
|
||||||
|
options.put(FlinkOptions.QUERY_TYPE.key(), FlinkOptions.QUERY_TYPE_INCREMENTAL);
|
||||||
|
options.put(FlinkOptions.ARCHIVE_MIN_COMMITS.key(), "3");
|
||||||
|
options.put(FlinkOptions.ARCHIVE_MAX_COMMITS.key(), "4");
|
||||||
|
options.put(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "2");
|
||||||
|
options.put("hoodie.commits.archival.batch", "1");
|
||||||
|
beforeEach(HoodieTableType.COPY_ON_WRITE, options);
|
||||||
|
|
||||||
|
// write 10 batches of data set
|
||||||
|
for (int i = 0; i < 20; i += 2) {
|
||||||
|
List<RowData> dataset = TestData.dataSetInsert(i + 1, i + 2);
|
||||||
|
TestData.writeData(dataset, conf);
|
||||||
|
}
|
||||||
|
// cleaning
|
||||||
|
HoodieFlinkWriteClient<?> writeClient = new HoodieFlinkWriteClient<>(
|
||||||
|
HoodieFlinkEngineContext.DEFAULT, StreamerUtil.getHoodieClientConfig(conf));
|
||||||
|
writeClient.clean();
|
||||||
|
|
||||||
|
HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(tempFile.getAbsolutePath(), HadoopConfigurations.getHadoopConf(conf));
|
||||||
|
List<String> commits = metaClient.getCommitsTimeline().filterCompletedInstants().getInstants()
|
||||||
|
.map(HoodieInstant::getTimestamp).collect(Collectors.toList());
|
||||||
|
|
||||||
|
assertThat(commits.size(), is(4));
|
||||||
|
|
||||||
|
List<String> archivedCommits = metaClient.getArchivedTimeline().getCommitsTimeline().filterCompletedInstants()
|
||||||
|
.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
|
||||||
|
|
||||||
|
assertThat(archivedCommits.size(), is(6));
|
||||||
|
|
||||||
|
// start and end commit: both are archived and cleaned
|
||||||
|
conf.setString(FlinkOptions.READ_START_COMMIT, archivedCommits.get(0));
|
||||||
|
conf.setString(FlinkOptions.READ_END_COMMIT, archivedCommits.get(1));
|
||||||
|
this.tableSource = getTableSource(conf);
|
||||||
|
InputFormat<RowData, ?> inputFormat1 = this.tableSource.getInputFormat();
|
||||||
|
assertThat(inputFormat1, instanceOf(MergeOnReadInputFormat.class));
|
||||||
|
|
||||||
|
List<RowData> actual1 = readData(inputFormat1);
|
||||||
|
final List<RowData> expected1 = TestData.dataSetInsert(1, 2, 3, 4);
|
||||||
|
TestData.assertRowDataEquals(actual1, expected1);
|
||||||
|
|
||||||
|
// only the start commit: is archived and cleaned
|
||||||
|
conf.setString(FlinkOptions.READ_START_COMMIT, archivedCommits.get(1));
|
||||||
|
conf.removeConfig(FlinkOptions.READ_END_COMMIT);
|
||||||
|
this.tableSource = getTableSource(conf);
|
||||||
|
InputFormat<RowData, ?> inputFormat2 = this.tableSource.getInputFormat();
|
||||||
|
assertThat(inputFormat2, instanceOf(MergeOnReadInputFormat.class));
|
||||||
|
|
||||||
|
List<RowData> actual2 = readData(inputFormat2);
|
||||||
|
final List<RowData> expected2 = TestData.dataSetInsert(3, 4, 5, 6, 7, 8, 9, 10,
|
||||||
|
11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
|
||||||
|
TestData.assertRowDataEquals(actual2, expected2);
|
||||||
|
|
||||||
|
// only the end commit: is archived and cleaned
|
||||||
|
conf.removeConfig(FlinkOptions.READ_START_COMMIT);
|
||||||
|
conf.setString(FlinkOptions.READ_END_COMMIT, archivedCommits.get(1));
|
||||||
|
this.tableSource = getTableSource(conf);
|
||||||
|
InputFormat<RowData, ?> inputFormat3 = this.tableSource.getInputFormat();
|
||||||
|
assertThat(inputFormat3, instanceOf(MergeOnReadInputFormat.class));
|
||||||
|
|
||||||
|
List<RowData> actual3 = readData(inputFormat3);
|
||||||
|
final List<RowData> expected3 = TestData.dataSetInsert(3, 4);
|
||||||
|
TestData.assertRowDataEquals(actual3, expected3);
|
||||||
|
|
||||||
|
// start and end commit: start is archived and cleaned, end is active
|
||||||
|
conf.setString(FlinkOptions.READ_START_COMMIT, archivedCommits.get(1));
|
||||||
|
conf.setString(FlinkOptions.READ_END_COMMIT, commits.get(0));
|
||||||
|
this.tableSource = getTableSource(conf);
|
||||||
|
InputFormat<RowData, ?> inputFormat4 = this.tableSource.getInputFormat();
|
||||||
|
assertThat(inputFormat4, instanceOf(MergeOnReadInputFormat.class));
|
||||||
|
|
||||||
|
List<RowData> actual4 = readData(inputFormat4);
|
||||||
|
final List<RowData> expected4 = TestData.dataSetInsert(3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
|
||||||
|
TestData.assertRowDataEquals(actual4, expected4);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
|
|||||||
@@ -88,6 +88,14 @@ public class TestUtils {
|
|||||||
.orElse(null);
|
.orElse(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
public static String getNthArchivedInstant(String basePath, int n) {
|
||||||
|
final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
|
||||||
|
.setConf(HadoopConfigurations.getHadoopConf(new Configuration())).setBasePath(basePath).build();
|
||||||
|
return metaClient.getArchivedTimeline().getCommitsTimeline().filterCompletedInstants()
|
||||||
|
.nthInstant(n).map(HoodieInstant::getTimestamp).orElse(null);
|
||||||
|
}
|
||||||
|
|
||||||
public static String getSplitPartitionPath(MergeOnReadInputSplit split) {
|
public static String getSplitPartitionPath(MergeOnReadInputSplit split) {
|
||||||
assertTrue(split.getLogPaths().isPresent());
|
assertTrue(split.getLogPaths().isPresent());
|
||||||
final String logPath = split.getLogPaths().get().get(0);
|
final String logPath = split.getLogPaths().get().get(0);
|
||||||
|
|||||||
Reference in New Issue
Block a user