diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 33878eb15..06cf412e9 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -44,6 +44,8 @@ import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.index.FlinkHoodieIndex; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.io.FlinkAppendHandle; +import org.apache.hudi.io.FlinkConcatAndReplaceHandle; +import org.apache.hudi.io.FlinkConcatHandle; import org.apache.hudi.io.FlinkCreateHandle; import org.apache.hudi.io.FlinkMergeAndReplaceHandle; import org.apache.hudi.io.FlinkMergeHandle; @@ -465,13 +467,16 @@ public class HoodieFlinkWriteClient extends final HoodieRecordLocation loc = record.getCurrentLocation(); final String fileID = loc.getFileId(); final String partitionPath = record.getPartitionPath(); + final boolean insertClustering = config.allowDuplicateInserts(); if (bucketToHandles.containsKey(fileID)) { MiniBatchHandle lastHandle = (MiniBatchHandle) bucketToHandles.get(fileID); if (lastHandle.shouldReplace()) { - HoodieWriteHandle writeHandle = new FlinkMergeAndReplaceHandle<>( - config, instantTime, table, recordItr, partitionPath, fileID, table.getTaskContextSupplier(), - lastHandle.getWritePath()); + HoodieWriteHandle writeHandle = insertClustering + ? new FlinkConcatAndReplaceHandle<>(config, instantTime, table, recordItr, partitionPath, fileID, + table.getTaskContextSupplier(), lastHandle.getWritePath()) + : new FlinkMergeAndReplaceHandle<>(config, instantTime, table, recordItr, partitionPath, fileID, + table.getTaskContextSupplier(), lastHandle.getWritePath()); this.bucketToHandles.put(fileID, writeHandle); // override with new replace handle return writeHandle; } @@ -486,8 +491,11 @@ public class HoodieFlinkWriteClient extends writeHandle = new FlinkCreateHandle<>(config, instantTime, table, partitionPath, fileID, table.getTaskContextSupplier()); } else { - writeHandle = new FlinkMergeHandle<>(config, instantTime, table, recordItr, partitionPath, - fileID, table.getTaskContextSupplier()); + writeHandle = insertClustering + ? new FlinkConcatHandle<>(config, instantTime, table, recordItr, partitionPath, + fileID, table.getTaskContextSupplier()) + : new FlinkMergeHandle<>(config, instantTime, table, recordItr, partitionPath, + fileID, table.getTaskContextSupplier()); } this.bucketToHandles.put(fileID, writeHandle); return writeHandle; diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatAndReplaceHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatAndReplaceHandle.java new file mode 100644 index 000000000..300e8c512 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatAndReplaceHandle.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieUpsertException; +import org.apache.hudi.keygen.KeyGenUtils; +import org.apache.hudi.table.HoodieTable; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; + +/** + * A {@link FlinkMergeAndReplaceHandle} that supports CONCAT write incrementally(small data buffers). + * + *

The records iterator for super constructor is reset as empty thus the initialization for new records + * does nothing. This handle keep the iterator for itself to override the write behavior. + */ +public class FlinkConcatAndReplaceHandle + extends FlinkMergeAndReplaceHandle { + private static final Logger LOG = LoggerFactory.getLogger(FlinkConcatAndReplaceHandle.class); + + // a representation of incoming records that tolerates duplicate keys + private final Iterator> recordItr; + + public FlinkConcatAndReplaceHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + Iterator> recordItr, String partitionPath, String fileId, + TaskContextSupplier taskContextSupplier, Path basePath) { + super(config, instantTime, hoodieTable, Collections.emptyIterator(), partitionPath, fileId, taskContextSupplier, basePath); + this.recordItr = recordItr; + } + + /** + * Write old record as is w/o merging with incoming record. + */ + @Override + public void write(GenericRecord oldRecord) { + String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, keyGeneratorOpt); + try { + fileWriter.writeAvro(key, oldRecord); + } catch (IOException | RuntimeException e) { + String errMsg = String.format("Failed to write old record into new file for key %s from old file %s to new file %s with writerSchema %s", + key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true)); + LOG.debug("Old record is " + oldRecord); + throw new HoodieUpsertException(errMsg, e); + } + recordsWritten++; + } + + @Override + protected void writeIncomingRecords() throws IOException { + while (recordItr.hasNext()) { + HoodieRecord record = recordItr.next(); + writeInsertRecord(record); + } + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatHandle.java new file mode 100644 index 000000000..812155c3d --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatHandle.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieUpsertException; +import org.apache.hudi.keygen.KeyGenUtils; +import org.apache.hudi.table.HoodieTable; + +import org.apache.avro.generic.GenericRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; + +/** + * Handle to concatenate new records to old records w/o any merging. + * + *

The records iterator for super constructor is reset as empty thus the initialization for new records + * does nothing. This handle keep the iterator for itself to override the write behavior. + */ +public class FlinkConcatHandle + extends FlinkMergeHandle { + private static final Logger LOG = LoggerFactory.getLogger(FlinkConcatHandle.class); + + // a representation of incoming records that tolerates duplicate keys + private final Iterator> recordItr; + + public FlinkConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + Iterator> recordItr, String partitionPath, String fileId, + TaskContextSupplier taskContextSupplier) { + super(config, instantTime, hoodieTable, Collections.emptyIterator(), partitionPath, fileId, taskContextSupplier); + this.recordItr = recordItr; + } + + /** + * Write old record as is w/o merging with incoming record. + */ + @Override + public void write(GenericRecord oldRecord) { + String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, keyGeneratorOpt); + try { + fileWriter.writeAvro(key, oldRecord); + } catch (IOException | RuntimeException e) { + String errMsg = String.format("Failed to write old record into new file for key %s from old file %s to new file %s with writerSchema %s", + key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true)); + LOG.debug("Old record is " + oldRecord); + throw new HoodieUpsertException(errMsg, e); + } + recordsWritten++; + } + + @Override + protected void writeIncomingRecords() throws IOException { + while (recordItr.hasNext()) { + HoodieRecord record = recordItr.next(); + writeInsertRecord(record); + } + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index fe0237065..a0e7c7ae9 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -227,11 +227,13 @@ public class FlinkOptions extends HoodieConfig { .defaultValue(TABLE_TYPE_COPY_ON_WRITE) .withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ"); - public static final ConfigOption INSERT_DEDUP = ConfigOptions - .key("write.insert.deduplicate") + public static final ConfigOption INSERT_CLUSTER = ConfigOptions + .key("write.insert.cluster") .booleanType() - .defaultValue(true) - .withDescription("Whether to deduplicate for INSERT operation, if disabled, writes the base files directly, default true"); + .defaultValue(false) + .withDescription("Whether to merge small files for insert mode, " + + "if true, the write throughput will decrease because the read/write of existing small file, " + + "only valid for COW table, default false"); public static final ConfigOption OPERATION = ConfigOptions .key("write.operation") diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java new file mode 100644 index 000000000..fa8ee49f1 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.configuration; + +import org.apache.hudi.common.model.WriteOperationType; + +import org.apache.flink.configuration.Configuration; + +import java.util.Locale; + +/** + * Tool helping to resolve the flink options {@link FlinkOptions}. + */ +public class OptionsResolver { + /** + * Returns whether insert clustering is allowed with given configuration {@code conf}. + */ + public static boolean insertClustering(Configuration conf) { + return isCowTable(conf) && isInsertOperation(conf) && conf.getBoolean(FlinkOptions.INSERT_CLUSTER); + } + + /** + * Returns whether the insert is clustering disabled with given configuration {@code conf}. + */ + public static boolean isAppendMode(Configuration conf) { + return isCowTable(conf) && isInsertOperation(conf) && !conf.getBoolean(FlinkOptions.INSERT_CLUSTER); + } + + /** + * Returns whether the table operation is 'insert'. + */ + public static boolean isInsertOperation(Configuration conf) { + WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)); + return operationType == WriteOperationType.INSERT; + } + + /** + * Returns whether it is a MERGE_ON_READ table. + */ + public static boolean isMorTable(Configuration conf) { + return conf.getString(FlinkOptions.TABLE_TYPE) + .toUpperCase(Locale.ROOT) + .equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + } + + /** + * Returns whether it is a COPY_ON_WRITE table. + */ + public static boolean isCowTable(Configuration conf) { + return conf.getString(FlinkOptions.TABLE_TYPE) + .toUpperCase(Locale.ROOT) + .equals(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 8c400b3bd..2a4a51cf9 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -219,7 +219,7 @@ public class StreamWriteOperatorCoordinator // for streaming mode, commits the ever received events anyway, // the stream write task snapshot and flush the data buffer synchronously in sequence, // so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract) - final boolean committed = commitInstant(this.instant); + final boolean committed = commitInstant(this.instant, checkpointId); if (committed) { if (tableState.scheduleCompaction) { // if async compaction is on, schedule the compaction @@ -238,7 +238,7 @@ public class StreamWriteOperatorCoordinator public void notifyCheckpointAborted(long checkpointId) { // once the checkpoint was aborted, unblock the writer tasks to // reuse the last instant. - executor.execute(this::sendCommitAckEvents, + executor.execute(() -> sendCommitAckEvents(checkpointId), "unblock data write with aborted checkpoint %s", checkpointId); } @@ -398,9 +398,9 @@ public class StreamWriteOperatorCoordinator * The coordinator reuses the instant if there is no data for this round of checkpoint, * sends the commit ack events to unblock the flushing. */ - private void sendCommitAckEvents() { + private void sendCommitAckEvents(long checkpointId) { CompletableFuture[] futures = Arrays.stream(this.gateways).filter(Objects::nonNull) - .map(gw -> gw.sendEvent(CommitAckEvent.getInstance())) + .map(gw -> gw.sendEvent(CommitAckEvent.getInstance(checkpointId))) .toArray(CompletableFuture[]::new); try { CompletableFuture.allOf(futures).get(); @@ -421,12 +421,19 @@ public class StreamWriteOperatorCoordinator || throwable.getCause().getMessage().contains("running"); } + /** + * Commits the instant. + */ + private void commitInstant(String instant) { + commitInstant(instant, -1); + } + /** * Commits the instant. * * @return true if the write statuses are committed successfully. */ - private boolean commitInstant(String instant) { + private boolean commitInstant(String instant, long checkpointId) { if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) { // The last checkpoint finished successfully. return false; @@ -442,7 +449,7 @@ public class StreamWriteOperatorCoordinator // No data has written, reset the buffer and returns early reset(); // Send commit ack event to the write function to unblock the flushing - sendCommitAckEvents(); + sendCommitAckEvents(checkpointId); return false; } doCommit(instant, writeResults); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java index dd63d929a..5ad2935e2 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java @@ -114,6 +114,11 @@ public abstract class AbstractStreamWriteFunction */ protected List writeStatuses; + /** + * Current checkpoint id. + */ + private long checkpointId = -1; + /** * Constructs a StreamWriteFunctionBase. * @@ -147,6 +152,7 @@ public abstract class AbstractStreamWriteFunction @Override public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { + this.checkpointId = functionSnapshotContext.getCheckpointId(); snapshotState(); // Reload the snapshot state as the current state. reloadWriteMetaState(); @@ -210,7 +216,10 @@ public abstract class AbstractStreamWriteFunction public void handleOperatorEvent(OperatorEvent event) { ValidationUtils.checkArgument(event instanceof CommitAckEvent, "The write function can only handle CommitAckEvent"); - this.confirming = false; + long checkpointId = ((CommitAckEvent) event).getCheckpointId(); + if (checkpointId == -1 || checkpointId == this.checkpointId) { + this.confirming = false; + } } /** diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java b/hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java index 541fd062f..84274f0e2 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java @@ -26,13 +26,25 @@ import org.apache.flink.runtime.operators.coordination.OperatorEvent; public class CommitAckEvent implements OperatorEvent { private static final long serialVersionUID = 1L; - private static final CommitAckEvent INSTANCE = new CommitAckEvent(); + private long checkpointId; + + public CommitAckEvent(long checkpointId) { + this.checkpointId = checkpointId; + } // default constructor for efficient serialization public CommitAckEvent() { } - public static CommitAckEvent getInstance() { - return INSTANCE; + public long getCheckpointId() { + return checkpointId; + } + + public void setCheckpointId(long checkpointId) { + this.checkpointId = checkpointId; + } + + public static CommitAckEvent getInstance(long checkpointId) { + return new CommitAckEvent(checkpointId); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 7ca91f7e2..2cbe152cc 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -69,8 +69,9 @@ public class FlinkStreamerConfig extends Configuration { @Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ.", required = true) public String tableType; - @Parameter(names = {"--insert-dedup"}, description = "Whether to deduplicate for INSERT operation, if disabled, writes the base files directly.", required = true) - public Boolean insertDedup = true; + @Parameter(names = {"--insert-cluster"}, description = "Whether to merge small files for insert mode, " + + "if true, the write throughput will decrease because the read/write of existing small file, default false.") + public Boolean insertCluster = false; @Parameter(names = {"--props"}, description = "Path to properties file on localfs or dfs, with configurations for " + "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are " @@ -308,7 +309,7 @@ public class FlinkStreamerConfig extends Configuration { conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName); // copy_on_write works same as COPY_ON_WRITE conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase()); - conf.setBoolean(FlinkOptions.INSERT_DEDUP, config.insertDedup); + conf.setBoolean(FlinkOptions.INSERT_CLUSTER, config.insertCluster); conf.setString(FlinkOptions.OPERATION, config.operation.value()); conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField); conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, config.payloadClassName); diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index c19c83104..4fb37a36e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -18,8 +18,8 @@ package org.apache.hudi.table; -import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.keygen.ComplexAvroKeyGenerator; @@ -27,7 +27,6 @@ import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.DataTypeUtils; -import org.apache.hudi.util.StreamerUtil; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; @@ -252,11 +251,6 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, commitsToRetain + 10); conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, commitsToRetain + 20); } - if (StreamerUtil.allowDuplicateInserts(conf)) { - // no need for compaction if insert duplicates is allowed - conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); - conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false); - } } /** @@ -284,7 +278,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab */ private static void setupWriteOptions(Configuration conf) { if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.OPERATION) - && HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)) == HoodieTableType.COPY_ON_WRITE) { + && OptionsResolver.isCowTable(conf)) { conf.setBoolean(FlinkOptions.PRE_COMBINE, true); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index d43dfd090..e5f097c01 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -21,6 +21,7 @@ package org.apache.hudi.table; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.sink.utils.Pipelines; import org.apache.hudi.util.ChangelogModes; import org.apache.hudi.util.StreamerUtil; @@ -63,9 +64,9 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, return (DataStreamSinkProvider) dataStream -> { // setup configuration - long ckpInterval = dataStream.getExecutionEnvironment() - .getCheckpointConfig().getCheckpointInterval(); - conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpInterval * 5); // five checkpoints interval + long ckpTimeout = dataStream.getExecutionEnvironment() + .getCheckpointConfig().getCheckpointTimeout(); + conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout); RowType rowType = (RowType) schema.toSourceRowDataType().notNull().getLogicalType(); @@ -76,7 +77,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, } // Append mode - if (StreamerUtil.allowDuplicateInserts(conf)) { + if (OptionsResolver.isAppendMode(conf)) { return Pipelines.append(conf, rowType, dataStream); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 835bb49b4..7aa023acd 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -28,7 +28,6 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCleaningPolicy; -import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; @@ -43,6 +42,7 @@ import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.schema.FilebasedSchemaProvider; @@ -171,7 +171,7 @@ public class StreamerUtil { .withEngineType(EngineType.FLINK) .withPath(conf.getString(FlinkOptions.PATH)) .combineInput(conf.getBoolean(FlinkOptions.PRE_COMBINE), true) - .withMergeAllowDuplicateOnInserts(allowDuplicateInserts(conf)) + .withMergeAllowDuplicateOnInserts(OptionsResolver.insertClustering(conf)) .withCompactionConfig( HoodieCompactionConfig.newBuilder() .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME)) @@ -304,14 +304,12 @@ public class StreamerUtil { } /** - * Returns whether needs to schedule the async compaction. + * Returns whether there is need to schedule the async compaction. * * @param conf The flink configuration. */ public static boolean needsAsyncCompaction(Configuration conf) { - return conf.getString(FlinkOptions.TABLE_TYPE) - .toUpperCase(Locale.ROOT) - .equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ) + return OptionsResolver.isMorTable(conf) && conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED); } @@ -321,9 +319,7 @@ public class StreamerUtil { * @param conf The flink configuration. */ public static boolean needsScheduleCompaction(Configuration conf) { - return conf.getString(FlinkOptions.TABLE_TYPE) - .toUpperCase(Locale.ROOT) - .equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ) + return OptionsResolver.isMorTable(conf) && conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED); } @@ -464,14 +460,6 @@ public class StreamerUtil { return fileStatus.getLen() > 0; } - /** - * Returns whether insert deduplication is allowed with given configuration {@code conf}. - */ - public static boolean allowDuplicateInserts(Configuration conf) { - WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)); - return operationType == WriteOperationType.INSERT && !conf.getBoolean(FlinkOptions.INSERT_DEDUP); - } - public static String getLastPendingInstant(HoodieTableMetaClient metaClient) { return getLastPendingInstant(metaClient, true); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 29bb42487..5b25311ec 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -46,6 +46,7 @@ import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -528,7 +529,7 @@ public class TestWriteCopyOnWrite { } @Test - public void testInsertAllowsDuplication() throws Exception { + public void testInsertAppendMode() throws Exception { InsertFunctionWrapper funcWrapper = new InsertFunctionWrapper<>(tempFile.getAbsolutePath(), conf); // open the function and ingest data @@ -593,6 +594,95 @@ public class TestWriteCopyOnWrite { TestData.checkWrittenAllData(tempFile, expected, 1); } + /** + * The test is almost same with {@link #testInsertWithSmallBufferSize} except that + * it is with insert clustering mode. + */ + @Test + public void testInsertClustering() throws Exception { + // reset the config option + conf.setString(FlinkOptions.OPERATION, "insert"); + conf.setBoolean(FlinkOptions.INSERT_CLUSTER, true); + conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0008); // 839 bytes buffer size + funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); + + // open the function and ingest data + funcWrapper.openFunction(); + // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes. + // so 3 records expect to trigger a mini-batch write + // flush the max size bucket once at a time. + for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) { + funcWrapper.invoke(rowData); + } + + Map> dataBuffer = funcWrapper.getDataBuffer(); + assertThat("Should have 1 data bucket", dataBuffer.size(), is(1)); + assertThat("2 records expect to flush out as a mini-batch", + dataBuffer.values().stream().findFirst().map(List::size).orElse(-1), + is(2)); + + // this triggers the data write and event send + funcWrapper.checkpointFunction(1); + dataBuffer = funcWrapper.getDataBuffer(); + assertThat("All data should be flushed out", dataBuffer.size(), is(0)); + + for (int i = 0; i < 2; i++) { + final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first + assertThat("The operator expect to send an event", event, instanceOf(WriteMetadataEvent.class)); + funcWrapper.getCoordinator().handleEventFromOperator(0, event); + } + assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); + + String instant = lastPendingInstant(); + + funcWrapper.checkpointComplete(1); + + Map expected = new HashMap<>(); + + expected.put("par1", "[" + + "id1,par1,id1,Danny,23,0,par1, " + + "id1,par1,id1,Danny,23,1,par1, " + + "id1,par1,id1,Danny,23,2,par1, " + + "id1,par1,id1,Danny,23,3,par1, " + + "id1,par1,id1,Danny,23,4,par1]"); + TestData.checkWrittenData(tempFile, expected, 1); + + // started a new instant already + checkInflightInstant(); + checkInstantState(HoodieInstant.State.COMPLETED, instant); + + // insert duplicates again + for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) { + funcWrapper.invoke(rowData); + } + + funcWrapper.checkpointFunction(2); + + for (int i = 0; i < 2; i++) { + final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first + funcWrapper.getCoordinator().handleEventFromOperator(0, event); + } + + funcWrapper.checkpointComplete(2); + + // same with the original base file content. + Map> expected2 = new HashMap<>(); + expected2.put("par1", Arrays.asList( + "id1,par1,id1,Danny,23,0,par1", + "id1,par1,id1,Danny,23,0,par1", + "id1,par1,id1,Danny,23,1,par1", + "id1,par1,id1,Danny,23,1,par1", + "id1,par1,id1,Danny,23,2,par1", + "id1,par1,id1,Danny,23,2,par1", + "id1,par1,id1,Danny,23,3,par1", + "id1,par1,id1,Danny,23,3,par1", + "id1,par1,id1,Danny,23,4,par1", + "id1,par1,id1,Danny,23,4,par1")); + + // Same the original base file content. + TestData.checkWrittenFullData(tempFile, expected2); + } + @Test public void testInsertWithSmallBufferSize() throws Exception { // reset the config option diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java index fd65914b2..064857ae6 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java @@ -63,6 +63,11 @@ public class TestWriteMergeOnRead extends TestWriteCopyOnWrite { conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); } + @Override + public void testInsertClustering() { + // insert clustering is only valid for cow table. + } + @Override protected void checkWrittenData(File baseFile, Map expected, int partitions) throws Exception { HoodieTableMetaClient metaClient = HoodieFlinkTable.create(writeConfig, context).getMetaClient(); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java index d3fac46eb..7530c8991 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java @@ -38,6 +38,11 @@ public class TestWriteMergeOnReadWithCompact extends TestWriteCopyOnWrite { conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1); } + @Override + public void testInsertClustering() { + // insert clustering is only valid for cow table. + } + @Override protected Map getExpectedBeforeCheckpointComplete() { return EXPECTED1; diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java index 8e90438dc..4dc197c5c 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext; import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; @@ -101,7 +102,7 @@ public class InsertFunctionWrapper { // checkpoint the coordinator first this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>()); - writeFunction.snapshotState(null); + writeFunction.snapshotState(new MockFunctionSnapshotContext(checkpointId)); stateInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index 7ac81720a..c65224a6e 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -45,6 +45,7 @@ import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext; import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -218,7 +219,7 @@ public class StreamWriteFunctionWrapper { } bucketAssignerFunction.snapshotState(null); - writeFunction.snapshotState(null); + writeFunction.snapshotState(new MockFunctionSnapshotContext(checkpointId)); stateInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 1a8c3ffff..4e7b3bb3d 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -850,9 +850,10 @@ public class HoodieDataSourceITCase extends AbstractTestBase { + "+I[id1, Sophia, 18, 1970-01-01T00:00:05, par5]]", 3); } - @Test - void testAppendWrite() { - TableEnvironment tableEnv = batchTableEnv; + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testAppendWrite(boolean clustering) { + TableEnvironment tableEnv = streamTableEnv; // csv source String csvSourceDDL = TestConfigurations.getCsvSourceDDL("csv_source", "test_source_5.data"); tableEnv.executeSql(csvSourceDDL); @@ -860,7 +861,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { String hoodieTableDDL = sql("hoodie_sink") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.OPERATION, "insert") - .option(FlinkOptions.INSERT_DEDUP, false) + .option(FlinkOptions.INSERT_CLUSTER, clustering) .end(); tableEnv.executeSql(hoodieTableDDL);