From 783c9cb3693ff7d82cc3d43a4964c64a4da19436 Mon Sep 17 00:00:00 2001 From: yuzhaojing <32435329+yuzhaojing@users.noreply.github.com> Date: Sat, 10 Jul 2021 14:49:35 +0800 Subject: [PATCH] [HUDI-2087] Support Append only in Flink stream (#3252) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 喻兆靖 --- .../hudi/table/action/commit/BucketType.java | 2 +- .../hudi/client/HoodieFlinkWriteClient.java | 10 +- .../org/apache/hudi/io/FlinkAppendHandle.java | 3 +- .../org/apache/hudi/io/FlinkCreateHandle.java | 12 +++ .../commit/BaseFlinkCommitActionExecutor.java | 5 +- .../hudi/configuration/FlinkOptions.java | 6 ++ .../partitioner/BucketAssignFunction.java | 44 +++++++-- .../hudi/sink/partitioner/BucketAssigner.java | 10 +- .../hudi/streamer/FlinkStreamerConfig.java | 11 ++- .../apache/hudi/table/HoodieTableFactory.java | 6 ++ .../hudi/sink/TestWriteCopyOnWrite.java | 95 +++++++++++++++++-- .../hudi/sink/TestWriteMergeOnRead.java | 15 +++ .../sink/TestWriteMergeOnReadWithCompact.java | 20 +++- .../java/org/apache/hudi/utils/TestData.java | 33 +++++++ 14 files changed, 243 insertions(+), 29 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java index 70ee473d2..e1fd1618b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java @@ -19,5 +19,5 @@ package org.apache.hudi.table.action.commit; public enum BucketType { - UPDATE, INSERT + UPDATE, INSERT, APPEND_ONLY } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 05e4481ec..71ca1b6e8 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -56,6 +56,7 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTimelineArchiveLog; import org.apache.hudi.table.MarkerFiles; import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hudi.table.action.commit.BucketType; import org.apache.hudi.table.action.compact.FlinkCompactHelpers; import org.apache.hudi.table.upgrade.FlinkUpgradeDowngrade; import org.apache.hudi.util.FlinkClientUtil; @@ -408,6 +409,12 @@ public class HoodieFlinkWriteClient extends final HoodieRecordLocation loc = record.getCurrentLocation(); final String fileID = loc.getFileId(); final String partitionPath = record.getPartitionPath(); + // append only mode always use FlinkCreateHandle + if (loc.getInstantTime().equals(BucketType.APPEND_ONLY.name())) { + return new FlinkCreateHandle<>(config, instantTime, table, partitionPath, + fileID, table.getTaskContextSupplier()); + } + if (bucketToHandles.containsKey(fileID)) { MiniBatchHandle lastHandle = (MiniBatchHandle) bucketToHandles.get(fileID); if (lastHandle.shouldReplace()) { @@ -424,7 +431,8 @@ public class HoodieFlinkWriteClient extends if (isDelta) { writeHandle = new FlinkAppendHandle<>(config, instantTime, table, partitionPath, fileID, recordItr, table.getTaskContextSupplier()); - } else if (loc.getInstantTime().equals("I")) { + } else if (loc.getInstantTime().equals(BucketType.INSERT.name()) || loc.getInstantTime().equals(BucketType.APPEND_ONLY.name())) { + // use the same handle for insert bucket writeHandle = new FlinkCreateHandle<>(config, instantTime, table, partitionPath, fileID, table.getTaskContextSupplier()); } else { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java index 987f3350d..41d06667f 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.MarkerFiles; +import org.apache.hudi.table.action.commit.BucketType; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; @@ -80,7 +81,7 @@ public class FlinkAppendHandle @Override protected boolean isUpdateRecord(HoodieRecord hoodieRecord) { return hoodieRecord.getCurrentLocation() != null - && hoodieRecord.getCurrentLocation().getInstantTime().equals("U"); + && hoodieRecord.getCurrentLocation().getInstantTime().equals(BucketType.UPDATE.name()); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java index 3ff579fed..49d8918b9 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java @@ -30,6 +30,7 @@ import org.apache.hudi.table.HoodieTable; import org.apache.avro.Schema; import org.apache.hadoop.fs.Path; +import org.apache.hudi.table.MarkerFiles; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -70,6 +71,17 @@ public class FlinkCreateHandle } } + @Override + protected void createMarkerFile(String partitionPath, String dataFileName) { + // In some rare cases, the task was pulled up again with same write file name, + // for e.g, reuse the small log files from last commit instant. + + // Just skip the marker file creation if it already exists, the new data would append to + // the file directly. + MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime); + markerFiles.createIfNotExists(partitionPath, dataFileName, getIOType()); + } + /** * The flink checkpoints start in sequence and asynchronously, when one write task finish the checkpoint(A) * (thus the fs view got the written data files some of which may be invalid), diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java index 5cfd28be2..7dcc2406b 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java @@ -102,9 +102,7 @@ public abstract class BaseFlinkCommitActionExecutor record = inputRecords.get(0); final String partitionPath = record.getPartitionPath(); final String fileId = record.getCurrentLocation().getFileId(); - final BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I") - ? BucketType.INSERT - : BucketType.UPDATE; + final BucketType bucketType = BucketType.valueOf(record.getCurrentLocation().getInstantTime()); handleUpsertPartition( instantTime, partitionPath, @@ -185,6 +183,7 @@ public abstract class BaseFlinkCommitActionExecutor APPEND_ONLY_ENABLE = ConfigOptions + .key("append_only.enable") + .booleanType() + .defaultValue(false) + .withDescription("Whether to write data to new baseFile without index, only support in COW, default false"); + public static final ConfigOption OPERATION = ConfigOptions .key("write.operation") .stringType() diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index 75a345430..5cc239bca 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -28,11 +28,13 @@ import org.apache.hudi.common.model.HoodieRecordGlobalLocation; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.bootstrap.IndexRecord; import org.apache.hudi.sink.utils.PayloadCreation; import org.apache.hudi.table.action.commit.BucketInfo; +import org.apache.hudi.table.action.commit.BucketType; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; @@ -55,9 +57,9 @@ import java.util.Objects; * it then assigns the bucket with ID using the {@link BucketAssigner}. * *

All the records are tagged with HoodieRecordLocation, instead of real instant time, - * INSERT record uses "I" and UPSERT record uses "U" as instant time. There is no need to keep + * INSERT record uses "INSERT" and UPSERT record uses "UPDATE" as instant time. There is no need to keep * the "real" instant time for each record, the bucket ID (partition path & fileID) actually decides - * where the record should write to. The "I" and "U" tags are only used for downstream to decide whether + * where the record should write to. The "INSERT" and "UPDATE" tags are only used for downstream to decide whether * the data bucket is an INSERT or an UPSERT, we should factor the tags out when the underneath writer * supports specifying the bucket type explicitly. * @@ -106,11 +108,18 @@ public class BucketAssignFunction> */ private final boolean globalIndex; + private final boolean appendOnly; + public BucketAssignFunction(Configuration conf) { this.conf = conf; this.isChangingRecords = WriteOperationType.isChangingRecords( WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION))); this.globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED); + this.appendOnly = conf.getBoolean(FlinkOptions.APPEND_ONLY_ENABLE); + if (appendOnly) { + ValidationUtils.checkArgument(conf.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.COPY_ON_WRITE.name()), + "APPEND_ONLY mode only support in COPY_ON_WRITE table"); + } } @Override @@ -170,25 +179,33 @@ public class BucketAssignFunction> final String partitionPath = hoodieKey.getPartitionPath(); final HoodieRecordLocation location; + if (appendOnly) { + location = getNewRecordLocation(partitionPath); + this.context.setCurrentKey(recordKey); + record.setCurrentLocation(location); + out.collect((O) record); + return; + } + // Only changing records need looking up the index for the location, // append only records are always recognized as INSERT. HoodieRecordGlobalLocation oldLoc = indexState.value(); if (isChangingRecords && oldLoc != null) { - // Set up the instant time as "U" to mark the bucket as an update bucket. + // Set up the instant time as "UPDATE" to mark the bucket as an update bucket. if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) { if (globalIndex) { // if partition path changes, emit a delete record for old partition path, // then update the index state using location with new partition path. HoodieRecord deleteRecord = new HoodieRecord<>(new HoodieKey(recordKey, oldLoc.getPartitionPath()), payloadCreation.createDeletePayload((BaseAvroPayload) record.getData())); - deleteRecord.setCurrentLocation(oldLoc.toLocal("U")); + deleteRecord.setCurrentLocation(oldLoc.toLocal(BucketType.UPDATE.name())); deleteRecord.seal(); out.collect((O) deleteRecord); } location = getNewRecordLocation(partitionPath); updateIndexState(partitionPath, location); } else { - location = oldLoc.toLocal("U"); + location = oldLoc.toLocal(BucketType.UPDATE.name()); this.bucketAssigner.addUpdate(partitionPath, location.getFileId()); } } else { @@ -203,17 +220,26 @@ public class BucketAssignFunction> } private HoodieRecordLocation getNewRecordLocation(String partitionPath) { - final BucketInfo bucketInfo = this.bucketAssigner.addInsert(partitionPath); + BucketInfo bucketInfo; + if (appendOnly) { + bucketInfo = this.bucketAssigner.addAppendOnly(partitionPath); + } else { + bucketInfo = this.bucketAssigner.addInsert(partitionPath); + } + final HoodieRecordLocation location; switch (bucketInfo.getBucketType()) { case INSERT: - // This is an insert bucket, use HoodieRecordLocation instant time as "I". + // This is an insert bucket, use HoodieRecordLocation instant time as "INSERT". // Downstream operators can then check the instant time to know whether // a record belongs to an insert bucket. - location = new HoodieRecordLocation("I", bucketInfo.getFileIdPrefix()); + location = new HoodieRecordLocation(BucketType.INSERT.name(), bucketInfo.getFileIdPrefix()); break; case UPDATE: - location = new HoodieRecordLocation("U", bucketInfo.getFileIdPrefix()); + location = new HoodieRecordLocation(BucketType.UPDATE.name(), bucketInfo.getFileIdPrefix()); + break; + case APPEND_ONLY: + location = new HoodieRecordLocation(BucketType.APPEND_ONLY.name(), bucketInfo.getFileIdPrefix()); break; default: throw new AssertionError(); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java index 6d805ce8d..965b55751 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java @@ -140,6 +140,14 @@ public class BucketAssigner implements AutoCloseable { } // if we have anything more, create new insert buckets, like normal + return getOrCreateNewFileBucket(partitionPath, BucketType.INSERT); + } + + public BucketInfo addAppendOnly(String partitionPath) { + return getOrCreateNewFileBucket(partitionPath, BucketType.APPEND_ONLY); + } + + private BucketInfo getOrCreateNewFileBucket(String partitionPath, BucketType bucketType) { if (newFileAssignStates.containsKey(partitionPath)) { NewFileAssignState newFileAssignState = newFileAssignStates.get(partitionPath); if (newFileAssignState.canAssign()) { @@ -148,7 +156,7 @@ public class BucketAssigner implements AutoCloseable { final String key = StreamerUtil.generateBucketKey(partitionPath, newFileAssignState.fileId); return bucketInfoMap.get(key); } - BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath); + BucketInfo bucketInfo = new BucketInfo(bucketType, FSUtils.createNewFileIdPfx(), partitionPath); final String key = StreamerUtil.generateBucketKey(partitionPath, bucketInfo.getFileIdPrefix()); bucketInfoMap.put(key, bucketInfo); newFileAssignStates.put(partitionPath, new NewFileAssignState(bucketInfo.getFileIdPrefix(), writeProfile.getRecordsPerBucket())); diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 0b4533f7c..c024d7c5c 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -69,6 +69,9 @@ public class FlinkStreamerConfig extends Configuration { @Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ.", required = true) public String tableType; + @Parameter(names = {"--append-only"}, description = "Write data to new parquet in every checkpoint. Only support in COPY_ON_WRITE table.", required = true) + public Boolean appendOnly = false; + @Parameter(names = {"--props"}, description = "Path to properties file on localfs or dfs, with configurations for " + "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are " + "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer" @@ -290,7 +293,13 @@ public class FlinkStreamerConfig extends Configuration { conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName); // copy_on_write works same as COPY_ON_WRITE conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase()); - conf.setString(FlinkOptions.OPERATION, config.operation.value()); + conf.setBoolean(FlinkOptions.APPEND_ONLY_ENABLE, config.appendOnly); + if (config.appendOnly) { + // append only should use insert operation + conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value()); + } else { + conf.setString(FlinkOptions.OPERATION, config.operation.value()); + } conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField); conf.setString(FlinkOptions.PAYLOAD_CLASS, config.payloadClassName); conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, config.filterDupes); diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 2eeb8f58b..2c2193c78 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -18,6 +18,7 @@ package org.apache.hudi.table; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.keygen.ComplexAvroKeyGenerator; import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; @@ -144,6 +145,11 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab TableSchema schema) { // table name conf.setString(FlinkOptions.TABLE_NAME.key(), tableName); + // append only + if (conf.getBoolean(FlinkOptions.APPEND_ONLY_ENABLE)) { + // append only should use insert operation + conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value()); + } // hoodie key about options setupHoodieKeyOptions(conf, table); // cleaning options diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index e145076bc..4344f453d 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -23,6 +23,7 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; @@ -44,6 +45,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import java.io.File; +import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -387,12 +389,12 @@ public class TestWriteCopyOnWrite { @Test public void testInsertWithMiniBatches() throws Exception { // reset the config option - conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size + conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0007); // 734 bytes batch size funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); // open the function and ingest data funcWrapper.openFunction(); - // Each record is 208 bytes. so 4 records expect to trigger a mini-batch write + // Each record is 216 bytes. so 4 records expect to trigger a mini-batch write for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { funcWrapper.invoke(rowData); } @@ -448,13 +450,13 @@ public class TestWriteCopyOnWrite { @Test public void testInsertWithDeduplication() throws Exception { // reset the config option - conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size + conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0007); // 734 bytes batch size conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true); funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); // open the function and ingest data funcWrapper.openFunction(); - // Each record is 208 bytes. so 4 records expect to trigger a mini-batch write + // Each record is 216 bytes. so 4 records expect to trigger a mini-batch write for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) { funcWrapper.invoke(rowData); } @@ -510,14 +512,91 @@ public class TestWriteCopyOnWrite { } @Test - public void testInsertWithSmallBufferSize() throws Exception { + public void testAppendOnly() throws Exception { // reset the config option - conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes buffer size + conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0007); // 734 bytes batch size + conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, false); + conf.setBoolean(FlinkOptions.APPEND_ONLY_ENABLE, true); + conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value()); funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); // open the function and ingest data funcWrapper.openFunction(); - // each record is 208 bytes. so 4 records expect to trigger buffer flush: + // Each record is 216 bytes. so 4 records expect to trigger a mini-batch write + for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) { + funcWrapper.invoke(rowData); + } + + // this triggers the data write and event send + funcWrapper.checkpointFunction(1); + Map> dataBuffer = funcWrapper.getDataBuffer(); + assertThat("All data should be flushed out", dataBuffer.size(), is(0)); + + final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first + final OperatorEvent event2 = funcWrapper.getNextEvent(); + assertThat("The operator expect to send an event", event2, instanceOf(WriteMetadataEvent.class)); + + funcWrapper.getCoordinator().handleEventFromOperator(0, event1); + funcWrapper.getCoordinator().handleEventFromOperator(0, event2); + assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); + + String instant = funcWrapper.getWriteClient() + .getLastPendingInstant(getTableType()); + + funcWrapper.checkpointComplete(1); + + Map> expected = new HashMap<>(); + + expected.put("par1", Arrays.asList( + "id1,par1,id1,Danny,23,0,par1", + "id1,par1,id1,Danny,23,1,par1", + "id1,par1,id1,Danny,23,2,par1", + "id1,par1,id1,Danny,23,3,par1", + "id1,par1,id1,Danny,23,4,par1")); + + TestData.checkWrittenAllData(tempFile, expected, 1); + + // started a new instant already + checkInflightInstant(funcWrapper.getWriteClient()); + checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); + + // insert duplicates again + for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) { + funcWrapper.invoke(rowData); + } + + funcWrapper.checkpointFunction(2); + + final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first + final OperatorEvent event4 = funcWrapper.getNextEvent(); + funcWrapper.getCoordinator().handleEventFromOperator(0, event3); + funcWrapper.getCoordinator().handleEventFromOperator(0, event4); + funcWrapper.checkpointComplete(2); + + // Same the original base file content. + expected.put("par1", Arrays.asList( + "id1,par1,id1,Danny,23,0,par1", + "id1,par1,id1,Danny,23,0,par1", + "id1,par1,id1,Danny,23,1,par1", + "id1,par1,id1,Danny,23,1,par1", + "id1,par1,id1,Danny,23,2,par1", + "id1,par1,id1,Danny,23,2,par1", + "id1,par1,id1,Danny,23,3,par1", + "id1,par1,id1,Danny,23,3,par1", + "id1,par1,id1,Danny,23,4,par1", + "id1,par1,id1,Danny,23,4,par1")); + TestData.checkWrittenAllData(tempFile, expected, 1); + } + + @Test + public void testInsertWithSmallBufferSize() throws Exception { + // reset the config option + conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0007); // 734 bytes buffer size + funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); + + // open the function and ingest data + funcWrapper.openFunction(); + // each record is 216 bytes. so 4 records expect to trigger buffer flush: // flush the max size bucket once at a time. for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { funcWrapper.invoke(rowData); @@ -660,7 +739,7 @@ public class TestWriteCopyOnWrite { public void testWriteExactlyOnce() throws Exception { // reset the config option conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 3); - conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes buffer size + conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0007); // 734 bytes buffer size funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); // open the function and ingest data diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java index 07e23b56e..b983e8c0e 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java @@ -23,12 +23,14 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestData; @@ -37,6 +39,7 @@ import org.apache.avro.Schema; import org.apache.flink.configuration.Configuration; import org.apache.hadoop.fs.FileSystem; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.io.File; import java.util.Comparator; @@ -44,6 +47,8 @@ import java.util.HashMap; import java.util.Map; import java.util.stream.Collectors; +import static org.junit.jupiter.api.Assertions.assertThrows; + /** * Test cases for delta stream write. */ @@ -86,6 +91,16 @@ public class TestWriteMergeOnRead extends TestWriteCopyOnWrite { return EXPECTED1; } + @Test + public void testAppendOnly() throws Exception { + conf.setBoolean(FlinkOptions.APPEND_ONLY_ENABLE, true); + conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value()); + funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); + assertThrows(IllegalArgumentException.class, () -> { + funcWrapper.openFunction(); + }, "APPEND_ONLY mode only support in COPY_ON_WRITE table"); + } + protected Map getMiniBatchExpected() { Map expected = new HashMap<>(); // MOR mode merges the messages with the same key. diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java index 13a71ecb8..bd6d3e322 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java @@ -19,15 +19,18 @@ package org.apache.hudi.sink; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper; import org.apache.flink.configuration.Configuration; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.util.HashMap; import java.util.Map; +import static org.junit.jupiter.api.Assertions.assertThrows; + /** * Test cases for delta stream write with compaction. */ @@ -39,10 +42,19 @@ public class TestWriteMergeOnReadWithCompact extends TestWriteCopyOnWrite { conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1); } - @Disabled + @Override + protected Map getExpectedBeforeCheckpointComplete() { + return EXPECTED1; + } + @Test - public void testIndexStateBootstrap() { - // Ignore the index bootstrap because we only support parquet load now. + public void testAppendOnly() throws Exception { + conf.setBoolean(FlinkOptions.APPEND_ONLY_ENABLE, true); + conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value()); + funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); + assertThrows(IllegalArgumentException.class, () -> { + funcWrapper.openFunction(); + }, "APPEND_ONLY mode only support in COPY_ON_WRITE table"); } protected Map getMiniBatchExpected() { diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index 50ecf543e..2de485920 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -360,8 +360,10 @@ public class TestData { assert baseFile.isDirectory(); FileFilter filter = file -> !file.getName().startsWith("."); File[] partitionDirs = baseFile.listFiles(filter); + assertNotNull(partitionDirs); assertThat(partitionDirs.length, is(partitions)); + for (File partitionDir : partitionDirs) { File[] dataFiles = partitionDir.listFiles(filter); assertNotNull(dataFiles); @@ -381,6 +383,37 @@ public class TestData { } } + public static void checkWrittenAllData( + File baseFile, + Map> expected, + int partitions) throws IOException { + assert baseFile.isDirectory(); + FileFilter filter = file -> !file.getName().startsWith("."); + File[] partitionDirs = baseFile.listFiles(filter); + + assertNotNull(partitionDirs); + assertThat(partitionDirs.length, is(partitions)); + + for (File partitionDir : partitionDirs) { + File[] dataFiles = partitionDir.listFiles(filter); + assertNotNull(dataFiles); + + List readBuffer = new ArrayList<>(); + for (File dataFile : dataFiles) { + ParquetReader reader = AvroParquetReader + .builder(new Path(dataFile.getAbsolutePath())).build(); + GenericRecord nextRecord = reader.read(); + while (nextRecord != null) { + readBuffer.add(filterOutVariables(nextRecord)); + nextRecord = reader.read(); + } + readBuffer.sort(Comparator.naturalOrder()); + } + + assertThat(readBuffer, is(expected.get(partitionDir.getName()))); + } + } + /** * Checks the source data are written as expected. *