[HUDI-2379] Include the pending compaction file groups for flink (#3567)
streaming reader
This commit is contained in:
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieLogFile;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.log.InstantRange;
|
||||
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||
@@ -52,11 +53,13 @@ import javax.annotation.Nullable;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
|
||||
import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
|
||||
@@ -206,7 +209,7 @@ public class StreamReadMonitoringFunction
|
||||
return;
|
||||
}
|
||||
metaClient.reloadActiveTimeline();
|
||||
HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
|
||||
HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
|
||||
if (commitTimeline.empty()) {
|
||||
LOG.warn("No splits found for the table under path " + path);
|
||||
return;
|
||||
@@ -228,8 +231,7 @@ public class StreamReadMonitoringFunction
|
||||
: InstantRange.getInstance(specifiedStart, instantToIssue.getTimestamp(), InstantRange.RangeType.CLOSE_CLOSE);
|
||||
} else {
|
||||
// first time consume and no start commit, consumes the latest incremental data set.
|
||||
HoodieInstant latestCommitInstant = metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
|
||||
instantRange = InstantRange.getInstance(latestCommitInstant.getTimestamp(), instantToIssue.getTimestamp(),
|
||||
instantRange = InstantRange.getInstance(instantToIssue.getTimestamp(), instantToIssue.getTimestamp(),
|
||||
InstantRange.RangeType.CLOSE_CLOSE);
|
||||
}
|
||||
} else {
|
||||
@@ -243,8 +245,13 @@ public class StreamReadMonitoringFunction
|
||||
// 4. use the file paths from #step 3 as the back-up of the filesystem view
|
||||
|
||||
String tableName = conf.getString(FlinkOptions.TABLE_NAME);
|
||||
List<HoodieCommitMetadata> metadataList = instants.stream()
|
||||
List<HoodieCommitMetadata> activeMetadataList = instants.stream()
|
||||
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
|
||||
List<HoodieCommitMetadata> archivedMetadataList = getArchivedMetadata(instantRange, commitTimeline, tableName);
|
||||
List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
|
||||
? mergeList(activeMetadataList, archivedMetadataList)
|
||||
: activeMetadataList;
|
||||
|
||||
Set<String> writePartitions = getWritePartitionPaths(metadataList);
|
||||
// apply partition push down
|
||||
if (this.requiredPartitionPaths.size() > 0) {
|
||||
@@ -325,6 +332,38 @@ public class StreamReadMonitoringFunction
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the archived metadata in case the reader consumes untimely or it wants
|
||||
* to read from the earliest.
|
||||
*
|
||||
* <p>Note: should improve it with metadata table when the metadata table is stable enough.
|
||||
*
|
||||
* @param instantRange The instant range to filter the timeline instants
|
||||
* @param commitTimeline The commit timeline
|
||||
* @param tableName The table name
|
||||
* @return the list of archived metadata, or empty if there is no need to read the archived timeline
|
||||
*/
|
||||
private List<HoodieCommitMetadata> getArchivedMetadata(
|
||||
InstantRange instantRange,
|
||||
HoodieTimeline commitTimeline,
|
||||
String tableName) {
|
||||
if (instantRange == null || commitTimeline.isBeforeTimelineStarts(instantRange.getStartInstant())) {
|
||||
// read the archived metadata if:
|
||||
// 1. the start commit is 'earliest';
|
||||
// 2. the start instant is archived.
|
||||
HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
|
||||
if (!metaClient.getArchivedTimeline().empty()) {
|
||||
Stream<HoodieInstant> instantStream = archivedTimeline.getCommitsTimeline().filterCompletedInstants().getInstants();
|
||||
if (instantRange != null) {
|
||||
instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, instantRange.getStartInstant()));
|
||||
}
|
||||
return instantStream
|
||||
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, archivedTimeline)).collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the instants with a given issuedInstant to start from.
|
||||
*
|
||||
@@ -335,19 +374,19 @@ public class StreamReadMonitoringFunction
|
||||
private List<HoodieInstant> filterInstantsWithStart(
|
||||
HoodieTimeline commitTimeline,
|
||||
final String issuedInstant) {
|
||||
HoodieTimeline completedTimeline = commitTimeline.filterCompletedInstants();
|
||||
if (issuedInstant != null) {
|
||||
return commitTimeline.getInstants()
|
||||
return completedTimeline.getInstants()
|
||||
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, issuedInstant))
|
||||
.collect(Collectors.toList());
|
||||
} else if (this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()
|
||||
&& !this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)) {
|
||||
String definedStartCommit = this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT);
|
||||
return commitTimeline.getInstants()
|
||||
return completedTimeline.getInstants()
|
||||
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, definedStartCommit))
|
||||
.collect(Collectors.toList());
|
||||
} else {
|
||||
return commitTimeline.getInstants()
|
||||
.collect(Collectors.toList());
|
||||
return completedTimeline.getInstants().collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -363,4 +402,10 @@ public class StreamReadMonitoringFunction
|
||||
.flatMap(Collection::stream)
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
private static <T> List<T> mergeList(List<T> list1, List<T> list2) {
|
||||
List<T> merged = new ArrayList<>(list1);
|
||||
merged.addAll(list2);
|
||||
return merged;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -458,7 +458,7 @@ public class HoodieTableSource implements
|
||||
}
|
||||
|
||||
private Schema inferSchemaFromDdl() {
|
||||
Schema schema = AvroSchemaConverter.convertToSchema(this.schema.toSourceRowDataType().getLogicalType());
|
||||
Schema schema = AvroSchemaConverter.convertToSchema(this.schema.toPhysicalRowDataType().getLogicalType());
|
||||
return HoodieAvroUtils.addMetadataFields(schema, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
|
||||
}
|
||||
|
||||
|
||||
@@ -319,7 +319,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
|
||||
|
||||
String hoodieTableDDL = sql("t1")
|
||||
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
|
||||
.option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
|
||||
.option(FlinkOptions.TABLE_TYPE, tableType.name())
|
||||
.option(FlinkOptions.READ_AS_STREAMING, "true")
|
||||
.option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, "2")
|
||||
.option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
|
||||
@@ -334,6 +334,40 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
|
||||
assertRowsEquals(result, expected, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testStreamReadMorTableWithCompactionPlan() throws Exception {
|
||||
String createSource = TestConfigurations.getFileSourceDDL("source");
|
||||
streamTableEnv.executeSql(createSource);
|
||||
|
||||
String hoodieTableDDL = sql("t1")
|
||||
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
|
||||
.option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
|
||||
.option(FlinkOptions.READ_AS_STREAMING, "true")
|
||||
.option(FlinkOptions.READ_STREAMING_START_COMMIT, FlinkOptions.START_COMMIT_EARLIEST)
|
||||
.option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, "2")
|
||||
// close the async compaction
|
||||
.option(FlinkOptions.COMPACTION_ASYNC_ENABLED, false)
|
||||
// generate compaction plan for each commit
|
||||
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, "1")
|
||||
.withPartition(false)
|
||||
.end();
|
||||
streamTableEnv.executeSql(hoodieTableDDL);
|
||||
|
||||
streamTableEnv.executeSql("insert into t1 select * from source");
|
||||
|
||||
List<Row> result = execSelectSql(streamTableEnv, "select * from t1", 10);
|
||||
final String expected = "["
|
||||
+ "+I[id1, Danny, 23, 1970-01-01T00:00:01, par1], "
|
||||
+ "+I[id2, Stephen, 33, 1970-01-01T00:00:02, par1], "
|
||||
+ "+I[id3, Julian, 53, 1970-01-01T00:00:03, par2], "
|
||||
+ "+I[id4, Fabian, 31, 1970-01-01T00:00:04, par2], "
|
||||
+ "+I[id5, Sophia, 18, 1970-01-01T00:00:05, par3], "
|
||||
+ "+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], "
|
||||
+ "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], "
|
||||
+ "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]";
|
||||
assertRowsEquals(result, expected);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("executionModeAndPartitioningParams")
|
||||
void testWriteAndRead(ExecMode execMode, boolean hiveStylePartitioning) {
|
||||
|
||||
@@ -400,7 +400,7 @@ public class TestData {
|
||||
String rowsString = rows.stream()
|
||||
.sorted(Comparator.comparing(o -> toIdSafely(o.getField(0))))
|
||||
.collect(Collectors.toList()).toString();
|
||||
assertThat(rowDataToString(expected), is(rowsString));
|
||||
assertThat(rowsString, is(rowDataToString(expected)));
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user