From 4c5b6923ccfaaa6616a934a3f690b1a795a42d41 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Sat, 6 Feb 2021 22:03:52 +0800 Subject: [PATCH] [HUDI-1557] Make Flink write pipeline write task scalable (#2506) This is the #step 2 of RFC-24: https://cwiki.apache.org/confluence/display/HUDI/RFC+-+24%3A+Hoodie+Flink+Writer+Proposal This PR introduce a BucketAssigner that assigns bucket ID (partition path & fileID) for each stream record. There is no need to look up index and partition the records anymore in the following pipeline for these records, we actually decide the write target location before the write and each record computes its location when the BucketAssigner receives it, thus, the indexing is with streaming style. Computing locations for a batch of records all at a time is resource consuming so a pressure to the engine, we should avoid that in streaming system. --- .../org/apache/hudi/table/WorkloadStat.java | 8 +- .../hudi/table/action/commit/BucketInfo.java | 38 ++ .../hudi/client/HoodieFlinkWriteClient.java | 15 +- .../index/state/FlinkInMemoryStateIndex.java | 40 +-- .../hudi/io/FlinkCreateHandleFactory.java | 41 +++ .../commit/BaseFlinkCommitActionExecutor.java | 199 +++-------- .../table/action/commit/FlinkWriteHelper.java | 46 ++- .../action/commit/UpsertPartitioner.java | 10 +- .../action/commit/UpsertPartitioner.java | 10 +- .../action/commit/UpsertPartitioner.java | 10 +- .../table/timeline/HoodieActiveTimeline.java | 6 + .../operator/InstantGenerateOperator.java | 18 +- .../operator/KeyedWriteProcessFunction.java | 50 ++- .../hudi/operator/StreamWriteFunction.java | 86 ++--- .../hudi/operator/StreamWriteOperator.java | 5 +- .../StreamWriteOperatorCoordinator.java | 32 +- .../operator/StreamWriteOperatorFactory.java | 4 +- .../partitioner/BucketAssignFunction.java | 149 ++++++++ .../operator/partitioner/BucketAssigner.java | 326 ++++++++++++++++++ .../transform/RowDataToHoodieFunction.java | 108 ++++++ .../hudi/streamer/FlinkStreamerConfig.java | 3 +- .../hudi/streamer/HoodieFlinkStreamer.java | 19 +- .../hudi/streamer/HoodieFlinkStreamerV2.java | 28 +- .../org/apache/hudi/util/StreamerUtil.java | 43 +++ .../operator/StreamWriteFunctionTest.java | 65 +++- .../hudi/operator/StreamWriteITCase.java | 119 ++++++- .../partitioner/TestBucketAssigner.java | 235 +++++++++++++ .../utils/StreamWriteFunctionWrapper.java | 74 +++- .../operator/utils/TestConfigurations.java | 13 + .../apache/hudi/operator/utils/TestData.java | 28 +- 30 files changed, 1435 insertions(+), 393 deletions(-) create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandleFactory.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssigner.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/operator/transform/RowDataToHoodieFunction.java create mode 100644 hudi-flink/src/test/java/org/apache/hudi/operator/partitioner/TestBucketAssigner.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadStat.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadStat.java index 6fdb217a0..c3371bab0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadStat.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadStat.java @@ -44,7 +44,13 @@ public class WorkloadStat implements Serializable { } public long addUpdates(HoodieRecordLocation location, long numUpdates) { - updateLocationToCount.put(location.getFileId(), Pair.of(location.getInstantTime(), numUpdates)); + long accNumUpdates = 0; + if (updateLocationToCount.containsKey(location.getFileId())) { + accNumUpdates = updateLocationToCount.get(location.getFileId()).getRight(); + } + updateLocationToCount.put( + location.getFileId(), + Pair.of(location.getInstantTime(), numUpdates + accNumUpdates)); return this.numUpdates += numUpdates; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java index 1d98ad49e..6547da642 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java @@ -19,6 +19,7 @@ package org.apache.hudi.table.action.commit; import java.io.Serializable; +import java.util.Objects; /** * Helper class for a bucket's type (INSERT and UPDATE) and its file location. @@ -29,6 +30,24 @@ public class BucketInfo implements Serializable { String fileIdPrefix; String partitionPath; + public BucketInfo(BucketType bucketType, String fileIdPrefix, String partitionPath) { + this.bucketType = bucketType; + this.fileIdPrefix = fileIdPrefix; + this.partitionPath = partitionPath; + } + + public BucketType getBucketType() { + return bucketType; + } + + public String getFileIdPrefix() { + return fileIdPrefix; + } + + public String getPartitionPath() { + return partitionPath; + } + @Override public String toString() { final StringBuilder sb = new StringBuilder("BucketInfo {"); @@ -38,4 +57,23 @@ public class BucketInfo implements Serializable { sb.append('}'); return sb.toString(); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BucketInfo that = (BucketInfo) o; + return bucketType == that.bucketType + && fileIdPrefix.equals(that.fileIdPrefix) + && partitionPath.equals(that.partitionPath); + } + + @Override + public int hashCode() { + return Objects.hash(bucketType, fileIdPrefix, partitionPath); + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index e3e0eb421..0c87f7df9 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CommitUtils; @@ -249,7 +250,17 @@ public class HoodieFlinkWriteClient extends public void deletePendingInstant(String tableType, String instant) { HoodieFlinkTable table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType)); - table.getMetaClient().getActiveTimeline() - .deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, instant)); + HoodieActiveTimeline activeTimeline = table.getMetaClient().getActiveTimeline(); + activeTimeline.deletePending(HoodieInstant.State.INFLIGHT, commitType, instant); + activeTimeline.deletePending(HoodieInstant.State.REQUESTED, commitType, instant); + } + + public void transitionRequestedToInflight(String tableType, String inFlightInstant) { + HoodieFlinkTable table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); + HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); + String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType)); + HoodieInstant requested = new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, inFlightInstant); + activeTimeline.transitionRequestedToInflight(requested, Option.empty(), + config.shouldAllowMultiWriteOnSameInstant()); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java index 44eafd57f..bae8de239 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.index.FlinkHoodieIndex; @@ -62,47 +61,14 @@ public class FlinkInMemoryStateIndex extends Flin public List> tagLocation(List> records, HoodieEngineContext context, HoodieTable>, List, List> hoodieTable) throws HoodieIndexException { - return context.map(records, record -> { - try { - if (mapState.contains(record.getKey())) { - record.unseal(); - record.setCurrentLocation(mapState.get(record.getKey())); - record.seal(); - } - } catch (Exception e) { - LOG.error(String.format("Tag record location failed, key = %s, %s", record.getRecordKey(), e.getMessage())); - } - return record; - }, 0); + throw new UnsupportedOperationException("No need to tag location for FlinkInMemoryStateIndex"); } @Override public List updateLocation(List writeStatuses, HoodieEngineContext context, HoodieTable>, List, List> hoodieTable) throws HoodieIndexException { - return context.map(writeStatuses, writeStatus -> { - for (HoodieRecord record : writeStatus.getWrittenRecords()) { - if (!writeStatus.isErrored(record.getKey())) { - HoodieKey key = record.getKey(); - Option newLocation = record.getNewLocation(); - if (newLocation.isPresent()) { - try { - mapState.put(key, newLocation.get()); - } catch (Exception e) { - LOG.error(String.format("Update record location failed, key = %s, %s", record.getRecordKey(), e.getMessage())); - } - } else { - // Delete existing index for a deleted record - try { - mapState.remove(key); - } catch (Exception e) { - LOG.error(String.format("Remove record location failed, key = %s, %s", record.getRecordKey(), e.getMessage())); - } - } - } - } - return writeStatus; - }, 0); + throw new UnsupportedOperationException("No need to update location for FlinkInMemoryStateIndex"); } @Override @@ -128,6 +94,6 @@ public class FlinkInMemoryStateIndex extends Flin */ @Override public boolean isImplicitWithStorage() { - return false; + return true; } } \ No newline at end of file diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandleFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandleFactory.java new file mode 100644 index 000000000..d65663e63 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandleFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; + +/** + * Create handle factory for Flink writer, use the specified fileID directly + * because it is unique anyway. + */ +public class FlinkCreateHandleFactory + extends CreateHandleFactory { + + @Override + public HoodieWriteHandle create( + HoodieWriteConfig hoodieConfig, String commitTime, + HoodieTable hoodieTable, String partitionPath, + String fileIdPrefix, TaskContextSupplier taskContextSupplier) { + return new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, partitionPath, + fileIdPrefix, taskContextSupplier); + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java index 337e7cb26..044f841d2 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java @@ -20,51 +20,52 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.execution.FlinkLazyInsertIterable; -import org.apache.hudi.io.CreateHandleFactory; +import org.apache.hudi.io.FlinkCreateHandleFactory; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.HoodieSortedMergeHandle; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.WorkloadProfile; -import org.apache.hudi.table.WorkloadStat; import org.apache.hudi.table.action.HoodieWriteMetadata; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.time.Instant; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import scala.Tuple2; - +/** + * With {@code org.apache.hudi.operator.partitioner.BucketAssigner}, each hoodie record + * is tagged with a bucket ID (partition path + fileID) in streaming way. All the records consumed by this + * executor should be tagged with bucket IDs and belong to one data bucket. + * + *

These bucket IDs make it possible to shuffle the records first by the bucket ID + * (see org.apache.hudi.operator.partitioner.BucketAssignerFunction), and this executor + * only needs to handle the data buffer that belongs to one data bucket once at a time. So there is no need to + * partition the buffer. + * + *

Computing the records batch locations all at a time is a pressure to the engine, + * we should avoid that in streaming system. + */ public abstract class BaseFlinkCommitActionExecutor extends BaseCommitActionExecutor>, List, List, HoodieWriteMetadata> { @@ -91,47 +92,39 @@ public abstract class BaseFlinkCommitActionExecutor> execute(List> inputRecords) { HoodieWriteMetadata> result = new HoodieWriteMetadata<>(); - WorkloadProfile profile = null; - if (isWorkloadProfileNeeded()) { - profile = new WorkloadProfile(buildProfile(inputRecords)); - LOG.info("Workload profile :" + profile); - try { - saveWorkloadProfileMetadataToInflight(profile, instantTime); - } catch (Exception e) { - HoodieTableMetaClient metaClient = table.getMetaClient(); - HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime); - try { - if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) { - throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e); - } - } catch (IOException ex) { - LOG.error("Check file exists failed"); - throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex); - } - } - } - - final Partitioner partitioner = getPartitioner(profile); - Map>> partitionedRecords = partition(inputRecords, partitioner); - List writeStatuses = new LinkedList<>(); - partitionedRecords.forEach((partition, records) -> { - if (WriteOperationType.isChangingRecords(operationType)) { - handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll); - } else { - handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll); - } - }); - updateIndex(writeStatuses, result); + final HoodieRecord record = inputRecords.get(0); + final String partitionPath = record.getPartitionPath(); + final String fileId = record.getCurrentLocation().getFileId(); + final BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I") + ? BucketType.INSERT + : BucketType.UPDATE; + if (WriteOperationType.isChangingRecords(operationType)) { + handleUpsertPartition( + instantTime, + partitionPath, + fileId, bucketType, + inputRecords.iterator()) + .forEachRemaining(writeStatuses::addAll); + } else { + handleUpsertPartition( + instantTime, + partitionPath, + fileId, + bucketType, + inputRecords.iterator()) + .forEachRemaining(writeStatuses::addAll); + } + setUpWriteMetadata(writeStatuses, result); return result; } - protected void updateIndex(List writeStatuses, HoodieWriteMetadata> result) { - Instant indexStartTime = Instant.now(); - // Update the index back - List statuses = table.getIndex().updateLocation(writeStatuses, context, table); - result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now())); + protected void setUpWriteMetadata( + List statuses, + HoodieWriteMetadata> result) { + // No need to update the index because the update happens before the write. result.setWriteStatuses(statuses); + result.setIndexUpdateDuration(Duration.ZERO); } @Override @@ -139,56 +132,6 @@ public abstract class BaseFlinkCommitActionExecutor>> partition(List> dedupedRecords, Partitioner partitioner) { - Map>, HoodieRecord>>> partitionedMidRecords = dedupedRecords - .stream() - .map(record -> new Tuple2<>(new Tuple2<>(record.getKey(), Option.ofNullable(record.getCurrentLocation())), record)) - .collect(Collectors.groupingBy(x -> partitioner.getPartition(x._1))); - Map>> results = new LinkedHashMap<>(); - partitionedMidRecords.forEach((key, value) -> results.put(key, value.stream().map(x -> x._2).collect(Collectors.toList()))); - return results; - } - - protected Pair, WorkloadStat> buildProfile(List> inputRecords) { - HashMap partitionPathStatMap = new HashMap<>(); - WorkloadStat globalStat = new WorkloadStat(); - - Map>, Long> partitionLocationCounts = inputRecords - .stream() - .map(record -> Pair.of( - Pair.of(record.getPartitionPath(), Option.ofNullable(record.getCurrentLocation())), record)) - .collect(Collectors.groupingBy(Pair::getLeft, Collectors.counting())); - - for (Map.Entry>, Long> e : partitionLocationCounts.entrySet()) { - String partitionPath = e.getKey().getLeft(); - Long count = e.getValue(); - Option locOption = e.getKey().getRight(); - - if (!partitionPathStatMap.containsKey(partitionPath)) { - partitionPathStatMap.put(partitionPath, new WorkloadStat()); - } - - if (locOption.isPresent()) { - // update - partitionPathStatMap.get(partitionPath).addUpdates(locOption.get(), count); - globalStat.addUpdates(locOption.get(), count); - } else { - // insert - partitionPathStatMap.get(partitionPath).addInserts(count); - globalStat.addInserts(count); - } - } - return Pair.of(partitionPathStatMap, globalStat); - } - @Override protected void commit(Option> extraMetadata, HoodieWriteMetadata> result) { commit(extraMetadata, result, result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList())); @@ -228,31 +171,28 @@ public abstract class BaseFlinkCommitActionExecutor> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr, - Partitioner partitioner) { - UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner; - BucketInfo binfo = upsertPartitioner.getBucketInfo(partition); - BucketType btype = binfo.bucketType; + protected Iterator> handleUpsertPartition( + String instantTime, + String partitionPath, + String fileIdHint, + BucketType bucketType, + Iterator recordItr) { try { - if (btype.equals(BucketType.INSERT)) { - return handleInsert(binfo.fileIdPrefix, recordItr); - } else if (btype.equals(BucketType.UPDATE)) { - return handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, recordItr); - } else { - throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition); + switch (bucketType) { + case INSERT: + return handleInsert(fileIdHint, recordItr); + case UPDATE: + return handleUpdate(partitionPath, fileIdHint, recordItr); + default: + throw new HoodieUpsertException("Unknown bucketType " + bucketType + " for partition :" + partitionPath); } } catch (Throwable t) { - String msg = "Error upserting bucketType " + btype + " for partition :" + partition; + String msg = "Error upserting bucketType " + bucketType + " for partition :" + partitionPath; LOG.error(msg, t); throw new HoodieUpsertException(msg, t); } } - protected Iterator> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr, - Partitioner partitioner) { - return handleUpsertPartition(instantTime, partition, recordItr, partitioner); - } - @Override public Iterator> handleUpdate(String partitionPath, String fileId, Iterator> recordItr) @@ -293,13 +233,6 @@ public abstract class BaseFlinkCommitActionExecutor> keyToNewRecords, - HoodieBaseFile dataFileToBeMerged) { - return new HoodieMergeHandle<>(config, instantTime, table, keyToNewRecords, - partitionPath, fileId, dataFileToBeMerged, taskContextSupplier); - } - @Override public Iterator> handleInsert(String idPfx, Iterator> recordItr) throws Exception { @@ -309,24 +242,6 @@ public abstract class BaseFlinkCommitActionExecutor) Collections.EMPTY_LIST).iterator(); } return new FlinkLazyInsertIterable<>(recordItr, true, config, instantTime, table, idPfx, - taskContextSupplier, new CreateHandleFactory<>()); + taskContextSupplier, new FlinkCreateHandleFactory<>()); } - - /** - * Provides a partitioner to perform the upsert operation, based on the workload profile. - */ - public Partitioner getUpsertPartitioner(WorkloadProfile profile) { - if (profile == null) { - throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner."); - } - return new UpsertPartitioner(profile, context, table, config); - } - - /** - * Provides a partitioner to perform the insert operation, based on the workload profile. - */ - public Partitioner getInsertPartitioner(WorkloadProfile profile) { - return getUpsertPartitioner(profile); - } - } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java index 191071e01..52381230d 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java @@ -19,17 +19,32 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import java.time.Duration; +import java.time.Instant; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; +/** + * Overrides the {@link #write} method to not look up index and partition the records, because + * with {@code org.apache.hudi.operator.partitioner.BucketAssigner}, each hoodie record + * is tagged with a bucket ID (partition path + fileID) in streaming way. The FlinkWriteHelper only hands over + * the records to the action executor {@link BaseCommitActionExecutor} to execute. + * + *

Computing the records batch locations all at a time is a pressure to the engine, + * we should avoid that in streaming system. + */ public class FlinkWriteHelper extends AbstractWriteHelper>, List, List, R> { @@ -44,23 +59,46 @@ public class FlinkWriteHelper extends AbstractW return WriteHelperHolder.FLINK_WRITE_HELPER; } + @Override + public HoodieWriteMetadata> write(String instantTime, List> inputRecords, HoodieEngineContext context, + HoodieTable>, List, List> table, boolean shouldCombine, int shuffleParallelism, + BaseCommitActionExecutor>, List, List, R> executor, boolean performTagging) { + try { + Instant lookupBegin = Instant.now(); + Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now()); + + HoodieWriteMetadata> result = executor.execute(inputRecords); + result.setIndexLookupDuration(indexLookupDuration); + return result; + } catch (Throwable e) { + if (e instanceof HoodieUpsertException) { + throw (HoodieUpsertException) e; + } + throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e); + } + } + @Override public List> deduplicateRecords(List> records, HoodieIndex>, List, List> index, int parallelism) { - boolean isIndexingGlobal = index.isGlobal(); Map>>> keyedRecords = records.stream().map(record -> { - HoodieKey hoodieKey = record.getKey(); // If index used is global, then records are expected to differ in their partitionPath - Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey; + final Object key = record.getKey().getRecordKey(); return Pair.of(key, record); }).collect(Collectors.groupingBy(Pair::getLeft)); return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> { @SuppressWarnings("unchecked") T reducedData = (T) rec1.getData().preCombine(rec2.getData()); + // we cannot allow the user to change the key or partitionPath, since that will affect + // everything + // so pick it from one of the records. HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey(); - return new HoodieRecord(reducedKey, reducedData); + HoodieRecord hoodieRecord = new HoodieRecord<>(reducedKey, reducedData); + // reuse the location from the first record. + hoodieRecord.setCurrentLocation(rec1.getCurrentLocation()); + return hoodieRecord; }).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList()); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index 8cc9b0df8..f44e83da9 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -116,10 +116,7 @@ public class UpsertPartitioner> implements Part private int addUpdateBucket(String partitionPath, String fileIdHint) { int bucket = totalBuckets; updateLocationToBucket.put(fileIdHint, bucket); - BucketInfo bucketInfo = new BucketInfo(); - bucketInfo.bucketType = BucketType.UPDATE; - bucketInfo.fileIdPrefix = fileIdHint; - bucketInfo.partitionPath = partitionPath; + BucketInfo bucketInfo = new BucketInfo(BucketType.UPDATE, fileIdHint, partitionPath); bucketInfoMap.put(totalBuckets, bucketInfo); totalBuckets++; return bucket; @@ -186,10 +183,7 @@ public class UpsertPartitioner> implements Part } else { recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 1) * insertRecordsPerBucket); } - BucketInfo bucketInfo = new BucketInfo(); - bucketInfo.bucketType = BucketType.INSERT; - bucketInfo.partitionPath = partitionPath; - bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx(); + BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath); bucketInfoMap.put(totalBuckets, bucketInfo); totalBuckets++; } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index 4f192033d..eeeeacf92 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -114,10 +114,7 @@ public class UpsertPartitioner> implements Part private int addUpdateBucket(String partitionPath, String fileIdHint) { int bucket = totalBuckets; updateLocationToBucket.put(fileIdHint, bucket); - BucketInfo bucketInfo = new BucketInfo(); - bucketInfo.bucketType = BucketType.UPDATE; - bucketInfo.fileIdPrefix = fileIdHint; - bucketInfo.partitionPath = partitionPath; + BucketInfo bucketInfo = new BucketInfo(BucketType.UPDATE, fileIdHint, partitionPath); bucketInfoMap.put(totalBuckets, bucketInfo); totalBuckets++; return bucket; @@ -184,10 +181,7 @@ public class UpsertPartitioner> implements Part } else { recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 1) * insertRecordsPerBucket); } - BucketInfo bucketInfo = new BucketInfo(); - bucketInfo.bucketType = BucketType.INSERT; - bucketInfo.partitionPath = partitionPath; - bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx(); + BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath); bucketInfoMap.put(totalBuckets, bucketInfo); totalBuckets++; } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index ee153c846..9d60cde69 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -120,10 +120,7 @@ public class UpsertPartitioner> extends Partiti private int addUpdateBucket(String partitionPath, String fileIdHint) { int bucket = totalBuckets; updateLocationToBucket.put(fileIdHint, bucket); - BucketInfo bucketInfo = new BucketInfo(); - bucketInfo.bucketType = BucketType.UPDATE; - bucketInfo.fileIdPrefix = fileIdHint; - bucketInfo.partitionPath = partitionPath; + BucketInfo bucketInfo = new BucketInfo(BucketType.UPDATE, fileIdHint, partitionPath); bucketInfoMap.put(totalBuckets, bucketInfo); totalBuckets++; return bucket; @@ -223,10 +220,7 @@ public class UpsertPartitioner> extends Partiti } else { recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 1) * insertRecordsPerBucket); } - BucketInfo bucketInfo = new BucketInfo(); - bucketInfo.bucketType = BucketType.INSERT; - bucketInfo.partitionPath = partitionPath; - bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx(); + BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath); bucketInfoMap.put(totalBuckets, bucketInfo); totalBuckets++; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index fcb4fd917..865f0dc1e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -167,6 +167,12 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { deleteInstantFile(instant); } + public void deletePending(HoodieInstant.State state, String action, String instantStr) { + HoodieInstant instant = new HoodieInstant(state, action, instantStr); + ValidationUtils.checkArgument(!instant.isCompleted()); + deleteInstantFile(instant); + } + public void deleteCompactionRequested(HoodieInstant instant) { ValidationUtils.checkArgument(instant.isRequested()); ValidationUtils.checkArgument(Objects.equals(instant.getAction(), HoodieTimeline.COMPACTION_ACTION)); diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java index 5c9930d60..dea8a056b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java @@ -40,7 +40,6 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileStatus; @@ -115,7 +114,7 @@ public class InstantGenerateOperator extends AbstractStreamOperator bufferedRecords = new LinkedList<>(); + private Map> bufferedRecords; /** * Flink collector help s to send data downstream. @@ -88,6 +92,8 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction(); + indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); @@ -112,17 +118,24 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction writeStatus; - switch (cfg.operation) { - case INSERT: - writeStatus = writeClient.insert(bufferedRecords, instantTimestamp); - break; - case UPSERT: - writeStatus = writeClient.upsert(bufferedRecords, instantTimestamp); - break; - default: - throw new HoodieFlinkStreamerException("Unknown operation : " + cfg.operation); - } + final List writeStatus = new ArrayList<>(); + this.bufferedRecords.values().forEach(records -> { + if (records.size() > 0) { + if (cfg.filterDupes) { + records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); + } + switch (cfg.operation) { + case INSERT: + writeStatus.addAll(writeClient.insert(records, instantTimestamp)); + break; + case UPSERT: + writeStatus.addAll(writeClient.upsert(records, instantTimestamp)); + break; + default: + throw new HoodieFlinkStreamerException("Unknown operation : " + cfg.operation); + } + } + }); output.collect(new Tuple3<>(instantTimestamp, writeStatus, indexOfThisSubtask)); bufferedRecords.clear(); } @@ -144,7 +157,7 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction record) { + final String fileId = record.getCurrentLocation().getFileId(); + final String key = StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId); + if (!this.bufferedRecords.containsKey(key)) { + this.bufferedRecords.put(key, new ArrayList<>()); + } + this.bufferedRecords.get(key).add(record); + } + @Override public void close() { if (writeClient != null) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java index 34a61d409..587709824 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java @@ -18,22 +18,18 @@ package org.apache.hudi.operator; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.keygen.KeyGenerator; +import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.operator.event.BatchWriteSuccessEvent; -import org.apache.hudi.util.RowDataToAvroConverters; +import org.apache.hudi.table.action.commit.FlinkWriteHelper; import org.apache.hudi.util.StreamerUtil; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; @@ -41,7 +37,6 @@ import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; -import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -50,7 +45,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; @@ -96,7 +93,7 @@ public class StreamWriteFunction extends KeyedProcessFunction /** * Write buffer for a checkpoint. */ - private transient List buffer; + private transient Map> buffer; /** * The buffer lock to control data buffering/flushing. @@ -130,23 +127,6 @@ public class StreamWriteFunction extends KeyedProcessFunction private transient BiFunction, String, List> writeFunction; - /** - * HoodieKey generator. - */ - private transient KeyGenerator keyGenerator; - - /** - * Row type of the input. - */ - private final RowType rowType; - - /** - * Avro schema of the input. - */ - private final Schema avroSchema; - - private transient RowDataToAvroConverters.RowDataToAvroConverter converter; - /** * The REQUESTED instant we write the data. */ @@ -160,20 +140,15 @@ public class StreamWriteFunction extends KeyedProcessFunction /** * Constructs a StreamingSinkFunction. * - * @param rowType The input row type * @param config The config options */ - public StreamWriteFunction(RowType rowType, Configuration config) { - this.rowType = rowType; - this.avroSchema = StreamerUtil.getSourceSchema(config); + public StreamWriteFunction(Configuration config) { this.config = config; } @Override public void open(Configuration parameters) throws IOException { this.taskID = getRuntimeContext().getIndexOfThisSubtask(); - this.keyGenerator = StreamerUtil.createKeyGenerator(FlinkOptions.flatOptions(this.config)); - this.converter = RowDataToAvroConverters.createConverter(this.rowType); initBuffer(); initWriteClient(); initWriteFunction(); @@ -211,7 +186,7 @@ public class StreamWriteFunction extends KeyedProcessFunction if (onCheckpointing) { addToBufferCondition.await(); } - this.buffer.add(toHoodieRecord(value)); + putDataIntoBuffer(value); } finally { bufferLock.unlock(); } @@ -230,7 +205,7 @@ public class StreamWriteFunction extends KeyedProcessFunction @VisibleForTesting @SuppressWarnings("rawtypes") - public List getBuffer() { + public Map> getBuffer() { return buffer; } @@ -249,7 +224,7 @@ public class StreamWriteFunction extends KeyedProcessFunction // ------------------------------------------------------------------------- private void initBuffer() { - this.buffer = new ArrayList<>(); + this.buffer = new LinkedHashMap<>(); this.bufferLock = new ReentrantLock(); this.addToBufferCondition = this.bufferLock.newCondition(); } @@ -277,32 +252,33 @@ public class StreamWriteFunction extends KeyedProcessFunction } } - /** - * Converts the give record to a {@link HoodieRecord}. - * - * @param record The input record - * @return HoodieRecord based on the configuration - * @throws IOException if error occurs - */ - @SuppressWarnings("rawtypes") - private HoodieRecord toHoodieRecord(I record) throws IOException { - boolean shouldCombine = this.config.getBoolean(FlinkOptions.INSERT_DROP_DUPS) - || WriteOperationType.fromValue(this.config.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT; - GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, record); - final String payloadClazz = this.config.getString(FlinkOptions.PAYLOAD_CLASS); - Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, - this.config.getString(FlinkOptions.PRECOMBINE_FIELD), false); - HoodieRecordPayload payload = shouldCombine - ? StreamerUtil.createPayload(payloadClazz, gr, orderingVal) - : StreamerUtil.createPayload(payloadClazz, gr); - return new HoodieRecord<>(keyGenerator.getKey(gr), payload); + private void putDataIntoBuffer(I value) { + HoodieRecord record = (HoodieRecord) value; + final String fileId = record.getCurrentLocation().getFileId(); + final String key = StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId); + if (!this.buffer.containsKey(key)) { + this.buffer.put(key, new ArrayList<>()); + } + this.buffer.get(key).add(record); } + @SuppressWarnings("unchecked, rawtypes") private void flushBuffer() { final List writeStatus; if (buffer.size() > 0) { - writeStatus = writeFunction.apply(buffer, currentInstant); - buffer.clear(); + writeStatus = new ArrayList<>(); + this.buffer.values() + // The records are partitioned by the bucket ID and each batch sent to + // the writer belongs to one bucket. + .forEach(records -> { + if (records.size() > 0) { + if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) { + records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); + } + writeStatus.addAll(writeFunction.apply(records, currentInstant)); + } + }); + this.buffer.clear(); } else { LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant); writeStatus = Collections.emptyList(); diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperator.java index 3f4d940ee..247269c22 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperator.java @@ -24,7 +24,6 @@ import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.table.types.logical.RowType; /** * Operator for {@link StreamSink}. @@ -36,8 +35,8 @@ public class StreamWriteOperator implements OperatorEventHandler { private final StreamWriteFunction sinkFunction; - public StreamWriteOperator(RowType rowType, Configuration conf) { - super(new StreamWriteFunction<>(rowType, conf)); + public StreamWriteOperator(Configuration conf) { + super(new StreamWriteFunction<>(conf)); this.sinkFunction = (StreamWriteFunction) getUserFunction(); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java index 524c6015e..bf0cfc27e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java @@ -22,9 +22,6 @@ import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieFlinkEngineContext; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.operator.event.BatchWriteSuccessEvent; @@ -38,8 +35,6 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.util.Preconditions; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,6 +54,8 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists; + /** * {@link OperatorCoordinator} for {@link StreamWriteFunction}. * @@ -121,7 +118,7 @@ public class StreamWriteOperatorCoordinator // writeClient initWriteClient(); // init table, create it if not exists. - initTable(); + initTableIfNotExists(this.conf); } @Override @@ -139,6 +136,7 @@ public class StreamWriteOperatorCoordinator + " data has not finish writing, roll back the last write and throw"; checkAndForceCommit(errMsg); this.inFlightInstant = this.writeClient.startCommit(); + this.writeClient.transitionRequestedToInflight(conf.getString(FlinkOptions.TABLE_TYPE), this.inFlightInstant); this.inFlightCheckpoint = checkpointId; LOG.info("Create instant [{}], at checkpoint [{}]", this.inFlightInstant, checkpointId); result.complete(writeCheckpointBytes()); @@ -200,28 +198,6 @@ public class StreamWriteOperatorCoordinator true); } - private void initTable() throws IOException { - final String basePath = this.conf.getString(FlinkOptions.PATH); - final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf(); - // Hadoop FileSystem - try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) { - if (!fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))) { - HoodieTableMetaClient.initTableType( - hadoopConf, - basePath, - HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)), - this.conf.getString(FlinkOptions.TABLE_NAME), - "archived", - this.conf.getString(FlinkOptions.PAYLOAD_CLASS), - 1); - LOG.info("Table initialized"); - } else { - LOG.info("Table [{}/{}] already exists, no need to initialize the table", - basePath, this.conf.getString(FlinkOptions.TABLE_NAME)); - } - } - } - static byte[] readBytes(DataInputStream in, int size) throws IOException { byte[] bytes = new byte[size]; in.readFully(bytes); diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java index f5faa54ea..56267451f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java @@ -28,7 +28,6 @@ import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; -import org.apache.flink.table.types.logical.RowType; /** * Factory class for {@link StreamWriteOperator}. @@ -43,10 +42,9 @@ public class StreamWriteOperatorFactory private final int numTasks; public StreamWriteOperatorFactory( - RowType rowType, Configuration conf, int numTasks) { - super(new StreamWriteOperator<>(rowType, conf)); + super(new StreamWriteOperator<>(conf)); this.operator = (StreamWriteOperator) getOperator(); this.conf = conf; this.numTasks = numTasks; diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java new file mode 100644 index 000000000..269ccc801 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator.partitioner; + +import org.apache.hudi.client.FlinkTaskContextSupplier; +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.operator.FlinkOptions; +import org.apache.hudi.table.action.commit.BucketInfo; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; + +/** + * The function to build the write profile incrementally for records within a checkpoint, + * it then assigns the bucket with ID using the {@link BucketAssigner}. + * + *

All the records are tagged with HoodieRecordLocation, instead of real instant time, + * INSERT record uses "I" and UPSERT record uses "U" as instant time. There is no need to keep + * the "real" instant time for each record, the bucket ID (partition path & fileID) actually decides + * where the record should write to. The "I" and "U" tag is only used for downstream to decide whether + * the data bucket is a INSERT or a UPSERT, we should factor the it out when the underneath writer + * supports specifying the bucket type explicitly. + * + *

The output records should then shuffle by the bucket ID and thus do scalable write. + * + * @see BucketAssigner + */ +public class BucketAssignFunction> + extends KeyedProcessFunction + implements CheckpointedFunction, CheckpointListener { + + private MapState indexState; + + private BucketAssigner bucketAssigner; + + private final Configuration conf; + + private final boolean isChangingRecords; + + public BucketAssignFunction(Configuration conf) { + this.conf = conf; + this.isChangingRecords = WriteOperationType.isChangingRecords( + WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION))); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf); + HoodieFlinkEngineContext context = + new HoodieFlinkEngineContext( + new SerializableConfiguration(StreamerUtil.getHadoopConf()), + new FlinkTaskContextSupplier(getRuntimeContext())); + this.bucketAssigner = new BucketAssigner( + context, + writeConfig); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) { + this.bucketAssigner.reset(); + } + + @Override + public void initializeState(FunctionInitializationContext context) { + MapStateDescriptor indexStateDesc = + new MapStateDescriptor<>( + "indexState", + TypeInformation.of(HoodieKey.class), + TypeInformation.of(HoodieRecordLocation.class)); + indexState = context.getKeyedStateStore().getMapState(indexStateDesc); + } + + @SuppressWarnings("unchecked") + @Override + public void processElement(I value, Context ctx, Collector out) throws Exception { + // 1. put the record into the BucketAssigner; + // 2. look up the state for location, if the record has a location, just send it out; + // 3. if it is an INSERT, decide the location using the BucketAssigner then send it out. + HoodieRecord record = (HoodieRecord) value; + final HoodieKey hoodieKey = record.getKey(); + final BucketInfo bucketInfo; + final HoodieRecordLocation location; + // Only changing records need looking up the index for the location, + // append only records are always recognized as INSERT. + if (isChangingRecords && this.indexState.contains(hoodieKey)) { + // Set up the instant time as "U" to mark the bucket as an update bucket. + location = new HoodieRecordLocation("U", this.indexState.get(hoodieKey).getFileId()); + this.bucketAssigner.addUpdate(record.getPartitionPath(), location.getFileId()); + } else { + bucketInfo = this.bucketAssigner.addInsert(hoodieKey.getPartitionPath()); + switch (bucketInfo.getBucketType()) { + case INSERT: + // This is an insert bucket, use HoodieRecordLocation instant time as "I". + // Downstream operators can then check the instant time to know whether + // a record belongs to an insert bucket. + location = new HoodieRecordLocation("I", bucketInfo.getFileIdPrefix()); + break; + case UPDATE: + location = new HoodieRecordLocation("U", bucketInfo.getFileIdPrefix()); + break; + default: + throw new AssertionError(); + } + this.indexState.put(hoodieKey, location); + } + record.unseal(); + record.setCurrentLocation(location); + record.seal(); + out.collect((O) record); + } + + @Override + public void notifyCheckpointComplete(long l) { + // Refresh the table state when there are new commits. + this.bucketAssigner.refreshTable(); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssigner.java b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssigner.java new file mode 100644 index 000000000..f87a802cf --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssigner.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator.partitioner; + +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.commit.BucketInfo; +import org.apache.hudi.table.action.commit.BucketType; +import org.apache.hudi.table.action.commit.SmallFile; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.util.Preconditions; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Bucket assigner that assigns the data buffer of one checkpoint into buckets. + * + *

This assigner assigns the record one by one. + * If the record is an update, checks and reuse existing UPDATE bucket or generates a new one; + * If the record is an insert, checks the record partition for small files first, try to find a small file + * that has space to append new records and reuse the small file's data bucket, if + * there is no small file(or no left space for new records), generates an INSERT bucket. + * + *

Use {partition}_{fileId} as the bucket identifier, so that the bucket is unique + * within and among partitions. + */ +public class BucketAssigner { + private static final Logger LOG = LogManager.getLogger(BucketAssigner.class); + + /** + * Remembers what type each bucket is for later. + */ + private final HashMap bucketInfoMap; + + private HoodieTable table; + + /** + * Fink engine context. + */ + private final HoodieFlinkEngineContext context; + + /** + * The write config. + */ + private final HoodieWriteConfig config; + + /** + * The average record size. + */ + private final long averageRecordSize; + + /** + * Total records to write for each bucket based on + * the config option {@link org.apache.hudi.config.HoodieStorageConfig#PARQUET_FILE_MAX_BYTES}. + */ + private final long insertRecordsPerBucket; + + /** + * Partition path to small files mapping. + */ + private final Map> partitionSmallFilesMap; + + /** + * Bucket ID(partition + fileId) -> small file assign state. + */ + private final Map smallFileAssignStates; + + /** + * Bucket ID(partition + fileId) -> new file assign state. + */ + private final Map newFileAssignStates; + + public BucketAssigner( + HoodieFlinkEngineContext context, + HoodieWriteConfig config) { + bucketInfoMap = new HashMap<>(); + partitionSmallFilesMap = new HashMap<>(); + smallFileAssignStates = new HashMap<>(); + newFileAssignStates = new HashMap<>(); + this.context = context; + this.config = config; + this.table = HoodieFlinkTable.create(this.config, this.context); + averageRecordSize = averageBytesPerRecord( + table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(), + config); + LOG.info("AvgRecordSize => " + averageRecordSize); + insertRecordsPerBucket = config.shouldAutoTuneInsertSplits() + ? config.getParquetMaxFileSize() / averageRecordSize + : config.getCopyOnWriteInsertSplitSize(); + LOG.info("InsertRecordsPerBucket => " + insertRecordsPerBucket); + } + + /** + * Reset the states of this assigner, should do once for each checkpoint, + * all the states are accumulated within one checkpoint interval. + */ + public void reset() { + bucketInfoMap.clear(); + partitionSmallFilesMap.clear(); + smallFileAssignStates.clear(); + newFileAssignStates.clear(); + } + + public BucketInfo addUpdate(String partitionPath, String fileIdHint) { + final String key = StreamerUtil.generateBucketKey(partitionPath, fileIdHint); + if (!bucketInfoMap.containsKey(key)) { + BucketInfo bucketInfo = new BucketInfo(BucketType.UPDATE, fileIdHint, partitionPath); + bucketInfoMap.put(key, bucketInfo); + } + // else do nothing because the bucket already exists. + return bucketInfoMap.get(key); + } + + public BucketInfo addInsert(String partitionPath) { + // for new inserts, compute buckets depending on how many records we have for each partition + List smallFiles = getSmallFilesForPartition(partitionPath); + + // first try packing this into one of the smallFiles + for (SmallFile smallFile : smallFiles) { + final String key = StreamerUtil.generateBucketKey(partitionPath, smallFile.location.getFileId()); + SmallFileAssignState assignState = smallFileAssignStates.get(key); + assert assignState != null; + if (assignState.canAssign()) { + assignState.assign(); + // create a new bucket or re-use an existing bucket + BucketInfo bucketInfo; + if (bucketInfoMap.containsKey(key)) { + // Assigns an inserts to existing update bucket + bucketInfo = bucketInfoMap.get(key); + } else { + bucketInfo = addUpdate(partitionPath, smallFile.location.getFileId()); + } + return bucketInfo; + } + } + + // if we have anything more, create new insert buckets, like normal + if (newFileAssignStates.containsKey(partitionPath)) { + NewFileAssignState newFileAssignState = newFileAssignStates.get(partitionPath); + if (newFileAssignState.canAssign()) { + newFileAssignState.assign(); + } + final String key = StreamerUtil.generateBucketKey(partitionPath, newFileAssignState.fileId); + return bucketInfoMap.get(key); + } + BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath); + final String key = StreamerUtil.generateBucketKey(partitionPath, bucketInfo.getFileIdPrefix()); + bucketInfoMap.put(key, bucketInfo); + newFileAssignStates.put(partitionPath, new NewFileAssignState(bucketInfo.getFileIdPrefix(), insertRecordsPerBucket)); + return bucketInfo; + } + + private List getSmallFilesForPartition(String partitionPath) { + if (partitionSmallFilesMap.containsKey(partitionPath)) { + return partitionSmallFilesMap.get(partitionPath); + } + List smallFiles = getSmallFiles(partitionPath); + if (smallFiles.size() > 0) { + LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles); + partitionSmallFilesMap.put(partitionPath, smallFiles); + smallFiles.forEach(smallFile -> + smallFileAssignStates.put( + StreamerUtil.generateBucketKey(partitionPath, smallFile.location.getFileId()), + new SmallFileAssignState(config.getParquetMaxFileSize(), smallFile, averageRecordSize))); + return smallFiles; + } + return Collections.emptyList(); + } + + /** + * Refresh the table state like TableFileSystemView and HoodieTimeline. + */ + public void refreshTable() { + this.table = HoodieFlinkTable.create(this.config, this.context); + } + + /** + * Returns a list of small files in the given partition path. + */ + protected List getSmallFiles(String partitionPath) { + + // smallFiles only for partitionPath + List smallFileLocations = new ArrayList<>(); + + HoodieTimeline commitTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants(); + + if (!commitTimeline.empty()) { // if we have some commits + HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); + List allFiles = table.getBaseFileOnlyView() + .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList()); + + for (HoodieBaseFile file : allFiles) { + if (file.getFileSize() < config.getParquetSmallFileLimit()) { + String filename = file.getFileName(); + SmallFile sf = new SmallFile(); + sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); + sf.sizeBytes = file.getFileSize(); + smallFileLocations.add(sf); + } + } + } + + return smallFileLocations; + } + + /** + * Obtains the average record size based on records written during previous commits. Used for estimating how many + * records pack into one file. + */ + protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, HoodieWriteConfig hoodieWriteConfig) { + long avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate(); + long fileSizeThreshold = (long) (hoodieWriteConfig.getRecordSizeEstimationThreshold() * hoodieWriteConfig.getParquetSmallFileLimit()); + try { + if (!commitTimeline.empty()) { + // Go over the reverse ordered commits to get a more recent estimate of average record size. + Iterator instants = commitTimeline.getReverseOrderedInstants().iterator(); + while (instants.hasNext()) { + HoodieInstant instant = instants.next(); + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata + .fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); + long totalBytesWritten = commitMetadata.fetchTotalBytesWritten(); + long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten(); + if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) { + avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten); + break; + } + } + } + } catch (Throwable t) { + // make this fail safe. + LOG.error("Error trying to compute average bytes/record ", t); + } + return avgSize; + } + + /** + * Candidate bucket state for small file. It records the total number of records + * that the bucket can append and the current number of assigned records. + */ + private static class SmallFileAssignState { + long assigned; + long totalUnassigned; + + SmallFileAssignState(long parquetMaxFileSize, SmallFile smallFile, long averageRecordSize) { + this.assigned = 0; + this.totalUnassigned = (parquetMaxFileSize - smallFile.sizeBytes) / averageRecordSize; + } + + public boolean canAssign() { + return this.totalUnassigned > 0 && this.totalUnassigned > this.assigned; + } + + /** + * Remembers to invoke {@link #canAssign()} first. + */ + public void assign() { + Preconditions.checkState(canAssign(), + "Can not assign insert to small file: assigned => " + + this.assigned + " totalUnassigned => " + this.totalUnassigned); + this.assigned++; + } + } + + /** + * Candidate bucket state for a new file. It records the total number of records + * that the bucket can append and the current number of assigned records. + */ + private static class NewFileAssignState { + long assigned; + long totalUnassigned; + final String fileId; + + NewFileAssignState(String fileId, long insertRecordsPerBucket) { + this.fileId = fileId; + this.assigned = 0; + this.totalUnassigned = insertRecordsPerBucket; + } + + public boolean canAssign() { + return this.totalUnassigned > 0 && this.totalUnassigned > this.assigned; + } + + /** + * Remembers to invoke {@link #canAssign()} first. + */ + public void assign() { + Preconditions.checkState(canAssign(), + "Can not assign insert to new file: assigned => " + + this.assigned + " totalUnassigned => " + this.totalUnassigned); + this.assigned++; + } + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/transform/RowDataToHoodieFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/transform/RowDataToHoodieFunction.java new file mode 100644 index 000000000..2d47c7961 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/transform/RowDataToHoodieFunction.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator.transform; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.keygen.KeyGenerator; +import org.apache.hudi.operator.FlinkOptions; +import org.apache.hudi.util.RowDataToAvroConverters; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; + +import java.io.IOException; + +/** + * Function that transforms RowData to HoodieRecord. + */ +public class RowDataToHoodieFunction> + extends RichMapFunction { + /** + * Row type of the input. + */ + private final RowType rowType; + + /** + * Avro schema of the input. + */ + private transient Schema avroSchema; + + /** + * RowData to Avro record converter. + */ + private transient RowDataToAvroConverters.RowDataToAvroConverter converter; + + /** + * HoodieKey generator. + */ + private transient KeyGenerator keyGenerator; + + /** + * Config options. + */ + private final Configuration config; + + public RowDataToHoodieFunction(RowType rowType, Configuration config) { + this.rowType = rowType; + this.config = config; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + this.avroSchema = StreamerUtil.getSourceSchema(this.config); + this.converter = RowDataToAvroConverters.createConverter(this.rowType); + this.keyGenerator = StreamerUtil.createKeyGenerator(FlinkOptions.flatOptions(this.config)); + } + + @SuppressWarnings("unchecked") + @Override + public O map(I i) throws Exception { + return (O) toHoodieRecord(i); + } + + /** + * Converts the give record to a {@link HoodieRecord}. + * + * @param record The input record + * @return HoodieRecord based on the configuration + * @throws IOException if error occurs + */ + @SuppressWarnings("rawtypes") + private HoodieRecord toHoodieRecord(I record) throws IOException { + boolean shouldCombine = this.config.getBoolean(FlinkOptions.INSERT_DROP_DUPS) + || WriteOperationType.fromValue(this.config.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT; + GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, record); + final String payloadClazz = this.config.getString(FlinkOptions.PAYLOAD_CLASS); + Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, + this.config.getString(FlinkOptions.PRECOMBINE_FIELD), false); + HoodieRecordPayload payload = shouldCombine + ? StreamerUtil.createPayload(payloadClazz, gr, orderingVal) + : StreamerUtil.createPayload(payloadClazz, gr); + return new HoodieRecord<>(keyGenerator.getKey(gr), payload); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 7df63fa4a..418e2ea25 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -71,8 +71,7 @@ public class FlinkStreamerConfig extends Configuration { + "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are " + "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer" + "to individual classes, for supported properties.") - public String propsFilePath = - "file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties"; + public String propsFilePath = ""; @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file " + "(using the CLI parameter \"--props\") can also be passed command line using this parameter.") diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index f6d75d3ea..d110bffef 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -22,9 +22,11 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.operator.FlinkOptions; import org.apache.hudi.operator.InstantGenerateOperator; import org.apache.hudi.operator.KeyedWriteProcessFunction; import org.apache.hudi.operator.KeyedWriteProcessOperator; +import org.apache.hudi.operator.partitioner.BucketAssignFunction; import org.apache.hudi.sink.CommitSink; import org.apache.hudi.source.JsonStringToHoodieRecordMapFunction; import org.apache.hudi.util.StreamerUtil; @@ -34,9 +36,11 @@ import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.List; @@ -66,12 +70,16 @@ public class HoodieFlinkStreamer { env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath)); } + Configuration conf = FlinkOptions.fromStreamerConfig(cfg); + int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM); + TypedProperties props = StreamerUtil.appendKafkaProps(cfg); // add data source config props.put(HoodieWriteConfig.WRITE_PAYLOAD_CLASS, cfg.payloadClassName); props.put(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, cfg.sourceOrderingField); + StreamerUtil.initTableIfNotExists(conf); // Read from kafka source DataStream inputRecords = env.addSource(new FlinkKafkaConsumer<>(cfg.kafkaTopic, new SimpleStringSchema(), props)) @@ -86,13 +94,20 @@ public class HoodieFlinkStreamer { // Keyby partition path, to avoid multiple subtasks writing to a partition at the same time .keyBy(HoodieRecord::getPartitionPath) - + // use the bucket assigner to generate bucket IDs + .transform( + "bucket_assigner", + TypeInformation.of(HoodieRecord.class), + new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) + .uid("uid_bucket_assigner") + // shuffle by fileId(bucket id) + .keyBy(record -> record.getCurrentLocation().getFileId()) // write operator, where the write operation really happens .transform(KeyedWriteProcessOperator.NAME, TypeInformation.of(new TypeHint, Integer>>() { }), new KeyedWriteProcessOperator(new KeyedWriteProcessFunction())) .name("write_process") .uid("write_process_uid") - .setParallelism(env.getParallelism()) + .setParallelism(numWriteTask) // Commit can only be executed once, so make it one parallelism .addSink(new CommitSink()) diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java index a8f92459a..24b899496 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java @@ -18,22 +18,25 @@ package org.apache.hudi.streamer; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.operator.FlinkOptions; import org.apache.hudi.operator.StreamWriteOperatorFactory; +import org.apache.hudi.operator.partitioner.BucketAssignFunction; +import org.apache.hudi.operator.transform.RowDataToHoodieFunction; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.StreamerUtil; import com.beust.jcommander.JCommander; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo; -import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import java.util.Properties; @@ -70,13 +73,8 @@ public class HoodieFlinkStreamerV2 { .getLogicalType(); Configuration conf = FlinkOptions.fromStreamerConfig(cfg); int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM); - StreamWriteOperatorFactory operatorFactory = - new StreamWriteOperatorFactory<>(rowType, conf, numWriteTask); - - int partitionFieldIndex = rowType.getFieldIndex(conf.getString(FlinkOptions.PARTITION_PATH_FIELD)); - LogicalType partitionFieldType = rowType.getTypeAt(partitionFieldIndex); - final RowData.FieldGetter partitionFieldGetter = - RowData.createFieldGetter(partitionFieldType, partitionFieldIndex); + StreamWriteOperatorFactory operatorFactory = + new StreamWriteOperatorFactory<>(conf, numWriteTask); DataStream dataStream = env.addSource(new FlinkKafkaConsumer<>( cfg.kafkaTopic, @@ -89,11 +87,19 @@ public class HoodieFlinkStreamerV2 { ), kafkaProps)) .name("kafka_source") .uid("uid_kafka_source") + .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)) // Key-by partition path, to avoid multiple subtasks write to a partition at the same time - .keyBy(partitionFieldGetter::getFieldOrNull) + .keyBy(HoodieRecord::getPartitionPath) + .transform( + "bucket_assigner", + TypeInformation.of(HoodieRecord.class), + new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) + .uid("uid_bucket_assigner") + // shuffle by fileId(bucket id) + .keyBy(record -> record.getCurrentLocation().getFileId()) .transform("hoodie_stream_write", null, operatorFactory) .uid("uid_hoodie_stream_write") - .setParallelism(numWriteTask); // should make it configurable + .setParallelism(numWriteTask); env.addOperator(dataStream.getTransformation()); diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 2fa87575d..4447705a9 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -18,6 +18,8 @@ package org.apache.hudi.util; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.keygen.SimpleAvroKeyGenerator; import org.apache.hudi.streamer.FlinkStreamerConfig; import org.apache.hudi.common.config.DFSPropertiesConfiguration; @@ -57,6 +59,7 @@ import java.util.Properties; * Utilities for Flink stream read and write. */ public class StreamerUtil { + private static final String DEFAULT_ARCHIVE_LOG_FOLDER = "archived"; private static final Logger LOG = LoggerFactory.getLogger(StreamerUtil.class); @@ -68,6 +71,9 @@ public class StreamerUtil { } public static TypedProperties getProps(FlinkStreamerConfig cfg) { + if (cfg.propsFilePath.isEmpty()) { + return new TypedProperties(); + } return readConfig( FSUtils.getFs(cfg.propsFilePath, getHadoopConf()), new Path(cfg.propsFilePath), cfg.configs).getConfig(); @@ -208,6 +214,10 @@ public class StreamerUtil { } } + public static HoodieWriteConfig getHoodieClientConfig(FlinkStreamerConfig conf) { + return getHoodieClientConfig(FlinkOptions.fromStreamerConfig(conf)); + } + public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) { HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder() @@ -250,4 +260,37 @@ public class StreamerUtil { checkPropNames.forEach(prop -> Preconditions.checkState(props.containsKey(prop), "Required property " + prop + " is missing")); } + + /** + * Initialize the table if it does not exist. + * + * @param conf the configuration + * @throws IOException if errors happens when writing metadata + */ + public static void initTableIfNotExists(Configuration conf) throws IOException { + final String basePath = conf.getString(FlinkOptions.PATH); + final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf(); + // Hadoop FileSystem + try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) { + if (!fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))) { + HoodieTableMetaClient.initTableType( + hadoopConf, + basePath, + HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)), + conf.getString(FlinkOptions.TABLE_NAME), + DEFAULT_ARCHIVE_LOG_FOLDER, + conf.getString(FlinkOptions.PAYLOAD_CLASS), + 1); + LOG.info("Table initialized under base path {}", basePath); + } else { + LOG.info("Table [{}/{}] already exists, no need to initialize the table", + basePath, conf.getString(FlinkOptions.TABLE_NAME)); + } + } + } + + /** Generates the bucket ID using format {partition path}_{fileID}. */ + public static String generateBucketKey(String partitionPath, String fileId) { + return String.format("%s_%s", partitionPath, fileId); + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java index c2d7a65f2..fea9b8f92 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java @@ -27,6 +27,7 @@ import org.apache.hudi.operator.utils.StreamWriteFunctionWrapper; import org.apache.hudi.operator.utils.TestConfigurations; import org.apache.hudi.operator.utils.TestData; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.table.data.RowData; import org.hamcrest.MatcherAssert; @@ -56,24 +57,26 @@ import static org.junit.jupiter.api.Assertions.assertTrue; */ public class StreamWriteFunctionTest { - private static final Map EXPECTED = new HashMap<>(); - - static { - EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1,par1, id2,par1,id2,Stephen,33,2,par1]"); - EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3,par2, id4,par2,id4,Fabian,31,4,par2]"); - EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3]"); - EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]"); - } + private static final Map EXPECTED1 = new HashMap<>(); private static final Map EXPECTED2 = new HashMap<>(); + private static final Map EXPECTED3 = new HashMap<>(); + static { + EXPECTED1.put("par1", "[id1,par1,id1,Danny,23,1,par1, id2,par1,id2,Stephen,33,2,par1]"); + EXPECTED1.put("par2", "[id3,par2,id3,Julian,53,3,par2, id4,par2,id4,Fabian,31,4,par2]"); + EXPECTED1.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3]"); + EXPECTED1.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]"); + EXPECTED2.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]"); EXPECTED2.put("par2", "[id3,par2,id3,Julian,54,3,par2, id4,par2,id4,Fabian,32,4,par2]"); EXPECTED2.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3, " + "id9,par3,id9,Jane,19,6,par3]"); EXPECTED2.put("par4", "[id10,par4,id10,Ella,38,7,par4, id11,par4,id11,Phoebe,52,8,par4, " + "id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]"); + + EXPECTED3.put("par1", "[id1,par1,id1,Danny,23,1,par1]"); } private StreamWriteFunctionWrapper funcWrapper; @@ -83,9 +86,7 @@ public class StreamWriteFunctionTest { @BeforeEach public void before() throws Exception { - this.funcWrapper = new StreamWriteFunctionWrapper<>( - tempFile.getAbsolutePath(), - TestConfigurations.SERIALIZER); + this.funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath()); } @AfterEach @@ -211,7 +212,7 @@ public class StreamWriteFunctionTest { final OperatorEvent nextEvent = funcWrapper.getNextEvent(); assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); - checkWrittenData(tempFile, EXPECTED); + checkWrittenData(tempFile, EXPECTED1); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); @@ -220,7 +221,43 @@ public class StreamWriteFunctionTest { funcWrapper.checkpointComplete(1); // the coordinator checkpoint commits the inflight instant. checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); - checkWrittenData(tempFile, EXPECTED); + checkWrittenData(tempFile, EXPECTED1); + } + + @Test + public void testInsertDuplicates() throws Exception { + // reset the config option + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true); + funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); + + // open the function and ingest data + funcWrapper.openFunction(); + for (RowData rowData : TestData.DATA_SET_THREE) { + funcWrapper.invoke(rowData); + } + + assertEmptyDataFiles(); + // this triggers the data write and event send + funcWrapper.checkpointFunction(1); + + final OperatorEvent nextEvent = funcWrapper.getNextEvent(); + assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); + checkWrittenData(tempFile, EXPECTED3, 1); + + funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); + assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); + + funcWrapper.checkpointComplete(1); + + // insert duplicates again + for (RowData rowData : TestData.DATA_SET_THREE) { + funcWrapper.invoke(rowData); + } + + funcWrapper.checkpointFunction(2); + + checkWrittenData(tempFile, EXPECTED3, 1); } @Test @@ -248,7 +285,7 @@ public class StreamWriteFunctionTest { funcWrapper.invoke(rowData); } // the data is not flushed yet - checkWrittenData(tempFile, EXPECTED); + checkWrittenData(tempFile, EXPECTED1); // this triggers the data write and event send funcWrapper.checkpointFunction(2); diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java index 56f946b96..f745a3c89 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java @@ -18,16 +18,24 @@ package org.apache.hudi.operator; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.operator.partitioner.BucketAssignFunction; +import org.apache.hudi.operator.transform.RowDataToHoodieFunction; import org.apache.hudi.operator.utils.TestConfigurations; import org.apache.hudi.operator.utils.TestData; +import org.apache.hudi.sink.CommitSink; +import org.apache.hudi.streamer.FlinkStreamerConfig; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.io.FilePathFilter; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.io.TextInputFormat; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.fs.Path; @@ -37,7 +45,7 @@ import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.FileProcessingMode; -import org.apache.flink.table.data.RowData; +import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.TestLogger; @@ -47,6 +55,7 @@ import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.nio.charset.StandardCharsets; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -74,19 +83,15 @@ public class StreamWriteITCase extends TestLogger { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); execEnv.getConfig().disableObjectReuse(); execEnv.setParallelism(4); - // 1 second a time - execEnv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); + // set up checkpoint interval + execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); - // Read from kafka source + // Read from file source RowType rowType = (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)) .getLogicalType(); - StreamWriteOperatorFactory operatorFactory = - new StreamWriteOperatorFactory<>(rowType, conf, 4); - - int partitionFieldIndex = rowType.getFieldIndex(conf.getString(FlinkOptions.PARTITION_PATH_FIELD)); - final RowData.FieldGetter partitionFieldGetter = - RowData.createFieldGetter(rowType.getTypeAt(partitionFieldIndex), partitionFieldIndex); + StreamWriteOperatorFactory operatorFactory = + new StreamWriteOperatorFactory<>(conf, 4); JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( rowType, @@ -107,17 +112,103 @@ public class StreamWriteITCase extends TestLogger { // use PROCESS_CONTINUOUSLY mode to trigger checkpoint .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo) .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) + .setParallelism(4) + .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)) // Key-by partition path, to avoid multiple subtasks write to a partition at the same time - .keyBy(partitionFieldGetter::getFieldOrNull) + .keyBy(HoodieRecord::getPartitionPath) + .transform( + "bucket_assigner", + TypeInformation.of(HoodieRecord.class), + new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) + .uid("uid_bucket_assigner") + // shuffle by fileId(bucket id) + .keyBy(record -> record.getCurrentLocation().getFileId()) .transform("hoodie_stream_write", null, operatorFactory) - .uid("uid_hoodie_stream_write") - .setParallelism(4); + .uid("uid_hoodie_stream_write"); execEnv.addOperator(dataStream.getTransformation()); JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME))); if (client.getJobStatus().get() != JobStatus.FAILED) { try { - TimeUnit.SECONDS.sleep(10); + TimeUnit.SECONDS.sleep(8); + client.cancel(); + } catch (Throwable var1) { + // ignored + } + } + + TestData.checkWrittenData(tempFile, EXPECTED); + } + + @Test + public void testWriteToHoodieLegacy() throws Exception { + FlinkStreamerConfig streamerConf = TestConfigurations.getDefaultStreamerConf(tempFile.getAbsolutePath()); + Configuration conf = FlinkOptions.fromStreamerConfig(streamerConf); + StreamerUtil.initTableIfNotExists(conf); + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + execEnv.getConfig().disableObjectReuse(); + execEnv.setParallelism(4); + // set up checkpoint interval + execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); + execEnv.getConfig().setGlobalJobParameters(streamerConf); + + // Read from file source + RowType rowType = + (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)) + .getLogicalType(); + + JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( + rowType, + new RowDataTypeInfo(rowType), + false, + true, + TimestampFormat.ISO_8601 + ); + String sourcePath = Objects.requireNonNull(Thread.currentThread() + .getContextClassLoader().getResource("test_source.data")).toString(); + + TextInputFormat format = new TextInputFormat(new Path(sourcePath)); + format.setFilesFilter(FilePathFilter.createDefaultFilter()); + TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO; + format.setCharsetName("UTF-8"); + + execEnv + // use PROCESS_CONTINUOUSLY mode to trigger checkpoint + .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo) + .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) + .setParallelism(4) + .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)) + .transform(InstantGenerateOperator.NAME, TypeInformation.of(HoodieRecord.class), new InstantGenerateOperator()) + .name("instant_generator") + .uid("instant_generator_id") + + // Keyby partition path, to avoid multiple subtasks writing to a partition at the same time + .keyBy(HoodieRecord::getPartitionPath) + // use the bucket assigner to generate bucket IDs + .transform( + "bucket_assigner", + TypeInformation.of(HoodieRecord.class), + new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) + .uid("uid_bucket_assigner") + // shuffle by fileId(bucket id) + .keyBy(record -> record.getCurrentLocation().getFileId()) + // write operator, where the write operation really happens + .transform(KeyedWriteProcessOperator.NAME, TypeInformation.of(new TypeHint, Integer>>() { + }), new KeyedWriteProcessOperator(new KeyedWriteProcessFunction())) + .name("write_process") + .uid("write_process_uid") + .setParallelism(4) + + // Commit can only be executed once, so make it one parallelism + .addSink(new CommitSink()) + .name("commit_sink") + .uid("commit_sink_uid") + .setParallelism(1); + + JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME))); + if (client.getJobStatus().get() != JobStatus.FAILED) { + try { + TimeUnit.SECONDS.sleep(8); client.cancel(); } catch (Throwable var1) { // ignored diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/partitioner/TestBucketAssigner.java b/hudi-flink/src/test/java/org/apache/hudi/operator/partitioner/TestBucketAssigner.java new file mode 100644 index 000000000..e27ea0757 --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/partitioner/TestBucketAssigner.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator.partitioner; + +import org.apache.hudi.client.FlinkTaskContextSupplier; +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.operator.utils.TestConfigurations; +import org.apache.hudi.table.action.commit.BucketInfo; +import org.apache.hudi.table.action.commit.BucketType; +import org.apache.hudi.table.action.commit.SmallFile; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.configuration.Configuration; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * Test cases for {@link BucketAssigner}. + */ +public class TestBucketAssigner { + private HoodieWriteConfig writeConfig; + private HoodieFlinkEngineContext context; + + @TempDir + File tempFile; + + @BeforeEach + public void before() throws IOException { + final String basePath = tempFile.getAbsolutePath(); + final Configuration conf = TestConfigurations.getDefaultConf(basePath); + + writeConfig = StreamerUtil.getHoodieClientConfig(conf); + context = new HoodieFlinkEngineContext( + new SerializableConfiguration(StreamerUtil.getHadoopConf()), + new FlinkTaskContextSupplier(null)); + StreamerUtil.initTableIfNotExists(conf); + } + + @Test + public void testAddUpdate() { + MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(context, writeConfig); + BucketInfo bucketInfo = mockBucketAssigner.addUpdate("par1", "file_id_0"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "file_id_0"); + + mockBucketAssigner.addUpdate("par1", "file_id_0"); + bucketInfo = mockBucketAssigner.addUpdate("par1", "file_id_0"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "file_id_0"); + + mockBucketAssigner.addUpdate("par1", "file_id_1"); + bucketInfo = mockBucketAssigner.addUpdate("par1", "file_id_1"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "file_id_1"); + + bucketInfo = mockBucketAssigner.addUpdate("par2", "file_id_0"); + assertBucketEquals(bucketInfo, "par2", BucketType.UPDATE, "file_id_0"); + + bucketInfo = mockBucketAssigner.addUpdate("par3", "file_id_2"); + assertBucketEquals(bucketInfo, "par3", BucketType.UPDATE, "file_id_2"); + } + + @Test + public void testAddInsert() { + MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(context, writeConfig); + BucketInfo bucketInfo = mockBucketAssigner.addInsert("par1"); + assertBucketEquals(bucketInfo, "par1", BucketType.INSERT); + + mockBucketAssigner.addInsert("par1"); + bucketInfo = mockBucketAssigner.addInsert("par1"); + assertBucketEquals(bucketInfo, "par1", BucketType.INSERT); + + mockBucketAssigner.addInsert("par2"); + bucketInfo = mockBucketAssigner.addInsert("par2"); + assertBucketEquals(bucketInfo, "par2", BucketType.INSERT); + + bucketInfo = mockBucketAssigner.addInsert("par3"); + assertBucketEquals(bucketInfo, "par3", BucketType.INSERT); + + bucketInfo = mockBucketAssigner.addInsert("par3"); + assertBucketEquals(bucketInfo, "par3", BucketType.INSERT); + } + + @Test + public void testInsertWithSmallFiles() { + SmallFile f0 = new SmallFile(); + f0.location = new HoodieRecordLocation("t0", "f0"); + f0.sizeBytes = 12; + + SmallFile f1 = new SmallFile(); + f1.location = new HoodieRecordLocation("t0", "f1"); + f1.sizeBytes = 122879; // no left space to append new records to this bucket + + SmallFile f2 = new SmallFile(); + f2.location = new HoodieRecordLocation("t0", "f2"); + f2.sizeBytes = 56; + + Map> smallFilesMap = new HashMap<>(); + smallFilesMap.put("par1", Arrays.asList(f0, f1)); + smallFilesMap.put("par2", Collections.singletonList(f2)); + + MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(context, writeConfig, smallFilesMap); + BucketInfo bucketInfo = mockBucketAssigner.addInsert("par1"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); + + mockBucketAssigner.addInsert("par1"); + bucketInfo = mockBucketAssigner.addInsert("par1"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); + + mockBucketAssigner.addInsert("par2"); + bucketInfo = mockBucketAssigner.addInsert("par2"); + assertBucketEquals(bucketInfo, "par2", BucketType.UPDATE, "f2"); + + bucketInfo = mockBucketAssigner.addInsert("par3"); + assertBucketEquals(bucketInfo, "par3", BucketType.INSERT); + + bucketInfo = mockBucketAssigner.addInsert("par3"); + assertBucketEquals(bucketInfo, "par3", BucketType.INSERT); + } + + @Test + public void testUpdateAndInsertWithSmallFiles() { + SmallFile f0 = new SmallFile(); + f0.location = new HoodieRecordLocation("t0", "f0"); + f0.sizeBytes = 12; + + SmallFile f1 = new SmallFile(); + f1.location = new HoodieRecordLocation("t0", "f1"); + f1.sizeBytes = 122879; // no left space to append new records to this bucket + + SmallFile f2 = new SmallFile(); + f2.location = new HoodieRecordLocation("t0", "f2"); + f2.sizeBytes = 56; + + Map> smallFilesMap = new HashMap<>(); + smallFilesMap.put("par1", Arrays.asList(f0, f1)); + smallFilesMap.put("par2", Collections.singletonList(f2)); + + MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(context, writeConfig, smallFilesMap); + mockBucketAssigner.addUpdate("par1", "f0"); + + BucketInfo bucketInfo = mockBucketAssigner.addInsert("par1"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); + + mockBucketAssigner.addInsert("par1"); + bucketInfo = mockBucketAssigner.addInsert("par1"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); + + mockBucketAssigner.addUpdate("par1", "f2"); + + mockBucketAssigner.addInsert("par1"); + bucketInfo = mockBucketAssigner.addInsert("par1"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); + + mockBucketAssigner.addUpdate("par2", "f0"); + + mockBucketAssigner.addInsert("par2"); + bucketInfo = mockBucketAssigner.addInsert("par2"); + assertBucketEquals(bucketInfo, "par2", BucketType.UPDATE, "f2"); + } + + private void assertBucketEquals( + BucketInfo bucketInfo, + String partition, + BucketType bucketType, + String fileId) { + BucketInfo actual = new BucketInfo(bucketType, fileId, partition); + assertThat(bucketInfo, is(actual)); + } + + private void assertBucketEquals( + BucketInfo bucketInfo, + String partition, + BucketType bucketType) { + assertThat(bucketInfo.getPartitionPath(), is(partition)); + assertThat(bucketInfo.getBucketType(), is(bucketType)); + } + + /** + * Mock BucketAssigner that can specify small files explicitly. + */ + static class MockBucketAssigner extends BucketAssigner { + private final Map> smallFilesMap; + + MockBucketAssigner( + HoodieFlinkEngineContext context, + HoodieWriteConfig config) { + this(context, config, Collections.emptyMap()); + } + + MockBucketAssigner( + HoodieFlinkEngineContext context, + HoodieWriteConfig config, + Map> smallFilesMap) { + super(context, config); + this.smallFilesMap = smallFilesMap; + } + + @Override + protected List getSmallFiles(String partitionPath) { + if (this.smallFilesMap.containsKey(partitionPath)) { + return this.smallFilesMap.get(partitionPath); + } + return Collections.emptyList(); + } + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java index 1b02791e7..59de28326 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java @@ -18,7 +18,14 @@ package org.apache.hudi.operator.utils; -import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.operator.StreamWriteFunction; +import org.apache.hudi.operator.StreamWriteOperatorCoordinator; +import org.apache.hudi.operator.event.BatchWriteSuccessEvent; +import org.apache.hudi.operator.partitioner.BucketAssignFunction; +import org.apache.hudi.operator.transform.RowDataToHoodieFunction; + import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; @@ -27,13 +34,9 @@ import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext; import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway; - -import org.apache.hudi.client.HoodieFlinkWriteClient; -import org.apache.hudi.operator.StreamWriteFunction; -import org.apache.hudi.operator.StreamWriteOperatorCoordinator; -import org.apache.hudi.operator.event.BatchWriteSuccessEvent; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Collector; import java.util.concurrent.CompletableFuture; @@ -43,7 +46,6 @@ import java.util.concurrent.CompletableFuture; * @param Input type */ public class StreamWriteFunctionWrapper { - private final TypeSerializer serializer; private final Configuration conf; private final IOManager ioManager; @@ -52,10 +54,18 @@ public class StreamWriteFunctionWrapper { private final StreamWriteOperatorCoordinator coordinator; private final MockFunctionInitializationContext functionInitializationContext; - private StreamWriteFunction function; + /** Function that converts row data to HoodieRecord. */ + private RowDataToHoodieFunction> toHoodieFunction; + /** Function that assigns bucket ID. */ + private BucketAssignFunction, HoodieRecord> bucketAssignerFunction; + /** Stream write function. */ + private StreamWriteFunction, Object> writeFunction; - public StreamWriteFunctionWrapper(String tablePath, TypeSerializer serializer) throws Exception { - this.serializer = serializer; + public StreamWriteFunctionWrapper(String tablePath) throws Exception { + this(tablePath, TestConfigurations.getDefaultConf(tablePath)); + } + + public StreamWriteFunctionWrapper(String tablePath, Configuration conf) throws Exception { this.ioManager = new IOManagerAsync(); MockEnvironment environment = new MockEnvironmentBuilder() .setTaskName("mockTask") @@ -64,7 +74,7 @@ public class StreamWriteFunctionWrapper { .build(); this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment); this.gateway = new MockOperatorEventGateway(); - this.conf = TestConfigurations.getDefaultConf(tablePath); + this.conf = conf; // one function this.coordinator = new StreamWriteOperatorCoordinator(conf, 1); this.coordinator.start(); @@ -72,14 +82,37 @@ public class StreamWriteFunctionWrapper { } public void openFunction() throws Exception { - function = new StreamWriteFunction<>(TestConfigurations.ROW_TYPE, this.conf); - function.setRuntimeContext(runtimeContext); - function.setOperatorEventGateway(gateway); - function.open(this.conf); + toHoodieFunction = new RowDataToHoodieFunction<>(TestConfigurations.ROW_TYPE, conf); + toHoodieFunction.setRuntimeContext(runtimeContext); + toHoodieFunction.open(conf); + + bucketAssignerFunction = new BucketAssignFunction<>(conf); + bucketAssignerFunction.setRuntimeContext(runtimeContext); + bucketAssignerFunction.open(conf); + bucketAssignerFunction.initializeState(this.functionInitializationContext); + + writeFunction = new StreamWriteFunction<>(conf); + writeFunction.setRuntimeContext(runtimeContext); + writeFunction.setOperatorEventGateway(gateway); + writeFunction.open(conf); } public void invoke(I record) throws Exception { - function.processElement(record, null, null); + HoodieRecord hoodieRecord = toHoodieFunction.map((RowData) record); + HoodieRecord[] hoodieRecords = new HoodieRecord[1]; + Collector> collector = new Collector>() { + @Override + public void collect(HoodieRecord record) { + hoodieRecords[0] = record; + } + + @Override + public void close() { + + } + }; + bucketAssignerFunction.processElement(hoodieRecord, null, collector); + writeFunction.processElement(hoodieRecords[0], null, null); } public BatchWriteSuccessEvent[] getEventBuffer() { @@ -92,19 +125,22 @@ public class StreamWriteFunctionWrapper { @SuppressWarnings("rawtypes") public HoodieFlinkWriteClient getWriteClient() { - return this.function.getWriteClient(); + return this.writeFunction.getWriteClient(); } public void checkpointFunction(long checkpointId) throws Exception { // checkpoint the coordinator first this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>()); - function.snapshotState(new MockFunctionSnapshotContext(checkpointId)); + bucketAssignerFunction.snapshotState(null); + + writeFunction.snapshotState(null); functionInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId); } public void checkpointComplete(long checkpointId) { functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId); coordinator.checkpointComplete(checkpointId); + this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId); } public void checkpointFails(long checkpointId) { diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java index 7513feddc..d9e603a14 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java @@ -19,6 +19,7 @@ package org.apache.hudi.operator.utils; import org.apache.hudi.operator.FlinkOptions; +import org.apache.hudi.streamer.FlinkStreamerConfig; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; @@ -56,4 +57,16 @@ public class TestConfigurations { conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition"); return conf; } + + public static FlinkStreamerConfig getDefaultStreamerConf(String tablePath) { + FlinkStreamerConfig streamerConf = new FlinkStreamerConfig(); + streamerConf.targetBasePath = tablePath; + streamerConf.readSchemaFilePath = Objects.requireNonNull(Thread.currentThread() + .getContextClassLoader().getResource("test_read_schema.avsc")).toString(); + streamerConf.targetTableName = "TestHoodieTable"; + streamerConf.partitionPathField = "partition"; + streamerConf.tableType = "COPY_ON_WRITE"; + streamerConf.checkpointInterval = 4000L; + return streamerConf; + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java index 7c2c31451..b4c24ef69 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java @@ -43,6 +43,7 @@ import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.stream.IntStream; import static junit.framework.TestCase.assertEquals; import static org.hamcrest.CoreMatchers.is; @@ -92,6 +93,13 @@ public class TestData { TimestampData.fromEpochMillis(8), StringData.fromString("par4")) ); + public static List DATA_SET_THREE = new ArrayList<>(); + static { + IntStream.range(0, 5).forEach(i -> DATA_SET_THREE.add( + binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(1), StringData.fromString("par1")))); + } + /** * Checks the source data TestConfigurations.DATA_SET_ONE are written as expected. * @@ -101,13 +109,29 @@ public class TestData { * @param expected The expected results mapping, the key should be the partition path */ public static void checkWrittenData(File baseFile, Map expected) throws IOException { + checkWrittenData(baseFile, expected, 4); + } + + /** + * Checks the source data TestConfigurations.DATA_SET_ONE are written as expected. + * + *

Note: Replace it with the Flink reader when it is supported. + * + * @param baseFile The file base to check, should be a directly + * @param expected The expected results mapping, the key should be the partition path + * @param partitions The expected partition number + */ + public static void checkWrittenData( + File baseFile, + Map expected, + int partitions) throws IOException { assert baseFile.isDirectory(); FileFilter filter = file -> !file.getName().startsWith("."); File[] partitionDirs = baseFile.listFiles(filter); assertNotNull(partitionDirs); - assertThat(partitionDirs.length, is(4)); + assertThat(partitionDirs.length, is(partitions)); for (File partitionDir : partitionDirs) { - File[] dataFiles = partitionDir.listFiles(file -> file.getName().endsWith(".parquet")); + File[] dataFiles = partitionDir.listFiles(filter); assertNotNull(dataFiles); File latestDataFile = Arrays.stream(dataFiles) .max(Comparator.comparing(f -> FSUtils.getCommitTime(f.getName())))