1
0

[HUDI-2578] Support merging small files for flink insert operation (#3822)

This commit is contained in:
Danny Chan
2021-10-20 21:10:07 +08:00
committed by GitHub
parent 3686c25fae
commit e355ab52db
18 changed files with 416 additions and 58 deletions

View File

@@ -44,6 +44,8 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.FlinkHoodieIndex; import org.apache.hudi.index.FlinkHoodieIndex;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.FlinkAppendHandle; import org.apache.hudi.io.FlinkAppendHandle;
import org.apache.hudi.io.FlinkConcatAndReplaceHandle;
import org.apache.hudi.io.FlinkConcatHandle;
import org.apache.hudi.io.FlinkCreateHandle; import org.apache.hudi.io.FlinkCreateHandle;
import org.apache.hudi.io.FlinkMergeAndReplaceHandle; import org.apache.hudi.io.FlinkMergeAndReplaceHandle;
import org.apache.hudi.io.FlinkMergeHandle; import org.apache.hudi.io.FlinkMergeHandle;
@@ -465,13 +467,16 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
final HoodieRecordLocation loc = record.getCurrentLocation(); final HoodieRecordLocation loc = record.getCurrentLocation();
final String fileID = loc.getFileId(); final String fileID = loc.getFileId();
final String partitionPath = record.getPartitionPath(); final String partitionPath = record.getPartitionPath();
final boolean insertClustering = config.allowDuplicateInserts();
if (bucketToHandles.containsKey(fileID)) { if (bucketToHandles.containsKey(fileID)) {
MiniBatchHandle lastHandle = (MiniBatchHandle) bucketToHandles.get(fileID); MiniBatchHandle lastHandle = (MiniBatchHandle) bucketToHandles.get(fileID);
if (lastHandle.shouldReplace()) { if (lastHandle.shouldReplace()) {
HoodieWriteHandle<?, ?, ?, ?> writeHandle = new FlinkMergeAndReplaceHandle<>( HoodieWriteHandle<?, ?, ?, ?> writeHandle = insertClustering
config, instantTime, table, recordItr, partitionPath, fileID, table.getTaskContextSupplier(), ? new FlinkConcatAndReplaceHandle<>(config, instantTime, table, recordItr, partitionPath, fileID,
lastHandle.getWritePath()); table.getTaskContextSupplier(), lastHandle.getWritePath())
: new FlinkMergeAndReplaceHandle<>(config, instantTime, table, recordItr, partitionPath, fileID,
table.getTaskContextSupplier(), lastHandle.getWritePath());
this.bucketToHandles.put(fileID, writeHandle); // override with new replace handle this.bucketToHandles.put(fileID, writeHandle); // override with new replace handle
return writeHandle; return writeHandle;
} }
@@ -486,8 +491,11 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
writeHandle = new FlinkCreateHandle<>(config, instantTime, table, partitionPath, writeHandle = new FlinkCreateHandle<>(config, instantTime, table, partitionPath,
fileID, table.getTaskContextSupplier()); fileID, table.getTaskContextSupplier());
} else { } else {
writeHandle = new FlinkMergeHandle<>(config, instantTime, table, recordItr, partitionPath, writeHandle = insertClustering
fileID, table.getTaskContextSupplier()); ? new FlinkConcatHandle<>(config, instantTime, table, recordItr, partitionPath,
fileID, table.getTaskContextSupplier())
: new FlinkMergeHandle<>(config, instantTime, table, recordItr, partitionPath,
fileID, table.getTaskContextSupplier());
} }
this.bucketToHandles.put(fileID, writeHandle); this.bucketToHandles.put(fileID, writeHandle);
return writeHandle; return writeHandle;

View File

@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
/**
* A {@link FlinkMergeAndReplaceHandle} that supports CONCAT write incrementally(small data buffers).
*
* <P>The records iterator for super constructor is reset as empty thus the initialization for new records
* does nothing. This handle keep the iterator for itself to override the write behavior.
*/
public class FlinkConcatAndReplaceHandle<T extends HoodieRecordPayload, I, K, O>
extends FlinkMergeAndReplaceHandle<T, I, K, O> {
private static final Logger LOG = LoggerFactory.getLogger(FlinkConcatAndReplaceHandle.class);
// a representation of incoming records that tolerates duplicate keys
private final Iterator<HoodieRecord<T>> recordItr;
public FlinkConcatAndReplaceHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
TaskContextSupplier taskContextSupplier, Path basePath) {
super(config, instantTime, hoodieTable, Collections.emptyIterator(), partitionPath, fileId, taskContextSupplier, basePath);
this.recordItr = recordItr;
}
/**
* Write old record as is w/o merging with incoming record.
*/
@Override
public void write(GenericRecord oldRecord) {
String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, keyGeneratorOpt);
try {
fileWriter.writeAvro(key, oldRecord);
} catch (IOException | RuntimeException e) {
String errMsg = String.format("Failed to write old record into new file for key %s from old file %s to new file %s with writerSchema %s",
key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true));
LOG.debug("Old record is " + oldRecord);
throw new HoodieUpsertException(errMsg, e);
}
recordsWritten++;
}
@Override
protected void writeIncomingRecords() throws IOException {
while (recordItr.hasNext()) {
HoodieRecord<T> record = recordItr.next();
writeInsertRecord(record);
}
}
}

