diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index 0954414eb..36014a8ec 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -32,7 +32,6 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.index.HoodieIndexUtils; import org.apache.hudi.sink.utils.PayloadCreation; import org.apache.hudi.table.HoodieTable; @@ -43,6 +42,8 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; @@ -81,7 +82,7 @@ public class BucketAssignFunction> private static final Logger LOG = LoggerFactory.getLogger(BucketAssignFunction.class); - private HoodieFlinkEngineContext context; + private BucketAssignOperator.Context context; /** * Index cache(speed-up) state for the underneath file based(BloomFilter) indices. @@ -94,7 +95,7 @@ public class BucketAssignFunction> *
  • If it does not, use the {@link BucketAssigner} to generate a new bucket ID
  • * */ - private MapState indexState; + private ValueState indexState; /** * Bucket assigner to assign new bucket IDs or reuse existing ones. @@ -138,7 +139,7 @@ public class BucketAssignFunction> super.open(parameters); HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf); this.hadoopConf = StreamerUtil.getHadoopConf(); - this.context = new HoodieFlinkEngineContext( + HoodieFlinkEngineContext context = new HoodieFlinkEngineContext( new SerializableConfiguration(this.hadoopConf), new FlinkTaskContextSupplier(getRuntimeContext())); this.bucketAssigner = BucketAssigners.create( @@ -158,16 +159,15 @@ public class BucketAssignFunction> @Override public void initializeState(FunctionInitializationContext context) { - MapStateDescriptor indexStateDesc = - new MapStateDescriptor<>( + ValueStateDescriptor indexStateDesc = + new ValueStateDescriptor<>( "indexState", - Types.STRING, TypeInformation.of(HoodieRecordGlobalLocation.class)); double ttl = conf.getDouble(FlinkOptions.INDEX_STATE_TTL) * 24 * 60 * 60 * 1000; if (ttl > 0) { indexStateDesc.enableTimeToLive(StateTtlConfigUtil.createTtlConfig((long) ttl)); } - indexState = context.getKeyedStateStore().getMapState(indexStateDesc); + indexState = context.getKeyedStateStore().getState(indexStateDesc); if (bootstrapIndex) { MapStateDescriptor partitionLoadStateDesc = new MapStateDescriptor<>("partitionLoadState", Types.STRING, Types.INT); @@ -191,13 +191,13 @@ public class BucketAssignFunction> // disabled by default. if (bootstrapIndex && !partitionLoadState.contains(partitionPath)) { // If the partition records are never loaded, load the records first. - loadRecords(partitionPath); + loadRecords(partitionPath, recordKey); } // Only changing records need looking up the index for the location, // append only records are always recognized as INSERT. - if (isChangingRecords && indexState.contains(recordKey)) { + HoodieRecordGlobalLocation oldLoc = indexState.value(); + if (isChangingRecords && oldLoc != null) { // Set up the instant time as "U" to mark the bucket as an update bucket. - HoodieRecordGlobalLocation oldLoc = this.indexState.get(recordKey); if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) { if (globalIndex) { // if partition path changes, emit a delete record for old partition path, @@ -209,7 +209,7 @@ public class BucketAssignFunction> out.collect((O) deleteRecord); } location = getNewRecordLocation(partitionPath); - updateIndexState(recordKey, partitionPath, location); + updateIndexState(partitionPath, location); } else { location = oldLoc.toLocal("U"); this.bucketAssigner.addUpdate(partitionPath, location.getFileId()); @@ -217,7 +217,7 @@ public class BucketAssignFunction> } else { location = getNewRecordLocation(partitionPath); if (isChangingRecords) { - updateIndexState(recordKey, partitionPath, location); + updateIndexState(partitionPath, location); } } record.setCurrentLocation(location); @@ -244,10 +244,9 @@ public class BucketAssignFunction> } private void updateIndexState( - String recordKey, String partitionPath, HoodieRecordLocation localLoc) throws Exception { - this.indexState.put(recordKey, HoodieRecordGlobalLocation.fromLocal(partitionPath, localLoc)); + this.indexState.update(HoodieRecordGlobalLocation.fromLocal(partitionPath, localLoc)); } @Override @@ -261,13 +260,18 @@ public class BucketAssignFunction> this.bucketAssigner.close(); } + public void setContext(BucketAssignOperator.Context context) { + this.context = context; + } + /** * Load all the indices of give partition path into the backup state. * * @param partitionPath The partition path + * @param curRecordKey The current record key * @throws Exception when error occurs for state update */ - private void loadRecords(String partitionPath) throws Exception { + private void loadRecords(String partitionPath, String curRecordKey) throws Exception { LOG.info("Start loading records under partition {} into the index state", partitionPath); HoodieTable hoodieTable = bucketAssigner.getTable(); BaseFileUtils fileUtils = BaseFileUtils.getInstance(hoodieTable.getBaseFileFormat()); @@ -294,14 +298,20 @@ public class BucketAssignFunction> boolean shouldLoad = KeyGroupRangeAssignment.assignKeyToParallelOperator( hoodieKey.getRecordKey(), maxParallelism, parallelism) == taskID; if (shouldLoad) { - this.indexState.put(hoodieKey.getRecordKey(), - new HoodieRecordGlobalLocation(hoodieKey.getPartitionPath(), baseFile.getCommitTime(), baseFile.getFileId())); + this.context.setCurrentKey(hoodieKey.getRecordKey()); + this.indexState.update( + new HoodieRecordGlobalLocation( + hoodieKey.getPartitionPath(), + baseFile.getCommitTime(), + baseFile.getFileId())); } } catch (Exception e) { LOG.error("Error when putting record keys into the state from file: {}", baseFile); } }); } + // recover the currentKey + this.context.setCurrentKey(curRecordKey); // Mark the partition path as loaded. partitionLoadState.put(partitionPath, 0); LOG.info("Finish loading records under partition {} into the index state", partitionPath); @@ -309,15 +319,7 @@ public class BucketAssignFunction> @VisibleForTesting public void clearIndexState() { + this.partitionLoadState.clear(); this.indexState.clear(); } - - @VisibleForTesting - public boolean isKeyInState(HoodieKey hoodieKey) { - try { - return this.indexState.contains(hoodieKey.getRecordKey()); - } catch (Exception e) { - throw new HoodieException(e); - } - } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignOperator.java new file mode 100644 index 000000000..3b3047b5d --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignOperator.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.partitioner; + +import org.apache.hudi.common.model.HoodieRecord; + +import org.apache.flink.streaming.api.operators.KeyedProcessOperator; + +/** + * Operator for {@link BucketAssignFunction}. + * + * @param The input type + */ +public class BucketAssignOperator, O extends HoodieRecord> + extends KeyedProcessOperator { + private final BucketAssignFunction function; + + public BucketAssignOperator(BucketAssignFunction function) { + super(function); + this.function = function; + } + + @Override + public void open() throws Exception { + super.open(); + this.function.setContext(new ContextImpl()); + } + + /** + * Context to give the function chance to operate the state handle. + */ + public interface Context { + void setCurrentKey(Object key); + } + + public class ContextImpl implements Context { + public void setCurrentKey(Object key) { + BucketAssignOperator.this.setCurrentKey(key); + } + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index b4109b3d5..02e76aae3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -29,6 +29,7 @@ import org.apache.hudi.sink.compact.CompactionCommitSink; import org.apache.hudi.sink.compact.CompactionPlanEvent; import org.apache.hudi.sink.compact.CompactionPlanOperator; import org.apache.hudi.sink.partitioner.BucketAssignFunction; +import org.apache.hudi.sink.partitioner.BucketAssignOperator; import org.apache.hudi.sink.transform.RowDataToHoodieFunction; import org.apache.hudi.util.StreamerUtil; @@ -80,7 +81,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, .transform( "bucket_assigner", TypeInformation.of(HoodieRecord.class), - new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) + new BucketAssignOperator<>(new BucketAssignFunction<>(conf))) .uid("uid_bucket_assigner") // shuffle by fileId(bucket id) .keyBy(record -> record.getCurrentLocation().getFileId()) diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java index 5256cd3d4..0590a2373 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java @@ -27,6 +27,7 @@ import org.apache.hudi.sink.compact.CompactionCommitSink; import org.apache.hudi.sink.compact.CompactionPlanEvent; import org.apache.hudi.sink.compact.CompactionPlanOperator; import org.apache.hudi.sink.partitioner.BucketAssignFunction; +import org.apache.hudi.sink.partitioner.BucketAssignOperator; import org.apache.hudi.sink.transform.RowDataToHoodieFunction; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.StreamerUtil; @@ -121,7 +122,7 @@ public class StreamWriteITCase extends TestLogger { .transform( "bucket_assigner", TypeInformation.of(HoodieRecord.class), - new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) + new BucketAssignOperator<>(new BucketAssignFunction<>(conf))) .uid("uid_bucket_assigner") // shuffle by fileId(bucket id) .keyBy(record -> record.getCurrentLocation().getFileId()) @@ -181,7 +182,7 @@ public class StreamWriteITCase extends TestLogger { .transform( "bucket_assigner", TypeInformation.of(HoodieRecord.class), - new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) + new BucketAssignOperator<>(new BucketAssignFunction<>(conf))) .uid("uid_bucket_assigner") // shuffle by fileId(bucket id) .keyBy(record -> record.getCurrentLocation().getFileId()) diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java index fe3dd818e..4edfe9fb1 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java @@ -55,7 +55,7 @@ public class CompactFunctionWrapper { private final MockFunctionInitializationContext functionInitializationContext; /** Function that generates the {@link HoodieCompactionPlan}. */ - private CompactionPlanOperator compactionPlanFunction; + private CompactionPlanOperator compactionPlanOperator; /** Function that executes the compaction task. */ private CompactFunction compactFunction; /** Stream sink to handle compaction commits. */ @@ -74,8 +74,8 @@ public class CompactFunctionWrapper { } public void openFunction() throws Exception { - compactionPlanFunction = new CompactionPlanOperator(conf); - compactionPlanFunction.open(); + compactionPlanOperator = new CompactionPlanOperator(conf); + compactionPlanOperator.open(); compactFunction = new CompactFunction(conf); compactFunction.setRuntimeContext(runtimeContext); @@ -118,8 +118,8 @@ public class CompactFunctionWrapper { } }; - compactionPlanFunction.setOutput(output); - compactionPlanFunction.notifyCheckpointComplete(checkpointID); + compactionPlanOperator.setOutput(output); + compactionPlanOperator.notifyCheckpointComplete(checkpointID); // collect the CompactCommitEvents List compactCommitEvents = new ArrayList<>(); for (CompactionPlanEvent event: events) { diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockOperatorStateStore.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockOperatorStateStore.java index ff71089f7..cc16be4f0 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockOperatorStateStore.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockOperatorStateStore.java @@ -49,6 +49,7 @@ public class MockOperatorStateStore implements KeyedStateStore, OperatorStateSto private Map lastSuccessStateMap; private MapState mapState; + private Map valueStateMap; public MockOperatorStateStore() { this.historyStateMap = new HashMap<>(); @@ -57,6 +58,7 @@ public class MockOperatorStateStore implements KeyedStateStore, OperatorStateSto this.lastSuccessStateMap = new HashMap<>(); this.mapState = new MockMapState<>(); + this.valueStateMap = new HashMap<>(); } @Override @@ -66,7 +68,9 @@ public class MockOperatorStateStore implements KeyedStateStore, OperatorStateSto @Override public ValueState getState(ValueStateDescriptor valueStateDescriptor) { - return null; + String name = valueStateDescriptor.getName(); + valueStateMap.putIfAbsent(name, new MockValueState()); + return valueStateMap.get(name); } @Override diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockValueState.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockValueState.java new file mode 100644 index 000000000..eac3e7a3d --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockValueState.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.utils; + +import org.apache.flink.api.common.state.ValueState; + +/** + * Mock value state for testing. + * + * @param Type of state value + */ +public class MockValueState implements ValueState { + private V v = null; + + @Override + public V value() { + return v; + } + + @Override + public void update(V value) { + this.v = value; + } + + @Override + public void clear() { + v = null; + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index 9f8852f28..74e43bad0 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -27,6 +27,7 @@ import org.apache.hudi.sink.StreamWriteFunction; import org.apache.hudi.sink.StreamWriteOperatorCoordinator; import org.apache.hudi.sink.event.BatchWriteSuccessEvent; import org.apache.hudi.sink.partitioner.BucketAssignFunction; +import org.apache.hudi.sink.partitioner.BucketAssignOperator; import org.apache.hudi.sink.transform.RowDataToHoodieFunction; import org.apache.hudi.utils.TestConfigurations; @@ -44,8 +45,10 @@ import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventG import org.apache.flink.table.data.RowData; import org.apache.flink.util.Collector; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; /** @@ -67,6 +70,8 @@ public class StreamWriteFunctionWrapper { private RowDataToHoodieFunction> toHoodieFunction; /** Function that assigns bucket ID. */ private BucketAssignFunction, HoodieRecord> bucketAssignerFunction; + /** BucketAssignOperator context. **/ + private MockBucketAssignOperatorContext bucketAssignOperatorContext; /** Stream write function. */ private StreamWriteFunction, Object> writeFunction; @@ -91,6 +96,7 @@ public class StreamWriteFunctionWrapper { this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext); this.functionInitializationContext = new MockFunctionInitializationContext(); this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf); + this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext(); } public void openFunction() throws Exception { @@ -103,6 +109,7 @@ public class StreamWriteFunctionWrapper { bucketAssignerFunction = new BucketAssignFunction<>(conf); bucketAssignerFunction.setRuntimeContext(runtimeContext); bucketAssignerFunction.open(conf); + bucketAssignerFunction.setContext(bucketAssignOperatorContext); bucketAssignerFunction.initializeState(this.functionInitializationContext); writeFunction = new StreamWriteFunction<>(conf); @@ -150,7 +157,7 @@ public class StreamWriteFunctionWrapper { return this.writeFunction.getWriteClient(); } - public void checkpointFunction(long checkpointId) throws Exception { + public void checkpointFunction(long checkpointId) { // checkpoint the coordinator first this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>()); bucketAssignerFunction.snapshotState(null); @@ -192,13 +199,35 @@ public class StreamWriteFunctionWrapper { public void clearIndexState() { this.bucketAssignerFunction.clearIndexState(); + this.bucketAssignOperatorContext.clearIndexState(); } public boolean isKeyInState(HoodieKey hoodieKey) { - return this.bucketAssignerFunction.isKeyInState(hoodieKey); + return this.bucketAssignOperatorContext.isKeyInState(hoodieKey.getRecordKey()); } public boolean isConforming() { return this.writeFunction.isConfirming(); } + + // ------------------------------------------------------------------------- + // Inner Class + // ------------------------------------------------------------------------- + + private static class MockBucketAssignOperatorContext implements BucketAssignOperator.Context { + private final Set updateKeys = new HashSet<>(); + + @Override + public void setCurrentKey(Object key) { + this.updateKeys.add(key); + } + + public void clearIndexState() { + this.updateKeys.clear(); + } + + public boolean isKeyInState(String key) { + return this.updateKeys.contains(key); + } + } }