From 26e5d2e6fcc399e7886594a5454a16d29aaa8702 Mon Sep 17 00:00:00 2001 From: wxp4532 Date: Fri, 11 Mar 2022 14:07:52 +0800 Subject: [PATCH] [HUDI-3559] Flink bucket index with COW table throws NoSuchElementException Actually method FlinkWriteHelper#deduplicateRecords does not guarantee the records sequence, but there is a implicit constraint: all the records in one bucket should have the same bucket type(instant time here), the BucketStreamWriteFunction breaks the rule and fails to comply with this constraint. close apache/hudi#5018 --- .../table/action/commit/FlinkWriteHelper.java | 11 +- .../hudi/sink/BucketStreamWriteFunction.java | 32 +++- .../hudi/sink/ITTestDataStreamWrite.java | 160 ++++++++++-------- 3 files changed, 114 insertions(+), 89 deletions(-) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java index d28aafcc4..66723a3fc 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java @@ -27,7 +27,6 @@ import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; @@ -91,13 +90,11 @@ public class FlinkWriteHelper extends BaseWrit @Override public List> deduplicateRecords( List> records, HoodieIndex index, int parallelism) { - Map>>> keyedRecords = records.stream().map(record -> { - // If index used is global, then records are expected to differ in their partitionPath - final Object key = record.getKey().getRecordKey(); - return Pair.of(key, record); - }).collect(Collectors.groupingBy(Pair::getLeft)); + // If index used is global, then records are expected to differ in their partitionPath + Map>> keyedRecords = records.stream() + .collect(Collectors.groupingBy(record -> record.getKey().getRecordKey())); - return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> { + return keyedRecords.values().stream().map(x -> x.stream().reduce((rec1, rec2) -> { final T data1 = rec1.getData(); final T data2 = rec2.getData(); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java index 057c79433..4c9e4dc25 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java @@ -65,7 +65,17 @@ public class BucketStreamWriteFunction extends StreamWriteFunction { private String indexKeyFields; - private final HashMap bucketToFileIDMap; + /** + * BucketID to file group mapping. + */ + private HashMap bucketIndex; + + /** + * Incremental bucket index of the current checkpoint interval, + * it is needed because the bucket type('I' or 'U') should be decided based on the committed files view, + * all the records in one bucket should have the same bucket type. + */ + private HashMap incBucketIndex; /** * Constructs a BucketStreamWriteFunction. @@ -74,7 +84,6 @@ public class BucketStreamWriteFunction extends StreamWriteFunction { */ public BucketStreamWriteFunction(Configuration config) { super(config); - this.bucketToFileIDMap = new HashMap<>(); } @Override @@ -85,6 +94,8 @@ public class BucketStreamWriteFunction extends StreamWriteFunction { this.taskID = getRuntimeContext().getIndexOfThisSubtask(); this.parallelism = getRuntimeContext().getNumberOfParallelSubtasks(); this.maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks(); + this.bucketIndex = new HashMap<>(); + this.incBucketIndex = new HashMap<>(); bootstrapIndex(); } @@ -94,6 +105,13 @@ public class BucketStreamWriteFunction extends StreamWriteFunction { this.table = this.writeClient.getHoodieTable(); } + @Override + public void snapshotState() { + super.snapshotState(); + this.bucketIndex.putAll(this.incBucketIndex); + this.incBucketIndex.clear(); + } + @Override public void processElement(I i, ProcessFunction.Context context, Collector collector) throws Exception { HoodieRecord record = (HoodieRecord) i; @@ -103,12 +121,12 @@ public class BucketStreamWriteFunction extends StreamWriteFunction { final int bucketNum = BucketIdentifier.getBucketId(hoodieKey, indexKeyFields, this.bucketNum); final String partitionBucketId = BucketIdentifier.partitionBucketIdStr(hoodieKey.getPartitionPath(), bucketNum); - if (bucketToFileIDMap.containsKey(partitionBucketId)) { - location = new HoodieRecordLocation("U", bucketToFileIDMap.get(partitionBucketId)); + if (bucketIndex.containsKey(partitionBucketId)) { + location = new HoodieRecordLocation("U", bucketIndex.get(partitionBucketId)); } else { String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum); location = new HoodieRecordLocation("I", newFileId); - bucketToFileIDMap.put(partitionBucketId, newFileId); + incBucketIndex.put(partitionBucketId, newFileId); } record.unseal(); record.setCurrentLocation(location); @@ -154,12 +172,12 @@ public class BucketStreamWriteFunction extends StreamWriteFunction { if (bucketToLoad.contains(bucketNumber)) { String partitionBucketId = BucketIdentifier.partitionBucketIdStr(partitionPath, bucketNumber); LOG.info(String.format("Should load this partition bucket %s with fileID %s", partitionBucketId, fileID)); - if (bucketToFileIDMap.containsKey(partitionBucketId)) { + if (bucketIndex.containsKey(partitionBucketId)) { throw new RuntimeException(String.format("Duplicate fileID %s from partitionBucket %s found " + "during the BucketStreamWriteFunction index bootstrap.", fileID, partitionBucketId)); } else { LOG.info(String.format("Adding fileID %s to the partition bucket %s.", fileID, partitionBucketId)); - bucketToFileIDMap.put(partitionBucketId, fileID); + bucketIndex.put(partitionBucketId, fileID); } } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java index bbf9009fd..b9deb43a9 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java @@ -20,6 +20,7 @@ package org.apache.hudi.sink; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.transform.ChainedTransformer; import org.apache.hudi.sink.transform.Transformer; @@ -92,8 +93,20 @@ public class ITTestDataStreamWrite extends TestLogger { @TempDir File tempFile; + @ParameterizedTest + @ValueSource(strings = {"BUCKET", "FLINK_STATE"}) + public void testWriteCopyOnWrite(String indexType) throws Exception { + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setString(FlinkOptions.INDEX_TYPE, indexType); + conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 1); + conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id"); + conf.setBoolean(FlinkOptions.PRE_COMBINE,true); + + testWriteToHoodie(conf, "cow_write", 1, EXPECTED); + } + @Test - public void testTransformerBeforeWriting() throws Exception { + public void testWriteCopyOnWriteWithTransformer() throws Exception { Transformer transformer = (ds) -> ds.map((rowdata) -> { if (rowdata instanceof GenericRowData) { GenericRowData genericRD = (GenericRowData) rowdata; @@ -105,97 +118,63 @@ public class ITTestDataStreamWrite extends TestLogger { } }); - testWriteToHoodie(transformer, EXPECTED_TRANSFORMER); + testWriteToHoodie(transformer, "cow_write_with_transformer", EXPECTED_TRANSFORMER); } @Test - public void testChainedTransformersBeforeWriting() throws Exception { - Transformer t1 = (ds) -> ds.map((rowdata) -> { - if (rowdata instanceof GenericRowData) { - GenericRowData genericRD = (GenericRowData) rowdata; + public void testWriteCopyOnWriteWithChainedTransformer() throws Exception { + Transformer t1 = (ds) -> ds.map(rowData -> { + if (rowData instanceof GenericRowData) { + GenericRowData genericRD = (GenericRowData) rowData; //update age field to age + 1 genericRD.setField(2, genericRD.getInt(2) + 1); return genericRD; } else { - throw new RuntimeException("Unrecognized row type : " + rowdata.getClass().getSimpleName()); + throw new RuntimeException("Unrecognized row type : " + rowData.getClass().getSimpleName()); } }); ChainedTransformer chainedTransformer = new ChainedTransformer(Arrays.asList(t1, t1)); - testWriteToHoodie(chainedTransformer, EXPECTED_CHAINED_TRANSFORMER); - } - - @Test - public void testWriteToHoodieWithoutTransformer() throws Exception { - testWriteToHoodie(null, EXPECTED); + testWriteToHoodie(chainedTransformer, "cow_write_with_chained_transformer", EXPECTED_CHAINED_TRANSFORMER); } @ParameterizedTest @ValueSource(strings = {"BUCKET", "FLINK_STATE"}) - public void testMergeOnReadWriteWithCompaction(String indexType) throws Exception { - int parallelism = 4; + public void testWriteMergeOnReadWithCompaction(String indexType) throws Exception { Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); conf.setString(FlinkOptions.INDEX_TYPE, indexType); conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4); conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id"); conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1); conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name()); - StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - execEnv.getConfig().disableObjectReuse(); - execEnv.setParallelism(parallelism); - // set up checkpoint interval - execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); - execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); - // Read from file source - RowType rowType = - (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)) - .getLogicalType(); - - JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( - rowType, - InternalTypeInfo.of(rowType), - false, - true, - TimestampFormat.ISO_8601 - ); - String sourcePath = Objects.requireNonNull(Thread.currentThread() - .getContextClassLoader().getResource("test_source.data")).toString(); - - TextInputFormat format = new TextInputFormat(new Path(sourcePath)); - format.setFilesFilter(FilePathFilter.createDefaultFilter()); - TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO; - format.setCharsetName("UTF-8"); - - DataStream dataStream = execEnv - // use PROCESS_CONTINUOUSLY mode to trigger checkpoint - .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo) - .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) - .setParallelism(parallelism); - - DataStream hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream); - DataStream pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream); - Pipelines.clean(conf, pipeline); - Pipelines.compact(conf, pipeline); - JobClient client = execEnv.executeAsync("mor-write-with-compact"); - if (client.getJobStatus().get() != JobStatus.FAILED) { - try { - TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction to finish - client.cancel(); - } catch (Throwable var1) { - // ignored - } - } - - TestData.checkWrittenFullData(tempFile, EXPECTED); + testWriteToHoodie(conf, "mor_write_with_compact", 1, EXPECTED); } private void testWriteToHoodie( Transformer transformer, + String jobName, + Map> expected) throws Exception { + testWriteToHoodie(TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), + Option.of(transformer), jobName, 2, expected); + } + + private void testWriteToHoodie( + Configuration conf, + String jobName, + int checkpoints, + Map> expected) throws Exception { + testWriteToHoodie(conf, Option.empty(), jobName, checkpoints, expected); + } + + private void testWriteToHoodie( + Configuration conf, + Option transformer, + String jobName, + int checkpoints, Map> expected) throws Exception { - Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); execEnv.getConfig().disableObjectReuse(); execEnv.setParallelism(4); @@ -218,16 +197,32 @@ public class ITTestDataStreamWrite extends TestLogger { String sourcePath = Objects.requireNonNull(Thread.currentThread() .getContextClassLoader().getResource("test_source.data")).toString(); - DataStream dataStream = execEnv - // use continuous file source to trigger checkpoint - .addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), 2)) - .name("continuous_file_source") - .setParallelism(1) - .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) - .setParallelism(4); + boolean isMor = conf.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.MERGE_ON_READ.name()); - if (transformer != null) { - dataStream = transformer.apply(dataStream); + DataStream dataStream; + if (isMor) { + TextInputFormat format = new TextInputFormat(new Path(sourcePath)); + format.setFilesFilter(FilePathFilter.createDefaultFilter()); + TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO; + format.setCharsetName("UTF-8"); + + dataStream = execEnv + // use PROCESS_CONTINUOUSLY mode to trigger checkpoint + .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo) + .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) + .setParallelism(1); + } else { + dataStream = execEnv + // use continuous file source to trigger checkpoint + .addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), checkpoints)) + .name("continuous_file_source") + .setParallelism(1) + .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) + .setParallelism(4); + } + + if (transformer.isPresent()) { + dataStream = transformer.get().apply(dataStream); } int parallelism = execEnv.getParallelism(); @@ -235,9 +230,24 @@ public class ITTestDataStreamWrite extends TestLogger { DataStream pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream); execEnv.addOperator(pipeline.getTransformation()); - JobClient client = execEnv.executeAsync(conf.getString(FlinkOptions.TABLE_NAME)); - // wait for the streaming job to finish - client.getJobExecutionResult().get(); + if (isMor) { + Pipelines.clean(conf, pipeline); + Pipelines.compact(conf, pipeline); + } + JobClient client = execEnv.executeAsync(jobName); + if (isMor) { + if (client.getJobStatus().get() != JobStatus.FAILED) { + try { + TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction to finish + client.cancel(); + } catch (Throwable var1) { + // ignored + } + } + } else { + // wait for the streaming job to finish + client.getJobExecutionResult().get(); + } TestData.checkWrittenFullData(tempFile, expected);