From 2fdae6835ce3fcad3111205d2373a69b34788483 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Wed, 10 Mar 2021 22:44:06 +0800 Subject: [PATCH] [HUDI-1663] Streaming read for Flink MOR table (#2640) Supports two read modes: * Read the full data set starting from the latest commit instant and subsequent incremental data set * Read data set that starts from a specified commit instant --- .../common/model/HoodieCommitMetadata.java | 5 + .../apache/hudi/operator/FlinkOptions.java | 36 +- .../StreamReadMonitoringFunction.java | 372 ++++++++++++++++++ .../hudi/operator/StreamReadOperator.java | 237 +++++++++++ .../org/apache/hudi/sink/HoodieTableSink.java | 2 +- .../apache/hudi/source/HoodieTableSource.java | 114 +++--- .../hudi/source/format/FilePathUtils.java | 108 +++++ .../hudi/source/format/mor/InstantRange.java | 101 +++++ .../format/mor/MergeOnReadInputFormat.java | 11 + .../format/mor/MergeOnReadInputSplit.java | 9 +- .../hudi/streamer/HoodieFlinkStreamer.java | 2 +- .../hudi/streamer/HoodieFlinkStreamerV2.java | 2 +- .../org/apache/hudi/util/StreamerUtil.java | 17 +- .../operator/utils/TestConfigurations.java | 29 +- .../apache/hudi/operator/utils/TestData.java | 105 ++++- .../hudi/source/HoodieDataSourceITCase.java | 118 ++++-- ...ceTest.java => TestHoodieTableSource.java} | 4 +- .../TestStreamReadMonitoringFunction.java | 269 +++++++++++++ .../hudi/source/TestStreamReadOperator.java | 290 ++++++++++++++ ...utFormatTest.java => TestInputFormat.java} | 23 +- .../java/org/apache/hudi/utils/TestUtils.java | 64 +++ .../factory/CollectSinkTableFactory.java | 174 ++++++++ .../org.apache.flink.table.factories.Factory | 17 + .../src/test/resources/test_source2.data | 8 + 24 files changed, 1989 insertions(+), 128 deletions(-) create mode 100644 hudi-flink/src/main/java/org/apache/hudi/operator/StreamReadMonitoringFunction.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/operator/StreamReadOperator.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/source/format/mor/InstantRange.java rename hudi-flink/src/test/java/org/apache/hudi/source/{HoodieTableSourceTest.java => TestHoodieTableSource.java} (97%) create mode 100644 hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java create mode 100644 hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java rename hudi-flink/src/test/java/org/apache/hudi/source/format/{InputFormatTest.java => TestInputFormat.java} (91%) create mode 100644 hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java create mode 100644 hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java create mode 100644 hudi-flink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory create mode 100644 hudi-flink/src/test/resources/test_source2.data diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java index bc9a4ba7e..da72b165f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java @@ -36,6 +36,7 @@ import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; @@ -339,6 +340,10 @@ public class HoodieCommitMetadata implements Serializable { maxEventTime == Long.MIN_VALUE ? Option.empty() : Option.of(maxEventTime)); } + public HashSet getWritePartitionPaths() { + return new HashSet<>(partitionToWriteStats.keySet()); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java index 10dc5be88..a724830d6 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java @@ -18,6 +18,7 @@ package org.apache.hudi.operator; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.streamer.FlinkStreamerConfig; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; @@ -68,6 +69,12 @@ public class FlinkOptions { // ------------------------------------------------------------------------ // Read Options // ------------------------------------------------------------------------ + public static final ConfigOption READ_TASKS = ConfigOptions + .key("read.tasks") + .intType() + .defaultValue(4) + .withDescription("Parallelism of tasks that do actual read, default is 4"); + public static final ConfigOption READ_SCHEMA_FILE_PATH = ConfigOptions .key("read.schema.file.path") .stringType() @@ -112,6 +119,25 @@ public class FlinkOptions { + " time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x" + " use UTC timezone, by default true"); + public static final ConfigOption READ_AS_STREAMING = ConfigOptions + .key("read.streaming.enabled") + .booleanType() + .defaultValue(false)// default read as batch + .withDescription("Whether to read as streaming source, default false"); + + public static final ConfigOption READ_STREAMING_CHECK_INTERVAL = ConfigOptions + .key("read.streaming.check-interval") + .intType() + .defaultValue(60)// default 1 minute + .withDescription("Check interval for streaming read of SECOND, default 1 minute"); + + public static final ConfigOption READ_STREAMING_START_COMMIT = ConfigOptions + .key("read.streaming.start-commit") + .stringType() + .noDefaultValue() + .withDescription("Start commit instant for streaming read, the commit time format should be 'yyyyMMddHHmmss', " + + "by default reading from the latest instant"); + // ------------------------------------------------------------------------ // Write Options // ------------------------------------------------------------------------ @@ -121,8 +147,8 @@ public class FlinkOptions { .noDefaultValue() .withDescription("Table name to register to Hive metastore"); - public static final String TABLE_TYPE_COPY_ON_WRITE = "COPY_ON_WRITE"; - public static final String TABLE_TYPE_MERGE_ON_READ = "MERGE_ON_READ"; + public static final String TABLE_TYPE_COPY_ON_WRITE = HoodieTableType.COPY_ON_WRITE.name(); + public static final String TABLE_TYPE_MERGE_ON_READ = HoodieTableType.MERGE_ON_READ.name(); public static final ConfigOption TABLE_TYPE = ConfigOptions .key("write.table.type") .stringType() @@ -203,8 +229,8 @@ public class FlinkOptions { .defaultValue(SimpleAvroKeyGenerator.class.getName()) .withDescription("Key generator class, that implements will extract the key out of incoming record"); - public static final ConfigOption WRITE_TASK_PARALLELISM = ConfigOptions - .key("write.task.parallelism") + public static final ConfigOption WRITE_TASKS = ConfigOptions + .key("write.tasks") .intType() .defaultValue(4) .withDescription("Parallelism of tasks that do actual write, default is 4"); @@ -290,7 +316,7 @@ public class FlinkOptions { conf.setString(FlinkOptions.RECORD_KEY_FIELD, config.recordKeyField); conf.setString(FlinkOptions.PARTITION_PATH_FIELD, config.partitionPathField); conf.setString(FlinkOptions.KEYGEN_CLASS, config.keygenClass); - conf.setInteger(FlinkOptions.WRITE_TASK_PARALLELISM, config.writeTaskNum); + conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum); return conf; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamReadMonitoringFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamReadMonitoringFunction.java new file mode 100644 index 000000000..2e24acaa9 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamReadMonitoringFunction.java @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.source.format.mor.InstantRange; +import org.apache.hudi.source.format.mor.MergeOnReadInputSplit; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS; + +/** + * This is the single (non-parallel) monitoring task which takes a {@link MergeOnReadInputSplit} + * , it is responsible for: + * + *
    + *
  1. Monitoring a user-provided hoodie table path.
  2. + *
  3. Deciding which files(or split) should be further read and processed.
  4. + *
  5. Creating the {@link MergeOnReadInputSplit splits} corresponding to those files.
  6. + *
  7. Assigning them to downstream tasks for further processing.
  8. + *
+ * + *

The splits to be read are forwarded to the downstream {@link StreamReadOperator} + * which can have parallelism greater than one. + * + *

IMPORTANT NOTE: Splits are forwarded downstream for reading in ascending instant commits time order, + * in each downstream task, the splits are also read in receiving sequence. We do not ensure split consuming sequence + * among the downstream tasks. + */ +public class StreamReadMonitoringFunction + extends RichSourceFunction implements CheckpointedFunction { + private static final Logger LOG = LoggerFactory.getLogger(StreamReadMonitoringFunction.class); + + private static final long serialVersionUID = 1L; + + /** + * The path to monitor. + */ + private final Path path; + + /** + * The interval between consecutive path scans. + */ + private final long interval; + + private transient Object checkpointLock; + + private volatile boolean isRunning = true; + + private String issuedInstant; + + private transient ListState instantState; + + private final Configuration conf; + + private transient org.apache.hadoop.conf.Configuration hadoopConf; + + private final HoodieTableMetaClient metaClient; + + private final long maxCompactionMemoryInBytes; + + public StreamReadMonitoringFunction( + Configuration conf, + Path path, + HoodieTableMetaClient metaClient, + long maxCompactionMemoryInBytes) { + this.conf = conf; + this.path = path; + this.metaClient = metaClient; + this.interval = conf.getInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL); + this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes; + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + + ValidationUtils.checkState(this.instantState == null, + "The " + getClass().getSimpleName() + " has already been initialized."); + + this.instantState = context.getOperatorStateStore().getListState( + new ListStateDescriptor<>( + "file-monitoring-state", + StringSerializer.INSTANCE + ) + ); + + if (context.isRestored()) { + LOG.info("Restoring state for the class {} with table {} and base path {}.", + getClass().getSimpleName(), conf.getString(FlinkOptions.TABLE_NAME), path); + + List retrievedStates = new ArrayList<>(); + for (String entry : this.instantState.get()) { + retrievedStates.add(entry); + } + + ValidationUtils.checkArgument(retrievedStates.size() <= 1, + getClass().getSimpleName() + " retrieved invalid state."); + + if (retrievedStates.size() == 1 && issuedInstant != null) { + // this is the case where we have both legacy and new state. + // the two should be mutually exclusive for the operator, thus we throw the exception. + + throw new IllegalArgumentException( + "The " + getClass().getSimpleName() + " has already restored from a previous Flink version."); + + } else if (retrievedStates.size() == 1) { + this.issuedInstant = retrievedStates.get(0); + if (LOG.isDebugEnabled()) { + LOG.debug("{} retrieved a issued instant of time {} for table {} with path {}.", + getClass().getSimpleName(), issuedInstant, conf.get(FlinkOptions.TABLE_NAME), path); + } + } + } + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + this.hadoopConf = StreamerUtil.getHadoopConf(); + } + + @Override + public void run(SourceFunction.SourceContext context) throws Exception { + checkpointLock = context.getCheckpointLock(); + while (isRunning) { + synchronized (checkpointLock) { + monitorDirAndForwardSplits(context); + } + TimeUnit.SECONDS.sleep(interval); + } + } + + @VisibleForTesting + public void monitorDirAndForwardSplits(SourceContext context) { + metaClient.reloadActiveTimeline(); + HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); + if (commitTimeline.empty()) { + LOG.warn("No splits found for the table under path " + path); + return; + } + List instants = getUncompactedInstants(commitTimeline, this.issuedInstant); + // get the latest instant that satisfies condition + final HoodieInstant instantToIssue = instants.size() == 0 ? null : instants.get(instants.size() - 1); + final InstantRange instantRange; + if (instantToIssue != null) { + if (this.issuedInstant != null) { + // had already consumed an instant + instantRange = InstantRange.getInstance(this.issuedInstant, instantToIssue.getTimestamp(), + InstantRange.RangeType.OPEN_CLOSE); + } else if (this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()) { + // first time consume and has a start commit + final String specifiedStart = this.conf.getString(FlinkOptions.READ_STREAMING_START_COMMIT); + instantRange = InstantRange.getInstance(specifiedStart, instantToIssue.getTimestamp(), + InstantRange.RangeType.CLOSE_CLOSE); + } else { + // first time consume and no start commit, + // would consume all the snapshot data PLUS incremental data set + instantRange = null; + } + } else { + LOG.info("No new instant found for the table under path " + path + ", skip reading"); + return; + } + // generate input split: + // 1. first fetch all the commit metadata for the incremental instants; + // 2. filter the relative partition paths + // 3. filter the full file paths + // 4. use the file paths from #step 3 as the back-up of the filesystem view + + List metadataList = instants.stream() + .map(instant -> getCommitMetadata(instant, commitTimeline)).collect(Collectors.toList()); + Set writePartitions = getWritePartitionPaths(metadataList); + FileStatus[] fileStatuses = getWritePathsOfInstants(metadataList); + if (fileStatuses.length == 0) { + throw new HoodieException("No files found for reading in user provided path."); + } + + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses); + final String commitToIssue = instantToIssue.getTimestamp(); + final AtomicInteger cnt = new AtomicInteger(0); + final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE); + List inputSplits = writePartitions.stream() + .map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, commitToIssue) + .map(fileSlice -> { + Option> logPaths = Option.ofNullable(fileSlice.getLogFiles() + .sorted(HoodieLogFile.getLogFileComparator()) + .map(logFile -> logFile.getPath().toString()) + .collect(Collectors.toList())); + return new MergeOnReadInputSplit(cnt.getAndAdd(1), + null, logPaths, commitToIssue, + metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange); + }).collect(Collectors.toList())) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + + for (MergeOnReadInputSplit split : inputSplits) { + context.collect(split); + } + // update the issues instant time + this.issuedInstant = commitToIssue; + } + + @Override + public void close() throws Exception { + super.close(); + + if (checkpointLock != null) { + synchronized (checkpointLock) { + issuedInstant = null; + isRunning = false; + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Closed File Monitoring Source for path: " + path + "."); + } + } + + @Override + public void cancel() { + if (checkpointLock != null) { + // this is to cover the case where cancel() is called before the run() + synchronized (checkpointLock) { + issuedInstant = null; + isRunning = false; + } + } else { + issuedInstant = null; + isRunning = false; + } + } + + // ------------------------------------------------------------------------- + // Checkpointing + // ------------------------------------------------------------------------- + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + this.instantState.clear(); + if (this.issuedInstant != null) { + this.instantState.add(this.issuedInstant); + } + } + + /** + * Returns the uncompacted instants with a given issuedInstant to start from. + * + * @param commitTimeline The completed commits timeline + * @param issuedInstant The last issued instant that has already been delivered to downstream + * @return the filtered hoodie instants + */ + private List getUncompactedInstants( + HoodieTimeline commitTimeline, + final String issuedInstant) { + if (issuedInstant != null) { + return commitTimeline.getInstants() + .filter(s -> !s.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) + .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, issuedInstant)) + .collect(Collectors.toList()); + } else if (this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()) { + String definedStartCommit = this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT); + return commitTimeline.getInstants() + .filter(s -> !s.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) + .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, definedStartCommit)) + .collect(Collectors.toList()); + } else { + return commitTimeline.getInstants() + .filter(s -> !s.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) + .collect(Collectors.toList()); + } + } + + /** + * Returns all the incremental write partition paths as a set with the given commits metadata. + * + * @param metadataList The commits metadata + * @return the partition path set + */ + private Set getWritePartitionPaths(List metadataList) { + return metadataList.stream() + .map(HoodieCommitMetadata::getWritePartitionPaths) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + } + + /** + * Returns all the incremental write file path statuses with the given commits metadata. + * + * @param metadataList The commits metadata + * @return the file statuses array + */ + private FileStatus[] getWritePathsOfInstants(List metadataList) { + FileSystem fs = FSUtils.getFs(path.getPath(), hadoopConf); + return metadataList.stream().map(metadata -> getWritePathsOfInstant(metadata, fs)) + .flatMap(Collection::stream).toArray(FileStatus[]::new); + } + + private List getWritePathsOfInstant(HoodieCommitMetadata metadata, FileSystem fs) { + return metadata.getFileIdAndFullPaths(path.getPath()).values().stream() + .map(path -> { + try { + return fs.getFileStatus(new org.apache.hadoop.fs.Path(path)); + } catch (IOException e) { + LOG.error("Get write status of path: {} error", path); + throw new HoodieException(e); + } + }) + .collect(Collectors.toList()); + } + + private HoodieCommitMetadata getCommitMetadata(HoodieInstant instant, HoodieTimeline timeline) { + byte[] data = timeline.getInstantDetails(instant).get(); + try { + return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class); + } catch (IOException e) { + LOG.error("Get write metadata for table {} with instant {} and path: {} error", + conf.getString(FlinkOptions.TABLE_NAME), instant.getTimestamp(), path); + throw new HoodieException(e); + } + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamReadOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamReadOperator.java new file mode 100644 index 000000000..d12147d28 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamReadOperator.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator; + +import org.apache.hudi.source.format.mor.MergeOnReadInputFormat; +import org.apache.hudi.source.format.mor.MergeOnReadInputSplit; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.runtime.state.JavaSerializer; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.MailboxExecutor; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.api.operators.StreamSourceContexts; +import org.apache.flink.streaming.api.operators.YieldingOperatorFactory; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingDeque; + +/** + * The operator that reads the {@link MergeOnReadInputSplit splits} received from the preceding {@link + * StreamReadMonitoringFunction}. Contrary to the {@link StreamReadMonitoringFunction} which has a parallelism of 1, + * this operator can have multiple parallelism. + * + *

As soon as an input split {@link MergeOnReadInputSplit} is received, it is put in a queue, + * the {@link MailboxExecutor} read the actual data of the split. + * This architecture allows the separation of split reading from processing the checkpoint barriers, + * thus removing any potential back-pressure. + */ +public class StreamReadOperator extends AbstractStreamOperator + implements OneInputStreamOperator { + + private static final Logger LOG = LoggerFactory.getLogger(StreamReadOperator.class); + + // It's the same thread that runs this operator and checkpoint actions. Use this executor to schedule only + // splits for subsequent reading, so that a new checkpoint could be triggered without blocking a long time + // for exhausting all scheduled split reading tasks. + private final MailboxExecutor executor; + + private MergeOnReadInputFormat format; + + private transient SourceFunction.SourceContext sourceContext; + + private transient ListState inputSplitsState; + private transient Queue splits; + + // Splits are read by the same thread that calls #processElement. Each read task is submitted to that thread by adding + // them to the executor. This state is used to ensure that only one read task is in that splits queue at a time, so that + // read tasks do not accumulate ahead of checkpoint tasks. When there is a read task in the queue, this is set to RUNNING. + // When there are no more files to read, this will be set to IDLE. + private transient SplitState currentSplitState; + + private StreamReadOperator(MergeOnReadInputFormat format, ProcessingTimeService timeService, + MailboxExecutor mailboxExecutor) { + this.format = Preconditions.checkNotNull(format, "The InputFormat should not be null."); + this.processingTimeService = timeService; + this.executor = Preconditions.checkNotNull(mailboxExecutor, "The mailboxExecutor should not be null."); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + + // TODO Replace Java serialization with Avro approach to keep state compatibility. + inputSplitsState = context.getOperatorStateStore().getListState( + new ListStateDescriptor<>("splits", new JavaSerializer<>())); + + // Initialize the current split state to IDLE. + currentSplitState = SplitState.IDLE; + + // Recover splits state from flink state backend if possible. + splits = new LinkedBlockingDeque<>(); + if (context.isRestored()) { + int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); + LOG.info("Restoring state for operator {} (task ID: {}).", getClass().getSimpleName(), subtaskIdx); + + for (MergeOnReadInputSplit split : inputSplitsState.get()) { + splits.add(split); + } + } + + this.sourceContext = StreamSourceContexts.getSourceContext( + getOperatorConfig().getTimeCharacteristic(), + getProcessingTimeService(), + new Object(), // no actual locking needed + getContainingTask().getStreamStatusMaintainer(), + output, + getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(), + -1); + + // Enqueue to process the recovered input splits. + enqueueProcessSplits(); + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + + inputSplitsState.clear(); + inputSplitsState.addAll(new ArrayList<>(splits)); + } + + @Override + public void processElement(StreamRecord element) { + splits.add(element.getValue()); + enqueueProcessSplits(); + } + + private void enqueueProcessSplits() { + if (currentSplitState == SplitState.IDLE && !splits.isEmpty()) { + currentSplitState = SplitState.RUNNING; + executor.execute(this::processSplits, this.getClass().getSimpleName()); + } + } + + private void processSplits() throws IOException { + MergeOnReadInputSplit split = splits.poll(); + if (split == null) { + currentSplitState = SplitState.IDLE; + return; + } + + format.open(split); + try { + RowData nextElement = null; + while (!format.reachedEnd()) { + nextElement = format.nextRecord(nextElement); + sourceContext.collect(nextElement); + } + } finally { + currentSplitState = SplitState.IDLE; + format.close(); + } + + // Re-schedule to process the next split. + enqueueProcessSplits(); + } + + @Override + public void processWatermark(Watermark mark) { + // we do nothing because we emit our own watermarks if needed. + } + + @Override + public void dispose() throws Exception { + super.dispose(); + + if (format != null) { + format.close(); + format.closeInputFormat(); + format = null; + } + + sourceContext = null; + } + + @Override + public void close() throws Exception { + super.close(); + output.close(); + if (sourceContext != null) { + sourceContext.emitWatermark(Watermark.MAX_WATERMARK); + sourceContext.close(); + sourceContext = null; + } + } + + public static OneInputStreamOperatorFactory factory(MergeOnReadInputFormat format) { + return new OperatorFactory(format); + } + + private enum SplitState { + IDLE, RUNNING + } + + private static class OperatorFactory extends AbstractStreamOperatorFactory + implements YieldingOperatorFactory, OneInputStreamOperatorFactory { + + private final MergeOnReadInputFormat format; + + private transient MailboxExecutor mailboxExecutor; + + private OperatorFactory(MergeOnReadInputFormat format) { + this.format = format; + } + + @Override + public void setMailboxExecutor(MailboxExecutor mailboxExecutor) { + this.mailboxExecutor = mailboxExecutor; + } + + @SuppressWarnings("unchecked") + @Override + public > O createStreamOperator(StreamOperatorParameters parameters) { + StreamReadOperator operator = new StreamReadOperator(format, processingTimeService, mailboxExecutor); + operator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput()); + return (O) operator; + } + + @Override + public Class getStreamOperatorClass(ClassLoader classLoader) { + return StreamReadOperator.class; + } + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/sink/HoodieTableSink.java index 5f979c70b..3ba83811f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/HoodieTableSink.java @@ -65,7 +65,7 @@ public class HoodieTableSink implements AppendStreamTableSink, Partitio public DataStreamSink consumeDataStream(DataStream dataStream) { // Read from kafka source RowType rowType = (RowType) this.schema.toRowDataType().notNull().getLogicalType(); - int numWriteTasks = this.conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM); + int numWriteTasks = this.conf.getInteger(FlinkOptions.WRITE_TASKS); StreamWriteOperatorFactory operatorFactory = new StreamWriteOperatorFactory<>(conf, isBounded); DataStream pipeline = dataStream diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/source/HoodieTableSource.java index 0bc219b65..9ed753aa4 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/HoodieTableSource.java @@ -21,14 +21,18 @@ package org.apache.hudi.source; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.HoodieROTablePathFilter; import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils; import org.apache.hudi.operator.FlinkOptions; +import org.apache.hudi.operator.StreamReadMonitoringFunction; +import org.apache.hudi.operator.StreamReadOperator; import org.apache.hudi.source.format.FilePathUtils; import org.apache.hudi.source.format.cow.CopyOnWriteInputFormat; import org.apache.hudi.source.format.mor.MergeOnReadInputFormat; @@ -48,10 +52,11 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; import org.apache.flink.table.expressions.Expression; @@ -75,7 +80,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -148,6 +152,11 @@ public class HoodieTableSource implements this.hadoopConf = StreamerUtil.getHadoopConf(); this.metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); this.maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new JobConf(this.hadoopConf)); + if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) { + ValidationUtils.checkArgument( + conf.getString(FlinkOptions.TABLE_TYPE).equalsIgnoreCase(FlinkOptions.TABLE_TYPE_MERGE_ON_READ), + "Streaming read is only supported for table type: " + FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + } } @Override @@ -155,14 +164,29 @@ public class HoodieTableSource implements @SuppressWarnings("unchecked") TypeInformation typeInfo = (TypeInformation) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType()); - InputFormatSourceFunction func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo); - DataStreamSource source = execEnv.addSource(func, explainSource(), typeInfo); - return source.name(explainSource()); + if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) { + StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction( + conf, path, metaClient, maxCompactionMemoryInBytes); + OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat) getInputFormat(true)); + SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, "streaming_source") + .setParallelism(1) + .uid("uid_streaming_source") + .transform("split_reader", typeInfo, factory) + .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)) + .uid("uid_split_reader"); + return new DataStreamSource<>(source); + } else { + InputFormatSourceFunction func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo); + DataStreamSource source = execEnv.addSource(func, explainSource(), typeInfo); + return source.name("streaming_source") + .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)) + .uid("uid_streaming_source"); + } } @Override public boolean isBounded() { - return true; + return !conf.getBoolean(FlinkOptions.READ_AS_STREAMING); } @Override @@ -189,24 +213,7 @@ public class HoodieTableSource implements @Override public List> getPartitions() { - try { - return FilePathUtils - .searchPartKeyValueAndPaths( - path.getFileSystem(), - path, - conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION), - partitionKeys.toArray(new String[0])) - .stream() - .map(tuple2 -> tuple2.f0) - .map(spec -> { - LinkedHashMap ret = new LinkedHashMap<>(); - spec.forEach((k, v) -> ret.put(k, defaultPartName.equals(v) ? null : v)); - return ret; - }) - .collect(Collectors.toList()); - } catch (Exception e) { - throw new TableException("Fetch partitions fail.", e); - } + return FilePathUtils.getPartitions(path, conf, partitionKeys, defaultPartName); } @Override @@ -269,7 +276,7 @@ public class HoodieTableSource implements : Option.of(kv.getValue()); return new MergeOnReadInputSplit(cnt.getAndAdd(1), baseFile.getPath(), logPaths, latestCommit, - metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType); + metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null); }).collect(Collectors.toList()); } else { // all the files are logs @@ -285,15 +292,19 @@ public class HoodieTableSource implements .collect(Collectors.toList())); return new MergeOnReadInputSplit(cnt.getAndAdd(1), null, logPaths, latestCommit, - metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType); + metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null); }).collect(Collectors.toList()); }) .flatMap(Collection::stream) .collect(Collectors.toList()); } } - @VisibleForTesting public InputFormat getInputFormat() { + return getInputFormat(false); + } + + @VisibleForTesting + public InputFormat getInputFormat(boolean isStreaming) { // When this table has no partition, just return an empty source. if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) { return new CollectionInputFormat<>(Collections.emptyList(), null); @@ -317,13 +328,20 @@ public class HoodieTableSource implements final String queryType = this.conf.getString(FlinkOptions.QUERY_TYPE); if (queryType.equals(FlinkOptions.QUERY_TYPE_SNAPSHOT)) { - switch (this.conf.getString(FlinkOptions.TABLE_TYPE)) { - case FlinkOptions.TABLE_TYPE_MERGE_ON_READ: - final List inputSplits = buildFileIndex(paths); - if (inputSplits.size() == 0) { - // When there is no input splits, just return an empty source. - LOG.warn("No input inputs generate for MERGE_ON_READ input format, returns empty collection instead"); - return new CollectionInputFormat<>(Collections.emptyList(), null); + final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)); + switch (tableType) { + case MERGE_ON_READ: + final List inputSplits; + if (!isStreaming) { + inputSplits = buildFileIndex(paths); + if (inputSplits.size() == 0) { + // When there is no input splits, just return an empty source. + LOG.warn("No input splits generate for MERGE_ON_READ input format, returns empty collection instead"); + return new CollectionInputFormat<>(Collections.emptyList(), null); + } + } else { + // streaming reader would build the splits automatically. + inputSplits = Collections.emptyList(); } final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState( rowType, @@ -335,10 +353,10 @@ public class HoodieTableSource implements this.conf, paths, hoodieTableState, - rowDataType.getChildren(), // use the explicit fields data type because the AvroSchemaConvertr is not very stable. + rowDataType.getChildren(), // use the explicit fields data type because the AvroSchemaConverter is not very stable. "default", this.limit); - case FlinkOptions.TABLE_TYPE_COPY_ON_WRITE: + case COPY_ON_WRITE: FileInputFormat format = new CopyOnWriteInputFormat( paths, this.schema.getFieldNames(), @@ -373,27 +391,9 @@ public class HoodieTableSource implements */ @VisibleForTesting public Path[] getReadPaths() { - if (partitionKeys.isEmpty()) { - return new Path[] {path}; - } else { - return getOrFetchPartitions().stream() - .map(HoodieTableSource.this::validateAndReorderPartitions) - .map(kvs -> FilePathUtils.generatePartitionPath(kvs, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION))) - .map(n -> new Path(path, n)) - .toArray(Path[]::new); - } - } - - private LinkedHashMap validateAndReorderPartitions(Map part) { - LinkedHashMap map = new LinkedHashMap<>(); - for (String k : partitionKeys) { - if (!part.containsKey(k)) { - throw new TableException("Partition keys are: " + partitionKeys - + ", incomplete partition spec: " + part); - } - map.put(k, part.get(k)); - } - return map; + return partitionKeys.isEmpty() + ? new Path[] {path} + : FilePathUtils.partitionPath2ReadPath(path, conf, partitionKeys, getOrFetchPartitions()); } private static class LatestFileFilter extends FilePathFilter { diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/format/FilePathUtils.java b/hudi-flink/src/main/java/org/apache/hudi/source/format/FilePathUtils.java index b025d9a77..03bf53df8 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/format/FilePathUtils.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/format/FilePathUtils.java @@ -19,6 +19,7 @@ package org.apache.hudi.source.format; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.operator.FlinkOptions; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FileStatus; @@ -35,6 +36,7 @@ import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; /** * Reference the Flink {@link org.apache.flink.table.utils.PartitionPathUtils} @@ -317,4 +319,110 @@ public class FilePathUtils { // the log files is hidden file return name.startsWith("_") || name.startsWith(".") && !name.contains(".log."); } + + /** + * Returns the partition path key and values as a list of map, each map item in the list + * is a mapping of the partition key name to its actual partition value. For example, say + * there is a file path with partition keys [key1, key2, key3]: + * + *

+   *   -- file:/// ... key1=val1/key2=val2/key3=val3
+   *   -- file:/// ... key1=val4/key2=val5/key3=val6
+   * 
+ * + *

The return list should be [{key1:val1, key2:val2, key3:val3}, {key1:val4, key2:val5, key3:val6}]. + * + * @param path The base path + * @param conf The configuration + * @param partitionKeys The partition key list + * @param defaultParName The default partition name for nulls + */ + public static List> getPartitions( + Path path, + org.apache.flink.configuration.Configuration conf, + List partitionKeys, + String defaultParName) { + try { + return FilePathUtils + .searchPartKeyValueAndPaths( + path.getFileSystem(), + path, + conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION), + partitionKeys.toArray(new String[0])) + .stream() + .map(tuple2 -> tuple2.f0) + .map(spec -> { + LinkedHashMap ret = new LinkedHashMap<>(); + spec.forEach((k, v) -> ret.put(k, defaultParName.equals(v) ? null : v)); + return ret; + }) + .collect(Collectors.toList()); + } catch (Exception e) { + throw new TableException("Fetch partitions fail.", e); + } + } + + /** + * Reorder the partition key value mapping based on the given partition keys sequence. + * + * @param partitionKVs The partition key and value mapping + * @param partitionKeys The partition key list + */ + public static LinkedHashMap validateAndReorderPartitions( + Map partitionKVs, + List partitionKeys) { + LinkedHashMap map = new LinkedHashMap<>(); + for (String k : partitionKeys) { + if (!partitionKVs.containsKey(k)) { + throw new TableException("Partition keys are: " + partitionKeys + + ", incomplete partition spec: " + partitionKVs); + } + map.put(k, partitionKVs.get(k)); + } + return map; + } + + /** + * Returns all the file paths that is the parents of the data files. + * + * @param path The base path + * @param conf The configuration + * @param partitionKeys The partition key list + * @param defaultParName The default partition name for nulls + */ + public static Path[] getReadPaths( + Path path, + org.apache.flink.configuration.Configuration conf, + List partitionKeys, + String defaultParName) { + if (partitionKeys.isEmpty()) { + return new Path[] {path}; + } else { + List> partitionPaths = + getPartitions(path, conf, partitionKeys, defaultParName); + return partitionPath2ReadPath(path, conf, partitionKeys, partitionPaths); + } + } + + /** + * Transforms the given partition key value mapping to read paths. + * + * @param path The base path + * @param conf The hadoop configuration + * @param partitionKeys The partition key list + * @param partitionPaths The partition key value mapping + * + * @see #getReadPaths + */ + public static Path[] partitionPath2ReadPath( + Path path, + org.apache.flink.configuration.Configuration conf, + List partitionKeys, + List> partitionPaths) { + return partitionPaths.stream() + .map(m -> validateAndReorderPartitions(m, partitionKeys)) + .map(kvs -> FilePathUtils.generatePartitionPath(kvs, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION))) + .map(n -> new Path(path, n)) + .toArray(Path[]::new); + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/InstantRange.java b/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/InstantRange.java new file mode 100644 index 000000000..62f34db9f --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/InstantRange.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source.format.mor; + +import org.apache.hudi.common.table.timeline.HoodieTimeline; + +import java.io.Serializable; +import java.util.Objects; + +/** + * A instant commits range used for incremental reader filtering. + */ +public abstract class InstantRange implements Serializable { + private static final long serialVersionUID = 1L; + + protected final String startInstant; + protected final String endInstant; + + public InstantRange(String startInstant, String endInstant) { + this.startInstant = Objects.requireNonNull(startInstant); + this.endInstant = Objects.requireNonNull(endInstant); + } + + public static InstantRange getInstance(String startInstant, String endInstant, RangeType rangeType) { + switch (rangeType) { + case OPEN_CLOSE: + return new OpenCloseRange(startInstant, endInstant); + case CLOSE_CLOSE: + return new CloseCloseRange(startInstant, endInstant); + default: + throw new AssertionError(); + } + } + + public String getStartInstant() { + return startInstant; + } + + public String getEndInstant() { + return endInstant; + } + + public abstract boolean isInRange(String instant); + + // ------------------------------------------------------------------------- + // Inner Class + // ------------------------------------------------------------------------- + + /** + * Represents a range type. + */ + public enum RangeType { + OPEN_CLOSE, CLOSE_CLOSE; + } + + private static class OpenCloseRange extends InstantRange { + + public OpenCloseRange(String startInstant, String endInstant) { + super(startInstant, endInstant); + } + + @Override + public boolean isInRange(String instant) { + // No need to do comparison: + // HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant) + // because the logic is ensured by the log scanner + return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN, startInstant); + } + } + + private static class CloseCloseRange extends InstantRange { + + public CloseCloseRange(String startInstant, String endInstant) { + super(startInstant, endInstant); + } + + @Override + public boolean isInRange(String instant) { + // No need to do comparison: + // HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant) + // because the logic is ensured by the log scanner + return HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, startInstant); + } + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputFormat.java index ebd91afed..510b5b549 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputFormat.java @@ -287,6 +287,17 @@ public class MergeOnReadInputFormat // delete record found, skipping return hasNext(); } else { + // should improve the code when log scanner supports + // seeking by log blocks with commit time which is more + // efficient. + if (split.getInstantRange().isPresent()) { + // based on the fact that commit time is always the first field + String commitTime = curAvroRecord.get().get(0).toString(); + if (!split.getInstantRange().get().isInRange(commitTime)) { + // filter out the records that are not in range + return hasNext(); + } + } GenericRecord requiredAvroRecord = buildAvroRecordBySchema( curAvroRecord.get(), requiredSchema, diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputSplit.java b/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputSplit.java index 5cf0affaa..a73e93a48 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputSplit.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputSplit.java @@ -39,6 +39,7 @@ public class MergeOnReadInputSplit implements InputSplit { private final String tablePath; private final long maxCompactionMemoryInBytes; private final String mergeType; + private final Option instantRange; public MergeOnReadInputSplit( int splitNum, @@ -47,7 +48,8 @@ public class MergeOnReadInputSplit implements InputSplit { String latestCommit, String tablePath, long maxCompactionMemoryInBytes, - String mergeType) { + String mergeType, + @Nullable InstantRange instantRange) { this.splitNum = splitNum; this.basePath = Option.ofNullable(basePath); this.logPaths = logPaths; @@ -55,6 +57,7 @@ public class MergeOnReadInputSplit implements InputSplit { this.tablePath = tablePath; this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes; this.mergeType = mergeType; + this.instantRange = Option.ofNullable(instantRange); } public Option getBasePath() { @@ -81,6 +84,10 @@ public class MergeOnReadInputSplit implements InputSplit { return mergeType; } + public Option getInstantRange() { + return this.instantRange; + } + @Override public int getSplitNumber() { return this.splitNum; diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index d110bffef..29f5de8c2 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -71,7 +71,7 @@ public class HoodieFlinkStreamer { } Configuration conf = FlinkOptions.fromStreamerConfig(cfg); - int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM); + int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASKS); TypedProperties props = StreamerUtil.appendKafkaProps(cfg); diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java index 27fd4f5d0..d217e08f9 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java @@ -72,7 +72,7 @@ public class HoodieFlinkStreamerV2 { (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg)) .getLogicalType(); Configuration conf = FlinkOptions.fromStreamerConfig(cfg); - int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM); + int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASKS); StreamWriteOperatorFactory operatorFactory = new StreamWriteOperatorFactory<>(conf); diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index fdab92b5e..830c23a4f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -18,27 +18,26 @@ package org.apache.hudi.util; -import org.apache.hudi.common.model.HoodieRecordLocation; -import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.util.TablePathUtils; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.TableNotFoundException; -import org.apache.hudi.keygen.SimpleAvroKeyGenerator; -import org.apache.hudi.streamer.FlinkStreamerConfig; import org.apache.hudi.common.config.DFSPropertiesConfiguration; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.TablePathUtils; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.keygen.KeyGenerator; +import org.apache.hudi.keygen.SimpleAvroKeyGenerator; import org.apache.hudi.operator.FlinkOptions; import org.apache.hudi.schema.FilebasedSchemaProvider; +import org.apache.hudi.streamer.FlinkStreamerConfig; import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; import org.apache.avro.Schema; @@ -332,7 +331,7 @@ public class StreamerUtil { public static boolean needsScheduleCompaction(Configuration conf) { return conf.getString(FlinkOptions.TABLE_TYPE) .toUpperCase(Locale.ROOT) - .equals(HoodieTableType.MERGE_ON_READ.name()) + .equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ) && conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED); } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java index c28383e3a..c38b715b9 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java @@ -20,15 +20,19 @@ package org.apache.hudi.operator.utils; import org.apache.hudi.operator.FlinkOptions; import org.apache.hudi.streamer.FlinkStreamerConfig; +import org.apache.hudi.utils.factory.CollectSinkTableFactory; import org.apache.hudi.utils.factory.ContinuousFileSourceFactory; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.Row; import java.util.Map; import java.util.Objects; @@ -56,6 +60,13 @@ public class TestConfigurations { ROW_DATA_TYPE.getChildren().toArray(new DataType[0])) .build(); + public static final TypeInformation ROW_TYPE_INFO = Types.ROW( + Types.STRING, + Types.STRING, + Types.INT, + Types.LOCAL_DATE_TIME, + Types.STRING); + public static String getCreateHoodieTableDDL(String tableName, Map options) { String createTable = "create table " + tableName + "(\n" + " uuid varchar(20),\n" @@ -77,8 +88,12 @@ public class TestConfigurations { } public static String getFileSourceDDL(String tableName) { + return getFileSourceDDL(tableName, "test_source.data"); + } + + public static String getFileSourceDDL(String tableName, String fileName) { String sourcePath = Objects.requireNonNull(Thread.currentThread() - .getContextClassLoader().getResource("test_source.data")).toString(); + .getContextClassLoader().getResource(fileName)).toString(); return "create table " + tableName + "(\n" + " uuid varchar(20),\n" + " name varchar(10),\n" @@ -91,6 +106,18 @@ public class TestConfigurations { + ")"; } + public static String getCollectSinkDDL(String tableName) { + return "create table " + tableName + "(\n" + + " uuid varchar(20),\n" + + " name varchar(10),\n" + + " age int,\n" + + " ts timestamp(3),\n" + + " `partition` varchar(20)\n" + + ") with (\n" + + " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'" + + ")"; + } + public static final RowDataSerializer SERIALIZER = new RowDataSerializer(new ExecutionConfig(), ROW_TYPE); public static Configuration getDefaultConf(String tablePath) { diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java index 9e671e615..efeb0dd4d 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java @@ -43,6 +43,7 @@ import org.apache.flink.table.data.writer.BinaryWriter; import org.apache.flink.table.runtime.types.InternalSerializers; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.Row; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.Strings; @@ -117,6 +118,52 @@ public class TestData { TimestampData.fromEpochMillis(1), StringData.fromString("par1")))); } + // data set of test_source.data + public static List DATA_SET_FOUR = Arrays.asList( + binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(1000), StringData.fromString("par1")), + binaryRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 33, + TimestampData.fromEpochMillis(2000), StringData.fromString("par1")), + binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"), 53, + TimestampData.fromEpochMillis(3000), StringData.fromString("par2")), + binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 31, + TimestampData.fromEpochMillis(4000), StringData.fromString("par2")), + binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18, + TimestampData.fromEpochMillis(5000), StringData.fromString("par3")), + binaryRow(StringData.fromString("id6"), StringData.fromString("Emma"), 20, + TimestampData.fromEpochMillis(6000), StringData.fromString("par3")), + binaryRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44, + TimestampData.fromEpochMillis(7000), StringData.fromString("par4")), + binaryRow(StringData.fromString("id8"), StringData.fromString("Han"), 56, + TimestampData.fromEpochMillis(8000), StringData.fromString("par4")) + ); + + // merged data set of test_source.data and test_source2.data + public static List DATA_SET_FIVE = Arrays.asList( + binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 24, + TimestampData.fromEpochMillis(1000), StringData.fromString("par1")), + binaryRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 34, + TimestampData.fromEpochMillis(2000), StringData.fromString("par1")), + binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"), 54, + TimestampData.fromEpochMillis(3000), StringData.fromString("par2")), + binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 32, + TimestampData.fromEpochMillis(4000), StringData.fromString("par2")), + binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18, + TimestampData.fromEpochMillis(5000), StringData.fromString("par3")), + binaryRow(StringData.fromString("id6"), StringData.fromString("Emma"), 20, + TimestampData.fromEpochMillis(6000), StringData.fromString("par3")), + binaryRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44, + TimestampData.fromEpochMillis(7000), StringData.fromString("par4")), + binaryRow(StringData.fromString("id8"), StringData.fromString("Han"), 56, + TimestampData.fromEpochMillis(8000), StringData.fromString("par4")), + binaryRow(StringData.fromString("id9"), StringData.fromString("Jane"), 19, + TimestampData.fromEpochMillis(6000), StringData.fromString("par3")), + binaryRow(StringData.fromString("id10"), StringData.fromString("Ella"), 38, + TimestampData.fromEpochMillis(7000), StringData.fromString("par4")), + binaryRow(StringData.fromString("id11"), StringData.fromString("Phoebe"), 52, + TimestampData.fromEpochMillis(8000), StringData.fromString("par4")) + ); + /** * Returns string format of a list of RowData. */ @@ -159,24 +206,78 @@ public class TestData { } /** - * Checks the source data TestConfigurations.DATA_SET_ONE are written as expected. + * Sort the {@code rows} using field at index 0 and asserts + * it equals with the expected string {@code expected}. + * + * @param rows Actual result rows + * @param expected Expected string of the sorted rows + */ + public static void assertRowsEquals(List rows, String expected) { + String rowsString = rows.stream() + .sorted(Comparator.comparing(o -> o.getField(0).toString())) + .collect(Collectors.toList()).toString(); + assertThat(rowsString, is(expected)); + } + + /** + * Sort the {@code rows} using field at index 0 and asserts + * it equals with the expected row data list {@code expected}. + * + * @param rows Actual result rows + * @param expected Expected row data list + */ + public static void assertRowsEquals(List rows, List expected) { + String rowsString = rows.stream() + .sorted(Comparator.comparing(o -> o.getField(0).toString())) + .collect(Collectors.toList()).toString(); + assertThat(rowsString, is(rowDataToString(expected))); + } + + /** + * Sort the {@code rows} using field at index 0 and asserts + * it equals with the expected string {@code expected}. + * + * @param rows Actual result rows + * @param expected Expected string of the sorted rows + */ + public static void assertRowDataEquals(List rows, String expected) { + String rowsString = rowDataToString(rows); + assertThat(rowsString, is(expected)); + } + + /** + * Sort the {@code rows} using field at index 0 and asserts + * it equals with the expected row data list {@code expected}. + * + * @param rows Actual result rows + * @param expected Expected row data list + */ + public static void assertRowDataEquals(List rows, List expected) { + String rowsString = rowDataToString(rows); + assertThat(rowsString, is(rowDataToString(expected))); + } + + /** + * Checks the source data set are written as expected. * *

Note: Replace it with the Flink reader when it is supported. * * @param baseFile The file base to check, should be a directory * @param expected The expected results mapping, the key should be the partition path + * and value should be values list with the key partition */ public static void checkWrittenData(File baseFile, Map expected) throws IOException { checkWrittenData(baseFile, expected, 4); } /** - * Checks the source data TestConfigurations.DATA_SET_ONE are written as expected. + * Checks the source data set are written as expected. * *

Note: Replace it with the Flink reader when it is supported. * * @param baseFile The file base to check, should be a directory * @param expected The expected results mapping, the key should be the partition path + * and value should be values list with the key partition * @param partitions The expected partition number */ public static void checkWrittenData( diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/source/HoodieDataSourceITCase.java index 087954564..ca110db4f 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/source/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/source/HoodieDataSourceITCase.java @@ -20,7 +20,11 @@ package org.apache.hudi.source; import org.apache.hudi.operator.FlinkOptions; import org.apache.hudi.operator.utils.TestConfigurations; +import org.apache.hudi.operator.utils.TestData; +import org.apache.hudi.utils.TestUtils; +import org.apache.hudi.utils.factory.CollectSinkTableFactory; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; @@ -34,16 +38,16 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import java.io.File; -import java.util.Comparator; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; +import static org.apache.hudi.operator.utils.TestData.assertRowsEquals; /** * IT cases for Hoodie table source and sink. @@ -72,6 +76,68 @@ public class HoodieDataSourceITCase extends AbstractTestBase { @TempDir File tempFile; + @Test + void testStreamWriteAndRead() throws Exception { + // create filesystem table named source + String createSource = TestConfigurations.getFileSourceDDL("source"); + streamTableEnv.executeSql(createSource); + + Map options = new HashMap<>(); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.READ_SCHEMA_FILE_PATH.key(), + Objects.requireNonNull(Thread.currentThread() + .getContextClassLoader().getResource("test_read_schema.avsc")).toString()); + options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); + options.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + streamTableEnv.executeSql(hoodieTableDDL); + String insertInto = "insert into t1 select * from source"; + execInsertSql(streamTableEnv, insertInto); + + List rows = execSelectSql(streamTableEnv, "select * from t1", 10); + assertRowsEquals(rows, TestData.DATA_SET_FOUR); + + // insert another batch of data + execInsertSql(streamTableEnv, insertInto); + List rows2 = execSelectSql(streamTableEnv, "select * from t1", 10); + assertRowsEquals(rows2, TestData.DATA_SET_FOUR); + } + + @Test + void testStreamReadAppendData() throws Exception { + // create filesystem table named source + String createSource = TestConfigurations.getFileSourceDDL("source"); + String createSource2 = TestConfigurations.getFileSourceDDL("source2", "test_source2.data"); + streamTableEnv.executeSql(createSource); + streamTableEnv.executeSql(createSource2); + + Map options = new HashMap<>(); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.READ_SCHEMA_FILE_PATH.key(), + Objects.requireNonNull(Thread.currentThread() + .getContextClassLoader().getResource("test_read_schema.avsc")).toString()); + options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); + options.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + String createHoodieTable = TestConfigurations.getCreateHoodieTableDDL("t1", options); + streamTableEnv.executeSql(createHoodieTable); + String insertInto = "insert into t1 select * from source"; + // execute 2 times + execInsertSql(streamTableEnv, insertInto); + // remember the commit + String specifiedCommit = TestUtils.getFirstCommit(tempFile.getAbsolutePath()); + // another update batch + String insertInto2 = "insert into t1 select * from source2"; + execInsertSql(streamTableEnv, insertInto2); + // now we consume starting from the oldest commit + options.put(FlinkOptions.READ_STREAMING_START_COMMIT.key(), specifiedCommit); + String createHoodieTable2 = TestConfigurations.getCreateHoodieTableDDL("t2", options); + streamTableEnv.executeSql(createHoodieTable2); + List rows = execSelectSql(streamTableEnv, "select * from t2", 10); + // all the data with same keys are appended within one data bucket and one log file, + // so when consume, the same keys are merged + assertRowsEquals(rows, TestData.DATA_SET_FIVE); + } + @Test void testStreamWriteBatchRead() { // create filesystem table named source @@ -90,15 +156,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { List rows = CollectionUtil.iterableToList( () -> streamTableEnv.sqlQuery("select * from t1").execute().collect()); - final String expected = "[id1,Danny,23,1970-01-01T00:00:01,par1, " - + "id2,Stephen,33,1970-01-01T00:00:02,par1, " - + "id3,Julian,53,1970-01-01T00:00:03,par2, " - + "id4,Fabian,31,1970-01-01T00:00:04,par2, " - + "id5,Sophia,18,1970-01-01T00:00:05,par3, " - + "id6,Emma,20,1970-01-01T00:00:06,par3, " - + "id7,Bob,44,1970-01-01T00:00:07,par4, " - + "id8,Han,56,1970-01-01T00:00:08,par4]"; - assertRowsEquals(rows, expected); + assertRowsEquals(rows, TestData.DATA_SET_FOUR); } @Test @@ -124,29 +182,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { List rows = CollectionUtil.iterableToList( () -> batchTableEnv.sqlQuery("select * from t1").execute().collect()); - final String expected = "[id1,Danny,23,1970-01-01T00:00:01,par1, " - + "id2,Stephen,33,1970-01-01T00:00:02,par1, " - + "id3,Julian,53,1970-01-01T00:00:03,par2, " - + "id4,Fabian,31,1970-01-01T00:00:04,par2, " - + "id5,Sophia,18,1970-01-01T00:00:05,par3, " - + "id6,Emma,20,1970-01-01T00:00:06,par3, " - + "id7,Bob,44,1970-01-01T00:00:07,par4, " - + "id8,Han,56,1970-01-01T00:00:08,par4]"; - assertRowsEquals(rows, expected); - } - - /** - * Sort the {@code rows} using field at index 0 and asserts - * it equals with the expected string {@code expected}. - * - * @param rows Actual result rows - * @param expected Expected string of the sorted rows - */ - private static void assertRowsEquals(List rows, String expected) { - String rowsString = rows.stream() - .sorted(Comparator.comparing(o -> o.getField(0).toString())) - .collect(Collectors.toList()).toString(); - assertThat(rowsString, is(expected)); + assertRowsEquals(rows, TestData.DATA_SET_FOUR); } private void execInsertSql(TableEnvironment tEnv, String insert) { @@ -159,4 +195,16 @@ public class HoodieDataSourceITCase extends AbstractTestBase { throw new RuntimeException(ex); } } + + private List execSelectSql(TableEnvironment tEnv, String select, long timeout) throws InterruptedException { + tEnv.executeSql(TestConfigurations.getCollectSinkDDL("sink")); + TableResult tableResult = tEnv.executeSql("insert into sink " + select); + // wait for the timeout then cancels the job + TimeUnit.SECONDS.sleep(timeout); + tableResult.getJobClient().ifPresent(JobClient::cancel); + tEnv.executeSql("DROP TABLE IF EXISTS sink"); + return CollectSinkTableFactory.RESULT.values().stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/HoodieTableSourceTest.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java similarity index 97% rename from hudi-flink/src/test/java/org/apache/hudi/source/HoodieTableSourceTest.java rename to hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java index f79945743..af50cf0dc 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/source/HoodieTableSourceTest.java +++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java @@ -54,8 +54,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows; /** * Test cases for HoodieTableSource. */ -public class HoodieTableSourceTest { - private static final Logger LOG = LoggerFactory.getLogger(HoodieTableSourceTest.class); +public class TestHoodieTableSource { + private static final Logger LOG = LoggerFactory.getLogger(TestHoodieTableSource.class); private Configuration conf; diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java new file mode 100644 index 000000000..f02a28c0f --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source; + +import org.apache.hudi.operator.FlinkOptions; +import org.apache.hudi.operator.StreamReadMonitoringFunction; +import org.apache.hudi.operator.utils.TestConfigurations; +import org.apache.hudi.operator.utils.TestData; +import org.apache.hudi.source.format.mor.MergeOnReadInputSplit; +import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.utils.TestUtils; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test cases for {@link StreamReadMonitoringFunction}. + */ +public class TestStreamReadMonitoringFunction { + private static final long WAIT_TIME_MILLIS = 5 * 1000L; + + private Configuration conf; + + @TempDir + File tempFile; + + @BeforeEach + public void before() throws Exception { + final String basePath = tempFile.getAbsolutePath(); + conf = TestConfigurations.getDefaultConf(basePath); + conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + conf.setInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2); // check every 2 seconds + + StreamerUtil.initTableIfNotExists(conf); + } + + @Test + public void testConsumeFromLatestCommit() throws Exception { + TestData.writeData(TestData.DATA_SET_ONE, conf); + StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf); + try (AbstractStreamOperatorTestHarness harness = createHarness(function)) { + harness.setup(); + harness.open(); + + CountDownLatch latch = new CountDownLatch(4); + CollectingSourceContext sourceContext = new CollectingSourceContext(latch); + + runAsync(sourceContext, function); + + assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation"); + assertThat("Should produce the expected splits", + sourceContext.getPartitionPaths(), is("par1,par2,par3,par4")); + assertTrue(sourceContext.splits.stream().noneMatch(split -> split.getInstantRange().isPresent()), + "No instants should have range limit"); + + Thread.sleep(1000L); + + // reset the source context + latch = new CountDownLatch(4); + sourceContext.reset(latch); + + // write another instant and validate + TestData.writeData(TestData.DATA_SET_TWO, conf); + + assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation"); + assertThat("Should produce the expected splits", + sourceContext.getPartitionPaths(), is("par1,par2,par3,par4")); + assertTrue(sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()), + "All the instants should have range limit"); + + // Stop the stream task. + function.close(); + } + } + + @Test + public void testConsumeFromSpecifiedCommit() throws Exception { + // write 2 commits first, use the second commit time as the specified start instant, + // all the splits should come from the second commit. + TestData.writeData(TestData.DATA_SET_ONE, conf); + TestData.writeData(TestData.DATA_SET_TWO, conf); + String specifiedCommit = TestUtils.getLatestCommit(tempFile.getAbsolutePath()); + conf.setString(FlinkOptions.READ_STREAMING_START_COMMIT, specifiedCommit); + StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf); + try (AbstractStreamOperatorTestHarness harness = createHarness(function)) { + harness.setup(); + harness.open(); + + CountDownLatch latch = new CountDownLatch(4); + CollectingSourceContext sourceContext = new CollectingSourceContext(latch); + + runAsync(sourceContext, function); + + assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation"); + assertThat("Should produce the expected splits", + sourceContext.getPartitionPaths(), is("par1,par2,par3,par4")); + assertTrue(sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()), + "All the instants should have range limit"); + assertTrue(sourceContext.splits.stream().allMatch(split -> split.getLatestCommit().equals(specifiedCommit)), + "All the splits should be with specified instant time"); + + // Stop the stream task. + function.close(); + } + } + + @Test + public void testCheckpointRestore() throws Exception { + TestData.writeData(TestData.DATA_SET_ONE, conf); + + StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf); + OperatorSubtaskState state; + try (AbstractStreamOperatorTestHarness harness = createHarness(function)) { + harness.setup(); + harness.open(); + + CountDownLatch latch = new CountDownLatch(4); + CollectingSourceContext sourceContext = new CollectingSourceContext(latch); + runAsync(sourceContext, function); + + assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation"); + Thread.sleep(1000L); + + state = harness.snapshot(1, 1); + + // Stop the stream task. + function.close(); + + assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation"); + assertThat("Should produce the expected splits", + sourceContext.getPartitionPaths(), is("par1,par2,par3,par4")); + assertTrue(sourceContext.splits.stream().noneMatch(split -> split.getInstantRange().isPresent()), + "No instants should have range limit"); + + } + + TestData.writeData(TestData.DATA_SET_TWO, conf); + StreamReadMonitoringFunction function2 = TestUtils.getMonitorFunc(conf); + try (AbstractStreamOperatorTestHarness harness = createHarness(function2)) { + harness.setup(); + // Recover to process the remaining snapshots. + harness.initializeState(state); + harness.open(); + + CountDownLatch latch = new CountDownLatch(4); + CollectingSourceContext sourceContext = new CollectingSourceContext(latch); + runAsync(sourceContext, function2); + + // Stop the stream task. + function.close(); + + assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation"); + assertThat("Should produce the expected splits", + sourceContext.getPartitionPaths(), is("par1,par2,par3,par4")); + assertTrue(sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()), + "All the instants should have range limit"); + } + } + + private AbstractStreamOperatorTestHarness createHarness( + StreamReadMonitoringFunction function) throws Exception { + StreamSource streamSource = new StreamSource<>(function); + return new AbstractStreamOperatorTestHarness<>(streamSource, 1, 1, 0); + } + + private void runAsync( + CollectingSourceContext sourceContext, + StreamReadMonitoringFunction function) { + Thread task = new Thread(() -> { + try { + function.run(sourceContext); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + task.start(); + } + + /** + * Source context that collects the outputs in to a list. + */ + private static class CollectingSourceContext implements SourceFunction.SourceContext { + private final List splits = new ArrayList<>(); + private final Object checkpointLock = new Object(); + private volatile CountDownLatch latch; + + CollectingSourceContext(CountDownLatch latch) { + this.latch = latch; + } + + @Override + public void collect(MergeOnReadInputSplit element) { + splits.add(element); + latch.countDown(); + } + + @Override + public void collectWithTimestamp(MergeOnReadInputSplit element, long timestamp) { + collect(element); + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public void markAsTemporarilyIdle() { + + } + + @Override + public Object getCheckpointLock() { + return checkpointLock; + } + + @Override + public void close() { + + } + + public void reset(CountDownLatch latch) { + this.latch = latch; + this.splits.clear(); + } + + public String getPartitionPaths() { + return this.splits.stream() + .map(TestUtils::getSplitPartitionPath) + .sorted(Comparator.naturalOrder()) + .collect(Collectors.joining(",")); + } + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java new file mode 100644 index 000000000..e13f95081 --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source; + +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.operator.FlinkOptions; +import org.apache.hudi.operator.StreamReadMonitoringFunction; +import org.apache.hudi.operator.StreamReadOperator; +import org.apache.hudi.operator.utils.TestConfigurations; +import org.apache.hudi.operator.utils.TestData; +import org.apache.hudi.source.format.FilePathUtils; +import org.apache.hudi.source.format.mor.MergeOnReadInputFormat; +import org.apache.hudi.source.format.mor.MergeOnReadInputSplit; +import org.apache.hudi.source.format.mor.MergeOnReadTableState; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.utils.TestUtils; + +import org.apache.avro.Schema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor; +import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction; +import org.apache.flink.streaming.runtime.tasks.mailbox.SteppingMailboxProcessor; +import org.apache.flink.streaming.util.CollectingSourceContext; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test cases for {@link StreamReadOperator}. + */ +public class TestStreamReadOperator { + private static final Map EXPECTED = new HashMap<>(); + static { + EXPECTED.put("par1", "id1,Danny,23,1970-01-01T00:00:00.001,par1, id2,Stephen,33,1970-01-01T00:00:00.002,par1"); + EXPECTED.put("par2", "id3,Julian,53,1970-01-01T00:00:00.003,par2, id4,Fabian,31,1970-01-01T00:00:00.004,par2"); + EXPECTED.put("par3", "id5,Sophia,18,1970-01-01T00:00:00.005,par3, id6,Emma,20,1970-01-01T00:00:00.006,par3"); + EXPECTED.put("par4", "id7,Bob,44,1970-01-01T00:00:00.007,par4, id8,Han,56,1970-01-01T00:00:00.008,par4"); + } + + private Configuration conf; + + @TempDir + File tempFile; + + @BeforeEach + public void before() throws Exception { + final String basePath = tempFile.getAbsolutePath(); + conf = TestConfigurations.getDefaultConf(basePath); + conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + + StreamerUtil.initTableIfNotExists(conf); + } + + @Test + void testWriteRecords() throws Exception { + TestData.writeData(TestData.DATA_SET_ONE, conf); + try (OneInputStreamOperatorTestHarness harness = createReader()) { + harness.setup(); + harness.open(); + + SteppingMailboxProcessor processor = createLocalMailbox(harness); + StreamReadMonitoringFunction func = TestUtils.getMonitorFunc(conf); + + List splits = generateSplits(func); + assertThat("Should have 4 splits", splits.size(), is(4)); + for (MergeOnReadInputSplit split : splits) { + // Process this element to enqueue to mail-box. + harness.processElement(split, -1); + + // Run the mail-box once to read all records from the given split. + assertThat("Should process 1 split", processor.runMailboxStep()); + } + // Assert the output has expected elements. + TestData.assertRowDataEquals(harness.extractOutputValues(), TestData.DATA_SET_ONE); + + TestData.writeData(TestData.DATA_SET_TWO, conf); + final List splits2 = generateSplits(func); + assertThat("Should have 4 splits", splits2.size(), is(4)); + for (MergeOnReadInputSplit split : splits2) { + // Process this element to enqueue to mail-box. + harness.processElement(split, -1); + + // Run the mail-box once to read all records from the given split. + assertThat("Should processed 1 split", processor.runMailboxStep()); + } + // The result sets behaves like append only: DATA_SET_ONE + DATA_SET_TWO + List expected = new ArrayList<>(TestData.DATA_SET_ONE); + expected.addAll(TestData.DATA_SET_TWO); + TestData.assertRowDataEquals(harness.extractOutputValues(), expected); + } + } + + @Test + public void testCheckpoint() throws Exception { + // Received emitted splits: split1, split2, split3, split4, checkpoint request is triggered + // when reading records from split1. + TestData.writeData(TestData.DATA_SET_ONE, conf); + long timestamp = 0; + try (OneInputStreamOperatorTestHarness harness = createReader()) { + harness.setup(); + harness.open(); + + SteppingMailboxProcessor processor = createLocalMailbox(harness); + StreamReadMonitoringFunction func = TestUtils.getMonitorFunc(conf); + + List splits = generateSplits(func); + assertThat("Should have 4 splits", splits.size(), is(4)); + + for (MergeOnReadInputSplit split : splits) { + harness.processElement(split, ++timestamp); + } + + // Trigger snapshot state, it will start to work once all records from split0 are read. + processor.getMainMailboxExecutor() + .execute(() -> harness.snapshot(1, 3), "Trigger snapshot"); + + assertTrue(processor.runMailboxStep(), "Should have processed the split0"); + assertTrue(processor.runMailboxStep(), "Should have processed the snapshot state action"); + + assertThat(TestData.rowDataToString(harness.extractOutputValues()), + is(getSplitExpected(Collections.singletonList(splits.get(0)), EXPECTED))); + + // Read records from split1. + assertTrue(processor.runMailboxStep(), "Should have processed the split1"); + + // Read records from split2. + assertTrue(processor.runMailboxStep(), "Should have processed the split2"); + + // Read records from split3. + assertTrue(processor.runMailboxStep(), "Should have processed the split3"); + + // Assert the output has expected elements. + TestData.assertRowDataEquals(harness.extractOutputValues(), TestData.DATA_SET_ONE); + } + } + + @Test + public void testCheckpointRestore() throws Exception { + TestData.writeData(TestData.DATA_SET_ONE, conf); + + OperatorSubtaskState state; + final List splits; + try (OneInputStreamOperatorTestHarness harness = createReader()) { + harness.setup(); + harness.open(); + + StreamReadMonitoringFunction func = TestUtils.getMonitorFunc(conf); + + splits = generateSplits(func); + assertThat("Should have 4 splits", splits.size(), is(4)); + + // Enqueue all the splits. + for (MergeOnReadInputSplit split : splits) { + harness.processElement(split, -1); + } + + // Read all records from the first 2 splits. + SteppingMailboxProcessor localMailbox = createLocalMailbox(harness); + for (int i = 0; i < 2; i++) { + assertTrue(localMailbox.runMailboxStep(), "Should have processed the split#" + i); + } + + assertThat(TestData.rowDataToString(harness.extractOutputValues()), + is(getSplitExpected(splits.subList(0, 2), EXPECTED))); + + // Snapshot state now, there are 2 splits left in the state. + state = harness.snapshot(1, 1); + } + + try (OneInputStreamOperatorTestHarness harness = createReader()) { + harness.setup(); + // Recover to process the remaining splits. + harness.initializeState(state); + harness.open(); + + SteppingMailboxProcessor localMailbox = createLocalMailbox(harness); + + for (int i = 2; i < 4; i++) { + assertTrue(localMailbox.runMailboxStep(), "Should have processed one split#" + i); + } + + // expect to output the left data + assertThat(TestData.rowDataToString(harness.extractOutputValues()), + is(getSplitExpected(splits.subList(2, 4), EXPECTED))); + } + } + + private static String getSplitExpected(List splits, Map expected) { + return splits.stream() + .map(TestUtils::getSplitPartitionPath) + .map(expected::get) + .sorted(Comparator.naturalOrder()) + .collect(Collectors.toList()).toString(); + } + + private List generateSplits(StreamReadMonitoringFunction func) throws Exception { + final List splits = new ArrayList<>(); + func.open(conf); + func.monitorDirAndForwardSplits(new CollectingSourceContext<>(new Object(), splits)); + return splits; + } + + private OneInputStreamOperatorTestHarness createReader() throws Exception { + final String basePath = tempFile.getAbsolutePath(); + final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() + .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build(); + final List partitionKeys = Collections.singletonList("partition"); + + // This input format is used to opening the emitted split. + TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); + final Schema tableAvroSchema; + try { + tableAvroSchema = schemaUtil.getTableAvroSchema(); + } catch (Exception e) { + throw new HoodieException("Get table avro schema error", e); + } + final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema); + final RowType rowType = (RowType) rowDataType.getLogicalType(); + final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState( + rowType, + TestConfigurations.ROW_TYPE, + tableAvroSchema.toString(), + AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE).toString(), + Collections.emptyList()); + Path[] paths = FilePathUtils.getReadPaths( + new Path(basePath), conf, partitionKeys, conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME)); + MergeOnReadInputFormat inputFormat = new MergeOnReadInputFormat( + conf, + paths, + hoodieTableState, + rowDataType.getChildren(), + "default", + 1000L); + + OneInputStreamOperatorFactory factory = StreamReadOperator.factory(inputFormat); + OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>( + factory, 1, 1, 0); + harness.getStreamConfig().setTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + return harness; + } + + private SteppingMailboxProcessor createLocalMailbox( + OneInputStreamOperatorTestHarness harness) { + return new SteppingMailboxProcessor( + MailboxDefaultAction.Controller::suspendDefaultAction, + harness.getTaskMailbox(), + StreamTaskActionExecutor.IMMEDIATE); + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/format/InputFormatTest.java b/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java similarity index 91% rename from hudi-flink/src/test/java/org/apache/hudi/source/format/InputFormatTest.java rename to hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java index 8bb529935..7774e5611 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/source/format/InputFormatTest.java +++ b/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java @@ -18,6 +18,7 @@ package org.apache.hudi.source.format; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.operator.FlinkOptions; import org.apache.hudi.operator.utils.TestConfigurations; import org.apache.hudi.operator.utils.TestData; @@ -32,7 +33,7 @@ import org.apache.flink.table.data.RowData; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.EnumSource; import java.io.File; import java.io.IOException; @@ -48,7 +49,7 @@ import static org.hamcrest.MatcherAssert.assertThat; /** * Test cases for MergeOnReadInputFormat and ParquetInputFormat. */ -public class InputFormatTest { +public class TestInputFormat { private HoodieTableSource tableSource; private Configuration conf; @@ -56,9 +57,9 @@ public class InputFormatTest { @TempDir File tempFile; - void beforeEach(String tableType) throws IOException { + void beforeEach(HoodieTableType tableType) throws IOException { conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); - conf.setString(FlinkOptions.TABLE_TYPE, tableType); + conf.setString(FlinkOptions.TABLE_TYPE, tableType.name()); conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // close the async compaction StreamerUtil.initTableIfNotExists(conf); @@ -71,10 +72,8 @@ public class InputFormatTest { } @ParameterizedTest - @ValueSource(strings = { - FlinkOptions.TABLE_TYPE_COPY_ON_WRITE, - FlinkOptions.TABLE_TYPE_MERGE_ON_READ}) - void testRead(String tableType) throws Exception { + @EnumSource(value = HoodieTableType.class) + void testRead(HoodieTableType tableType) throws Exception { beforeEach(tableType); TestData.writeData(TestData.DATA_SET_ONE, conf); @@ -113,7 +112,7 @@ public class InputFormatTest { @Test void testReadBaseAndLogFiles() throws Exception { - beforeEach(FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + beforeEach(HoodieTableType.MERGE_ON_READ); // write parquet first with compaction conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true); @@ -153,10 +152,8 @@ public class InputFormatTest { } @ParameterizedTest - @ValueSource(strings = { - FlinkOptions.TABLE_TYPE_COPY_ON_WRITE, - FlinkOptions.TABLE_TYPE_MERGE_ON_READ}) - void testReadWithPartitionPrune(String tableType) throws Exception { + @EnumSource(value = HoodieTableType.class) + void testReadWithPartitionPrune(HoodieTableType tableType) throws Exception { beforeEach(tableType); TestData.writeData(TestData.DATA_SET_ONE, conf); diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java new file mode 100644 index 000000000..fa021f3da --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utils; + +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.operator.FlinkOptions; +import org.apache.hudi.operator.StreamReadMonitoringFunction; +import org.apache.hudi.source.format.mor.MergeOnReadInputSplit; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; + +import java.io.File; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Common test utils. + */ +public class TestUtils { + + public static String getLatestCommit(String basePath) { + final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() + .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build(); + return metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants().lastInstant().get().getTimestamp(); + } + + public static String getFirstCommit(String basePath) { + final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() + .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build(); + return metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants().firstInstant().get().getTimestamp(); + } + + public static String getSplitPartitionPath(MergeOnReadInputSplit split) { + assertTrue(split.getLogPaths().isPresent()); + final String logPath = split.getLogPaths().get().get(0); + String[] paths = logPath.split(File.separator); + return paths[paths.length - 2]; + } + + public static StreamReadMonitoringFunction getMonitorFunc(Configuration conf) { + final String basePath = conf.getString(FlinkOptions.PATH); + final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() + .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build(); + return new StreamReadMonitoringFunction(conf, new Path(basePath), metaClient, 1024 * 1024L); + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java new file mode 100644 index 000000000..902987ef7 --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utils.factory; + +import org.apache.hudi.operator.utils.TestConfigurations; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Factory for CollectTableSink. + * + *

Note: The CollectTableSink collects all the data of a table into a global collection {@code RESULT}, + * so the tests should executed in single thread and the table name should be the same. + */ +public class CollectSinkTableFactory implements DynamicTableSinkFactory { + public static final String FACTORY_ID = "collect"; + + // global results to collect and query + public static final Map> RESULT = new HashMap<>(); + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + helper.validate(); + + TableSchema schema = context.getCatalogTable().getSchema(); + RESULT.clear(); + return new CollectTableSink(schema, context.getObjectIdentifier().getObjectName()); + } + + @Override + public String factoryIdentifier() { + return FACTORY_ID; + } + + @Override + public Set> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set> optionalOptions() { + return Collections.emptySet(); + } + + // -------------------------------------------------------------------------------------------- + // Table sinks + // -------------------------------------------------------------------------------------------- + + /** + * Values {@link DynamicTableSink} for testing. + */ + private static class CollectTableSink implements DynamicTableSink { + + private final TableSchema schema; + private final String tableName; + + private CollectTableSink( + TableSchema schema, + String tableName) { + this.schema = schema; + this.tableName = tableName; + } + + @Override + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + return ChangelogMode.insertOnly(); + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + DataStructureConverter converter = context.createDataStructureConverter(schema.toPhysicalRowDataType()); + return SinkFunctionProvider.of(new CollectSinkFunction(converter)); + } + + @Override + public DynamicTableSink copy() { + return new CollectTableSink(schema, tableName); + } + + @Override + public String asSummaryString() { + return "CollectSink"; + } + } + + static class CollectSinkFunction extends RichSinkFunction implements CheckpointedFunction { + + private static final long serialVersionUID = 1L; + private final DynamicTableSink.DataStructureConverter converter; + + protected transient ListState resultState; + protected transient List localResult; + + private int taskID; + + protected CollectSinkFunction(DynamicTableSink.DataStructureConverter converter) { + this.converter = converter; + } + + @Override + public void invoke(RowData value, SinkFunction.Context context) { + if (value.getRowKind() == RowKind.INSERT) { + Row row = (Row) converter.toExternal(value); + assert row != null; + RESULT.get(taskID).add(row); + } else { + throw new RuntimeException( + "CollectSinkFunction received " + value.getRowKind() + " messages."); + } + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + this.resultState = context.getOperatorStateStore().getListState( + new ListStateDescriptor<>("sink-results", TestConfigurations.ROW_TYPE_INFO)); + this.localResult = new ArrayList<>(); + if (context.isRestored()) { + for (Row value : resultState.get()) { + localResult.add(value); + } + } + this.taskID = getRuntimeContext().getIndexOfThisSubtask(); + synchronized (CollectSinkTableFactory.class) { + RESULT.put(taskID, localResult); + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + resultState.clear(); + resultState.addAll(RESULT.get(taskID)); + } + } +} diff --git a/hudi-flink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/hudi-flink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 000000000..5fec9b622 --- /dev/null +++ b/hudi-flink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.hudi.utils.factory.CollectSinkTableFactory diff --git a/hudi-flink/src/test/resources/test_source2.data b/hudi-flink/src/test/resources/test_source2.data new file mode 100644 index 000000000..ff8265d4b --- /dev/null +++ b/hudi-flink/src/test/resources/test_source2.data @@ -0,0 +1,8 @@ +{"uuid": "id1", "name": "Danny", "age": 24, "ts": "1970-01-01T00:00:01", "partition": "par1"} +{"uuid": "id2", "name": "Stephen", "age": 34, "ts": "1970-01-01T00:00:02", "partition": "par1"} +{"uuid": "id3", "name": "Julian", "age": 54, "ts": "1970-01-01T00:00:03", "partition": "par2"} +{"uuid": "id4", "name": "Fabian", "age": 32, "ts": "1970-01-01T00:00:04", "partition": "par2"} +{"uuid": "id5", "name": "Sophia", "age": 18, "ts": "1970-01-01T00:00:05", "partition": "par3"} +{"uuid": "id9", "name": "Jane", "age": 19, "ts": "1970-01-01T00:00:06", "partition": "par3"} +{"uuid": "id10", "name": "Ella", "age": 38, "ts": "1970-01-01T00:00:07", "partition": "par4"} +{"uuid": "id11", "name": "Phoebe", "age": 52, "ts": "1970-01-01T00:00:08", "partition": "par4"}