From 9b01d2f864e5cc4a559cfd4199136bca0979b095 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Thu, 20 May 2021 10:20:08 +0800 Subject: [PATCH] [HUDI-1915] Fix the file id for write data buffer before flushing (#2966) --- .../table/HoodieFlinkMergeOnReadTable.java | 8 ++ ...linkMergeOnReadRollbackActionExecutor.java | 77 +++++++++++++++++++ .../hudi/configuration/FlinkOptions.java | 2 +- .../apache/hudi/sink/StreamWriteFunction.java | 24 +++--- .../hudi/sink/TestWriteCopyOnWrite.java | 64 +++++++++++++++ .../java/org/apache/hudi/utils/TestData.java | 7 ++ 6 files changed, 171 insertions(+), 11 deletions(-) create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMergeOnReadRollbackActionExecutor.java diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java index b7f177b72..8bcb979aa 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java @@ -19,11 +19,13 @@ package org.apache.hudi.table; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; @@ -34,6 +36,7 @@ import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.commit.delta.FlinkUpsertDeltaCommitActionExecutor; import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor; import org.apache.hudi.table.action.compact.FlinkScheduleCompactionActionExecutor; +import org.apache.hudi.table.action.rollback.FlinkMergeOnReadRollbackActionExecutor; import java.util.List; import java.util.Map; @@ -87,5 +90,10 @@ public class HoodieFlinkMergeOnReadTable throw new HoodieNotSupportedException("Compaction is supported as a separate pipeline, " + "should not invoke directly through HoodieFlinkMergeOnReadTable"); } + + @Override + public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) { + return new FlinkMergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute(); + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMergeOnReadRollbackActionExecutor.java new file mode 100644 index 000000000..25b20a507 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMergeOnReadRollbackActionExecutor.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.rollback; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.HoodieRollbackStat; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.HoodieTable; + +import java.io.IOException; +import java.util.List; + +@SuppressWarnings("checkstyle:LineLength") +public class FlinkMergeOnReadRollbackActionExecutor extends + BaseMergeOnReadRollbackActionExecutor>, List, List> { + public FlinkMergeOnReadRollbackActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable>, List, List> table, + String instantTime, + HoodieInstant commitInstant, + boolean deleteInstants) { + super(context, config, table, instantTime, commitInstant, deleteInstants); + } + + public FlinkMergeOnReadRollbackActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable>, List, List> table, + String instantTime, + HoodieInstant commitInstant, + boolean deleteInstants, + boolean skipTimelinePublish, + boolean useMarkerBasedStrategy) { + super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy); + } + + @Override + protected RollbackStrategy getRollbackStrategy() { + if (useMarkerBasedStrategy) { + return new FlinkMarkerBasedRollbackStrategy(table, context, config, instantTime); + } else { + return this::executeRollbackUsingFileListing; + } + } + + @Override + protected List executeRollbackUsingFileListing(HoodieInstant resolvedInstant) { + List rollbackRequests; + try { + rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(resolvedInstant, table, context); + } catch (IOException e) { + throw new HoodieIOException("Error generating rollback requests by file listing.", e); + } + return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, resolvedInstant, rollbackRequests); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 3f1b17bc0..4f1be0bdb 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -91,7 +91,7 @@ public class FlinkOptions { .booleanType() .defaultValue(false) .withDescription("Whether to update index for the old partition path\n" - + "if same key record with different partition path came in, default true"); + + "if same key record with different partition path came in, default false"); // ------------------------------------------------------------------------ // Read Options diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 2a38f310b..a1b3346c0 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -308,26 +308,28 @@ public class StreamWriteFunction } /** - * Prepare the write data buffer: - * - *
    - *
  • Patch up all the records with correct partition path;
  • - *
  • Patch up the first record with correct partition path and fileID.
  • - *
+ * Prepare the write data buffer: patch up all the records with correct partition path. */ public List writeBuffer() { // rewrite all the records with new record key - List recordList = records.stream() + return records.stream() .map(record -> record.toHoodieRecord(partitionPath)) .collect(Collectors.toList()); + } + + /** + * Sets up before flush: patch up the first record with correct partition path and fileID. + * + *

Note: the method may modify the given records {@code records}. + */ + public void preWrite(List records) { // rewrite the first record with expected fileID - HoodieRecord first = recordList.get(0); + HoodieRecord first = records.get(0); HoodieRecord record = new HoodieRecord<>(first.getKey(), first.getData()); HoodieRecordLocation newLoc = new HoodieRecordLocation(first.getCurrentLocation().getInstantTime(), fileID); record.setCurrentLocation(newLoc); - recordList.set(0, record); - return recordList; + records.set(0, record); } public void reset() { @@ -469,6 +471,7 @@ public class StreamWriteFunction if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) { records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); } + bucket.preWrite(records); final List writeStatus = new ArrayList<>(writeFunction.apply(records, instant)); final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder() .taskID(taskID) @@ -500,6 +503,7 @@ public class StreamWriteFunction if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) { records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); } + bucket.preWrite(records); writeStatus.addAll(writeFunction.apply(records, currentInstant)); bucket.reset(); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index b962afb57..d2d04ee27 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -436,6 +436,70 @@ public class TestWriteCopyOnWrite { checkWrittenData(tempFile, expected, 1); } + @Test + public void testInsertWithDeduplication() throws Exception { + // reset the config option + conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size + conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true); + funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); + + // open the function and ingest data + funcWrapper.openFunction(); + // Each record is 208 bytes. so 4 records expect to trigger a mini-batch write + for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) { + funcWrapper.invoke(rowData); + } + + Map> dataBuffer = funcWrapper.getDataBuffer(); + assertThat("Should have 1 data bucket", dataBuffer.size(), is(1)); + assertThat("3 records expect to flush out as a mini-batch", + dataBuffer.values().stream().findFirst().map(List::size).orElse(-1), + is(2)); + + // this triggers the data write and event send + funcWrapper.checkpointFunction(1); + dataBuffer = funcWrapper.getDataBuffer(); + assertThat("All data should be flushed out", dataBuffer.size(), is(0)); + + final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first + final OperatorEvent event2 = funcWrapper.getNextEvent(); + assertThat("The operator expect to send an event", event2, instanceOf(BatchWriteSuccessEvent.class)); + + funcWrapper.getCoordinator().handleEventFromOperator(0, event1); + funcWrapper.getCoordinator().handleEventFromOperator(0, event2); + assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); + + String instant = funcWrapper.getWriteClient() + .getLastPendingInstant(getTableType()); + + funcWrapper.checkpointComplete(1); + + Map expected = new HashMap<>(); + expected.put("par1", "[id1,par1,id1,Danny,23,4,par1]"); + + checkWrittenData(tempFile, expected, 1); + + // started a new instant already + checkInflightInstant(funcWrapper.getWriteClient()); + checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); + + // insert duplicates again + for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) { + funcWrapper.invoke(rowData); + } + + funcWrapper.checkpointFunction(2); + + final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first + final OperatorEvent event4 = funcWrapper.getNextEvent(); + funcWrapper.getCoordinator().handleEventFromOperator(0, event3); + funcWrapper.getCoordinator().handleEventFromOperator(0, event4); + funcWrapper.checkpointComplete(2); + + // Same the original base file content. + checkWrittenData(tempFile, expected, 1); + } + @Test public void testInsertWithSmallBufferSize() throws Exception { // reset the config option diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index bb6766180..fae076501 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -121,6 +121,13 @@ public class TestData { TimestampData.fromEpochMillis(1), StringData.fromString("par1")))); } + public static List DATA_SET_INSERT_SAME_KEY = new ArrayList<>(); + static { + IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_SAME_KEY.add( + insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(i), StringData.fromString("par1")))); + } + // data set of test_source.data public static List DATA_SET_SOURCE_INSERT = Arrays.asList( insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,