View File

@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
/**
* Handle to concatenate new records to old records w/o any merging.
*
* <P>The records iterator for super constructor is reset as empty thus the initialization for new records
* does nothing. This handle keep the iterator for itself to override the write behavior.
*/
public class FlinkConcatHandle<T extends HoodieRecordPayload, I, K, O>
extends FlinkMergeHandle<T, I, K, O> {
private static final Logger LOG = LoggerFactory.getLogger(FlinkConcatHandle.class);
// a representation of incoming records that tolerates duplicate keys
private final Iterator<HoodieRecord<T>> recordItr;
public FlinkConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
TaskContextSupplier taskContextSupplier) {
super(config, instantTime, hoodieTable, Collections.emptyIterator(), partitionPath, fileId, taskContextSupplier);
this.recordItr = recordItr;
}
/**
* Write old record as is w/o merging with incoming record.
*/
@Override
public void write(GenericRecord oldRecord) {
String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, keyGeneratorOpt);
try {
fileWriter.writeAvro(key, oldRecord);
} catch (IOException | RuntimeException e) {
String errMsg = String.format("Failed to write old record into new file for key %s from old file %s to new file %s with writerSchema %s",
key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true));
LOG.debug("Old record is " + oldRecord);
throw new HoodieUpsertException(errMsg, e);
}
recordsWritten++;
}
@Override
protected void writeIncomingRecords() throws IOException {
while (recordItr.hasNext()) {
HoodieRecord<T> record = recordItr.next();
writeInsertRecord(record);
}
}
}

View File

@@ -227,11 +227,13 @@ public class FlinkOptions extends HoodieConfig {
.defaultValue(TABLE_TYPE_COPY_ON_WRITE) .defaultValue(TABLE_TYPE_COPY_ON_WRITE)
.withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ"); .withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ");
public static final ConfigOption<Boolean> INSERT_DEDUP = ConfigOptions public static final ConfigOption<Boolean> INSERT_CLUSTER = ConfigOptions
.key("write.insert.deduplicate") .key("write.insert.cluster")
.booleanType() .booleanType()
.defaultValue(true) .defaultValue(false)
.withDescription("Whether to deduplicate for INSERT operation, if disabled, writes the base files directly, default true"); .withDescription("Whether to merge small files for insert mode, "
+ "if true, the write throughput will decrease because the read/write of existing small file, "
+ "only valid for COW table, default false");
public static final ConfigOption<String> OPERATION = ConfigOptions public static final ConfigOption<String> OPERATION = ConfigOptions
.key("write.operation") .key("write.operation")

View File

