diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 7eeec4c70..a84e116a6 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -22,6 +22,8 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.client.utils.TransactionUtils; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; @@ -139,10 +141,8 @@ public class HoodieFlinkWriteClient extends getTableAndInitCtx(WriteOperationType.UPSERT, instantTime); table.validateUpsertSchema(); preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient()); - final HoodieRecord record = records.get(0); - final boolean isDelta = table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ); - final HoodieWriteHandle writeHandle = getOrCreateWriteHandle(record, isDelta, getConfig(), - instantTime, table, record.getPartitionPath(), records.listIterator()); + final HoodieWriteHandle writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(), + instantTime, table, records.listIterator()); HoodieWriteMetadata> result = ((HoodieFlinkTable) table).upsert(context, writeHandle, instantTime, records); if (result.getIndexLookupDuration().isPresent()) { metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis()); @@ -162,10 +162,8 @@ public class HoodieFlinkWriteClient extends table.validateUpsertSchema(); preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient()); // create the write handle if not exists - final HoodieRecord record = records.get(0); - final boolean isDelta = table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ); - final HoodieWriteHandle writeHandle = getOrCreateWriteHandle(record, isDelta, getConfig(), - instantTime, table, record.getPartitionPath(), records.listIterator()); + final HoodieWriteHandle writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(), + instantTime, table, records.listIterator()); HoodieWriteMetadata> result = ((HoodieFlinkTable) table).insert(context, writeHandle, instantTime, records); if (result.getIndexLookupDuration().isPresent()) { metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis()); @@ -173,6 +171,45 @@ public class HoodieFlinkWriteClient extends return postWrite(result, instantTime, table); } + /** + * Removes all existing records from the partitions affected and inserts the given HoodieRecords, into the table. + * + * @param records HoodieRecords to insert + * @param instantTime Instant time of the commit + * @return list of WriteStatus to inspect errors and counts + */ + public List insertOverwrite( + List> records, final String instantTime) { + HoodieTable>, List, List> table = + getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE, instantTime); + table.validateInsertSchema(); + preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE, table.getMetaClient()); + // create the write handle if not exists + final HoodieWriteHandle writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(), + instantTime, table, records.listIterator()); + HoodieWriteMetadata result = ((HoodieFlinkTable) table).insertOverwrite(context, writeHandle, instantTime, records); + return postWrite(result, instantTime, table); + } + + /** + * Removes all existing records of the Hoodie table and inserts the given HoodieRecords, into the table. + * + * @param records HoodieRecords to insert + * @param instantTime Instant time of the commit + * @return list of WriteStatus to inspect errors and counts + */ + public List insertOverwriteTable( + List> records, final String instantTime) { + HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE_TABLE, instantTime); + table.validateInsertSchema(); + preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE_TABLE, table.getMetaClient()); + // create the write handle if not exists + final HoodieWriteHandle writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(), + instantTime, table, records.listIterator()); + HoodieWriteMetadata result = ((HoodieFlinkTable) table).insertOverwriteTable(context, writeHandle, instantTime, records); + return postWrite(result, instantTime, table); + } + @Override public List insertPreppedRecords(List> preppedRecords, String instantTime) { throw new HoodieNotSupportedException("InsertPrepped operation is not supported yet"); @@ -353,27 +390,25 @@ public class HoodieFlinkWriteClient extends * Get or create a new write handle in order to reuse the file handles. * * @param record The first record in the bucket - * @param isDelta Whether the table is in MOR mode * @param config Write config * @param instantTime The instant time * @param table The table - * @param partitionPath Partition path * @param recordItr Record iterator * @return Existing write handle or create a new one */ private HoodieWriteHandle getOrCreateWriteHandle( HoodieRecord record, - boolean isDelta, HoodieWriteConfig config, String instantTime, HoodieTable>, List, List> table, - String partitionPath, Iterator> recordItr) { final HoodieRecordLocation loc = record.getCurrentLocation(); final String fileID = loc.getFileId(); if (bucketToHandles.containsKey(fileID)) { return bucketToHandles.get(fileID); } + final String partitionPath = record.getPartitionPath(); + final boolean isDelta = table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ); final HoodieWriteHandle writeHandle; if (isDelta) { writeHandle = new FlinkAppendHandle<>(config, instantTime, table, partitionPath, fileID, recordItr, @@ -409,19 +444,23 @@ public class HoodieFlinkWriteClient extends .collect(Collectors.toList()); } - public String getInflightAndRequestedInstant(String tableType) { - final String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType)); + public String getLastPendingInstant(HoodieTableType tableType) { + final String actionType = CommitUtils.getCommitActionType(tableType); + return getLastPendingInstant(actionType); + } + + public String getLastPendingInstant(String actionType) { HoodieTimeline unCompletedTimeline = getHoodieTable().getMetaClient().getCommitsTimeline().filterInflightsAndRequested(); return unCompletedTimeline.getInstants() - .filter(x -> x.getAction().equals(commitType)) + .filter(x -> x.getAction().equals(actionType)) .map(HoodieInstant::getTimestamp) .collect(Collectors.toList()).stream() .max(Comparator.naturalOrder()) .orElse(null); } - public String getLastCompletedInstant(String tableType) { - final String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType)); + public String getLastCompletedInstant(HoodieTableType tableType) { + final String commitType = CommitUtils.getCommitActionType(tableType); HoodieTimeline completedTimeline = getHoodieTable().getMetaClient().getCommitsTimeline().filterCompletedInstants(); return completedTimeline.getInstants() .filter(x -> x.getAction().equals(commitType)) @@ -431,32 +470,49 @@ public class HoodieFlinkWriteClient extends .orElse(null); } - public void deletePendingInstant(String tableType, String instant) { - HoodieFlinkTable table = getHoodieTable(); - String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType)); - HoodieActiveTimeline activeTimeline = table.getMetaClient().getActiveTimeline(); - activeTimeline.deletePendingIfExists(HoodieInstant.State.INFLIGHT, commitType, instant); - activeTimeline.deletePendingIfExists(HoodieInstant.State.REQUESTED, commitType, instant); - } - - public void transitionRequestedToInflight(String tableType, String inFlightInstant) { + public void transitionRequestedToInflight(String commitType, String inFlightInstant) { HoodieFlinkTable table = getHoodieTable(); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); - String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType)); HoodieInstant requested = new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, inFlightInstant); activeTimeline.transitionRequestedToInflight(requested, Option.empty(), config.shouldAllowMultiWriteOnSameInstant()); } - public void rollbackInflightCompaction(HoodieInstant inflightInstant) { - HoodieFlinkTable table = getHoodieTable(); - HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); - if (pendingCompactionTimeline.containsInstant(inflightInstant)) { - rollbackInflightCompaction(inflightInstant, table); - } - } - public HoodieFlinkTable getHoodieTable() { return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); } + + public Map> getPartitionToReplacedFileIds( + WriteOperationType writeOperationType, + List writeStatuses) { + HoodieFlinkTable table = getHoodieTable(); + switch (writeOperationType) { + case INSERT_OVERWRITE: + return writeStatuses.stream().map(status -> status.getStat().getPartitionPath()).distinct() + .collect( + Collectors.toMap( + partition -> partition, + partitionPath -> getAllExistingFileIds(table, partitionPath))); + case INSERT_OVERWRITE_TABLE: + Map> partitionToExistingFileIds = new HashMap<>(); + List partitionPaths = + FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath()); + if (partitionPaths != null && partitionPaths.size() > 0) { + context.setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of all partitions"); + partitionToExistingFileIds = partitionPaths.stream().parallel() + .collect( + Collectors.toMap( + partition -> partition, + partition -> getAllExistingFileIds(table, partition))); + } + return partitionToExistingFileIds; + default: + throw new AssertionError(); + } + } + + private List getAllExistingFileIds(HoodieFlinkTable table, String partitionPath) { + // because new commit is not complete. it is safe to mark all existing file Ids as old files + return table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList()); + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java index c0699ff8e..f79991928 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java @@ -125,4 +125,36 @@ public interface ExplicitWriteHandleTable { HoodieWriteHandle writeHandle, String instantTime, List> preppedRecords); + + /** + * Replaces all the existing records and inserts the specified new records into Hoodie table at the supplied instantTime, + * for the partition paths contained in input records. + * + * @param context HoodieEngineContext + * @param writeHandle The write handle + * @param instantTime Instant time for the replace action + * @param records input records + * @return HoodieWriteMetadata + */ + HoodieWriteMetadata> insertOverwrite( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List> records); + + /** + * Deletes all the existing records of the Hoodie table and inserts the specified new records into Hoodie table at the supplied instantTime, + * for the partition paths contained in input records. + * + * @param context HoodieEngineContext + * @param writeHandle The write handle + * @param instantTime Instant time for the replace action + * @param records input records + * @return HoodieWriteMetadata + */ + HoodieWriteMetadata> insertOverwriteTable( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List> records); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index 167b3766e..d8bdb9fba 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -47,6 +47,8 @@ import org.apache.hudi.table.action.clean.FlinkCleanActionExecutor; import org.apache.hudi.table.action.clean.FlinkScheduleCleanActionExecutor; import org.apache.hudi.table.action.commit.FlinkDeleteCommitActionExecutor; import org.apache.hudi.table.action.commit.FlinkInsertCommitActionExecutor; +import org.apache.hudi.table.action.commit.FlinkInsertOverwriteCommitActionExecutor; +import org.apache.hudi.table.action.commit.FlinkInsertOverwriteTableCommitActionExecutor; import org.apache.hudi.table.action.commit.FlinkInsertPreppedCommitActionExecutor; import org.apache.hudi.table.action.commit.FlinkMergeHelper; import org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor; @@ -181,6 +183,24 @@ public class HoodieFlinkCopyOnWriteTable extends return new FlinkInsertPreppedCommitActionExecutor<>(context, writeHandle, config, this, instantTime, preppedRecords).execute(); } + @Override + public HoodieWriteMetadata> insertOverwrite( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List> records) { + return new FlinkInsertOverwriteCommitActionExecutor(context, writeHandle, config, this, instantTime, records).execute(); + } + + @Override + public HoodieWriteMetadata> insertOverwriteTable( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List> records) { + return new FlinkInsertOverwriteTableCommitActionExecutor(context, writeHandle, config, this, instantTime, records).execute(); + } + @Override public HoodieWriteMetadata> upsert(HoodieEngineContext context, String instantTime, List> records) { throw new HoodieNotSupportedException("This method should not be invoked"); @@ -229,12 +249,12 @@ public class HoodieFlinkCopyOnWriteTable extends @Override public HoodieWriteMetadata> insertOverwrite(HoodieEngineContext context, String instantTime, List> records) { - throw new HoodieNotSupportedException("InsertOverWrite is not supported yet"); + throw new HoodieNotSupportedException("This method should not be invoked"); } @Override public HoodieWriteMetadata> insertOverwriteTable(HoodieEngineContext context, String instantTime, List> records) { - throw new HoodieNotSupportedException("insertOverwriteTable is not supported yet"); + throw new HoodieNotSupportedException("This method should not be invoked"); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertOverwriteCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertOverwriteCommitActionExecutor.java new file mode 100644 index 000000000..583e0b6a9 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertOverwriteCommitActionExecutor.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.HoodieWriteHandle; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; + +import java.util.List; + +public class FlinkInsertOverwriteCommitActionExecutor> + extends BaseFlinkCommitActionExecutor { + + protected List> inputRecords; + + public FlinkInsertOverwriteCommitActionExecutor(HoodieEngineContext context, + HoodieWriteHandle writeHandle, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + List> inputRecords) { + this(context, writeHandle, config, table, instantTime, inputRecords, WriteOperationType.INSERT_OVERWRITE); + } + + public FlinkInsertOverwriteCommitActionExecutor(HoodieEngineContext context, + HoodieWriteHandle writeHandle, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + List> inputRecords, + WriteOperationType writeOperationType) { + super(context, writeHandle, config, table, instantTime, writeOperationType); + this.inputRecords = inputRecords; + } + + @Override + protected String getCommitActionType() { + return HoodieTimeline.REPLACE_COMMIT_ACTION; + } + + @Override + public HoodieWriteMetadata> execute() { + return FlinkWriteHelper.newInstance().write(instantTime, inputRecords, context, table, + config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false); + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertOverwriteTableCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertOverwriteTableCommitActionExecutor.java new file mode 100644 index 000000000..a31679b63 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertOverwriteTableCommitActionExecutor.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.HoodieWriteHandle; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; + +import java.util.List; + +public class FlinkInsertOverwriteTableCommitActionExecutor> + extends FlinkInsertOverwriteCommitActionExecutor { + + public FlinkInsertOverwriteTableCommitActionExecutor(HoodieEngineContext context, + HoodieWriteHandle writeHandle, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + List> inputRecords) { + super(context, writeHandle, config, table, instantTime, inputRecords, WriteOperationType.INSERT_OVERWRITE_TABLE); + } + + @Override + public HoodieWriteMetadata> execute() { + return FlinkWriteHelper.newInstance().write(instantTime, inputRecords, context, table, + config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java index 41cfc8abc..b5a3cc002 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java @@ -104,4 +104,8 @@ public enum WriteOperationType { public static boolean isChangingRecords(WriteOperationType operationType) { return operationType == UPSERT || operationType == UPSERT_PREPPED || operationType == DELETE; } + + public static boolean isOverwrite(WriteOperationType operationType) { + return operationType == INSERT_OVERWRITE || operationType == INSERT_OVERWRITE_TABLE; + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java index ea36f6739..4956c1406 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java @@ -40,6 +40,17 @@ public class CommitUtils { private static final Logger LOG = LogManager.getLogger(CommitUtils.class); + /** + * Gets the commit action type for given write operation and table type. + */ + public static String getCommitActionType(WriteOperationType operation, HoodieTableType tableType) { + if (operation == WriteOperationType.INSERT_OVERWRITE || operation == WriteOperationType.INSERT_OVERWRITE_TABLE) { + return HoodieTimeline.REPLACE_COMMIT_ACTION; + } else { + return getCommitActionType(tableType); + } + } + /** * Gets the commit action type for given table type. */ diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/InstantGenerateOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/InstantGenerateOperator.java index 941379735..3c4fb1e84 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/InstantGenerateOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/InstantGenerateOperator.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.streamer.FlinkStreamerConfig; @@ -188,7 +189,8 @@ public class InstantGenerateOperator extends AbstractStreamOperator */ private transient OperatorEventGateway eventGateway; + /** + * Commit action type. + */ + private transient String actionType; + /** * Constructs a StreamingSinkFunction. * @@ -141,6 +148,9 @@ public class StreamWriteFunction public void open(Configuration parameters) throws IOException { this.taskID = getRuntimeContext().getIndexOfThisSubtask(); this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext()); + this.actionType = CommitUtils.getCommitActionType( + WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)), + HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE))); initBuffer(); initWriteFunction(); } @@ -166,6 +176,7 @@ public class StreamWriteFunction @Override public void close() { if (this.writeClient != null) { + this.writeClient.cleanHandles(); this.writeClient.close(); } } @@ -224,6 +235,12 @@ public class StreamWriteFunction case UPSERT: this.writeFunction = (records, instantTime) -> this.writeClient.upsert(records, instantTime); break; + case INSERT_OVERWRITE: + this.writeFunction = (records, instantTime) -> this.writeClient.insertOverwrite(records, instantTime); + break; + case INSERT_OVERWRITE_TABLE: + this.writeFunction = (records, instantTime) -> this.writeClient.insertOverwriteTable(records, instantTime); + break; default: throw new RuntimeException("Unsupported write operation : " + writeOperation); } @@ -315,7 +332,7 @@ public class StreamWriteFunction @SuppressWarnings("unchecked, rawtypes") private void flushBucket(DataBucket bucket) { - this.currentInstant = this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE)); + this.currentInstant = this.writeClient.getLastPendingInstant(this.actionType); if (this.currentInstant == null) { // in case there are empty checkpoints that has no input data LOG.info("No inflight instant when flushing data, cancel."); @@ -339,7 +356,7 @@ public class StreamWriteFunction @SuppressWarnings("unchecked, rawtypes") private void flushRemaining(boolean isEndInput) { - this.currentInstant = this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE)); + this.currentInstant = this.writeClient.getLastPendingInstant(this.actionType); if (this.currentInstant == null) { // in case there are empty checkpoints that has no input data LOG.info("No inflight instant when flushing data, cancel."); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 54a7603a5..6244d6510 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -20,7 +20,11 @@ package org.apache.hudi.sink; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; @@ -40,10 +44,14 @@ import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Locale; +import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -116,6 +124,11 @@ public class StreamWriteOperatorCoordinator */ private HiveSyncContext hiveSyncContext; + /** + * The table state. + */ + private transient TableState tableState; + /** * Constructs a StreamingSinkOperatorCoordinator. * @@ -135,8 +148,8 @@ public class StreamWriteOperatorCoordinator public void start() throws Exception { // initialize event buffer reset(); - // writeClient this.writeClient = StreamerUtil.createWriteClient(conf, null); + this.tableState = TableState.create(conf); // init table, create it if not exists. initTableIfNotExists(this.conf); // start a new instant @@ -214,14 +227,16 @@ public class StreamWriteOperatorCoordinator } private void startInstant() { - this.instant = this.writeClient.startCommit(); - this.writeClient.transitionRequestedToInflight(conf.getString(FlinkOptions.TABLE_TYPE), this.instant); + final String instant = HoodieActiveTimeline.createNewInstantTime(); + this.writeClient.startCommitWithTime(instant, tableState.commitAction); + this.instant = instant; + this.writeClient.transitionRequestedToInflight(tableState.commitAction, this.instant); LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant, this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE)); } @Override - public void resetToCheckpoint(long checkpointID, @Nullable byte[] checkpointData) throws Exception { + public void resetToCheckpoint(long checkpointID, @Nullable byte[] checkpointData) { // no operation } @@ -310,6 +325,7 @@ public class StreamWriteOperatorCoordinator } /** Performs the actual commit action. */ + @SuppressWarnings("unchecked") private void doCommit(List writeResults) { // commit or rollback long totalErrorRecords = writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L); @@ -323,7 +339,11 @@ public class StreamWriteOperatorCoordinator + totalErrorRecords + "/" + totalRecords); } - boolean success = writeClient.commit(this.instant, writeResults, Option.of(checkpointCommitMetadata)); + final Map> partitionToReplacedFileIds = tableState.isOverwrite + ? writeClient.getPartitionToReplacedFileIds(tableState.operationType, writeResults) + : Collections.emptyMap(); + boolean success = writeClient.commit(this.instant, writeResults, Option.of(checkpointCommitMetadata), + tableState.commitAction, partitionToReplacedFileIds); if (success) { reset(); LOG.info("Commit instant [{}] success!", this.instant); @@ -401,4 +421,26 @@ public class StreamWriteOperatorCoordinator return new StreamWriteOperatorCoordinator(this.conf, context); } } + + /** + * Remember some table state variables. + */ + private static class TableState implements Serializable { + private static final long serialVersionUID = 1L; + + private final WriteOperationType operationType; + private final String commitAction; + private final boolean isOverwrite; + + private TableState(Configuration conf) { + this.operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)); + this.commitAction = CommitUtils.getCommitActionType(this.operationType, + HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase(Locale.ROOT))); + this.isOverwrite = WriteOperationType.isOverwrite(this.operationType); + } + + public static TableState create(Configuration conf) { + return new TableState(conf); + } + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index 7e017ccc8..f765e9d5a 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -127,6 +127,7 @@ public class BucketAssignFunction> this.bucketAssigner = BucketAssigners.create( getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(), + WriteOperationType.isOverwrite(WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION))), HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)), context, writeConfig); @@ -190,7 +191,9 @@ public class BucketAssignFunction> default: throw new AssertionError(); } - this.indexState.put(hoodieKey, location); + if (isChangingRecords) { + this.indexState.put(hoodieKey, location); + } } record.unseal(); record.setCurrentLocation(location); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java index 237ec27b2..1c28e6d18 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java @@ -33,19 +33,24 @@ public abstract class BucketAssigners { /** * Creates a {@code BucketAssigner}. * - * @param taskID The task ID - * @param numTasks The number of tasks + * @param taskID The task ID + * @param numTasks The number of tasks + * @param isOverwrite Whether the write operation is OVERWRITE * @param tableType The table type - * @param context The engine context - * @param config The configuration + * @param context The engine context + * @param config The configuration * @return the bucket assigner instance */ public static BucketAssigner create( int taskID, int numTasks, + boolean isOverwrite, HoodieTableType tableType, HoodieFlinkEngineContext context, HoodieWriteConfig config) { + if (isOverwrite) { + return new OverwriteBucketAssigner(taskID, numTasks, context, config); + } switch (tableType) { case COPY_ON_WRITE: return new BucketAssigner(taskID, numTasks, context, config); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/OverwriteBucketAssigner.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/OverwriteBucketAssigner.java new file mode 100644 index 000000000..7e2320e0b --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/OverwriteBucketAssigner.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.partitioner; + +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.action.commit.SmallFile; + +import java.util.Collections; +import java.util.List; + +/** + * BucketAssigner for INSERT OVERWRITE and INSERT OVERWRITE TABLE operations, + * this assigner always skip the existing small files because of the 'OVERWRITE' semantics. + * + *

