1
0

[HUDI-1663] Streaming read for Flink MOR table (#2640)

Supports two read modes:
* Read the full data set starting from the latest commit instant and
  subsequent incremental data set
* Read data set that starts from a specified commit instant
This commit is contained in:
Danny Chan
2021-03-10 22:44:06 +08:00
committed by GitHub
parent c4a66324cd
commit 2fdae6835c
24 changed files with 1989 additions and 128 deletions

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.operator;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
@@ -68,6 +69,12 @@ public class FlinkOptions {
// ------------------------------------------------------------------------
// Read Options
// ------------------------------------------------------------------------
public static final ConfigOption<Integer> READ_TASKS = ConfigOptions
.key("read.tasks")
.intType()
.defaultValue(4)
.withDescription("Parallelism of tasks that do actual read, default is 4");
public static final ConfigOption<String> READ_SCHEMA_FILE_PATH = ConfigOptions
.key("read.schema.file.path")
.stringType()
@@ -112,6 +119,25 @@ public class FlinkOptions {
+ " time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x"
+ " use UTC timezone, by default true");
public static final ConfigOption<Boolean> READ_AS_STREAMING = ConfigOptions
.key("read.streaming.enabled")
.booleanType()
.defaultValue(false)// default read as batch
.withDescription("Whether to read as streaming source, default false");
public static final ConfigOption<Integer> READ_STREAMING_CHECK_INTERVAL = ConfigOptions
.key("read.streaming.check-interval")
.intType()
.defaultValue(60)// default 1 minute
.withDescription("Check interval for streaming read of SECOND, default 1 minute");
public static final ConfigOption<String> READ_STREAMING_START_COMMIT = ConfigOptions
.key("read.streaming.start-commit")
.stringType()
.noDefaultValue()
.withDescription("Start commit instant for streaming read, the commit time format should be 'yyyyMMddHHmmss', "
+ "by default reading from the latest instant");
// ------------------------------------------------------------------------
// Write Options
// ------------------------------------------------------------------------
@@ -121,8 +147,8 @@ public class FlinkOptions {
.noDefaultValue()
.withDescription("Table name to register to Hive metastore");
public static final String TABLE_TYPE_COPY_ON_WRITE = "COPY_ON_WRITE";
public static final String TABLE_TYPE_MERGE_ON_READ = "MERGE_ON_READ";
public static final String TABLE_TYPE_COPY_ON_WRITE = HoodieTableType.COPY_ON_WRITE.name();
public static final String TABLE_TYPE_MERGE_ON_READ = HoodieTableType.MERGE_ON_READ.name();
public static final ConfigOption<String> TABLE_TYPE = ConfigOptions
.key("write.table.type")
.stringType()
@@ -203,8 +229,8 @@ public class FlinkOptions {
.defaultValue(SimpleAvroKeyGenerator.class.getName())
.withDescription("Key generator class, that implements will extract the key out of incoming record");
public static final ConfigOption<Integer> WRITE_TASK_PARALLELISM = ConfigOptions
.key("write.task.parallelism")
public static final ConfigOption<Integer> WRITE_TASKS = ConfigOptions
.key("write.tasks")
.intType()
.defaultValue(4)
.withDescription("Parallelism of tasks that do actual write, default is 4");
@@ -290,7 +316,7 @@ public class FlinkOptions {
conf.setString(FlinkOptions.RECORD_KEY_FIELD, config.recordKeyField);
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, config.partitionPathField);
conf.setString(FlinkOptions.KEYGEN_CLASS, config.keygenClass);
conf.setInteger(FlinkOptions.WRITE_TASK_PARALLELISM, config.writeTaskNum);
conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum);
return conf;
}

View File

@@ -0,0 +1,372 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.operator;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.source.format.mor.InstantRange;
import org.apache.hudi.source.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
/**
* This is the single (non-parallel) monitoring task which takes a {@link MergeOnReadInputSplit}
* , it is responsible for:
*
* <ol>
* <li>Monitoring a user-provided hoodie table path.</li>
* <li>Deciding which files(or split) should be further read and processed.</li>
* <li>Creating the {@link MergeOnReadInputSplit splits} corresponding to those files.</li>
* <li>Assigning them to downstream tasks for further processing.</li>
* </ol>
*
* <p>The splits to be read are forwarded to the downstream {@link StreamReadOperator}
* which can have parallelism greater than one.
*
* <p><b>IMPORTANT NOTE: </b> Splits are forwarded downstream for reading in ascending instant commits time order,
* in each downstream task, the splits are also read in receiving sequence. We do not ensure split consuming sequence
* among the downstream tasks.
*/
public class StreamReadMonitoringFunction
extends RichSourceFunction<MergeOnReadInputSplit> implements CheckpointedFunction {
private static final Logger LOG = LoggerFactory.getLogger(StreamReadMonitoringFunction.class);
private static final long serialVersionUID = 1L;
/**
* The path to monitor.
*/
private final Path path;
/**
* The interval between consecutive path scans.
*/
private final long interval;
private transient Object checkpointLock;
private volatile boolean isRunning = true;
private String issuedInstant;
private transient ListState<String> instantState;
private final Configuration conf;
private transient org.apache.hadoop.conf.Configuration hadoopConf;
private final HoodieTableMetaClient metaClient;
private final long maxCompactionMemoryInBytes;
public StreamReadMonitoringFunction(
Configuration conf,
Path path,
HoodieTableMetaClient metaClient,
long maxCompactionMemoryInBytes) {
this.conf = conf;
this.path = path;
this.metaClient = metaClient;
this.interval = conf.getInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL);
this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ValidationUtils.checkState(this.instantState == null,
"The " + getClass().getSimpleName() + " has already been initialized.");
this.instantState = context.getOperatorStateStore().getListState(
new ListStateDescriptor<>(
"file-monitoring-state",
StringSerializer.INSTANCE
)
);
if (context.isRestored()) {
LOG.info("Restoring state for the class {} with table {} and base path {}.",
getClass().getSimpleName(), conf.getString(FlinkOptions.TABLE_NAME), path);
List<String> retrievedStates = new ArrayList<>();
for (String entry : this.instantState.get()) {
retrievedStates.add(entry);
}
ValidationUtils.checkArgument(retrievedStates.size() <= 1,
getClass().getSimpleName() + " retrieved invalid state.");
if (retrievedStates.size() == 1 && issuedInstant != null) {
// this is the case where we have both legacy and new state.
// the two should be mutually exclusive for the operator, thus we throw the exception.
throw new IllegalArgumentException(
"The " + getClass().getSimpleName() + " has already restored from a previous Flink version.");
} else if (retrievedStates.size() == 1) {
this.issuedInstant = retrievedStates.get(0);
if (LOG.isDebugEnabled()) {
LOG.debug("{} retrieved a issued instant of time {} for table {} with path {}.",
getClass().getSimpleName(), issuedInstant, conf.get(FlinkOptions.TABLE_NAME), path);
}
}
}
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.hadoopConf = StreamerUtil.getHadoopConf();
}
@Override
public void run(SourceFunction.SourceContext<MergeOnReadInputSplit> context) throws Exception {
checkpointLock = context.getCheckpointLock();
while (isRunning) {
synchronized (checkpointLock) {
monitorDirAndForwardSplits(context);
}
TimeUnit.SECONDS.sleep(interval);
}
}
@VisibleForTesting
public void monitorDirAndForwardSplits(SourceContext<MergeOnReadInputSplit> context) {
metaClient.reloadActiveTimeline();
HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
if (commitTimeline.empty()) {
LOG.warn("No splits found for the table under path " + path);
return;
}
List<HoodieInstant> instants = getUncompactedInstants(commitTimeline, this.issuedInstant);
// get the latest instant that satisfies condition
final HoodieInstant instantToIssue = instants.size() == 0 ? null : instants.get(instants.size() - 1);
final InstantRange instantRange;
if (instantToIssue != null) {
if (this.issuedInstant != null) {
// had already consumed an instant
instantRange = InstantRange.getInstance(this.issuedInstant, instantToIssue.getTimestamp(),
InstantRange.RangeType.OPEN_CLOSE);
} else if (this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()) {
// first time consume and has a start commit
final String specifiedStart = this.conf.getString(FlinkOptions.READ_STREAMING_START_COMMIT);
instantRange = InstantRange.getInstance(specifiedStart, instantToIssue.getTimestamp(),
InstantRange.RangeType.CLOSE_CLOSE);
} else {
// first time consume and no start commit,
// would consume all the snapshot data PLUS incremental data set
instantRange = null;
}
} else {
LOG.info("No new instant found for the table under path " + path + ", skip reading");
return;
}
// generate input split:
// 1. first fetch all the commit metadata for the incremental instants;
// 2. filter the relative partition paths
// 3. filter the full file paths
// 4. use the file paths from #step 3 as the back-up of the filesystem view
List<HoodieCommitMetadata> metadataList = instants.stream()
.map(instant -> getCommitMetadata(instant, commitTimeline)).collect(Collectors.toList());
Set<String> writePartitions = getWritePartitionPaths(metadataList);
FileStatus[] fileStatuses = getWritePathsOfInstants(metadataList);
if (fileStatuses.length == 0) {
throw new HoodieException("No files found for reading in user provided path.");
}
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses);
final String commitToIssue = instantToIssue.getTimestamp();
final AtomicInteger cnt = new AtomicInteger(0);
final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
List<MergeOnReadInputSplit> inputSplits = writePartitions.stream()
.map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, commitToIssue)
.map(fileSlice -> {
Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
.map(logFile -> logFile.getPath().toString())
.collect(Collectors.toList()));
return new MergeOnReadInputSplit(cnt.getAndAdd(1),
null, logPaths, commitToIssue,
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange);
}).collect(Collectors.toList()))
.flatMap(Collection::stream)
.collect(Collectors.toList());
for (MergeOnReadInputSplit split : inputSplits) {
context.collect(split);
}
// update the issues instant time
this.issuedInstant = commitToIssue;
}
@Override
public void close() throws Exception {
super.close();
if (checkpointLock != null) {
synchronized (checkpointLock) {
issuedInstant = null;
isRunning = false;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Closed File Monitoring Source for path: " + path + ".");
}
}
@Override
public void cancel() {
if (checkpointLock != null) {
// this is to cover the case where cancel() is called before the run()
synchronized (checkpointLock) {
issuedInstant = null;
isRunning = false;
}
} else {
issuedInstant = null;
isRunning = false;
}
}
// -------------------------------------------------------------------------
// Checkpointing
// -------------------------------------------------------------------------
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
this.instantState.clear();
if (this.issuedInstant != null) {
this.instantState.add(this.issuedInstant);
}
}
/**
* Returns the uncompacted instants with a given issuedInstant to start from.
*
* @param commitTimeline The completed commits timeline
* @param issuedInstant The last issued instant that has already been delivered to downstream
* @return the filtered hoodie instants
*/
private List<HoodieInstant> getUncompactedInstants(
HoodieTimeline commitTimeline,
final String issuedInstant) {
if (issuedInstant != null) {
return commitTimeline.getInstants()
.filter(s -> !s.getAction().equals(HoodieTimeline.COMPACTION_ACTION))
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, issuedInstant))
.collect(Collectors.toList());
} else if (this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()) {
String definedStartCommit = this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT);
return commitTimeline.getInstants()
.filter(s -> !s.getAction().equals(HoodieTimeline.COMPACTION_ACTION))
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, definedStartCommit))
.collect(Collectors.toList());
} else {
return commitTimeline.getInstants()
.filter(s -> !s.getAction().equals(HoodieTimeline.COMPACTION_ACTION))
.collect(Collectors.toList());
}
}
/**
* Returns all the incremental write partition paths as a set with the given commits metadata.
*
* @param metadataList The commits metadata
* @return the partition path set
*/
private Set<String> getWritePartitionPaths(List<HoodieCommitMetadata> metadataList) {
return metadataList.stream()
.map(HoodieCommitMetadata::getWritePartitionPaths)
.flatMap(Collection::stream)
.collect(Collectors.toSet());
}
/**
* Returns all the incremental write file path statuses with the given commits metadata.
*
* @param metadataList The commits metadata
* @return the file statuses array
*/
private FileStatus[] getWritePathsOfInstants(List<HoodieCommitMetadata> metadataList) {
FileSystem fs = FSUtils.getFs(path.getPath(), hadoopConf);
return metadataList.stream().map(metadata -> getWritePathsOfInstant(metadata, fs))
.flatMap(Collection::stream).toArray(FileStatus[]::new);
}
private List<FileStatus> getWritePathsOfInstant(HoodieCommitMetadata metadata, FileSystem fs) {
return metadata.getFileIdAndFullPaths(path.getPath()).values().stream()
.map(path -> {
try {
return fs.getFileStatus(new org.apache.hadoop.fs.Path(path));
} catch (IOException e) {
LOG.error("Get write status of path: {} error", path);
throw new HoodieException(e);
}
})
.collect(Collectors.toList());
}
private HoodieCommitMetadata getCommitMetadata(HoodieInstant instant, HoodieTimeline timeline) {
byte[] data = timeline.getInstantDetails(instant).get();
try {
return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
} catch (IOException e) {
LOG.error("Get write metadata for table {} with instant {} and path: {} error",
conf.getString(FlinkOptions.TABLE_NAME), instant.getTimestamp(), path);
throw new HoodieException(e);
}
}
}