@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.configuration;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.flink.configuration.Configuration;
import java.util.Locale;
/**
* Tool helping to resolve the flink options {@link FlinkOptions}.
*/
public class OptionsResolver {
/**
* Returns whether insert clustering is allowed with given configuration {@code conf}.
*/
public static boolean insertClustering(Configuration conf) {
return isCowTable(conf) && isInsertOperation(conf) && conf.getBoolean(FlinkOptions.INSERT_CLUSTER);
}
/**
* Returns whether the insert is clustering disabled with given configuration {@code conf}.
*/
public static boolean isAppendMode(Configuration conf) {
return isCowTable(conf) && isInsertOperation(conf) && !conf.getBoolean(FlinkOptions.INSERT_CLUSTER);
}
/**
* Returns whether the table operation is 'insert'.
*/
public static boolean isInsertOperation(Configuration conf) {
WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
return operationType == WriteOperationType.INSERT;
}
/**
* Returns whether it is a MERGE_ON_READ table.
*/
public static boolean isMorTable(Configuration conf) {
return conf.getString(FlinkOptions.TABLE_TYPE)
.toUpperCase(Locale.ROOT)
.equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
}
/**
* Returns whether it is a COPY_ON_WRITE table.
*/
public static boolean isCowTable(Configuration conf) {
return conf.getString(FlinkOptions.TABLE_TYPE)
.toUpperCase(Locale.ROOT)
.equals(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
}
}

View File

@@ -219,7 +219,7 @@ public class StreamWriteOperatorCoordinator
// for streaming mode, commits the ever received events anyway, // for streaming mode, commits the ever received events anyway,
// the stream write task snapshot and flush the data buffer synchronously in sequence, // the stream write task snapshot and flush the data buffer synchronously in sequence,
// so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract) // so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract)
final boolean committed = commitInstant(this.instant); final boolean committed = commitInstant(this.instant, checkpointId);
if (committed) { if (committed) {
if (tableState.scheduleCompaction) { if (tableState.scheduleCompaction) {
// if async compaction is on, schedule the compaction // if async compaction is on, schedule the compaction
@@ -238,7 +238,7 @@ public class StreamWriteOperatorCoordinator
public void notifyCheckpointAborted(long checkpointId) { public void notifyCheckpointAborted(long checkpointId) {
// once the checkpoint was aborted, unblock the writer tasks to // once the checkpoint was aborted, unblock the writer tasks to
// reuse the last instant. // reuse the last instant.
executor.execute(this::sendCommitAckEvents, executor.execute(() -> sendCommitAckEvents(checkpointId),
"unblock data write with aborted checkpoint %s", checkpointId); "unblock data write with aborted checkpoint %s", checkpointId);
} }
@@ -398,9 +398,9 @@ public class StreamWriteOperatorCoordinator
* The coordinator reuses the instant if there is no data for this round of checkpoint, * The coordinator reuses the instant if there is no data for this round of checkpoint,
* sends the commit ack events to unblock the flushing. * sends the commit ack events to unblock the flushing.
*/ */
private void sendCommitAckEvents() { private void sendCommitAckEvents(long checkpointId) {
CompletableFuture<?>[] futures = Arrays.stream(this.gateways).filter(Objects::nonNull) CompletableFuture<?>[] futures = Arrays.stream(this.gateways).filter(Objects::nonNull)
.map(gw -> gw.sendEvent(CommitAckEvent.getInstance())) .map(gw -> gw.sendEvent(CommitAckEvent.getInstance(checkpointId)))
.toArray(CompletableFuture<?>[]::new); .toArray(CompletableFuture<?>[]::new);
try { try {
CompletableFuture.allOf(futures).get(); CompletableFuture.allOf(futures).get();
@@ -421,12 +421,19 @@ public class StreamWriteOperatorCoordinator
|| throwable.getCause().getMessage().contains("running"); || throwable.getCause().getMessage().contains("running");
} }
/**
* Commits the instant.
*/
private void commitInstant(String instant) {
commitInstant(instant, -1);
}
/** /**
* Commits the instant. * Commits the instant.
* *
* @return true if the write statuses are committed successfully. * @return true if the write statuses are committed successfully.
*/ */
private boolean commitInstant(String instant) { private boolean commitInstant(String instant, long checkpointId) {
if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) { if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) {
// The last checkpoint finished successfully. // The last checkpoint finished successfully.
return false; return false;
@@ -442,7 +449,7 @@ public class StreamWriteOperatorCoordinator
// No data has written, reset the buffer and returns early // No data has written, reset the buffer and returns early
reset(); reset();
// Send commit ack event to the write function to unblock the flushing // Send commit ack event to the write function to unblock the flushing
sendCommitAckEvents(); sendCommitAckEvents(checkpointId);
return false; return false;
} }
doCommit(instant, writeResults); doCommit(instant, writeResults);

View File

@@ -114,6 +114,11 @@ public abstract class AbstractStreamWriteFunction<I>
*/ */
protected List<WriteStatus> writeStatuses; protected List<WriteStatus> writeStatuses;
/**
* Current checkpoint id.
*/
private long checkpointId = -1;
/** /**
* Constructs a StreamWriteFunctionBase. * Constructs a StreamWriteFunctionBase.
* *
@@ -147,6 +152,7 @@ public abstract class AbstractStreamWriteFunction<I>
@Override @Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
this.checkpointId = functionSnapshotContext.getCheckpointId();
snapshotState(); snapshotState();
// Reload the snapshot state as the current state. // Reload the snapshot state as the current state.
reloadWriteMetaState(); reloadWriteMetaState();
@@ -210,7 +216,10 @@ public abstract class AbstractStreamWriteFunction<I>
public void handleOperatorEvent(OperatorEvent event) { public void handleOperatorEvent(OperatorEvent event) {
ValidationUtils.checkArgument(event instanceof CommitAckEvent, ValidationUtils.checkArgument(event instanceof CommitAckEvent,
"The write function can only handle CommitAckEvent"); "The write function can only handle CommitAckEvent");
this.confirming = false; long checkpointId = ((CommitAckEvent) event).getCheckpointId();
if (checkpointId == -1 || checkpointId == this.checkpointId) {
this.confirming = false;
}
} }
/** /**

View File

@@ -26,13 +26,25 @@ import org.apache.flink.runtime.operators.coordination.OperatorEvent;
public class CommitAckEvent implements OperatorEvent { public class CommitAckEvent implements OperatorEvent {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private static final CommitAckEvent INSTANCE = new CommitAckEvent(); private long checkpointId;
public CommitAckEvent(long checkpointId) {
this.checkpointId = checkpointId;
}
// default constructor for efficient serialization // default constructor for efficient serialization
public CommitAckEvent() { public CommitAckEvent() {
} }
public static CommitAckEvent getInstance() { public long getCheckpointId() {
return INSTANCE; return checkpointId;
}
public void setCheckpointId(long checkpointId) {
this.checkpointId = checkpointId;
}
public static CommitAckEvent getInstance(long checkpointId) {
return new CommitAckEvent(checkpointId);
} }
} }

View File

@@ -69,8 +69,9 @@ public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ.", required = true) @Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ.", required = true)
public String tableType; public String tableType;
@Parameter(names = {"--insert-dedup"}, description = "Whether to deduplicate for INSERT operation, if disabled, writes the base files directly.", required = true) @Parameter(names = {"--insert-cluster"}, description = "Whether to merge small files for insert mode, "
public Boolean insertDedup = true; + "if true, the write throughput will decrease because the read/write of existing small file, default false.")
public Boolean insertCluster = false;
@Parameter(names = {"--props"}, description = "Path to properties file on localfs or dfs, with configurations for " @Parameter(names = {"--props"}, description = "Path to properties file on localfs or dfs, with configurations for "
+ "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are " + "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
@@ -308,7 +309,7 @@ public class FlinkStreamerConfig extends Configuration {
conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName); conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName);
// copy_on_write works same as COPY_ON_WRITE // copy_on_write works same as COPY_ON_WRITE
conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase()); conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase());
conf.setBoolean(FlinkOptions.INSERT_DEDUP, config.insertDedup); conf.setBoolean(FlinkOptions.INSERT_CLUSTER, config.insertCluster);
conf.setString(FlinkOptions.OPERATION, config.operation.value()); conf.setString(FlinkOptions.OPERATION, config.operation.value());
conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField); conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField);
conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, config.payloadClassName); conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, config.payloadClassName);

View File

@@ -18,8 +18,8 @@
package org.apache.hudi.table; package org.apache.hudi.table;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator; import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
@@ -27,7 +27,6 @@ import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator; import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.DataTypeUtils; import org.apache.hudi.util.DataTypeUtils;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
@@ -252,11 +251,6 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, commitsToRetain + 10); conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, commitsToRetain + 10);
conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, commitsToRetain + 20); conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, commitsToRetain + 20);
} }
if (StreamerUtil.allowDuplicateInserts(conf)) {
// no need for compaction if insert duplicates is allowed
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);
}
} }
/** /**
@@ -284,7 +278,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
*/ */
private static void setupWriteOptions(Configuration conf) { private static void setupWriteOptions(Configuration conf) {
if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.OPERATION) if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.OPERATION)
&& HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)) == HoodieTableType.COPY_ON_WRITE) { && OptionsResolver.isCowTable(conf)) {
conf.setBoolean(FlinkOptions.PRE_COMBINE, true); conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
} }
} }

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.table;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.utils.Pipelines; import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.util.ChangelogModes; import org.apache.hudi.util.ChangelogModes;
import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.StreamerUtil;
@@ -63,9 +64,9 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
return (DataStreamSinkProvider) dataStream -> { return (DataStreamSinkProvider) dataStream -> {
// setup configuration // setup configuration
long ckpInterval = dataStream.getExecutionEnvironment() long ckpTimeout = dataStream.getExecutionEnvironment()
.getCheckpointConfig().getCheckpointInterval(); .getCheckpointConfig().getCheckpointTimeout();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpInterval * 5); // five checkpoints interval conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
RowType rowType = (RowType) schema.toSourceRowDataType().notNull().getLogicalType(); RowType rowType = (RowType) schema.toSourceRowDataType().notNull().getLogicalType();
@@ -76,7 +77,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
} }
// Append mode // Append mode
if (StreamerUtil.allowDuplicateInserts(conf)) { if (OptionsResolver.isAppendMode(conf)) {
return Pipelines.append(conf, rowType, dataStream); return Pipelines.append(conf, rowType, dataStream);
} }

