[HUDI-3461] The archived timeline for flink streaming reader should not be reused (#4861)
* Before the patch, the flink streaming reader caches the meta client thus the archived timeline, when fetching the instant details from the reused timeline, the exception throws * Add a method in HoodieTableMetaClient to return a fresh new archived timeline each time
This commit is contained in:
@@ -156,7 +156,7 @@ public class HoodieTableMetaClient implements Serializable {
|
|||||||
*/
|
*/
|
||||||
private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
|
private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
|
||||||
in.defaultReadObject();
|
in.defaultReadObject();
|
||||||
fs = null; // will be lazily inited
|
fs = null; // will be lazily initialized
|
||||||
}
|
}
|
||||||
|
|
||||||
private void writeObject(java.io.ObjectOutputStream out) throws IOException {
|
private void writeObject(java.io.ObjectOutputStream out) throws IOException {
|
||||||
@@ -330,7 +330,7 @@ public class HoodieTableMetaClient implements Serializable {
|
|||||||
* Get the archived commits as a timeline. This is costly operation, as all data from the archived files are read.
|
* Get the archived commits as a timeline. This is costly operation, as all data from the archived files are read.
|
||||||
* This should not be used, unless for historical debugging purposes.
|
* This should not be used, unless for historical debugging purposes.
|
||||||
*
|
*
|
||||||
* @return Active commit timeline
|
* @return Archived commit timeline
|
||||||
*/
|
*/
|
||||||
public synchronized HoodieArchivedTimeline getArchivedTimeline() {
|
public synchronized HoodieArchivedTimeline getArchivedTimeline() {
|
||||||
if (archivedTimeline == null) {
|
if (archivedTimeline == null) {
|
||||||
@@ -339,6 +339,20 @@ public class HoodieTableMetaClient implements Serializable {
|
|||||||
return archivedTimeline;
|
return archivedTimeline;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns fresh new archived commits as a timeline from startTs (inclusive).
|
||||||
|
*
|
||||||
|
* <p>This is costly operation if really early endTs is specified.
|
||||||
|
* Be caution to use this only when the time range is short.
|
||||||
|
*
|
||||||
|
* <p>This method is not thread safe.
|
||||||
|
*
|
||||||
|
* @return Archived commit timeline
|
||||||
|
*/
|
||||||
|
public HoodieArchivedTimeline getArchivedTimeline(String startTs) {
|
||||||
|
return new HoodieArchivedTimeline(this, startTs);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Validate table properties.
|
* Validate table properties.
|
||||||
* @param properties Properties from writeConfig.
|
* @param properties Properties from writeConfig.
|
||||||
|
|||||||
@@ -79,13 +79,13 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
|
|||||||
private static final String ACTION_TYPE_KEY = "actionType";
|
private static final String ACTION_TYPE_KEY = "actionType";
|
||||||
private static final String ACTION_STATE = "actionState";
|
private static final String ACTION_STATE = "actionState";
|
||||||
private HoodieTableMetaClient metaClient;
|
private HoodieTableMetaClient metaClient;
|
||||||
private Map<String, byte[]> readCommits = new HashMap<>();
|
private final Map<String, byte[]> readCommits = new HashMap<>();
|
||||||
|
|
||||||
private static final Logger LOG = LogManager.getLogger(HoodieArchivedTimeline.class);
|
private static final Logger LOG = LogManager.getLogger(HoodieArchivedTimeline.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Loads instants between (startTs, endTs].
|
* Loads all the archived instants.
|
||||||
* Note that there is no lazy loading, so this may not work if really long time range (endTs-startTs) is specified.
|
* Note that there is no lazy loading, so this may not work if the archived timeline range is really long.
|
||||||
* TBD: Should we enforce maximum time range?
|
* TBD: Should we enforce maximum time range?
|
||||||
*/
|
*/
|
||||||
public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) {
|
public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) {
|
||||||
@@ -96,6 +96,19 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
|
|||||||
this.details = (Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails;
|
this.details = (Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Loads completed instants from startTs(inclusive).
|
||||||
|
* Note that there is no lazy loading, so this may not work if really early startTs is specified.
|
||||||
|
*/
|
||||||
|
public HoodieArchivedTimeline(HoodieTableMetaClient metaClient, String startTs) {
|
||||||
|
this.metaClient = metaClient;
|
||||||
|
setInstants(loadInstants(new StartTsFilter(startTs), true,
|
||||||
|
record -> HoodieInstant.State.COMPLETED.toString().equals(record.get(ACTION_STATE).toString())));
|
||||||
|
// multiple casts will make this lambda serializable -
|
||||||
|
// http://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.16
|
||||||
|
this.details = (Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* For serialization and de-serialization only.
|
* For serialization and de-serialization only.
|
||||||
*
|
*
|
||||||
@@ -300,6 +313,19 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class StartTsFilter extends TimeRangeFilter {
|
||||||
|
private final String startTs;
|
||||||
|
|
||||||
|
public StartTsFilter(String startTs) {
|
||||||
|
super(startTs, null); // endTs is never used
|
||||||
|
this.startTs = startTs;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isInRange(HoodieInstant instant) {
|
||||||
|
return HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN_OR_EQUALS, startTs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sort files by reverse order of version suffix in file name.
|
* Sort files by reverse order of version suffix in file name.
|
||||||
*/
|
*/
|
||||||
@@ -330,7 +356,7 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
|
|||||||
// filter in-memory instants
|
// filter in-memory instants
|
||||||
Set<String> validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION);
|
Set<String> validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION);
|
||||||
return new HoodieDefaultTimeline(getInstants().filter(i ->
|
return new HoodieDefaultTimeline(getInstants().filter(i ->
|
||||||
readCommits.keySet().contains(i.getTimestamp()))
|
readCommits.containsKey(i.getTimestamp()))
|
||||||
.filter(s -> validActions.contains(s.getAction())), details);
|
.filter(s -> validActions.contains(s.getAction())), details);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -250,22 +250,12 @@ public class IncrementalInputSplits implements Serializable {
|
|||||||
InstantRange instantRange,
|
InstantRange instantRange,
|
||||||
HoodieTimeline commitTimeline,
|
HoodieTimeline commitTimeline,
|
||||||
String tableName) {
|
String tableName) {
|
||||||
if (instantRange == null || commitTimeline.isBeforeTimelineStarts(instantRange.getStartInstant())) {
|
if (commitTimeline.isBeforeTimelineStarts(instantRange.getStartInstant())) {
|
||||||
// read the archived metadata if:
|
// read the archived metadata if the start instant is archived.
|
||||||
// 1. the start commit is 'earliest';
|
HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(instantRange.getStartInstant());
|
||||||
// 2. the start instant is archived.
|
|
||||||
HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
|
|
||||||
HoodieTimeline archivedCompleteTimeline = archivedTimeline.getCommitsTimeline().filterCompletedInstants();
|
HoodieTimeline archivedCompleteTimeline = archivedTimeline.getCommitsTimeline().filterCompletedInstants();
|
||||||
if (!archivedCompleteTimeline.empty()) {
|
if (!archivedCompleteTimeline.empty()) {
|
||||||
final String endTs = archivedCompleteTimeline.lastInstant().get().getTimestamp();
|
|
||||||
Stream<HoodieInstant> instantStream = archivedCompleteTimeline.getInstants();
|
Stream<HoodieInstant> instantStream = archivedCompleteTimeline.getInstants();
|
||||||
if (instantRange != null) {
|
|
||||||
archivedTimeline.loadInstantDetailsInMemory(instantRange.getStartInstant(), endTs);
|
|
||||||
instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, instantRange.getStartInstant()));
|
|
||||||
} else {
|
|
||||||
final String startTs = archivedCompleteTimeline.firstInstant().get().getTimestamp();
|
|
||||||
archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
|
|
||||||
}
|
|
||||||
return maySkipCompaction(instantStream)
|
return maySkipCompaction(instantStream)
|
||||||
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, archivedTimeline)).collect(Collectors.toList());
|
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, archivedTimeline)).collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user