1
0

[HUDI-1931] BucketAssignFunction use ValueState instead of MapState (#3026)

Co-authored-by: 854194341@qq.com <loukey_7821>
This commit is contained in:
Danny Chan
2021-06-06 10:40:15 +08:00
committed by GitHub
parent 2a7e1e091e
commit 08464a6a5b
8 changed files with 177 additions and 38 deletions

View File

@@ -32,7 +32,6 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndexUtils; import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.sink.utils.PayloadCreation; import org.apache.hudi.sink.utils.PayloadCreation;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
@@ -43,6 +42,8 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
@@ -81,7 +82,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
private static final Logger LOG = LoggerFactory.getLogger(BucketAssignFunction.class); private static final Logger LOG = LoggerFactory.getLogger(BucketAssignFunction.class);
private HoodieFlinkEngineContext context; private BucketAssignOperator.Context context;
/** /**
* Index cache(speed-up) state for the underneath file based(BloomFilter) indices. * Index cache(speed-up) state for the underneath file based(BloomFilter) indices.
@@ -94,7 +95,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
* <li>If it does not, use the {@link BucketAssigner} to generate a new bucket ID</li> * <li>If it does not, use the {@link BucketAssigner} to generate a new bucket ID</li>
* </ul> * </ul>
*/ */
private MapState<String, HoodieRecordGlobalLocation> indexState; private ValueState<HoodieRecordGlobalLocation> indexState;
/** /**
* Bucket assigner to assign new bucket IDs or reuse existing ones. * Bucket assigner to assign new bucket IDs or reuse existing ones.
@@ -138,7 +139,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
super.open(parameters); super.open(parameters);
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf); HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
this.hadoopConf = StreamerUtil.getHadoopConf(); this.hadoopConf = StreamerUtil.getHadoopConf();
this.context = new HoodieFlinkEngineContext( HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
new SerializableConfiguration(this.hadoopConf), new SerializableConfiguration(this.hadoopConf),
new FlinkTaskContextSupplier(getRuntimeContext())); new FlinkTaskContextSupplier(getRuntimeContext()));
this.bucketAssigner = BucketAssigners.create( this.bucketAssigner = BucketAssigners.create(
@@ -158,16 +159,15 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
@Override @Override
public void initializeState(FunctionInitializationContext context) { public void initializeState(FunctionInitializationContext context) {
MapStateDescriptor<String, HoodieRecordGlobalLocation> indexStateDesc = ValueStateDescriptor<HoodieRecordGlobalLocation> indexStateDesc =
new MapStateDescriptor<>( new ValueStateDescriptor<>(
"indexState", "indexState",
Types.STRING,
TypeInformation.of(HoodieRecordGlobalLocation.class)); TypeInformation.of(HoodieRecordGlobalLocation.class));
double ttl = conf.getDouble(FlinkOptions.INDEX_STATE_TTL) * 24 * 60 * 60 * 1000; double ttl = conf.getDouble(FlinkOptions.INDEX_STATE_TTL) * 24 * 60 * 60 * 1000;
if (ttl > 0) { if (ttl > 0) {
indexStateDesc.enableTimeToLive(StateTtlConfigUtil.createTtlConfig((long) ttl)); indexStateDesc.enableTimeToLive(StateTtlConfigUtil.createTtlConfig((long) ttl));
} }
indexState = context.getKeyedStateStore().getMapState(indexStateDesc); indexState = context.getKeyedStateStore().getState(indexStateDesc);
if (bootstrapIndex) { if (bootstrapIndex) {
MapStateDescriptor<String, Integer> partitionLoadStateDesc = MapStateDescriptor<String, Integer> partitionLoadStateDesc =
new MapStateDescriptor<>("partitionLoadState", Types.STRING, Types.INT); new MapStateDescriptor<>("partitionLoadState", Types.STRING, Types.INT);
@@ -191,13 +191,13 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
// disabled by default. // disabled by default.
if (bootstrapIndex && !partitionLoadState.contains(partitionPath)) { if (bootstrapIndex && !partitionLoadState.contains(partitionPath)) {
// If the partition records are never loaded, load the records first. // If the partition records are never loaded, load the records first.
loadRecords(partitionPath); loadRecords(partitionPath, recordKey);
} }
// Only changing records need looking up the index for the location, // Only changing records need looking up the index for the location,
// append only records are always recognized as INSERT. // append only records are always recognized as INSERT.
if (isChangingRecords && indexState.contains(recordKey)) { HoodieRecordGlobalLocation oldLoc = indexState.value();
if (isChangingRecords && oldLoc != null) {
// Set up the instant time as "U" to mark the bucket as an update bucket. // Set up the instant time as "U" to mark the bucket as an update bucket.
HoodieRecordGlobalLocation oldLoc = this.indexState.get(recordKey);
if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) { if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) {
if (globalIndex) { if (globalIndex) {
// if partition path changes, emit a delete record for old partition path, // if partition path changes, emit a delete record for old partition path,
@@ -209,7 +209,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
out.collect((O) deleteRecord); out.collect((O) deleteRecord);
} }
location = getNewRecordLocation(partitionPath); location = getNewRecordLocation(partitionPath);
updateIndexState(recordKey, partitionPath, location); updateIndexState(partitionPath, location);
} else { } else {
location = oldLoc.toLocal("U"); location = oldLoc.toLocal("U");
this.bucketAssigner.addUpdate(partitionPath, location.getFileId()); this.bucketAssigner.addUpdate(partitionPath, location.getFileId());
@@ -217,7 +217,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
} else { } else {
location = getNewRecordLocation(partitionPath); location = getNewRecordLocation(partitionPath);
if (isChangingRecords) { if (isChangingRecords) {
updateIndexState(recordKey, partitionPath, location); updateIndexState(partitionPath, location);
} }
} }
record.setCurrentLocation(location); record.setCurrentLocation(location);
@@ -244,10 +244,9 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
} }
private void updateIndexState( private void updateIndexState(
String recordKey,
String partitionPath, String partitionPath,
HoodieRecordLocation localLoc) throws Exception { HoodieRecordLocation localLoc) throws Exception {
this.indexState.put(recordKey, HoodieRecordGlobalLocation.fromLocal(partitionPath, localLoc)); this.indexState.update(HoodieRecordGlobalLocation.fromLocal(partitionPath, localLoc));
} }
@Override @Override
@@ -261,13 +260,18 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
this.bucketAssigner.close(); this.bucketAssigner.close();
} }
public void setContext(BucketAssignOperator.Context context) {
this.context = context;
}
/** /**
* Load all the indices of give partition path into the backup state. * Load all the indices of give partition path into the backup state.
* *
* @param partitionPath The partition path * @param partitionPath The partition path
* @param curRecordKey The current record key
* @throws Exception when error occurs for state update * @throws Exception when error occurs for state update
*/ */
private void loadRecords(String partitionPath) throws Exception { private void loadRecords(String partitionPath, String curRecordKey) throws Exception {
LOG.info("Start loading records under partition {} into the index state", partitionPath); LOG.info("Start loading records under partition {} into the index state", partitionPath);
HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable(); HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
BaseFileUtils fileUtils = BaseFileUtils.getInstance(hoodieTable.getBaseFileFormat()); BaseFileUtils fileUtils = BaseFileUtils.getInstance(hoodieTable.getBaseFileFormat());
@@ -294,14 +298,20 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
boolean shouldLoad = KeyGroupRangeAssignment.assignKeyToParallelOperator( boolean shouldLoad = KeyGroupRangeAssignment.assignKeyToParallelOperator(
hoodieKey.getRecordKey(), maxParallelism, parallelism) == taskID; hoodieKey.getRecordKey(), maxParallelism, parallelism) == taskID;
if (shouldLoad) { if (shouldLoad) {
this.indexState.put(hoodieKey.getRecordKey(), this.context.setCurrentKey(hoodieKey.getRecordKey());
new HoodieRecordGlobalLocation(hoodieKey.getPartitionPath(), baseFile.getCommitTime(), baseFile.getFileId())); this.indexState.update(
new HoodieRecordGlobalLocation(
hoodieKey.getPartitionPath(),
baseFile.getCommitTime(),
baseFile.getFileId()));
} }
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error when putting record keys into the state from file: {}", baseFile); LOG.error("Error when putting record keys into the state from file: {}", baseFile);
} }
}); });
} }
// recover the currentKey
this.context.setCurrentKey(curRecordKey);
// Mark the partition path as loaded. // Mark the partition path as loaded.
partitionLoadState.put(partitionPath, 0); partitionLoadState.put(partitionPath, 0);
LOG.info("Finish loading records under partition {} into the index state", partitionPath); LOG.info("Finish loading records under partition {} into the index state", partitionPath);
@@ -309,15 +319,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
@VisibleForTesting @VisibleForTesting
public void clearIndexState() { public void clearIndexState() {
this.partitionLoadState.clear();
this.indexState.clear(); this.indexState.clear();
} }
@VisibleForTesting
public boolean isKeyInState(HoodieKey hoodieKey) {
try {
return this.indexState.contains(hoodieKey.getRecordKey());
} catch (Exception e) {
throw new HoodieException(e);
}
}
} }