View File

@@ -28,7 +28,6 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -43,6 +42,7 @@ import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.schema.FilebasedSchemaProvider; import org.apache.hudi.schema.FilebasedSchemaProvider;
@@ -171,7 +171,7 @@ public class StreamerUtil {
.withEngineType(EngineType.FLINK) .withEngineType(EngineType.FLINK)
.withPath(conf.getString(FlinkOptions.PATH)) .withPath(conf.getString(FlinkOptions.PATH))
.combineInput(conf.getBoolean(FlinkOptions.PRE_COMBINE), true) .combineInput(conf.getBoolean(FlinkOptions.PRE_COMBINE), true)
.withMergeAllowDuplicateOnInserts(allowDuplicateInserts(conf)) .withMergeAllowDuplicateOnInserts(OptionsResolver.insertClustering(conf))
.withCompactionConfig( .withCompactionConfig(
HoodieCompactionConfig.newBuilder() HoodieCompactionConfig.newBuilder()
.withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME)) .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
@@ -304,14 +304,12 @@ public class StreamerUtil {
} }
/** /**
* Returns whether needs to schedule the async compaction. * Returns whether there is need to schedule the async compaction.
* *
* @param conf The flink configuration. * @param conf The flink configuration.
*/ */
public static boolean needsAsyncCompaction(Configuration conf) { public static boolean needsAsyncCompaction(Configuration conf) {
return conf.getString(FlinkOptions.TABLE_TYPE) return OptionsResolver.isMorTable(conf)
.toUpperCase(Locale.ROOT)
.equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
&& conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED); && conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED);
} }
@@ -321,9 +319,7 @@ public class StreamerUtil {
* @param conf The flink configuration. * @param conf The flink configuration.
*/ */
public static boolean needsScheduleCompaction(Configuration conf) { public static boolean needsScheduleCompaction(Configuration conf) {
return conf.getString(FlinkOptions.TABLE_TYPE) return OptionsResolver.isMorTable(conf)
.toUpperCase(Locale.ROOT)
.equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
&& conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED); && conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED);
} }
@@ -464,14 +460,6 @@ public class StreamerUtil {
return fileStatus.getLen() > 0; return fileStatus.getLen() > 0;
} }
/**
* Returns whether insert deduplication is allowed with given configuration {@code conf}.
*/
public static boolean allowDuplicateInserts(Configuration conf) {
WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
return operationType == WriteOperationType.INSERT && !conf.getBoolean(FlinkOptions.INSERT_DEDUP);
}
public static String getLastPendingInstant(HoodieTableMetaClient metaClient) { public static String getLastPendingInstant(HoodieTableMetaClient metaClient) {
return getLastPendingInstant(metaClient, true); return getLastPendingInstant(metaClient, true);
} }