View File

@@ -0,0 +1,237 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.operator;
import org.apache.hudi.source.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.source.format.mor.MergeOnReadInputSplit;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingDeque;
/**
* The operator that reads the {@link MergeOnReadInputSplit splits} received from the preceding {@link
* StreamReadMonitoringFunction}. Contrary to the {@link StreamReadMonitoringFunction} which has a parallelism of 1,
* this operator can have multiple parallelism.
*
* <p>As soon as an input split {@link MergeOnReadInputSplit} is received, it is put in a queue,
* the {@link MailboxExecutor} read the actual data of the split.
* This architecture allows the separation of split reading from processing the checkpoint barriers,
* thus removing any potential back-pressure.
*/
public class StreamReadOperator extends AbstractStreamOperator<RowData>
implements OneInputStreamOperator<MergeOnReadInputSplit, RowData> {
private static final Logger LOG = LoggerFactory.getLogger(StreamReadOperator.class);
// It's the same thread that runs this operator and checkpoint actions. Use this executor to schedule only
// splits for subsequent reading, so that a new checkpoint could be triggered without blocking a long time
// for exhausting all scheduled split reading tasks.
private final MailboxExecutor executor;
private MergeOnReadInputFormat format;
private transient SourceFunction.SourceContext<RowData> sourceContext;
private transient ListState<MergeOnReadInputSplit> inputSplitsState;
private transient Queue<MergeOnReadInputSplit> splits;
// Splits are read by the same thread that calls #processElement. Each read task is submitted to that thread by adding
// them to the executor. This state is used to ensure that only one read task is in that splits queue at a time, so that
// read tasks do not accumulate ahead of checkpoint tasks. When there is a read task in the queue, this is set to RUNNING.
// When there are no more files to read, this will be set to IDLE.
private transient SplitState currentSplitState;
private StreamReadOperator(MergeOnReadInputFormat format, ProcessingTimeService timeService,
MailboxExecutor mailboxExecutor) {
this.format = Preconditions.checkNotNull(format, "The InputFormat should not be null.");
this.processingTimeService = timeService;
this.executor = Preconditions.checkNotNull(mailboxExecutor, "The mailboxExecutor should not be null.");
}
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
// TODO Replace Java serialization with Avro approach to keep state compatibility.
inputSplitsState = context.getOperatorStateStore().getListState(
new ListStateDescriptor<>("splits", new JavaSerializer<>()));
// Initialize the current split state to IDLE.
currentSplitState = SplitState.IDLE;
// Recover splits state from flink state backend if possible.
splits = new LinkedBlockingDeque<>();
if (context.isRestored()) {
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
LOG.info("Restoring state for operator {} (task ID: {}).", getClass().getSimpleName(), subtaskIdx);
for (MergeOnReadInputSplit split : inputSplitsState.get()) {
splits.add(split);
}
}
this.sourceContext = StreamSourceContexts.getSourceContext(
getOperatorConfig().getTimeCharacteristic(),
getProcessingTimeService(),
new Object(), // no actual locking needed
getContainingTask().getStreamStatusMaintainer(),
output,
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(),
-1);
// Enqueue to process the recovered input splits.
enqueueProcessSplits();
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
inputSplitsState.clear();
inputSplitsState.addAll(new ArrayList<>(splits));
}
@Override
public void processElement(StreamRecord<MergeOnReadInputSplit> element) {
splits.add(element.getValue());
enqueueProcessSplits();
}
private void enqueueProcessSplits() {
if (currentSplitState == SplitState.IDLE && !splits.isEmpty()) {
currentSplitState = SplitState.RUNNING;
executor.execute(this::processSplits, this.getClass().getSimpleName());
}
}
private void processSplits() throws IOException {
MergeOnReadInputSplit split = splits.poll();
if (split == null) {
currentSplitState = SplitState.IDLE;
return;
}
format.open(split);
try {
RowData nextElement = null;
while (!format.reachedEnd()) {
nextElement = format.nextRecord(nextElement);
sourceContext.collect(nextElement);
}
} finally {
currentSplitState = SplitState.IDLE;
format.close();
}
// Re-schedule to process the next split.
enqueueProcessSplits();
}
@Override
public void processWatermark(Watermark mark) {
// we do nothing because we emit our own watermarks if needed.
}
@Override
public void dispose() throws Exception {
super.dispose();
if (format != null) {
format.close();
format.closeInputFormat();
format = null;
}
sourceContext = null;
}
@Override
public void close() throws Exception {
super.close();
output.close();
if (sourceContext != null) {
sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
sourceContext.close();
sourceContext = null;
}
}
public static OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory(MergeOnReadInputFormat format) {
return new OperatorFactory(format);
}
private enum SplitState {
IDLE, RUNNING
}
private static class OperatorFactory extends AbstractStreamOperatorFactory<RowData>
implements YieldingOperatorFactory<RowData>, OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> {
private final MergeOnReadInputFormat format;
private transient MailboxExecutor mailboxExecutor;
private OperatorFactory(MergeOnReadInputFormat format) {
this.format = format;
}
@Override
public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
this.mailboxExecutor = mailboxExecutor;
}
@SuppressWarnings("unchecked")
@Override
public <O extends StreamOperator<RowData>> O createStreamOperator(StreamOperatorParameters<RowData> parameters) {
StreamReadOperator operator = new StreamReadOperator(format, processingTimeService, mailboxExecutor);
operator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
return (O) operator;
}
@Override
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return StreamReadOperator.class;
}
}
}

