[HUDI-1867] Streaming read for Flink COW table (#2895)
Supports streaming read for Copy On Write table.
This commit is contained in:
@@ -19,6 +19,7 @@
|
|||||||
package org.apache.hudi.source;
|
package org.apache.hudi.source;
|
||||||
|
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
|
import org.apache.hudi.common.model.BaseFile;
|
||||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||||
import org.apache.hudi.common.model.HoodieLogFile;
|
import org.apache.hudi.common.model.HoodieLogFile;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
@@ -111,6 +112,8 @@ public class StreamReadMonitoringFunction
|
|||||||
|
|
||||||
private final long maxCompactionMemoryInBytes;
|
private final long maxCompactionMemoryInBytes;
|
||||||
|
|
||||||
|
private final boolean isDelta;
|
||||||
|
|
||||||
public StreamReadMonitoringFunction(
|
public StreamReadMonitoringFunction(
|
||||||
Configuration conf,
|
Configuration conf,
|
||||||
Path path,
|
Path path,
|
||||||
@@ -121,6 +124,7 @@ public class StreamReadMonitoringFunction
|
|||||||
this.metaClient = metaClient;
|
this.metaClient = metaClient;
|
||||||
this.interval = conf.getInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL);
|
this.interval = conf.getInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL);
|
||||||
this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
|
this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
|
||||||
|
this.isDelta = conf.getString(FlinkOptions.TABLE_TYPE).equalsIgnoreCase(FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -185,7 +189,10 @@ public class StreamReadMonitoringFunction
|
|||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public void monitorDirAndForwardSplits(SourceContext<MergeOnReadInputSplit> context) {
|
public void monitorDirAndForwardSplits(SourceContext<MergeOnReadInputSplit> context) {
|
||||||
metaClient.reloadActiveTimeline();
|
metaClient.reloadActiveTimeline();
|
||||||
HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
|
HoodieTimeline commitTimeline = isDelta
|
||||||
|
// if is delta, exclude the parquet files from compaction
|
||||||
|
? metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()
|
||||||
|
: metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
||||||
if (commitTimeline.empty()) {
|
if (commitTimeline.empty()) {
|
||||||
LOG.warn("No splits found for the table under path " + path);
|
LOG.warn("No splits found for the table under path " + path);
|
||||||
return;
|
return;
|
||||||
@@ -238,8 +245,9 @@ public class StreamReadMonitoringFunction
|
|||||||
.sorted(HoodieLogFile.getLogFileComparator())
|
.sorted(HoodieLogFile.getLogFileComparator())
|
||||||
.map(logFile -> logFile.getPath().toString())
|
.map(logFile -> logFile.getPath().toString())
|
||||||
.collect(Collectors.toList()));
|
.collect(Collectors.toList()));
|
||||||
|
String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
|
||||||
return new MergeOnReadInputSplit(cnt.getAndAdd(1),
|
return new MergeOnReadInputSplit(cnt.getAndAdd(1),
|
||||||
null, logPaths, commitToIssue,
|
basePath, logPaths, commitToIssue,
|
||||||
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange);
|
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange);
|
||||||
}).collect(Collectors.toList()))
|
}).collect(Collectors.toList()))
|
||||||
.flatMap(Collection::stream)
|
.flatMap(Collection::stream)
|
||||||
|
|||||||
@@ -26,7 +26,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
|
|||||||
import org.apache.hudi.common.table.TableSchemaResolver;
|
import org.apache.hudi.common.table.TableSchemaResolver;
|
||||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.common.util.ValidationUtils;
|
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.hadoop.HoodieROTablePathFilter;
|
import org.apache.hudi.hadoop.HoodieROTablePathFilter;
|
||||||
@@ -156,11 +155,6 @@ public class HoodieTableSource implements
|
|||||||
this.hadoopConf = StreamerUtil.getHadoopConf();
|
this.hadoopConf = StreamerUtil.getHadoopConf();
|
||||||
this.metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
|
this.metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
|
||||||
this.maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new JobConf(this.hadoopConf));
|
this.maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new JobConf(this.hadoopConf));
|
||||||
if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
|
|
||||||
ValidationUtils.checkArgument(
|
|
||||||
conf.getString(FlinkOptions.TABLE_TYPE).equalsIgnoreCase(FlinkOptions.TABLE_TYPE_MERGE_ON_READ),
|
|
||||||
"Streaming read is only supported for table type: " + FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -377,6 +371,25 @@ public class HoodieTableSource implements
|
|||||||
.emitDelete(isStreaming)
|
.emitDelete(isStreaming)
|
||||||
.build();
|
.build();
|
||||||
case COPY_ON_WRITE:
|
case COPY_ON_WRITE:
|
||||||
|
if (isStreaming) {
|
||||||
|
final MergeOnReadTableState hoodieTableState2 = new MergeOnReadTableState(
|
||||||
|
rowType,
|
||||||
|
requiredRowType,
|
||||||
|
tableAvroSchema.toString(),
|
||||||
|
AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
|
||||||
|
Collections.emptyList(),
|
||||||
|
conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
|
||||||
|
return MergeOnReadInputFormat.builder()
|
||||||
|
.config(this.conf)
|
||||||
|
.paths(FilePathUtils.toFlinkPaths(paths))
|
||||||
|
.tableState(hoodieTableState2)
|
||||||
|
// use the explicit fields data type because the AvroSchemaConverter
|
||||||
|
// is not very stable.
|
||||||
|
.fieldTypes(rowDataType.getChildren())
|
||||||
|
.defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
|
||||||
|
.limit(this.limit)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
|
FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
|
||||||
FilePathUtils.toFlinkPaths(paths),
|
FilePathUtils.toFlinkPaths(paths),
|
||||||
this.schema.getFieldNames(),
|
this.schema.getFieldNames(),
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ package org.apache.hudi.table.format.mor;
|
|||||||
|
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
|
import org.apache.hudi.common.table.log.InstantRange;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
@@ -64,6 +65,7 @@ import java.util.stream.IntStream;
|
|||||||
|
|
||||||
import static org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE;
|
import static org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE;
|
||||||
import static org.apache.flink.table.filesystem.RowPartitionComputer.restorePartValueFromType;
|
import static org.apache.flink.table.filesystem.RowPartitionComputer.restorePartValueFromType;
|
||||||
|
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS;
|
||||||
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
|
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
|
||||||
import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
|
import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
|
||||||
|
|
||||||
@@ -162,9 +164,17 @@ public class MergeOnReadInputFormat
|
|||||||
public void open(MergeOnReadInputSplit split) throws IOException {
|
public void open(MergeOnReadInputSplit split) throws IOException {
|
||||||
this.currentReadCount = 0L;
|
this.currentReadCount = 0L;
|
||||||
this.hadoopConf = StreamerUtil.getHadoopConf();
|
this.hadoopConf = StreamerUtil.getHadoopConf();
|
||||||
if (!split.getLogPaths().isPresent()) {
|
if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() > 0)) {
|
||||||
// base file only
|
if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
|
||||||
this.iterator = new BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get()));
|
// base file only with commit time filtering
|
||||||
|
this.iterator = new BaseFileOnlyFilteringIterator(
|
||||||
|
split.getInstantRange(),
|
||||||
|
this.tableState.getRequiredRowType(),
|
||||||
|
getReader(split.getBasePath().get(), getRequiredPosWithCommitTime(this.requiredPos)));
|
||||||
|
} else {
|
||||||
|
// base file only
|
||||||
|
this.iterator = new BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get()));
|
||||||
|
}
|
||||||
} else if (!split.getBasePath().isPresent()) {
|
} else if (!split.getBasePath().isPresent()) {
|
||||||
// log files only
|
// log files only
|
||||||
this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));
|
this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));
|
||||||
@@ -390,6 +400,57 @@ public class MergeOnReadInputFormat
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Similar with {@link BaseFileOnlyIterator} but with instant time filtering.
|
||||||
|
*/
|
||||||
|
static class BaseFileOnlyFilteringIterator implements RecordIterator {
|
||||||
|
// base file reader
|
||||||
|
private final ParquetColumnarRowSplitReader reader;
|
||||||
|
private final InstantRange instantRange;
|
||||||
|
private final RowDataProjection projection;
|
||||||
|
|
||||||
|
private RowData currentRecord;
|
||||||
|
|
||||||
|
BaseFileOnlyFilteringIterator(
|
||||||
|
Option<InstantRange> instantRange,
|
||||||
|
RowType requiredRowType,
|
||||||
|
ParquetColumnarRowSplitReader reader) {
|
||||||
|
this.reader = reader;
|
||||||
|
this.instantRange = instantRange.orElse(null);
|
||||||
|
int[] positions = IntStream.range(1, 1 + requiredRowType.getFieldCount()).toArray();
|
||||||
|
projection = RowDataProjection.instance(requiredRowType, positions);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean reachedEnd() throws IOException {
|
||||||
|
while (!this.reader.reachedEnd()) {
|
||||||
|
currentRecord = this.reader.nextRecord();
|
||||||
|
if (instantRange != null) {
|
||||||
|
boolean isInRange = instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString());
|
||||||
|
if (isInRange) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RowData nextRecord() {
|
||||||
|
// can promote: no need to project with null instant range
|
||||||
|
return projection.project(currentRecord);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
if (this.reader != null) {
|
||||||
|
this.reader.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static class LogFileOnlyIterator implements RecordIterator {
|
static class LogFileOnlyIterator implements RecordIterator {
|
||||||
// iterator for log files
|
// iterator for log files
|
||||||
private final Iterator<RowData> iterator;
|
private final Iterator<RowData> iterator;
|
||||||
@@ -625,6 +686,17 @@ public class MergeOnReadInputFormat
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// Utilities
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
private static int[] getRequiredPosWithCommitTime(int[] requiredPos) {
|
||||||
|
int[] requiredPos2 = new int[requiredPos.length + 1];
|
||||||
|
requiredPos2[0] = HOODIE_COMMIT_TIME_COL_POS;
|
||||||
|
System.arraycopy(requiredPos, 0, requiredPos2, 1, requiredPos.length);
|
||||||
|
return requiredPos2;
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public void isEmitDelete(boolean emitDelete) {
|
public void isEmitDelete(boolean emitDelete) {
|
||||||
this.emitDelete = emitDelete;
|
this.emitDelete = emitDelete;
|
||||||
|
|||||||
@@ -82,8 +82,9 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
|
|||||||
@TempDir
|
@TempDir
|
||||||
File tempFile;
|
File tempFile;
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
void testStreamWriteAndRead() throws Exception {
|
@EnumSource(value = HoodieTableType.class)
|
||||||
|
void testStreamWriteAndRead(HoodieTableType tableType) throws Exception {
|
||||||
// create filesystem table named source
|
// create filesystem table named source
|
||||||
String createSource = TestConfigurations.getFileSourceDDL("source");
|
String createSource = TestConfigurations.getFileSourceDDL("source");
|
||||||
streamTableEnv.executeSql(createSource);
|
streamTableEnv.executeSql(createSource);
|
||||||
@@ -91,7 +92,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
|
|||||||
Map<String, String> options = new HashMap<>();
|
Map<String, String> options = new HashMap<>();
|
||||||
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
|
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
|
||||||
options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
|
options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
|
||||||
options.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
|
options.put(FlinkOptions.TABLE_TYPE.key(), tableType.name());
|
||||||
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
|
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
|
||||||
streamTableEnv.executeSql(hoodieTableDDL);
|
streamTableEnv.executeSql(hoodieTableDDL);
|
||||||
String insertInto = "insert into t1 select * from source";
|
String insertInto = "insert into t1 select * from source";
|
||||||
@@ -106,8 +107,9 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
|
|||||||
assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT);
|
assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
void testStreamReadAppendData() throws Exception {
|
@EnumSource(value = HoodieTableType.class)
|
||||||
|
void testStreamReadAppendData(HoodieTableType tableType) throws Exception {
|
||||||
// create filesystem table named source
|
// create filesystem table named source
|
||||||
String createSource = TestConfigurations.getFileSourceDDL("source");
|
String createSource = TestConfigurations.getFileSourceDDL("source");
|
||||||
String createSource2 = TestConfigurations.getFileSourceDDL("source2", "test_source_2.data");
|
String createSource2 = TestConfigurations.getFileSourceDDL("source2", "test_source_2.data");
|
||||||
@@ -117,7 +119,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
|
|||||||
Map<String, String> options = new HashMap<>();
|
Map<String, String> options = new HashMap<>();
|
||||||
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
|
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
|
||||||
options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
|
options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
|
||||||
options.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
|
options.put(FlinkOptions.TABLE_TYPE.key(), tableType.name());
|
||||||
String createHoodieTable = TestConfigurations.getCreateHoodieTableDDL("t1", options);
|
String createHoodieTable = TestConfigurations.getCreateHoodieTableDDL("t1", options);
|
||||||
streamTableEnv.executeSql(createHoodieTable);
|
streamTableEnv.executeSql(createHoodieTable);
|
||||||
String insertInto = "insert into t1 select * from source";
|
String insertInto = "insert into t1 select * from source";
|
||||||
|
|||||||
Reference in New Issue
Block a user