[HUDI-1788] Insert overwrite (table) for Flink writer (#2808)
Supports `INSERT OVERWRITE` and `INSERT OVERWRITE TABLE` for Flink writer.
This commit is contained in:
@@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.util.CommitUtils;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.streamer.FlinkStreamerConfig;
|
||||
@@ -188,7 +189,8 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
|
||||
*/
|
||||
private String startNewInstant(long checkpointId) {
|
||||
String newTime = writeClient.startCommit();
|
||||
this.writeClient.transitionRequestedToInflight(this.cfg.tableType, newTime);
|
||||
final String actionType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(this.cfg.tableType));
|
||||
this.writeClient.transitionRequestedToInflight(actionType, newTime);
|
||||
LOG.info("create instant [{}], at checkpoint [{}]", newTime, checkpointId);
|
||||
return newTime;
|
||||
}
|
||||
|
||||
@@ -21,7 +21,9 @@ package org.apache.hudi.sink;
|
||||
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.common.util.CommitUtils;
|
||||
import org.apache.hudi.common.util.ObjectSizeCalculator;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
@@ -128,6 +130,11 @@ public class StreamWriteFunction<K, I, O>
|
||||
*/
|
||||
private transient OperatorEventGateway eventGateway;
|
||||
|
||||
/**
|
||||
* Commit action type.
|
||||
*/
|
||||
private transient String actionType;
|
||||
|
||||
/**
|
||||
* Constructs a StreamingSinkFunction.
|
||||
*
|
||||
@@ -141,6 +148,9 @@ public class StreamWriteFunction<K, I, O>
|
||||
public void open(Configuration parameters) throws IOException {
|
||||
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
|
||||
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
|
||||
this.actionType = CommitUtils.getCommitActionType(
|
||||
WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
|
||||
HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));
|
||||
initBuffer();
|
||||
initWriteFunction();
|
||||
}
|
||||
@@ -166,6 +176,7 @@ public class StreamWriteFunction<K, I, O>
|
||||
@Override
|
||||
public void close() {
|
||||
if (this.writeClient != null) {
|
||||
this.writeClient.cleanHandles();
|
||||
this.writeClient.close();
|
||||
}
|
||||
}
|
||||
@@ -224,6 +235,12 @@ public class StreamWriteFunction<K, I, O>
|
||||
case UPSERT:
|
||||
this.writeFunction = (records, instantTime) -> this.writeClient.upsert(records, instantTime);
|
||||
break;
|
||||
case INSERT_OVERWRITE:
|
||||
this.writeFunction = (records, instantTime) -> this.writeClient.insertOverwrite(records, instantTime);
|
||||
break;
|
||||
case INSERT_OVERWRITE_TABLE:
|
||||
this.writeFunction = (records, instantTime) -> this.writeClient.insertOverwriteTable(records, instantTime);
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Unsupported write operation : " + writeOperation);
|
||||
}
|
||||
@@ -315,7 +332,7 @@ public class StreamWriteFunction<K, I, O>
|
||||
|
||||
@SuppressWarnings("unchecked, rawtypes")
|
||||
private void flushBucket(DataBucket bucket) {
|
||||
this.currentInstant = this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE));
|
||||
this.currentInstant = this.writeClient.getLastPendingInstant(this.actionType);
|
||||
if (this.currentInstant == null) {
|
||||
// in case there are empty checkpoints that has no input data
|
||||
LOG.info("No inflight instant when flushing data, cancel.");
|
||||
@@ -339,7 +356,7 @@ public class StreamWriteFunction<K, I, O>
|
||||
|
||||
@SuppressWarnings("unchecked, rawtypes")
|
||||
private void flushRemaining(boolean isEndInput) {
|
||||
this.currentInstant = this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE));
|
||||
this.currentInstant = this.writeClient.getLastPendingInstant(this.actionType);
|
||||
if (this.currentInstant == null) {
|
||||
// in case there are empty checkpoints that has no input data
|
||||
LOG.info("No inflight instant when flushing data, cancel.");
|
||||
|
||||
@@ -20,7 +20,11 @@ package org.apache.hudi.sink;
|
||||
|
||||
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.util.CommitUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
@@ -40,10 +44,14 @@ import org.jetbrains.annotations.Nullable;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionException;
|
||||
@@ -116,6 +124,11 @@ public class StreamWriteOperatorCoordinator
|
||||
*/
|
||||
private HiveSyncContext hiveSyncContext;
|
||||
|
||||
/**
|
||||
* The table state.
|
||||
*/
|
||||
private transient TableState tableState;
|
||||
|
||||
/**
|
||||
* Constructs a StreamingSinkOperatorCoordinator.
|
||||
*
|
||||
@@ -135,8 +148,8 @@ public class StreamWriteOperatorCoordinator
|
||||
public void start() throws Exception {
|
||||
// initialize event buffer
|
||||
reset();
|
||||
// writeClient
|
||||
this.writeClient = StreamerUtil.createWriteClient(conf, null);
|
||||
this.tableState = TableState.create(conf);
|
||||
// init table, create it if not exists.
|
||||
initTableIfNotExists(this.conf);
|
||||
// start a new instant
|
||||
@@ -214,14 +227,16 @@ public class StreamWriteOperatorCoordinator
|
||||
}
|
||||
|
||||
private void startInstant() {
|
||||
this.instant = this.writeClient.startCommit();
|
||||
this.writeClient.transitionRequestedToInflight(conf.getString(FlinkOptions.TABLE_TYPE), this.instant);
|
||||
final String instant = HoodieActiveTimeline.createNewInstantTime();
|
||||
this.writeClient.startCommitWithTime(instant, tableState.commitAction);
|
||||
this.instant = instant;
|
||||
this.writeClient.transitionRequestedToInflight(tableState.commitAction, this.instant);
|
||||
LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
|
||||
this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resetToCheckpoint(long checkpointID, @Nullable byte[] checkpointData) throws Exception {
|
||||
public void resetToCheckpoint(long checkpointID, @Nullable byte[] checkpointData) {
|
||||
// no operation
|
||||
}
|
||||
|
||||
@@ -310,6 +325,7 @@ public class StreamWriteOperatorCoordinator
|
||||
}
|
||||
|
||||
/** Performs the actual commit action. */
|
||||
@SuppressWarnings("unchecked")
|
||||
private void doCommit(List<WriteStatus> writeResults) {
|
||||
// commit or rollback
|
||||
long totalErrorRecords = writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
|
||||
@@ -323,7 +339,11 @@ public class StreamWriteOperatorCoordinator
|
||||
+ totalErrorRecords + "/" + totalRecords);
|
||||
}
|
||||
|
||||
boolean success = writeClient.commit(this.instant, writeResults, Option.of(checkpointCommitMetadata));
|
||||
final Map<String, List<String>> partitionToReplacedFileIds = tableState.isOverwrite
|
||||
? writeClient.getPartitionToReplacedFileIds(tableState.operationType, writeResults)
|
||||
: Collections.emptyMap();
|
||||
boolean success = writeClient.commit(this.instant, writeResults, Option.of(checkpointCommitMetadata),
|
||||
tableState.commitAction, partitionToReplacedFileIds);
|
||||
if (success) {
|
||||
reset();
|
||||
LOG.info("Commit instant [{}] success!", this.instant);
|
||||
@@ -401,4 +421,26 @@ public class StreamWriteOperatorCoordinator
|
||||
return new StreamWriteOperatorCoordinator(this.conf, context);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remember some table state variables.
|
||||
*/
|
||||
private static class TableState implements Serializable {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private final WriteOperationType operationType;
|
||||
private final String commitAction;
|
||||
private final boolean isOverwrite;
|
||||
|
||||
private TableState(Configuration conf) {
|
||||
this.operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
|
||||
this.commitAction = CommitUtils.getCommitActionType(this.operationType,
|
||||
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase(Locale.ROOT)));
|
||||
this.isOverwrite = WriteOperationType.isOverwrite(this.operationType);
|
||||
}
|
||||
|
||||
public static TableState create(Configuration conf) {
|
||||
return new TableState(conf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -127,6 +127,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
|
||||
this.bucketAssigner = BucketAssigners.create(
|
||||
getRuntimeContext().getIndexOfThisSubtask(),
|
||||
getRuntimeContext().getNumberOfParallelSubtasks(),
|
||||
WriteOperationType.isOverwrite(WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION))),
|
||||
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)),
|
||||
context,
|
||||
writeConfig);
|
||||
@@ -190,7 +191,9 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
|
||||
default:
|
||||
throw new AssertionError();
|
||||
}
|
||||
this.indexState.put(hoodieKey, location);
|
||||
if (isChangingRecords) {
|
||||
this.indexState.put(hoodieKey, location);
|
||||
}
|
||||
}
|
||||
record.unseal();
|
||||
record.setCurrentLocation(location);
|
||||
|
||||
@@ -33,19 +33,24 @@ public abstract class BucketAssigners {
|
||||
/**
|
||||
* Creates a {@code BucketAssigner}.
|
||||
*
|
||||
* @param taskID The task ID
|
||||
* @param numTasks The number of tasks
|
||||
* @param taskID The task ID
|
||||
* @param numTasks The number of tasks
|
||||
* @param isOverwrite Whether the write operation is OVERWRITE
|
||||
* @param tableType The table type
|
||||
* @param context The engine context
|
||||
* @param config The configuration
|
||||
* @param context The engine context
|
||||
* @param config The configuration
|
||||
* @return the bucket assigner instance
|
||||
*/
|
||||
public static BucketAssigner create(
|
||||
int taskID,
|
||||
int numTasks,
|
||||
boolean isOverwrite,
|
||||
HoodieTableType tableType,
|
||||
HoodieFlinkEngineContext context,
|
||||
HoodieWriteConfig config) {
|
||||
if (isOverwrite) {
|
||||
return new OverwriteBucketAssigner(taskID, numTasks, context, config);
|
||||
}
|
||||
switch (tableType) {
|
||||
case COPY_ON_WRITE:
|
||||
return new BucketAssigner(taskID, numTasks, context, config);
|
||||
|
||||
@@ -0,0 +1,47 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.sink.partitioner;
|
||||
|
||||
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.action.commit.SmallFile;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* BucketAssigner for INSERT OVERWRITE and INSERT OVERWRITE TABLE operations,
|
||||
* this assigner always skip the existing small files because of the 'OVERWRITE' semantics.
|
||||
*
|
||||
* <p>Note: assumes the index can always index log files for Flink write.
|
||||
*/
|
||||
public class OverwriteBucketAssigner extends BucketAssigner {
|
||||
public OverwriteBucketAssigner(
|
||||
int taskID,
|
||||
int numTasks,
|
||||
HoodieFlinkEngineContext context,
|
||||
HoodieWriteConfig config) {
|
||||
super(taskID, numTasks, context, config);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<SmallFile> getSmallFiles(String partitionPath) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
@@ -19,6 +19,7 @@
|
||||
package org.apache.hudi.table;
|
||||
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.sink.CleanFunction;
|
||||
import org.apache.hudi.sink.StreamWriteOperatorFactory;
|
||||
@@ -40,6 +41,7 @@ import org.apache.flink.table.api.TableSchema;
|
||||
import org.apache.flink.table.connector.ChangelogMode;
|
||||
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
|
||||
import org.apache.flink.table.connector.sink.DynamicTableSink;
|
||||
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
|
||||
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
|
||||
import org.apache.flink.table.types.logical.RowType;
|
||||
import org.apache.flink.types.RowKind;
|
||||
@@ -49,10 +51,11 @@ import java.util.Map;
|
||||
/**
|
||||
* Hoodie table sink.
|
||||
*/
|
||||
public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning {
|
||||
public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
|
||||
|
||||
private final Configuration conf;
|
||||
private final TableSchema schema;
|
||||
private boolean overwrite = false;
|
||||
|
||||
public HoodieTableSink(Configuration conf, TableSchema schema) {
|
||||
this.conf = conf;
|
||||
@@ -129,7 +132,21 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void applyStaticPartition(Map<String, String> map) {
|
||||
// no operation
|
||||
public void applyStaticPartition(Map<String, String> partition) {
|
||||
// #applyOverwrite should have been invoked.
|
||||
if (this.overwrite) {
|
||||
final String operationType;
|
||||
if (partition.size() > 0) {
|
||||
operationType = WriteOperationType.INSERT_OVERWRITE.value();
|
||||
} else {
|
||||
operationType = WriteOperationType.INSERT_OVERWRITE_TABLE.value();
|
||||
}
|
||||
this.conf.setString(FlinkOptions.OPERATION, operationType);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void applyOverwrite(boolean b) {
|
||||
this.overwrite = b;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user