View File

@@ -65,7 +65,7 @@ public class HoodieTableSink implements AppendStreamTableSink<RowData>, Partitio
public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
// Read from kafka source
RowType rowType = (RowType) this.schema.toRowDataType().notNull().getLogicalType();
int numWriteTasks = this.conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM);
int numWriteTasks = this.conf.getInteger(FlinkOptions.WRITE_TASKS);
StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf, isBounded);
DataStream<Object> pipeline = dataStream

View File

@@ -21,14 +21,18 @@ package org.apache.hudi.source;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.HoodieROTablePathFilter;
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
import org.apache.hudi.operator.FlinkOptions;
import org.apache.hudi.operator.StreamReadMonitoringFunction;
import org.apache.hudi.operator.StreamReadOperator;
import org.apache.hudi.source.format.FilePathUtils;
import org.apache.hudi.source.format.cow.CopyOnWriteInputFormat;
import org.apache.hudi.source.format.mor.MergeOnReadInputFormat;
@@ -48,10 +52,11 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.Expression;
@@ -75,7 +80,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@@ -148,6 +152,11 @@ public class HoodieTableSource implements
this.hadoopConf = StreamerUtil.getHadoopConf();
this.metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
this.maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new JobConf(this.hadoopConf));
if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
ValidationUtils.checkArgument(
conf.getString(FlinkOptions.TABLE_TYPE).equalsIgnoreCase(FlinkOptions.TABLE_TYPE_MERGE_ON_READ),
"Streaming read is only supported for table type: " + FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
}
}
@Override
@@ -155,14 +164,29 @@ public class HoodieTableSource implements
@SuppressWarnings("unchecked")
TypeInformation<RowData> typeInfo =
(TypeInformation<RowData>) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
InputFormatSourceFunction<RowData> func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo);
DataStreamSource<RowData> source = execEnv.addSource(func, explainSource(), typeInfo);
return source.name(explainSource());
if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction(
conf, path, metaClient, maxCompactionMemoryInBytes);
OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) getInputFormat(true));
SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, "streaming_source")
.setParallelism(1)
.uid("uid_streaming_source")
.transform("split_reader", typeInfo, factory)
.setParallelism(conf.getInteger(FlinkOptions.READ_TASKS))
.uid("uid_split_reader");
return new DataStreamSource<>(source);
} else {
InputFormatSourceFunction<RowData> func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo);
DataStreamSource<RowData> source = execEnv.addSource(func, explainSource(), typeInfo);
return source.name("streaming_source")
.setParallelism(conf.getInteger(FlinkOptions.READ_TASKS))
.uid("uid_streaming_source");
}
}
@Override
public boolean isBounded() {
return true;
return !conf.getBoolean(FlinkOptions.READ_AS_STREAMING);
}
@Override
@@ -189,24 +213,7 @@ public class HoodieTableSource implements
@Override
public List<Map<String, String>> getPartitions() {
try {
return FilePathUtils
.searchPartKeyValueAndPaths(
path.getFileSystem(),
path,
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION),
partitionKeys.toArray(new String[0]))
.stream()
.map(tuple2 -> tuple2.f0)
.map(spec -> {
LinkedHashMap<String, String> ret = new LinkedHashMap<>();
spec.forEach((k, v) -> ret.put(k, defaultPartName.equals(v) ? null : v));
return ret;
})
.collect(Collectors.toList());
} catch (Exception e) {
throw new TableException("Fetch partitions fail.", e);
}
return FilePathUtils.getPartitions(path, conf, partitionKeys, defaultPartName);
}
@Override
@@ -269,7 +276,7 @@ public class HoodieTableSource implements
: Option.of(kv.getValue());
return new MergeOnReadInputSplit(cnt.getAndAdd(1),
baseFile.getPath(), logPaths, latestCommit,
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType);
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null);
}).collect(Collectors.toList());
} else {
// all the files are logs
@@ -285,15 +292,19 @@ public class HoodieTableSource implements
.collect(Collectors.toList()));
return new MergeOnReadInputSplit(cnt.getAndAdd(1),
null, logPaths, latestCommit,
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType);
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null);
}).collect(Collectors.toList()); })
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
}
@VisibleForTesting
public InputFormat<RowData, ?> getInputFormat() {
return getInputFormat(false);
}
@VisibleForTesting
public InputFormat<RowData, ?> getInputFormat(boolean isStreaming) {
// When this table has no partition, just return an empty source.
if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {
return new CollectionInputFormat<>(Collections.emptyList(), null);
@@ -317,13 +328,20 @@ public class HoodieTableSource implements
final String queryType = this.conf.getString(FlinkOptions.QUERY_TYPE);
if (queryType.equals(FlinkOptions.QUERY_TYPE_SNAPSHOT)) {
switch (this.conf.getString(FlinkOptions.TABLE_TYPE)) {
case FlinkOptions.TABLE_TYPE_MERGE_ON_READ:
final List<MergeOnReadInputSplit> inputSplits = buildFileIndex(paths);
if (inputSplits.size() == 0) {
// When there is no input splits, just return an empty source.
LOG.warn("No input inputs generate for MERGE_ON_READ input format, returns empty collection instead");
return new CollectionInputFormat<>(Collections.emptyList(), null);
final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE));
switch (tableType) {
case MERGE_ON_READ:
final List<MergeOnReadInputSplit> inputSplits;
if (!isStreaming) {
inputSplits = buildFileIndex(paths);
if (inputSplits.size() == 0) {
// When there is no input splits, just return an empty source.
LOG.warn("No input splits generate for MERGE_ON_READ input format, returns empty collection instead");
return new CollectionInputFormat<>(Collections.emptyList(), null);
}
} else {
// streaming reader would build the splits automatically.
inputSplits = Collections.emptyList();
}
final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
rowType,
@@ -335,10 +353,10 @@ public class HoodieTableSource implements
this.conf,
paths,
hoodieTableState,
rowDataType.getChildren(), // use the explicit fields data type because the AvroSchemaConvertr is not very stable.
rowDataType.getChildren(), // use the explicit fields data type because the AvroSchemaConverter is not very stable.
"default",
this.limit);
case FlinkOptions.TABLE_TYPE_COPY_ON_WRITE:
case COPY_ON_WRITE:
FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
paths,
this.schema.getFieldNames(),
@@ -373,27 +391,9 @@ public class HoodieTableSource implements
*/
@VisibleForTesting
public Path[] getReadPaths() {
if (partitionKeys.isEmpty()) {
return new Path[] {path};
} else {
return getOrFetchPartitions().stream()
.map(HoodieTableSource.this::validateAndReorderPartitions)
.map(kvs -> FilePathUtils.generatePartitionPath(kvs, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION)))
.map(n -> new Path(path, n))
.toArray(Path[]::new);
}
}
private LinkedHashMap<String, String> validateAndReorderPartitions(Map<String, String> part) {
LinkedHashMap<String, String> map = new LinkedHashMap<>();
for (String k : partitionKeys) {
if (!part.containsKey(k)) {
throw new TableException("Partition keys are: " + partitionKeys
+ ", incomplete partition spec: " + part);
}
map.put(k, part.get(k));
}
return map;
return partitionKeys.isEmpty()
? new Path[] {path}
: FilePathUtils.partitionPath2ReadPath(path, conf, partitionKeys, getOrFetchPartitions());
}
private static class LatestFileFilter extends FilePathFilter {

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.source.format;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.operator.FlinkOptions;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileStatus;
@@ -35,6 +36,7 @@ import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* Reference the Flink {@link org.apache.flink.table.utils.PartitionPathUtils}
@@ -317,4 +319,110 @@ public class FilePathUtils {
// the log files is hidden file
return name.startsWith("_") || name.startsWith(".") && !name.contains(".log.");
}
/**
* Returns the partition path key and values as a list of map, each map item in the list
* is a mapping of the partition key name to its actual partition value. For example, say
* there is a file path with partition keys [key1, key2, key3]:
*
* <p><pre>
* -- file:/// ... key1=val1/key2=val2/key3=val3
* -- file:/// ... key1=val4/key2=val5/key3=val6
* </pre>
*
* <p>The return list should be [{key1:val1, key2:val2, key3:val3}, {key1:val4, key2:val5, key3:val6}].
*
* @param path The base path
* @param conf The configuration
* @param partitionKeys The partition key list
* @param defaultParName The default partition name for nulls
*/
public static List<Map<String, String>> getPartitions(
Path path,
org.apache.flink.configuration.Configuration conf,
List<String> partitionKeys,
String defaultParName) {
try {
return FilePathUtils
.searchPartKeyValueAndPaths(
path.getFileSystem(),
path,
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION),
partitionKeys.toArray(new String[0]))
.stream()
.map(tuple2 -> tuple2.f0)
.map(spec -> {
LinkedHashMap<String, String> ret = new LinkedHashMap<>();
spec.forEach((k, v) -> ret.put(k, defaultParName.equals(v) ? null : v));
return ret;
})
.collect(Collectors.toList());
} catch (Exception e) {
throw new TableException("Fetch partitions fail.", e);
}
}
/**
* Reorder the partition key value mapping based on the given partition keys sequence.
*
* @param partitionKVs The partition key and value mapping
* @param partitionKeys The partition key list
*/
public static LinkedHashMap<String, String> validateAndReorderPartitions(
Map<String, String> partitionKVs,
List<String> partitionKeys) {
LinkedHashMap<String, String> map = new LinkedHashMap<>();
for (String k : partitionKeys) {
if (!partitionKVs.containsKey(k)) {
throw new TableException("Partition keys are: " + partitionKeys
+ ", incomplete partition spec: " + partitionKVs);
}
map.put(k, partitionKVs.get(k));
}
return map;
}
/**
* Returns all the file paths that is the parents of the data files.
*
* @param path The base path
* @param conf The configuration
* @param partitionKeys The partition key list
* @param defaultParName The default partition name for nulls
*/
public static Path[] getReadPaths(
Path path,
org.apache.flink.configuration.Configuration conf,
List<String> partitionKeys,
String defaultParName) {
if (partitionKeys.isEmpty()) {
return new Path[] {path};
} else {
List<Map<String, String>> partitionPaths =
getPartitions(path, conf, partitionKeys, defaultParName);
return partitionPath2ReadPath(path, conf, partitionKeys, partitionPaths);
}
}
/**
* Transforms the given partition key value mapping to read paths.
*
* @param path The base path
* @param conf The hadoop configuration
* @param partitionKeys The partition key list
* @param partitionPaths The partition key value mapping
*
* @see #getReadPaths
*/
public static Path[] partitionPath2ReadPath(
Path path,
org.apache.flink.configuration.Configuration conf,
List<String> partitionKeys,
List<Map<String, String>> partitionPaths) {
return partitionPaths.stream()
.map(m -> validateAndReorderPartitions(m, partitionKeys))
.map(kvs -> FilePathUtils.generatePartitionPath(kvs, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION)))
.map(n -> new Path(path, n))
.toArray(Path[]::new);
}
}

