[HUDI-2738] Remove the bucketAssignFunction useless context (#3972)
Co-authored-by: yuzhaojing <yuzhaojing@bytedance.com>
This commit is contained in:
@@ -35,7 +35,6 @@ import org.apache.hudi.sink.utils.PayloadCreation;
|
|||||||
import org.apache.hudi.table.action.commit.BucketInfo;
|
import org.apache.hudi.table.action.commit.BucketInfo;
|
||||||
import org.apache.hudi.util.StreamerUtil;
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
|
|
||||||
import org.apache.flink.annotation.VisibleForTesting;
|
|
||||||
import org.apache.flink.api.common.state.CheckpointListener;
|
import org.apache.flink.api.common.state.CheckpointListener;
|
||||||
import org.apache.flink.api.common.state.StateTtlConfig;
|
import org.apache.flink.api.common.state.StateTtlConfig;
|
||||||
import org.apache.flink.api.common.state.ValueState;
|
import org.apache.flink.api.common.state.ValueState;
|
||||||
@@ -70,8 +69,6 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
|
|||||||
extends KeyedProcessFunction<K, I, O>
|
extends KeyedProcessFunction<K, I, O>
|
||||||
implements CheckpointedFunction, CheckpointListener {
|
implements CheckpointedFunction, CheckpointListener {
|
||||||
|
|
||||||
private BucketAssignOperator.Context context;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Index cache(speed-up) state for the underneath file based(BloomFilter) indices.
|
* Index cache(speed-up) state for the underneath file based(BloomFilter) indices.
|
||||||
* When a record came in, we do these check:
|
* When a record came in, we do these check:
|
||||||
@@ -158,7 +155,6 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
|
|||||||
public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
|
public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
|
||||||
if (value instanceof IndexRecord) {
|
if (value instanceof IndexRecord) {
|
||||||
IndexRecord<?> indexRecord = (IndexRecord<?>) value;
|
IndexRecord<?> indexRecord = (IndexRecord<?>) value;
|
||||||
this.context.setCurrentKey(indexRecord.getRecordKey());
|
|
||||||
this.indexState.update((HoodieRecordGlobalLocation) indexRecord.getCurrentLocation());
|
this.indexState.update((HoodieRecordGlobalLocation) indexRecord.getCurrentLocation());
|
||||||
} else {
|
} else {
|
||||||
processRecord((HoodieRecord<?>) value, out);
|
processRecord((HoodieRecord<?>) value, out);
|
||||||
@@ -198,7 +194,6 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
location = getNewRecordLocation(partitionPath);
|
location = getNewRecordLocation(partitionPath);
|
||||||
this.context.setCurrentKey(recordKey);
|
|
||||||
}
|
}
|
||||||
// always refresh the index
|
// always refresh the index
|
||||||
if (isChangingRecords) {
|
if (isChangingRecords) {
|
||||||
@@ -243,13 +238,4 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
|
|||||||
public void close() throws Exception {
|
public void close() throws Exception {
|
||||||
this.bucketAssigner.close();
|
this.bucketAssigner.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setContext(BucketAssignOperator.Context context) {
|
|
||||||
this.context = context;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public void clearIndexState() {
|
|
||||||
this.indexState.clear();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,57 +0,0 @@
|
|||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hudi.sink.partitioner;
|
|
||||||
|
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
|
||||||
|
|
||||||
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Operator for {@link BucketAssignFunction}.
|
|
||||||
*
|
|
||||||
* @param <I> The input type
|
|
||||||
*/
|
|
||||||
public class BucketAssignOperator<K, I extends HoodieRecord<?>, O extends HoodieRecord<?>>
|
|
||||||
extends KeyedProcessOperator<K, I, O> {
|
|
||||||
private final BucketAssignFunction<K, I, O> function;
|
|
||||||
|
|
||||||
public BucketAssignOperator(BucketAssignFunction<K, I, O> function) {
|
|
||||||
super(function);
|
|
||||||
this.function = function;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void open() throws Exception {
|
|
||||||
super.open();
|
|
||||||
this.function.setContext(new ContextImpl());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Context to give the function chance to operate the state handle.
|
|
||||||
*/
|
|
||||||
public interface Context {
|
|
||||||
void setCurrentKey(Object key);
|
|
||||||
}
|
|
||||||
|
|
||||||
public class ContextImpl implements Context {
|
|
||||||
public void setCurrentKey(Object key) {
|
|
||||||
BucketAssignOperator.this.setCurrentKey(key);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -35,7 +35,6 @@ import org.apache.hudi.sink.compact.CompactionCommitSink;
|
|||||||
import org.apache.hudi.sink.compact.CompactionPlanEvent;
|
import org.apache.hudi.sink.compact.CompactionPlanEvent;
|
||||||
import org.apache.hudi.sink.compact.CompactionPlanOperator;
|
import org.apache.hudi.sink.compact.CompactionPlanOperator;
|
||||||
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
|
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
|
||||||
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
|
|
||||||
import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
|
import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
|
||||||
import org.apache.hudi.table.format.FilePathUtils;
|
import org.apache.hudi.table.format.FilePathUtils;
|
||||||
|
|
||||||
@@ -44,6 +43,7 @@ import org.apache.flink.configuration.Configuration;
|
|||||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||||
import org.apache.flink.streaming.api.datastream.DataStreamSink;
|
import org.apache.flink.streaming.api.datastream.DataStreamSink;
|
||||||
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
|
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
|
||||||
|
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
|
||||||
import org.apache.flink.streaming.api.operators.ProcessOperator;
|
import org.apache.flink.streaming.api.operators.ProcessOperator;
|
||||||
import org.apache.flink.table.data.RowData;
|
import org.apache.flink.table.data.RowData;
|
||||||
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
|
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
|
||||||
@@ -163,7 +163,7 @@ public class Pipelines {
|
|||||||
.transform(
|
.transform(
|
||||||
"bucket_assigner",
|
"bucket_assigner",
|
||||||
TypeInformation.of(HoodieRecord.class),
|
TypeInformation.of(HoodieRecord.class),
|
||||||
new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
|
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
|
||||||
.uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME))
|
.uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME))
|
||||||
.setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism))
|
.setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism))
|
||||||
// shuffle by fileId(bucket id)
|
// shuffle by fileId(bucket id)
|
||||||
|
|||||||
@@ -27,7 +27,6 @@ import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
|
|||||||
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
|
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
|
||||||
import org.apache.hudi.sink.event.WriteMetadataEvent;
|
import org.apache.hudi.sink.event.WriteMetadataEvent;
|
||||||
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
|
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
|
||||||
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
|
|
||||||
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
|
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
|
||||||
import org.apache.hudi.util.StreamerUtil;
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
import org.apache.hudi.utils.TestConfigurations;
|
import org.apache.hudi.utils.TestConfigurations;
|
||||||
@@ -91,7 +90,7 @@ public class StreamWriteFunctionWrapper<I> implements TestFunctionWrapper<I> {
|
|||||||
/**
|
/**
|
||||||
* BucketAssignOperator context.
|
* BucketAssignOperator context.
|
||||||
**/
|
**/
|
||||||
private MockBucketAssignOperatorContext bucketAssignOperatorContext;
|
private final MockBucketAssignFunctionContext bucketAssignFunctionContext;
|
||||||
/**
|
/**
|
||||||
* Stream write function.
|
* Stream write function.
|
||||||
*/
|
*/
|
||||||
@@ -125,11 +124,10 @@ public class StreamWriteFunctionWrapper<I> implements TestFunctionWrapper<I> {
|
|||||||
this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
|
this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
|
||||||
this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext);
|
this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext);
|
||||||
this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
|
this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
|
||||||
this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext();
|
this.bucketAssignFunctionContext = new MockBucketAssignFunctionContext();
|
||||||
this.stateInitializationContext = new MockStateInitializationContext();
|
this.stateInitializationContext = new MockStateInitializationContext();
|
||||||
this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
|
this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
|
||||||
this.asyncCompaction = StreamerUtil.needsAsyncCompaction(conf);
|
this.asyncCompaction = StreamerUtil.needsAsyncCompaction(conf);
|
||||||
this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext();
|
|
||||||
this.output = new CollectorOutput<>(new ArrayList<>());
|
this.output = new CollectorOutput<>(new ArrayList<>());
|
||||||
this.streamConfig = new StreamConfig(conf);
|
this.streamConfig = new StreamConfig(conf);
|
||||||
streamConfig.setOperatorID(new OperatorID());
|
streamConfig.setOperatorID(new OperatorID());
|
||||||
@@ -155,7 +153,6 @@ public class StreamWriteFunctionWrapper<I> implements TestFunctionWrapper<I> {
|
|||||||
bucketAssignerFunction = new BucketAssignFunction<>(conf);
|
bucketAssignerFunction = new BucketAssignFunction<>(conf);
|
||||||
bucketAssignerFunction.setRuntimeContext(runtimeContext);
|
bucketAssignerFunction.setRuntimeContext(runtimeContext);
|
||||||
bucketAssignerFunction.open(conf);
|
bucketAssignerFunction.open(conf);
|
||||||
bucketAssignerFunction.setContext(bucketAssignOperatorContext);
|
|
||||||
bucketAssignerFunction.initializeState(this.stateInitializationContext);
|
bucketAssignerFunction.initializeState(this.stateInitializationContext);
|
||||||
|
|
||||||
setupWriteFunction();
|
setupWriteFunction();
|
||||||
@@ -187,15 +184,16 @@ public class StreamWriteFunctionWrapper<I> implements TestFunctionWrapper<I> {
|
|||||||
if (streamElement.isRecord()) {
|
if (streamElement.isRecord()) {
|
||||||
HoodieRecord<?> bootstrapRecord = (HoodieRecord<?>) streamElement.asRecord().getValue();
|
HoodieRecord<?> bootstrapRecord = (HoodieRecord<?>) streamElement.asRecord().getValue();
|
||||||
bucketAssignerFunction.processElement(bootstrapRecord, null, collector);
|
bucketAssignerFunction.processElement(bootstrapRecord, null, collector);
|
||||||
|
bucketAssignFunctionContext.setCurrentKey(bootstrapRecord.getRecordKey());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bootstrapOperator.processElement(new StreamRecord<>(hoodieRecord));
|
bootstrapOperator.processElement(new StreamRecord<>(hoodieRecord));
|
||||||
list.clear();
|
list.clear();
|
||||||
this.bucketAssignOperatorContext.setCurrentKey(hoodieRecord.getRecordKey());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bucketAssignerFunction.processElement(hoodieRecord, null, collector);
|
bucketAssignerFunction.processElement(hoodieRecord, null, collector);
|
||||||
|
bucketAssignFunctionContext.setCurrentKey(hoodieRecord.getRecordKey());
|
||||||
writeFunction.processElement(hoodieRecords[0], null, null);
|
writeFunction.processElement(hoodieRecords[0], null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -267,13 +265,8 @@ public class StreamWriteFunctionWrapper<I> implements TestFunctionWrapper<I> {
|
|||||||
return coordinatorContext;
|
return coordinatorContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clearIndexState() {
|
|
||||||
this.bucketAssignerFunction.clearIndexState();
|
|
||||||
this.bucketAssignOperatorContext.clearIndexState();
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isKeyInState(HoodieKey hoodieKey) {
|
public boolean isKeyInState(HoodieKey hoodieKey) {
|
||||||
return this.bucketAssignOperatorContext.isKeyInState(hoodieKey.getRecordKey());
|
return this.bucketAssignFunctionContext.isKeyInState(hoodieKey.getRecordKey());
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isConforming() {
|
public boolean isConforming() {
|
||||||
@@ -303,18 +296,13 @@ public class StreamWriteFunctionWrapper<I> implements TestFunctionWrapper<I> {
|
|||||||
// Inner Class
|
// Inner Class
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
private static class MockBucketAssignOperatorContext implements BucketAssignOperator.Context {
|
private static class MockBucketAssignFunctionContext {
|
||||||
private final Set<Object> updateKeys = new HashSet<>();
|
private final Set<Object> updateKeys = new HashSet<>();
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setCurrentKey(Object key) {
|
public void setCurrentKey(Object key) {
|
||||||
this.updateKeys.add(key);
|
this.updateKeys.add(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clearIndexState() {
|
|
||||||
this.updateKeys.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isKeyInState(String key) {
|
public boolean isKeyInState(String key) {
|
||||||
return this.updateKeys.contains(key);
|
return this.updateKeys.contains(key);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user