diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index cff24d97f..33f1dd620 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -35,7 +35,6 @@ import org.apache.hudi.sink.utils.PayloadCreation; import org.apache.hudi.table.action.commit.BucketInfo; import org.apache.hudi.util.StreamerUtil; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueState; @@ -70,8 +69,6 @@ public class BucketAssignFunction> extends KeyedProcessFunction implements CheckpointedFunction, CheckpointListener { - private BucketAssignOperator.Context context; - /** * Index cache(speed-up) state for the underneath file based(BloomFilter) indices. * When a record came in, we do these check: @@ -158,7 +155,6 @@ public class BucketAssignFunction> public void processElement(I value, Context ctx, Collector out) throws Exception { if (value instanceof IndexRecord) { IndexRecord indexRecord = (IndexRecord) value; - this.context.setCurrentKey(indexRecord.getRecordKey()); this.indexState.update((HoodieRecordGlobalLocation) indexRecord.getCurrentLocation()); } else { processRecord((HoodieRecord) value, out); @@ -198,7 +194,6 @@ public class BucketAssignFunction> } } else { location = getNewRecordLocation(partitionPath); - this.context.setCurrentKey(recordKey); } // always refresh the index if (isChangingRecords) { @@ -243,13 +238,4 @@ public class BucketAssignFunction> public void close() throws Exception { this.bucketAssigner.close(); } - - public void setContext(BucketAssignOperator.Context context) { - this.context = context; - } - - @VisibleForTesting - public void clearIndexState() { - this.indexState.clear(); - } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignOperator.java deleted file mode 100644 index 3b3047b5d..000000000 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignOperator.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.sink.partitioner; - -import org.apache.hudi.common.model.HoodieRecord; - -import org.apache.flink.streaming.api.operators.KeyedProcessOperator; - -/** - * Operator for {@link BucketAssignFunction}. - * - * @param The input type - */ -public class BucketAssignOperator, O extends HoodieRecord> - extends KeyedProcessOperator { - private final BucketAssignFunction function; - - public BucketAssignOperator(BucketAssignFunction function) { - super(function); - this.function = function; - } - - @Override - public void open() throws Exception { - super.open(); - this.function.setContext(new ContextImpl()); - } - - /** - * Context to give the function chance to operate the state handle. - */ - public interface Context { - void setCurrentKey(Object key); - } - - public class ContextImpl implements Context { - public void setCurrentKey(Object key) { - BucketAssignOperator.this.setCurrentKey(key); - } - } -} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 4f8036620..41ea4ebe4 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -35,7 +35,6 @@ import org.apache.hudi.sink.compact.CompactionCommitSink; import org.apache.hudi.sink.compact.CompactionPlanEvent; import org.apache.hudi.sink.compact.CompactionPlanOperator; import org.apache.hudi.sink.partitioner.BucketAssignFunction; -import org.apache.hudi.sink.partitioner.BucketAssignOperator; import org.apache.hudi.sink.transform.RowDataToHoodieFunctions; import org.apache.hudi.table.format.FilePathUtils; @@ -44,6 +43,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; @@ -163,7 +163,7 @@ public class Pipelines { .transform( "bucket_assigner", TypeInformation.of(HoodieRecord.class), - new BucketAssignOperator<>(new BucketAssignFunction<>(conf))) + new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) .uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME)) .setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism)) // shuffle by fileId(bucket id) diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index 7da4c9e11..2c8bac057 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -27,7 +27,6 @@ import org.apache.hudi.sink.StreamWriteOperatorCoordinator; import org.apache.hudi.sink.bootstrap.BootstrapOperator; import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.sink.partitioner.BucketAssignFunction; -import org.apache.hudi.sink.partitioner.BucketAssignOperator; import org.apache.hudi.sink.transform.RowDataToHoodieFunction; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; @@ -91,7 +90,7 @@ public class StreamWriteFunctionWrapper implements TestFunctionWrapper { /** * BucketAssignOperator context. **/ - private MockBucketAssignOperatorContext bucketAssignOperatorContext; + private final MockBucketAssignFunctionContext bucketAssignFunctionContext; /** * Stream write function. */ @@ -125,11 +124,10 @@ public class StreamWriteFunctionWrapper implements TestFunctionWrapper { this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1); this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext); this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf); - this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext(); + this.bucketAssignFunctionContext = new MockBucketAssignFunctionContext(); this.stateInitializationContext = new MockStateInitializationContext(); this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf); this.asyncCompaction = StreamerUtil.needsAsyncCompaction(conf); - this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext(); this.output = new CollectorOutput<>(new ArrayList<>()); this.streamConfig = new StreamConfig(conf); streamConfig.setOperatorID(new OperatorID()); @@ -155,7 +153,6 @@ public class StreamWriteFunctionWrapper implements TestFunctionWrapper { bucketAssignerFunction = new BucketAssignFunction<>(conf); bucketAssignerFunction.setRuntimeContext(runtimeContext); bucketAssignerFunction.open(conf); - bucketAssignerFunction.setContext(bucketAssignOperatorContext); bucketAssignerFunction.initializeState(this.stateInitializationContext); setupWriteFunction(); @@ -187,15 +184,16 @@ public class StreamWriteFunctionWrapper implements TestFunctionWrapper { if (streamElement.isRecord()) { HoodieRecord bootstrapRecord = (HoodieRecord) streamElement.asRecord().getValue(); bucketAssignerFunction.processElement(bootstrapRecord, null, collector); + bucketAssignFunctionContext.setCurrentKey(bootstrapRecord.getRecordKey()); } } bootstrapOperator.processElement(new StreamRecord<>(hoodieRecord)); list.clear(); - this.bucketAssignOperatorContext.setCurrentKey(hoodieRecord.getRecordKey()); } bucketAssignerFunction.processElement(hoodieRecord, null, collector); + bucketAssignFunctionContext.setCurrentKey(hoodieRecord.getRecordKey()); writeFunction.processElement(hoodieRecords[0], null, null); } @@ -267,13 +265,8 @@ public class StreamWriteFunctionWrapper implements TestFunctionWrapper { return coordinatorContext; } - public void clearIndexState() { - this.bucketAssignerFunction.clearIndexState(); - this.bucketAssignOperatorContext.clearIndexState(); - } - public boolean isKeyInState(HoodieKey hoodieKey) { - return this.bucketAssignOperatorContext.isKeyInState(hoodieKey.getRecordKey()); + return this.bucketAssignFunctionContext.isKeyInState(hoodieKey.getRecordKey()); } public boolean isConforming() { @@ -303,18 +296,13 @@ public class StreamWriteFunctionWrapper implements TestFunctionWrapper { // Inner Class // ------------------------------------------------------------------------- - private static class MockBucketAssignOperatorContext implements BucketAssignOperator.Context { + private static class MockBucketAssignFunctionContext { private final Set updateKeys = new HashSet<>(); - @Override public void setCurrentKey(Object key) { this.updateKeys.add(key); } - public void clearIndexState() { - this.updateKeys.clear(); - } - public boolean isKeyInState(String key) { return this.updateKeys.contains(key); }