View File

@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.source.format.mor;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import java.io.Serializable;
import java.util.Objects;
/**
* A instant commits range used for incremental reader filtering.
*/
public abstract class InstantRange implements Serializable {
private static final long serialVersionUID = 1L;
protected final String startInstant;
protected final String endInstant;
public InstantRange(String startInstant, String endInstant) {
this.startInstant = Objects.requireNonNull(startInstant);
this.endInstant = Objects.requireNonNull(endInstant);
}
public static InstantRange getInstance(String startInstant, String endInstant, RangeType rangeType) {
switch (rangeType) {
case OPEN_CLOSE:
return new OpenCloseRange(startInstant, endInstant);
case CLOSE_CLOSE:
return new CloseCloseRange(startInstant, endInstant);
default:
throw new AssertionError();
}
}
public String getStartInstant() {
return startInstant;
}
public String getEndInstant() {
return endInstant;
}
public abstract boolean isInRange(String instant);
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
/**
* Represents a range type.
*/
public enum RangeType {
OPEN_CLOSE, CLOSE_CLOSE;
}
private static class OpenCloseRange extends InstantRange {
public OpenCloseRange(String startInstant, String endInstant) {
super(startInstant, endInstant);
}
@Override
public boolean isInRange(String instant) {
// No need to do comparison:
// HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant)
// because the logic is ensured by the log scanner
return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN, startInstant);
}
}
private static class CloseCloseRange extends InstantRange {
public CloseCloseRange(String startInstant, String endInstant) {
super(startInstant, endInstant);
}
@Override
public boolean isInRange(String instant) {
// No need to do comparison:
// HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant)
// because the logic is ensured by the log scanner
return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, startInstant);
}
}
}