View File

@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.partitioner;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
/**
* Operator for {@link BucketAssignFunction}.
*
* @param <I> The input type
*/
public class BucketAssignOperator<K, I extends HoodieRecord<?>, O extends HoodieRecord<?>>
extends KeyedProcessOperator<K, I, O> {
private final BucketAssignFunction<K, I, O> function;
public BucketAssignOperator(BucketAssignFunction<K, I, O> function) {
super(function);
this.function = function;
}
@Override
public void open() throws Exception {
super.open();
this.function.setContext(new ContextImpl());
}
/**
* Context to give the function chance to operate the state handle.
*/
public interface Context {
void setCurrentKey(Object key);
}
public class ContextImpl implements Context {
public void setCurrentKey(Object key) {
BucketAssignOperator.this.setCurrentKey(key);
}
}
}

View File

@@ -29,6 +29,7 @@ import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.compact.CompactionPlanEvent; import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.sink.compact.CompactionPlanOperator; import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction; import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.StreamerUtil;
@@ -80,7 +81,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
.transform( .transform(
"bucket_assigner", "bucket_assigner",
TypeInformation.of(HoodieRecord.class), TypeInformation.of(HoodieRecord.class),
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
.uid("uid_bucket_assigner") .uid("uid_bucket_assigner")
// shuffle by fileId(bucket id) // shuffle by fileId(bucket id)
.keyBy(record -> record.getCurrentLocation().getFileId()) .keyBy(record -> record.getCurrentLocation().getFileId())

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.compact.CompactionPlanEvent; import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.sink.compact.CompactionPlanOperator; import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction; import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.StreamerUtil;
@@ -121,7 +122,7 @@ public class StreamWriteITCase extends TestLogger {
.transform( .transform(
"bucket_assigner", "bucket_assigner",
TypeInformation.of(HoodieRecord.class), TypeInformation.of(HoodieRecord.class),
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
.uid("uid_bucket_assigner") .uid("uid_bucket_assigner")
// shuffle by fileId(bucket id) // shuffle by fileId(bucket id)
.keyBy(record -> record.getCurrentLocation().getFileId()) .keyBy(record -> record.getCurrentLocation().getFileId())
@@ -181,7 +182,7 @@ public class StreamWriteITCase extends TestLogger {
.transform( .transform(
"bucket_assigner", "bucket_assigner",
TypeInformation.of(HoodieRecord.class), TypeInformation.of(HoodieRecord.class),
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
.uid("uid_bucket_assigner") .uid("uid_bucket_assigner")
// shuffle by fileId(bucket id) // shuffle by fileId(bucket id)
.keyBy(record -> record.getCurrentLocation().getFileId()) .keyBy(record -> record.getCurrentLocation().getFileId())

View File

@@ -55,7 +55,7 @@ public class CompactFunctionWrapper {
private final MockFunctionInitializationContext functionInitializationContext; private final MockFunctionInitializationContext functionInitializationContext;
/** Function that generates the {@link HoodieCompactionPlan}. */ /** Function that generates the {@link HoodieCompactionPlan}. */
private CompactionPlanOperator compactionPlanFunction; private CompactionPlanOperator compactionPlanOperator;
/** Function that executes the compaction task. */ /** Function that executes the compaction task. */
private CompactFunction compactFunction; private CompactFunction compactFunction;
/** Stream sink to handle compaction commits. */ /** Stream sink to handle compaction commits. */
@@ -74,8 +74,8 @@ public class CompactFunctionWrapper {
} }
public void openFunction() throws Exception { public void openFunction() throws Exception {
compactionPlanFunction = new CompactionPlanOperator(conf); compactionPlanOperator = new CompactionPlanOperator(conf);
compactionPlanFunction.open(); compactionPlanOperator.open();
compactFunction = new CompactFunction(conf); compactFunction = new CompactFunction(conf);
compactFunction.setRuntimeContext(runtimeContext); compactFunction.setRuntimeContext(runtimeContext);
@@ -118,8 +118,8 @@ public class CompactFunctionWrapper {
} }
}; };
compactionPlanFunction.setOutput(output); compactionPlanOperator.setOutput(output);
compactionPlanFunction.notifyCheckpointComplete(checkpointID); compactionPlanOperator.notifyCheckpointComplete(checkpointID);
// collect the CompactCommitEvents // collect the CompactCommitEvents
List<CompactionCommitEvent> compactCommitEvents = new ArrayList<>(); List<CompactionCommitEvent> compactCommitEvents = new ArrayList<>();
for (CompactionPlanEvent event: events) { for (CompactionPlanEvent event: events) {

View File

@@ -49,6 +49,7 @@ public class MockOperatorStateStore implements KeyedStateStore, OperatorStateSto
private Map<String, TestUtils.MockListState> lastSuccessStateMap; private Map<String, TestUtils.MockListState> lastSuccessStateMap;
private MapState mapState; private MapState mapState;
private Map<String, ValueState> valueStateMap;
public MockOperatorStateStore() { public MockOperatorStateStore() {
this.historyStateMap = new HashMap<>(); this.historyStateMap = new HashMap<>();
@@ -57,6 +58,7 @@ public class MockOperatorStateStore implements KeyedStateStore, OperatorStateSto
this.lastSuccessStateMap = new HashMap<>(); this.lastSuccessStateMap = new HashMap<>();
this.mapState = new MockMapState<>(); this.mapState = new MockMapState<>();
this.valueStateMap = new HashMap<>();
} }
@Override @Override
@@ -66,7 +68,9 @@ public class MockOperatorStateStore implements KeyedStateStore, OperatorStateSto
@Override @Override
public <T> ValueState<T> getState(ValueStateDescriptor<T> valueStateDescriptor) { public <T> ValueState<T> getState(ValueStateDescriptor<T> valueStateDescriptor) {
return null; String name = valueStateDescriptor.getName();
valueStateMap.putIfAbsent(name, new MockValueState());
return valueStateMap.get(name);
} }
@Override @Override

View File

@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.utils;
import org.apache.flink.api.common.state.ValueState;
/**
* Mock value state for testing.
*
* @param <V> Type of state value
*/
public class MockValueState<V> implements ValueState<V> {
private V v = null;
@Override
public V value() {
return v;
}
@Override
public void update(V value) {
this.v = value;
}
@Override
public void clear() {
v = null;
}
}

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.sink.StreamWriteFunction;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator; import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.event.BatchWriteSuccessEvent; import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction; import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestConfigurations;
@@ -44,8 +45,10 @@ import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventG
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
/** /**
@@ -67,6 +70,8 @@ public class StreamWriteFunctionWrapper<I> {
private RowDataToHoodieFunction<RowData, HoodieRecord<?>> toHoodieFunction; private RowDataToHoodieFunction<RowData, HoodieRecord<?>> toHoodieFunction;
/** Function that assigns bucket ID. */ /** Function that assigns bucket ID. */
private BucketAssignFunction<String, HoodieRecord<?>, HoodieRecord<?>> bucketAssignerFunction; private BucketAssignFunction<String, HoodieRecord<?>, HoodieRecord<?>> bucketAssignerFunction;
/** BucketAssignOperator context. **/
private MockBucketAssignOperatorContext bucketAssignOperatorContext;
/** Stream write function. */ /** Stream write function. */
private StreamWriteFunction<Object, HoodieRecord<?>, Object> writeFunction; private StreamWriteFunction<Object, HoodieRecord<?>, Object> writeFunction;
@@ -91,6 +96,7 @@ public class StreamWriteFunctionWrapper<I> {
this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext); this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext);
this.functionInitializationContext = new MockFunctionInitializationContext(); this.functionInitializationContext = new MockFunctionInitializationContext();
this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf); this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext();
} }
public void openFunction() throws Exception { public void openFunction() throws Exception {
@@ -103,6 +109,7 @@ public class StreamWriteFunctionWrapper<I> {
bucketAssignerFunction = new BucketAssignFunction<>(conf); bucketAssignerFunction = new BucketAssignFunction<>(conf);
bucketAssignerFunction.setRuntimeContext(runtimeContext); bucketAssignerFunction.setRuntimeContext(runtimeContext);
bucketAssignerFunction.open(conf); bucketAssignerFunction.open(conf);
bucketAssignerFunction.setContext(bucketAssignOperatorContext);
bucketAssignerFunction.initializeState(this.functionInitializationContext); bucketAssignerFunction.initializeState(this.functionInitializationContext);
writeFunction = new StreamWriteFunction<>(conf); writeFunction = new StreamWriteFunction<>(conf);
@@ -150,7 +157,7 @@ public class StreamWriteFunctionWrapper<I> {
return this.writeFunction.getWriteClient(); return this.writeFunction.getWriteClient();
} }
public void checkpointFunction(long checkpointId) throws Exception { public void checkpointFunction(long checkpointId) {
// checkpoint the coordinator first // checkpoint the coordinator first
this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>()); this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>());
bucketAssignerFunction.snapshotState(null); bucketAssignerFunction.snapshotState(null);
@@ -192,13 +199,35 @@ public class StreamWriteFunctionWrapper<I> {
public void clearIndexState() { public void clearIndexState() {
this.bucketAssignerFunction.clearIndexState(); this.bucketAssignerFunction.clearIndexState();
this.bucketAssignOperatorContext.clearIndexState();
} }
public boolean isKeyInState(HoodieKey hoodieKey) { public boolean isKeyInState(HoodieKey hoodieKey) {
return this.bucketAssignerFunction.isKeyInState(hoodieKey); return this.bucketAssignOperatorContext.isKeyInState(hoodieKey.getRecordKey());
} }
public boolean isConforming() { public boolean isConforming() {
return this.writeFunction.isConfirming(); return this.writeFunction.isConfirming();
} }
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
private static class MockBucketAssignOperatorContext implements BucketAssignOperator.Context {
private final Set<Object> updateKeys = new HashSet<>();
@Override
public void setCurrentKey(Object key) {
this.updateKeys.add(key);
}
public void clearIndexState() {
this.updateKeys.clear();
}
public boolean isKeyInState(String key) {
return this.updateKeys.contains(key);
}
}
} }