Note: assumes the index can always index log files for Flink write. + */ +public class OverwriteBucketAssigner extends BucketAssigner { + public OverwriteBucketAssigner( + int taskID, + int numTasks, + HoodieFlinkEngineContext context, + HoodieWriteConfig config) { + super(taskID, numTasks, context, config); + } + + @Override + protected List getSmallFiles(String partitionPath) { + return Collections.emptyList(); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index b52e0caed..c3427f364 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -19,6 +19,7 @@ package org.apache.hudi.table; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.CleanFunction; import org.apache.hudi.sink.StreamWriteOperatorFactory; @@ -40,6 +41,7 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DataStreamSinkProvider; import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite; import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; @@ -49,10 +51,11 @@ import java.util.Map; /** * Hoodie table sink. */ -public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning { +public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite { private final Configuration conf; private final TableSchema schema; + private boolean overwrite = false; public HoodieTableSink(Configuration conf, TableSchema schema) { this.conf = conf; @@ -129,7 +132,21 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning { } @Override - public void applyStaticPartition(Map map) { - // no operation + public void applyStaticPartition(Map partition) { + // #applyOverwrite should have been invoked. + if (this.overwrite) { + final String operationType; + if (partition.size() > 0) { + operationType = WriteOperationType.INSERT_OVERWRITE.value(); + } else { + operationType = WriteOperationType.INSERT_OVERWRITE_TABLE.value(); + } + this.conf.setString(FlinkOptions.OPERATION, operationType); + } + } + + @Override + public void applyOverwrite(boolean b) { + this.overwrite = b; } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index 0afd41418..a2fdf227f 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -20,6 +20,7 @@ package org.apache.hudi.sink; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.configuration.FlinkOptions; @@ -104,9 +105,8 @@ public class TestStreamWriteOperatorCoordinator { coordinator.handleEventFromOperator(1, event1); coordinator.notifyCheckpointComplete(1); - String inflight = coordinator.getWriteClient() - .getInflightAndRequestedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE); - String lastCompleted = coordinator.getWriteClient().getLastCompletedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE); + String inflight = coordinator.getWriteClient().getLastPendingInstant(HoodieTableType.COPY_ON_WRITE); + String lastCompleted = coordinator.getWriteClient().getLastCompletedInstant(HoodieTableType.COPY_ON_WRITE); assertThat("Instant should be complete", lastCompleted, is(instant)); assertNotEquals("", inflight, "Should start a new instant"); assertNotEquals(instant, inflight, "Should start a new instant"); @@ -156,7 +156,7 @@ public class TestStreamWriteOperatorCoordinator { assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1), "Returns early for empty write results"); - String lastCompleted = coordinator.getWriteClient().getLastCompletedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE); + String lastCompleted = coordinator.getWriteClient().getLastCompletedInstant(HoodieTableType.COPY_ON_WRITE); assertNull(lastCompleted, "Returns early for empty write results"); assertNull(coordinator.getEventBuffer()[0]); @@ -172,7 +172,7 @@ public class TestStreamWriteOperatorCoordinator { coordinator.handleEventFromOperator(1, event1); assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(2), "Commits the instant with partial events anyway"); - lastCompleted = coordinator.getWriteClient().getLastCompletedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE); + lastCompleted = coordinator.getWriteClient().getLastCompletedInstant(HoodieTableType.COPY_ON_WRITE); assertThat("Commits the instant with partial events anyway", lastCompleted, is(instant)); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index f373ab86d..9e417e33f 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -92,7 +92,7 @@ public class TestWriteCopyOnWrite { public void before() throws Exception { final String basePath = tempFile.getAbsolutePath(); conf = TestConfigurations.getDefaultConf(basePath); - conf.setString(FlinkOptions.TABLE_TYPE, getTableType()); + conf.setString(FlinkOptions.TABLE_TYPE, getTableType().name()); setUp(conf); this.funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); } @@ -125,8 +125,7 @@ public class TestWriteCopyOnWrite { // this triggers the data write and event send funcWrapper.checkpointFunction(1); - String instant = funcWrapper.getWriteClient() - .getInflightAndRequestedInstant(getTableType()); + String instant = funcWrapper.getWriteClient().getLastPendingInstant(getTableType()); final OperatorEvent nextEvent = funcWrapper.getNextEvent(); MatcherAssert.assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); @@ -152,7 +151,7 @@ public class TestWriteCopyOnWrite { funcWrapper.checkpointFunction(2); String instant2 = funcWrapper.getWriteClient() - .getInflightAndRequestedInstant(getTableType()); + .getLastPendingInstant(getTableType()); assertNotEquals(instant, instant2); final OperatorEvent nextEvent2 = funcWrapper.getNextEvent(); @@ -181,7 +180,7 @@ public class TestWriteCopyOnWrite { funcWrapper.checkpointFunction(1); String instant = funcWrapper.getWriteClient() - .getInflightAndRequestedInstant(getTableType()); + .getLastPendingInstant(getTableType()); assertNotNull(instant); final OperatorEvent nextEvent = funcWrapper.getNextEvent(); @@ -223,7 +222,7 @@ public class TestWriteCopyOnWrite { funcWrapper.checkpointFunction(1); String instant = funcWrapper.getWriteClient() - .getInflightAndRequestedInstant(getTableType()); + .getLastPendingInstant(getTableType()); final OperatorEvent nextEvent = funcWrapper.getNextEvent(); assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); @@ -309,7 +308,7 @@ public class TestWriteCopyOnWrite { funcWrapper.checkpointFunction(2); String instant = funcWrapper.getWriteClient() - .getInflightAndRequestedInstant(getTableType()); + .getLastPendingInstant(getTableType()); nextEvent = funcWrapper.getNextEvent(); assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); @@ -354,7 +353,7 @@ public class TestWriteCopyOnWrite { funcWrapper.checkpointFunction(2); String instant = funcWrapper.getWriteClient() - .getInflightAndRequestedInstant(getTableType()); + .getLastPendingInstant(getTableType()); nextEvent = funcWrapper.getNextEvent(); assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); @@ -409,7 +408,7 @@ public class TestWriteCopyOnWrite { assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); String instant = funcWrapper.getWriteClient() - .getInflightAndRequestedInstant(getTableType()); + .getLastPendingInstant(getTableType()); funcWrapper.checkpointComplete(1); @@ -493,7 +492,7 @@ public class TestWriteCopyOnWrite { funcWrapper.checkpointFunction(2); String instant = funcWrapper.getWriteClient() - .getInflightAndRequestedInstant(getTableType()); + .getLastPendingInstant(getTableType()); nextEvent = funcWrapper.getNextEvent(); assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); @@ -516,7 +515,7 @@ public class TestWriteCopyOnWrite { @SuppressWarnings("rawtypes") private void checkInflightInstant(HoodieFlinkWriteClient writeClient) { - final String instant = writeClient.getInflightAndRequestedInstant(getTableType()); + final String instant = writeClient.getLastPendingInstant(getTableType()); assertNotNull(instant); } @@ -528,7 +527,7 @@ public class TestWriteCopyOnWrite { final String instant; switch (state) { case REQUESTED: - instant = writeClient.getInflightAndRequestedInstant(getTableType()); + instant = writeClient.getLastPendingInstant(getTableType()); break; case COMPLETED: instant = writeClient.getLastCompletedInstant(getTableType()); @@ -539,8 +538,8 @@ public class TestWriteCopyOnWrite { assertThat(instant, is(instantStr)); } - protected String getTableType() { - return HoodieTableType.COPY_ON_WRITE.name(); + protected HoodieTableType getTableType() { + return HoodieTableType.COPY_ON_WRITE; } protected void checkWrittenData(File baseFile, Map expected) throws Exception { diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java index 5ce8ae2ed..b24881a53 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java @@ -90,7 +90,7 @@ public class TestWriteMergeOnRead extends TestWriteCopyOnWrite { } @Override - protected String getTableType() { - return HoodieTableType.MERGE_ON_READ.name(); + protected HoodieTableType getTableType() { + return HoodieTableType.MERGE_ON_READ; } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java index 2821caeef..98d121180 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java @@ -53,7 +53,7 @@ public class TestWriteMergeOnReadWithCompact extends TestWriteCopyOnWrite { } @Override - protected String getTableType() { - return HoodieTableType.MERGE_ON_READ.name(); + protected HoodieTableType getTableType() { + return HoodieTableType.MERGE_ON_READ; } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 56cbb5543..d2ba3e693 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -18,6 +18,7 @@ package org.apache.hudi.table; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.util.StreamerUtil; @@ -223,7 +224,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf); String latestCommit = StreamerUtil.createWriteClient(conf, null) - .getLastCompletedInstant(FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + .getLastCompletedInstant(HoodieTableType.MERGE_ON_READ); Map options = new HashMap<>(); options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); @@ -276,6 +277,53 @@ public class HoodieDataSourceITCase extends AbstractTestBase { + "id8,Han,56,1970-01-01T00:00:08,par4]"); } + @ParameterizedTest + @EnumSource(value = ExecMode.class) + void testInsertOverwrite(ExecMode execMode) { + TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv; + Map options = new HashMap<>(); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + tableEnv.executeSql(hoodieTableDDL); + + final String insertInto1 = "insert into t1 values\n" + + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n" + + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n" + + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n" + + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n" + + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n" + + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n" + + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n" + + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')"; + + execInsertSql(tableEnv, insertInto1); + + // overwrite partition 'par1' and increase in age by 1 + final String insertInto2 = "insert overwrite t1 partition(`partition`='par1') values\n" + + "('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01'),\n" + + "('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02')\n"; + + execInsertSql(tableEnv, insertInto2); + + List result1 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT_OVERWRITE); + + // overwrite the whole table + final String insertInto3 = "insert overwrite t1 values\n" + + "('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01', 'par1'),\n" + + "('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02', 'par2')\n"; + + execInsertSql(tableEnv, insertInto3); + + List result2 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + final String expected = "[" + + "id1,Danny,24,1970-01-01T00:00:01,par1, " + + "id2,Stephen,34,1970-01-01T00:00:02,par2]"; + assertRowsEquals(result2, expected); + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index 2201aeb37..4a2466c94 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -167,6 +167,26 @@ public class TestData { TimestampData.fromEpochMillis(8000), StringData.fromString("par4")) ); + // data set of test_source.data with partition 'par1' overwrite + public static List DATA_SET_SOURCE_INSERT_OVERWRITE = Arrays.asList( + insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 24, + TimestampData.fromEpochMillis(1000), StringData.fromString("par1")), + insertRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 34, + TimestampData.fromEpochMillis(2000), StringData.fromString("par1")), + insertRow(StringData.fromString("id3"), StringData.fromString("Julian"), 53, + TimestampData.fromEpochMillis(3000), StringData.fromString("par2")), + insertRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 31, + TimestampData.fromEpochMillis(4000), StringData.fromString("par2")), + insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18, + TimestampData.fromEpochMillis(5000), StringData.fromString("par3")), + insertRow(StringData.fromString("id6"), StringData.fromString("Emma"), 20, + TimestampData.fromEpochMillis(6000), StringData.fromString("par3")), + insertRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44, + TimestampData.fromEpochMillis(7000), StringData.fromString("par4")), + insertRow(StringData.fromString("id8"), StringData.fromString("Han"), 56, + TimestampData.fromEpochMillis(8000), StringData.fromString("par4")) + ); + public static List DATA_SET_UPDATE_DELETE = Arrays.asList( // this is update insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 24, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index 632a155fa..00b7116d5 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -29,10 +29,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; @@ -193,14 +190,6 @@ public class DataSourceUtils { return new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jssc), createHoodieConfig(schemaStr, basePath, tblName, parameters)); } - public static String getCommitActionType(WriteOperationType operation, HoodieTableType tableType) { - if (operation == WriteOperationType.INSERT_OVERWRITE || operation == WriteOperationType.INSERT_OVERWRITE_TABLE) { - return HoodieTimeline.REPLACE_COMMIT_ACTION; - } else { - return CommitUtils.getCommitActionType(tableType); - } - } - public static HoodieWriteResult doWriteOperation(SparkRDDWriteClient client, JavaRDD hoodieRecords, String instantTime, WriteOperationType operation) throws HoodieException { switch (operation) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java index e8041636b..b2e7134bd 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java @@ -18,7 +18,6 @@ package org.apache.hudi.internal; -import org.apache.hudi.DataSourceUtils; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.model.HoodieWriteStat; @@ -27,6 +26,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -79,7 +79,7 @@ public class DataSourceInternalWriterHelper { public void commit(List writeStatList) { try { writeClient.commitStats(instantTime, writeStatList, Option.of(new HashMap<>()), - DataSourceUtils.getCommitActionType(operationType, metaClient.getTableType())); + CommitUtils.getCommitActionType(operationType, metaClient.getTableType())); } catch (Exception ioe) { throw new HoodieException(ioe.getMessage(), ioe); } finally { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 5b8727871..829690f2b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -19,7 +19,6 @@ package org.apache.hudi import java.util import java.util.Properties - import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration @@ -33,7 +32,7 @@ import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.table.timeline.HoodieActiveTimeline -import org.apache.hudi.common.util.ReflectionUtils +import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils} import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, BOOTSTRAP_INDEX_CLASS_PROP, DEFAULT_BOOTSTRAP_INDEX_CLASS} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException @@ -126,7 +125,7 @@ private[hudi] object HoodieSparkSqlWriter { tableConfig = tableMetaClient.getTableConfig } - val commitActionType = DataSourceUtils.getCommitActionType(operation, tableConfig.getTableType) + val commitActionType = CommitUtils.getCommitActionType(operation, tableConfig.getTableType) // short-circuit if bulk_insert via row is enabled. // scalastyle:off