View File

@@ -46,6 +46,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@@ -528,7 +529,7 @@ public class TestWriteCopyOnWrite {
} }
@Test @Test
public void testInsertAllowsDuplication() throws Exception { public void testInsertAppendMode() throws Exception {
InsertFunctionWrapper<RowData> funcWrapper = new InsertFunctionWrapper<>(tempFile.getAbsolutePath(), conf); InsertFunctionWrapper<RowData> funcWrapper = new InsertFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// open the function and ingest data // open the function and ingest data
@@ -593,6 +594,95 @@ public class TestWriteCopyOnWrite {
TestData.checkWrittenAllData(tempFile, expected, 1); TestData.checkWrittenAllData(tempFile, expected, 1);
} }
/**
* The test is almost same with {@link #testInsertWithSmallBufferSize} except that
* it is with insert clustering mode.
*/
@Test
public void testInsertClustering() throws Exception {
// reset the config option
conf.setString(FlinkOptions.OPERATION, "insert");
conf.setBoolean(FlinkOptions.INSERT_CLUSTER, true);
conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0008); // 839 bytes buffer size
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// open the function and ingest data
funcWrapper.openFunction();
// record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes.
// so 3 records expect to trigger a mini-batch write
// flush the max size bucket once at a time.
for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
funcWrapper.invoke(rowData);
}
Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
assertThat("2 records expect to flush out as a mini-batch",
dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
is(2));
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
dataBuffer = funcWrapper.getDataBuffer();
assertThat("All data should be flushed out", dataBuffer.size(), is(0));
for (int i = 0; i < 2; i++) {
final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first
assertThat("The operator expect to send an event", event, instanceOf(WriteMetadataEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, event);
}
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
String instant = lastPendingInstant();
funcWrapper.checkpointComplete(1);
Map<String, String> expected = new HashMap<>();
expected.put("par1", "["
+ "id1,par1,id1,Danny,23,0,par1, "
+ "id1,par1,id1,Danny,23,1,par1, "
+ "id1,par1,id1,Danny,23,2,par1, "
+ "id1,par1,id1,Danny,23,3,par1, "
+ "id1,par1,id1,Danny,23,4,par1]");
TestData.checkWrittenData(tempFile, expected, 1);
// started a new instant already
checkInflightInstant();
checkInstantState(HoodieInstant.State.COMPLETED, instant);
// insert duplicates again
for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
funcWrapper.invoke(rowData);
}
funcWrapper.checkpointFunction(2);
for (int i = 0; i < 2; i++) {
final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first
funcWrapper.getCoordinator().handleEventFromOperator(0, event);
}
funcWrapper.checkpointComplete(2);
// same with the original base file content.
Map<String, List<String>> expected2 = new HashMap<>();
expected2.put("par1", Arrays.asList(
"id1,par1,id1,Danny,23,0,par1",
"id1,par1,id1,Danny,23,0,par1",
"id1,par1,id1,Danny,23,1,par1",
"id1,par1,id1,Danny,23,1,par1",
"id1,par1,id1,Danny,23,2,par1",
"id1,par1,id1,Danny,23,2,par1",
"id1,par1,id1,Danny,23,3,par1",
"id1,par1,id1,Danny,23,3,par1",
"id1,par1,id1,Danny,23,4,par1",
"id1,par1,id1,Danny,23,4,par1"));
// Same the original base file content.
TestData.checkWrittenFullData(tempFile, expected2);
}
@Test @Test
public void testInsertWithSmallBufferSize() throws Exception { public void testInsertWithSmallBufferSize() throws Exception {
// reset the config option // reset the config option

View File

@@ -63,6 +63,11 @@ public class TestWriteMergeOnRead extends TestWriteCopyOnWrite {
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
} }
@Override
public void testInsertClustering() {
// insert clustering is only valid for cow table.
}
@Override @Override
protected void checkWrittenData(File baseFile, Map<String, String> expected, int partitions) throws Exception { protected void checkWrittenData(File baseFile, Map<String, String> expected, int partitions) throws Exception {
HoodieTableMetaClient metaClient = HoodieFlinkTable.create(writeConfig, context).getMetaClient(); HoodieTableMetaClient metaClient = HoodieFlinkTable.create(writeConfig, context).getMetaClient();

View File

@@ -38,6 +38,11 @@ public class TestWriteMergeOnReadWithCompact extends TestWriteCopyOnWrite {
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1); conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
} }
@Override
public void testInsertClustering() {
// insert clustering is only valid for cow table.
}
@Override @Override
protected Map<String, String> getExpectedBeforeCheckpointComplete() { protected Map<String, String> getExpectedBeforeCheckpointComplete() {
return EXPECTED1; return EXPECTED1;

View File

@@ -35,6 +35,7 @@ import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext;
import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway; import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.RowType;
@@ -101,7 +102,7 @@ public class InsertFunctionWrapper<I> {
// checkpoint the coordinator first // checkpoint the coordinator first
this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>()); this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>());
writeFunction.snapshotState(null); writeFunction.snapshotState(new MockFunctionSnapshotContext(checkpointId));
stateInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId); stateInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId);
} }