View File

@@ -287,6 +287,17 @@ public class MergeOnReadInputFormat
// delete record found, skipping
return hasNext();
} else {
// should improve the code when log scanner supports
// seeking by log blocks with commit time which is more
// efficient.
if (split.getInstantRange().isPresent()) {
// based on the fact that commit time is always the first field
String commitTime = curAvroRecord.get().get(0).toString();
if (!split.getInstantRange().get().isInRange(commitTime)) {
// filter out the records that are not in range
return hasNext();
}
}
GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
curAvroRecord.get(),
requiredSchema,

View File

@@ -39,6 +39,7 @@ public class MergeOnReadInputSplit implements InputSplit {
private final String tablePath;
private final long maxCompactionMemoryInBytes;
private final String mergeType;
private final Option<InstantRange> instantRange;
public MergeOnReadInputSplit(
int splitNum,
@@ -47,7 +48,8 @@ public class MergeOnReadInputSplit implements InputSplit {
String latestCommit,
String tablePath,
long maxCompactionMemoryInBytes,
String mergeType) {
String mergeType,
@Nullable InstantRange instantRange) {
this.splitNum = splitNum;
this.basePath = Option.ofNullable(basePath);
this.logPaths = logPaths;
@@ -55,6 +57,7 @@ public class MergeOnReadInputSplit implements InputSplit {
this.tablePath = tablePath;
this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
this.mergeType = mergeType;
this.instantRange = Option.ofNullable(instantRange);
}
public Option<String> getBasePath() {
@@ -81,6 +84,10 @@ public class MergeOnReadInputSplit implements InputSplit {
return mergeType;
}
public Option<InstantRange> getInstantRange() {
return this.instantRange;
}
@Override
public int getSplitNumber() {
return this.splitNum;

View File

@@ -71,7 +71,7 @@ public class HoodieFlinkStreamer {
}
Configuration conf = FlinkOptions.fromStreamerConfig(cfg);
int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM);
int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASKS);
TypedProperties props = StreamerUtil.appendKafkaProps(cfg);

View File

@@ -72,7 +72,7 @@ public class HoodieFlinkStreamerV2 {
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg))
.getLogicalType();
Configuration conf = FlinkOptions.fromStreamerConfig(cfg);
int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM);
int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASKS);
StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
new StreamWriteOperatorFactory<>(conf);

View File

@@ -18,27 +18,26 @@
package org.apache.hudi.util;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.TablePathUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.TablePathUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.operator.FlinkOptions;
import org.apache.hudi.schema.FilebasedSchemaProvider;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.avro.Schema;
@@ -332,7 +331,7 @@ public class StreamerUtil {
public static boolean needsScheduleCompaction(Configuration conf) {
return conf.getString(FlinkOptions.TABLE_TYPE)
.toUpperCase(Locale.ROOT)
.equals(HoodieTableType.MERGE_ON_READ.name())
.equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
&& conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED);
}
}