1
0

[HUDI-3559] Flink bucket index with COW table throws NoSuchElementException

Actually method FlinkWriteHelper#deduplicateRecords does not guarantee the records sequence, but there is a
implicit constraint: all the records in one bucket should have the same bucket type(instant time here),
the BucketStreamWriteFunction breaks the rule and fails to comply with this constraint.

close apache/hudi#5018
This commit is contained in:
wxp4532
2022-03-11 14:07:52 +08:00
committed by yuzhao.cyz
parent a118d56b07
commit 26e5d2e6fc
3 changed files with 114 additions and 89 deletions

View File

@@ -27,7 +27,6 @@ import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
@@ -91,13 +90,11 @@ public class FlinkWriteHelper<T extends HoodieRecordPayload, R> extends BaseWrit
@Override
public List<HoodieRecord<T>> deduplicateRecords(
List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) {
Map<Object, List<Pair<Object, HoodieRecord<T>>>> keyedRecords = records.stream().map(record -> {
// If index used is global, then records are expected to differ in their partitionPath
final Object key = record.getKey().getRecordKey();
return Pair.of(key, record);
}).collect(Collectors.groupingBy(Pair::getLeft));
// If index used is global, then records are expected to differ in their partitionPath
Map<Object, List<HoodieRecord<T>>> keyedRecords = records.stream()
.collect(Collectors.groupingBy(record -> record.getKey().getRecordKey()));
return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
return keyedRecords.values().stream().map(x -> x.stream().reduce((rec1, rec2) -> {
final T data1 = rec1.getData();
final T data2 = rec2.getData();

View File

@@ -65,7 +65,17 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
private String indexKeyFields;
private final HashMap<String, String> bucketToFileIDMap;
/**
* BucketID to file group mapping.
*/
private HashMap<String, String> bucketIndex;
/**
* Incremental bucket index of the current checkpoint interval,
* it is needed because the bucket type('I' or 'U') should be decided based on the committed files view,
* all the records in one bucket should have the same bucket type.
*/
private HashMap<String, String> incBucketIndex;
/**
* Constructs a BucketStreamWriteFunction.
@@ -74,7 +84,6 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
*/
public BucketStreamWriteFunction(Configuration config) {
super(config);
this.bucketToFileIDMap = new HashMap<>();
}
@Override
@@ -85,6 +94,8 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
this.maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
this.bucketIndex = new HashMap<>();
this.incBucketIndex = new HashMap<>();
bootstrapIndex();
}
@@ -94,6 +105,13 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
this.table = this.writeClient.getHoodieTable();
}
@Override
public void snapshotState() {
super.snapshotState();
this.bucketIndex.putAll(this.incBucketIndex);
this.incBucketIndex.clear();
}
@Override
public void processElement(I i, ProcessFunction<I, Object>.Context context, Collector<Object> collector) throws Exception {
HoodieRecord<?> record = (HoodieRecord<?>) i;
@@ -103,12 +121,12 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
final int bucketNum = BucketIdentifier.getBucketId(hoodieKey, indexKeyFields, this.bucketNum);
final String partitionBucketId = BucketIdentifier.partitionBucketIdStr(hoodieKey.getPartitionPath(), bucketNum);
if (bucketToFileIDMap.containsKey(partitionBucketId)) {
location = new HoodieRecordLocation("U", bucketToFileIDMap.get(partitionBucketId));
if (bucketIndex.containsKey(partitionBucketId)) {
location = new HoodieRecordLocation("U", bucketIndex.get(partitionBucketId));
} else {
String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum);
location = new HoodieRecordLocation("I", newFileId);
bucketToFileIDMap.put(partitionBucketId, newFileId);
incBucketIndex.put(partitionBucketId, newFileId);
}
record.unseal();
record.setCurrentLocation(location);
@@ -154,12 +172,12 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
if (bucketToLoad.contains(bucketNumber)) {
String partitionBucketId = BucketIdentifier.partitionBucketIdStr(partitionPath, bucketNumber);
LOG.info(String.format("Should load this partition bucket %s with fileID %s", partitionBucketId, fileID));
if (bucketToFileIDMap.containsKey(partitionBucketId)) {
if (bucketIndex.containsKey(partitionBucketId)) {
throw new RuntimeException(String.format("Duplicate fileID %s from partitionBucket %s found "
+ "during the BucketStreamWriteFunction index bootstrap.", fileID, partitionBucketId));
} else {
LOG.info(String.format("Adding fileID %s to the partition bucket %s.", fileID, partitionBucketId));
bucketToFileIDMap.put(partitionBucketId, fileID);
bucketIndex.put(partitionBucketId, fileID);
}
}
}

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.sink;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.transform.ChainedTransformer;
import org.apache.hudi.sink.transform.Transformer;
@@ -92,8 +93,20 @@ public class ITTestDataStreamWrite extends TestLogger {
@TempDir
File tempFile;
@ParameterizedTest
@ValueSource(strings = {"BUCKET", "FLINK_STATE"})
public void testWriteCopyOnWrite(String indexType) throws Exception {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setString(FlinkOptions.INDEX_TYPE, indexType);
conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 1);
conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id");
conf.setBoolean(FlinkOptions.PRE_COMBINE,true);
testWriteToHoodie(conf, "cow_write", 1, EXPECTED);
}
@Test
public void testTransformerBeforeWriting() throws Exception {
public void testWriteCopyOnWriteWithTransformer() throws Exception {
Transformer transformer = (ds) -> ds.map((rowdata) -> {
if (rowdata instanceof GenericRowData) {
GenericRowData genericRD = (GenericRowData) rowdata;
@@ -105,97 +118,63 @@ public class ITTestDataStreamWrite extends TestLogger {
}
});
testWriteToHoodie(transformer, EXPECTED_TRANSFORMER);
testWriteToHoodie(transformer, "cow_write_with_transformer", EXPECTED_TRANSFORMER);
}
@Test
public void testChainedTransformersBeforeWriting() throws Exception {
Transformer t1 = (ds) -> ds.map((rowdata) -> {
if (rowdata instanceof GenericRowData) {
GenericRowData genericRD = (GenericRowData) rowdata;
public void testWriteCopyOnWriteWithChainedTransformer() throws Exception {
Transformer t1 = (ds) -> ds.map(rowData -> {
if (rowData instanceof GenericRowData) {
GenericRowData genericRD = (GenericRowData) rowData;
//update age field to age + 1
genericRD.setField(2, genericRD.getInt(2) + 1);
return genericRD;
} else {
throw new RuntimeException("Unrecognized row type : " + rowdata.getClass().getSimpleName());
throw new RuntimeException("Unrecognized row type : " + rowData.getClass().getSimpleName());
}
});
ChainedTransformer chainedTransformer = new ChainedTransformer(Arrays.asList(t1, t1));
testWriteToHoodie(chainedTransformer, EXPECTED_CHAINED_TRANSFORMER);
}
@Test
public void testWriteToHoodieWithoutTransformer() throws Exception {
testWriteToHoodie(null, EXPECTED);
testWriteToHoodie(chainedTransformer, "cow_write_with_chained_transformer", EXPECTED_CHAINED_TRANSFORMER);
}
@ParameterizedTest
@ValueSource(strings = {"BUCKET", "FLINK_STATE"})
public void testMergeOnReadWriteWithCompaction(String indexType) throws Exception {
int parallelism = 4;
public void testWriteMergeOnReadWithCompaction(String indexType) throws Exception {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setString(FlinkOptions.INDEX_TYPE, indexType);
conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id");
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
execEnv.getConfig().disableObjectReuse();
execEnv.setParallelism(parallelism);
// set up checkpoint interval
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// Read from file source
RowType rowType =
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
.getLogicalType();
JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
rowType,
InternalTypeInfo.of(rowType),
false,
true,
TimestampFormat.ISO_8601
);
String sourcePath = Objects.requireNonNull(Thread.currentThread()
.getContextClassLoader().getResource("test_source.data")).toString();
TextInputFormat format = new TextInputFormat(new Path(sourcePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
format.setCharsetName("UTF-8");
DataStream<RowData> dataStream = execEnv
// use PROCESS_CONTINUOUSLY mode to trigger checkpoint
.readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
.setParallelism(parallelism);
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream);
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
Pipelines.clean(conf, pipeline);
Pipelines.compact(conf, pipeline);
JobClient client = execEnv.executeAsync("mor-write-with-compact");
if (client.getJobStatus().get() != JobStatus.FAILED) {
try {
TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction to finish
client.cancel();
} catch (Throwable var1) {
// ignored
}
}
TestData.checkWrittenFullData(tempFile, EXPECTED);
testWriteToHoodie(conf, "mor_write_with_compact", 1, EXPECTED);
}
private void testWriteToHoodie(
Transformer transformer,
String jobName,
Map<String, List<String>> expected) throws Exception {
testWriteToHoodie(TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()),
Option.of(transformer), jobName, 2, expected);
}
private void testWriteToHoodie(
Configuration conf,
String jobName,
int checkpoints,
Map<String, List<String>> expected) throws Exception {
testWriteToHoodie(conf, Option.empty(), jobName, checkpoints, expected);
}
private void testWriteToHoodie(
Configuration conf,
Option<Transformer> transformer,
String jobName,
int checkpoints,
Map<String, List<String>> expected) throws Exception {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
execEnv.getConfig().disableObjectReuse();
execEnv.setParallelism(4);
@@ -218,16 +197,32 @@ public class ITTestDataStreamWrite extends TestLogger {
String sourcePath = Objects.requireNonNull(Thread.currentThread()
.getContextClassLoader().getResource("test_source.data")).toString();
DataStream<RowData> dataStream = execEnv
// use continuous file source to trigger checkpoint
.addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), 2))
.name("continuous_file_source")
.setParallelism(1)
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
.setParallelism(4);
boolean isMor = conf.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.MERGE_ON_READ.name());
if (transformer != null) {
dataStream = transformer.apply(dataStream);
DataStream<RowData> dataStream;
if (isMor) {
TextInputFormat format = new TextInputFormat(new Path(sourcePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
format.setCharsetName("UTF-8");
dataStream = execEnv
// use PROCESS_CONTINUOUSLY mode to trigger checkpoint
.readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
.setParallelism(1);
} else {
dataStream = execEnv
// use continuous file source to trigger checkpoint
.addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), checkpoints))
.name("continuous_file_source")
.setParallelism(1)
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
.setParallelism(4);
}
if (transformer.isPresent()) {
dataStream = transformer.get().apply(dataStream);
}
int parallelism = execEnv.getParallelism();
@@ -235,9 +230,24 @@ public class ITTestDataStreamWrite extends TestLogger {
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
execEnv.addOperator(pipeline.getTransformation());
JobClient client = execEnv.executeAsync(conf.getString(FlinkOptions.TABLE_NAME));
// wait for the streaming job to finish
client.getJobExecutionResult().get();
if (isMor) {
Pipelines.clean(conf, pipeline);
Pipelines.compact(conf, pipeline);
}
JobClient client = execEnv.executeAsync(jobName);
if (isMor) {
if (client.getJobStatus().get() != JobStatus.FAILED) {
try {
TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction to finish
client.cancel();
} catch (Throwable var1) {
// ignored
}
}
} else {
// wait for the streaming job to finish
client.getJobExecutionResult().get();
}
TestData.checkWrittenFullData(tempFile, expected);