View File

@@ -45,6 +45,7 @@ import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext;
import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway; import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -218,7 +219,7 @@ public class StreamWriteFunctionWrapper<I> {
} }
bucketAssignerFunction.snapshotState(null); bucketAssignerFunction.snapshotState(null);
writeFunction.snapshotState(null); writeFunction.snapshotState(new MockFunctionSnapshotContext(checkpointId));
stateInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId); stateInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId);
} }

View File

@@ -850,9 +850,10 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
+ "+I[id1, Sophia, 18, 1970-01-01T00:00:05, par5]]", 3); + "+I[id1, Sophia, 18, 1970-01-01T00:00:05, par5]]", 3);
} }
@Test @ParameterizedTest
void testAppendWrite() { @ValueSource(booleans = {true, false})
TableEnvironment tableEnv = batchTableEnv; void testAppendWrite(boolean clustering) {
TableEnvironment tableEnv = streamTableEnv;
// csv source // csv source
String csvSourceDDL = TestConfigurations.getCsvSourceDDL("csv_source", "test_source_5.data"); String csvSourceDDL = TestConfigurations.getCsvSourceDDL("csv_source", "test_source_5.data");
tableEnv.executeSql(csvSourceDDL); tableEnv.executeSql(csvSourceDDL);
@@ -860,7 +861,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
String hoodieTableDDL = sql("hoodie_sink") String hoodieTableDDL = sql("hoodie_sink")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.OPERATION, "insert") .option(FlinkOptions.OPERATION, "insert")
.option(FlinkOptions.INSERT_DEDUP, false) .option(FlinkOptions.INSERT_CLUSTER, clustering)
.end(); .end();
tableEnv.executeSql(hoodieTableDDL); tableEnv.executeSql(hoodieTableDDL);