From 7a11de12764d8f68f296c6e68a22822318bfbefa Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Mon, 1 Mar 2021 12:29:41 +0800 Subject: [PATCH] [HUDI-1632] Supports merge on read write mode for Flink writer (#2593) Also supports async compaction with pluggable strategies. --- .../apache/hudi/io/HoodieAppendHandle.java | 48 ++-- .../org/apache/hudi/io/HoodieMergeHandle.java | 15 +- .../hudi/client/HoodieFlinkWriteClient.java | 121 +++++++-- .../index/state/FlinkInMemoryStateIndex.java | 10 - ...y.java => ExplicitWriteHandleFactory.java} | 6 +- .../org/apache/hudi/io/FlinkAppendHandle.java | 125 ++++++++++ .../org/apache/hudi/io/FlinkCreateHandle.java | 14 +- .../org/apache/hudi/io/FlinkMergeHandle.java | 28 +-- .../table/HoodieFlinkCopyOnWriteTable.java | 64 ++++- .../table/HoodieFlinkMergeOnReadTable.java | 66 ++++- .../apache/hudi/table/HoodieFlinkTable.java | 3 +- .../commit/BaseFlinkCommitActionExecutor.java | 29 +-- .../table/action/commit/FlinkMergeHelper.java | 11 +- .../BaseFlinkDeltaCommitActionExecutor.java | 65 +++++ .../FlinkUpsertDeltaCommitActionExecutor.java | 52 ++++ .../action/compact/FlinkCompactHelpers.java | 75 ++++++ ...FlinkScheduleCompactionActionExecutor.java | 152 +++++++++++ .../HoodieFlinkMergeOnReadTableCompactor.java | 235 ++++++++++++++++++ .../apache/hudi/operator/FlinkOptions.java | 40 ++- .../hudi/operator/StreamWriteFunction.java | 8 +- .../StreamWriteOperatorCoordinator.java | 19 ++ .../operator/compact/CompactFunction.java | 94 +++++++ .../compact/CompactionCommitEvent.java | 62 +++++ .../compact/CompactionCommitSink.java | 150 +++++++++++ .../operator/compact/CompactionPlanEvent.java | 47 ++++ .../compact/CompactionPlanOperator.java | 146 +++++++++++ .../partitioner/BucketAssignFunction.java | 9 +- .../operator/partitioner/BucketAssigner.java | 4 +- .../operator/partitioner/BucketAssigners.java | 54 ++++ .../delta/DeltaBucketAssigner.java | 114 +++++++++ .../org/apache/hudi/util/StreamerUtil.java | 6 + .../hudi/operator/StreamWriteITCase.java | 83 +++++++ ...ionTest.java => TestWriteCopyOnWrite.java} | 85 ++++--- .../hudi/operator/TestWriteMergeOnRead.java | 96 +++++++ .../TestWriteMergeOnReadWithCompact.java | 58 +++++ .../utils/CompactFunctionWrapper.java | 142 +++++++++++ .../utils/StreamWriteFunctionWrapper.java | 16 ++ .../apache/hudi/operator/utils/TestData.java | 96 +++++++ 38 files changed, 2296 insertions(+), 152 deletions(-) rename hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/{ExplicitCreateHandleFactory.java => ExplicitWriteHandleFactory.java} (87%) create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/FlinkUpsertDeltaCommitActionExecutor.java create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkCompactHelpers.java create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkScheduleCompactionActionExecutor.java create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/operator/compact/CompactFunction.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/operator/compact/CompactionCommitEvent.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/operator/compact/CompactionCommitSink.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/operator/compact/CompactionPlanEvent.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/operator/compact/CompactionPlanOperator.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssigners.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/delta/DeltaBucketAssigner.java rename hudi-flink/src/test/java/org/apache/hudi/operator/{StreamWriteFunctionTest.java => TestWriteCopyOnWrite.java} (90%) create mode 100644 hudi-flink/src/test/java/org/apache/hudi/operator/TestWriteMergeOnRead.java create mode 100644 hudi-flink/src/test/java/org/apache/hudi/operator/TestWriteMergeOnReadWithCompact.java create mode 100644 hudi-flink/src/test/java/org/apache/hudi/operator/utils/CompactFunctionWrapper.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index c6ea7bab2..986afe64e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -74,38 +74,38 @@ public class HoodieAppendHandle extends // This acts as the sequenceID for records written private static final AtomicLong RECORD_COUNTER = new AtomicLong(1); - private final String fileId; + protected final String fileId; // Buffer for holding records in memory before they are flushed to disk private final List recordList = new ArrayList<>(); // Buffer for holding records (to be deleted) in memory before they are flushed to disk private final List keysToDelete = new ArrayList<>(); // Incoming records to be written to logs. - private final Iterator> recordItr; + protected Iterator> recordItr; // Writer to log into the file group's latest slice. - private Writer writer; + protected Writer writer; - private final List statuses; + protected final List statuses; // Total number of records written during an append - private long recordsWritten = 0; + protected long recordsWritten = 0; // Total number of records deleted during an append - private long recordsDeleted = 0; + protected long recordsDeleted = 0; // Total number of records updated during an append - private long updatedRecordsWritten = 0; + protected long updatedRecordsWritten = 0; // Total number of new records inserted into the delta file - private long insertRecordsWritten = 0; + protected long insertRecordsWritten = 0; // Average record size for a HoodieRecord. This size is updated at the end of every log block flushed to disk private long averageRecordSize = 0; // Flag used to initialize some metadata private boolean doInit = true; // Total number of bytes written during this append phase (an estimation) - private long estimatedNumberOfBytesWritten; + protected long estimatedNumberOfBytesWritten; // Number of records that must be written to meet the max block size for a log block private int numberOfRecords = 0; // Max block size to limit to for a log block private final int maxBlockSize = config.getLogFileDataBlockMaxSize(); // Header metadata for a log block - private final Map header = new HashMap<>(); + protected final Map header = new HashMap<>(); private SizeEstimator sizeEstimator; public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, @@ -178,6 +178,14 @@ public class HoodieAppendHandle extends } } + /** + * Returns whether the hoodie record is an UPDATE. + */ + protected boolean isUpdateRecord(HoodieRecord hoodieRecord) { + // If currentLocation is present, then this is an update + return hoodieRecord.getCurrentLocation() != null; + } + private Option getIndexedRecord(HoodieRecord hoodieRecord) { Option> recordMetadata = hoodieRecord.getData().getMetadata(); try { @@ -190,8 +198,7 @@ public class HoodieAppendHandle extends HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord.get(), hoodieRecord.getRecordKey(), hoodieRecord.getPartitionPath(), fileId); HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord.get(), instantTime, seqId); - // If currentLocation is present, then this is an update - if (hoodieRecord.getCurrentLocation() != null) { + if (isUpdateRecord(hoodieRecord)) { updatedRecordsWritten++; } else { insertRecordsWritten++; @@ -324,7 +331,7 @@ public class HoodieAppendHandle extends estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords; } - private void appendDataAndDeleteBlocks(Map header) { + protected void appendDataAndDeleteBlocks(Map header) { try { header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writerSchemaWithMetafields.toString()); @@ -412,6 +419,13 @@ public class HoodieAppendHandle extends .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); } + /** + * Whether there is need to update the record location. + */ + protected boolean needsUpdateLocation() { + return true; + } + private void writeToBuffer(HoodieRecord record) { if (!partitionPath.equals(record.getPartitionPath())) { HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: " @@ -421,9 +435,11 @@ public class HoodieAppendHandle extends } // update the new location of the record, so we know where to find it next - record.unseal(); - record.setNewLocation(new HoodieRecordLocation(instantTime, fileId)); - record.seal(); + if (needsUpdateLocation()) { + record.unseal(); + record.setNewLocation(new HoodieRecordLocation(instantTime, fileId)); + record.seal(); + } Option indexedRecord = getIndexedRecord(record); if (indexedRecord.isPresent()) { recordList.add(indexedRecord.get()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 0716050b7..8579f54b0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -198,6 +198,13 @@ public class HoodieMergeHandle extends H } } + /** + * Whether there is need to update the record location. + */ + boolean needsUpdateLocation() { + return true; + } + /** * Load the new incoming records in a map and return partitionPath. */ @@ -206,9 +213,11 @@ public class HoodieMergeHandle extends H while (newRecordsItr.hasNext()) { HoodieRecord record = newRecordsItr.next(); // update the new location of the record, so we know where to find it next - record.unseal(); - record.setNewLocation(new HoodieRecordLocation(instantTime, fileId)); - record.seal(); + if (needsUpdateLocation()) { + record.unseal(); + record.setNewLocation(new HoodieRecordLocation(instantTime, fileId)); + record.seal(); + } // NOTE: Once Records are added to map (spillable-map), DO NOT change it as they won't persist keyToNewRecords.put(record.getRecordKey(), record); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 0f1557f03..5d86d0a87 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -37,9 +37,11 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.index.FlinkHoodieIndex; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.io.FlinkAppendHandle; import org.apache.hudi.io.FlinkCreateHandle; import org.apache.hudi.io.FlinkMergeHandle; import org.apache.hudi.io.HoodieWriteHandle; @@ -48,14 +50,19 @@ import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hudi.table.action.compact.FlinkCompactHelpers; import org.apache.hudi.table.upgrade.FlinkUpgradeDowngrade; import com.codahale.metrics.Timer; import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.text.ParseException; import java.util.Comparator; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -64,6 +71,8 @@ import java.util.stream.Collectors; public class HoodieFlinkWriteClient extends AbstractHoodieWriteClient>, List, List> { + private static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkWriteClient.class); + /** * FileID to write handle mapping in order to record the write handles for each file group, * so that we can append the mini-batch data buffer incrementally. @@ -127,21 +136,9 @@ public class HoodieFlinkWriteClient extends table.validateUpsertSchema(); preWrite(instantTime, WriteOperationType.UPSERT); final HoodieRecord record = records.get(0); - final HoodieRecordLocation loc = record.getCurrentLocation(); - final String fileID = loc.getFileId(); - final boolean isInsert = loc.getInstantTime().equals("I"); - final HoodieWriteHandle writeHandle; - if (bucketToHandles.containsKey(fileID)) { - writeHandle = bucketToHandles.get(fileID); - } else { - // create the write handle if not exists - writeHandle = isInsert - ? new FlinkCreateHandle<>(getConfig(), instantTime, table, record.getPartitionPath(), - fileID, table.getTaskContextSupplier()) - : new FlinkMergeHandle<>(getConfig(), instantTime, table, records.listIterator(), record.getPartitionPath(), - fileID, table.getTaskContextSupplier()); - bucketToHandles.put(fileID, writeHandle); - } + final boolean isDelta = table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ); + final HoodieWriteHandle writeHandle = getOrCreateWriteHandle(record, isDelta, getConfig(), + instantTime, table, record.getPartitionPath(), records.listIterator()); HoodieWriteMetadata> result = ((HoodieFlinkTable) table).upsert(context, writeHandle, instantTime, records); if (result.getIndexLookupDuration().isPresent()) { metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis()); @@ -160,7 +157,12 @@ public class HoodieFlinkWriteClient extends getTableAndInitCtx(WriteOperationType.INSERT, instantTime); table.validateUpsertSchema(); preWrite(instantTime, WriteOperationType.INSERT); - HoodieWriteMetadata> result = table.insert(context, instantTime, records); + // create the write handle if not exists + final HoodieRecord record = records.get(0); + final boolean isDelta = table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ); + final HoodieWriteHandle writeHandle = getOrCreateWriteHandle(record, isDelta, getConfig(), + instantTime, table, record.getPartitionPath(), records.listIterator()); + HoodieWriteMetadata> result = ((HoodieFlinkTable) table).insert(context, writeHandle, instantTime, records); if (result.getIndexLookupDuration().isPresent()) { metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis()); } @@ -207,13 +209,40 @@ public class HoodieFlinkWriteClient extends } @Override - public void commitCompaction(String compactionInstantTime, List writeStatuses, Option> extraMetadata) throws IOException { - throw new HoodieNotSupportedException("Compaction is not supported yet"); + public void commitCompaction( + String compactionInstantTime, + List writeStatuses, + Option> extraMetadata) throws IOException { + HoodieFlinkTable table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); + HoodieCommitMetadata metadata = FlinkCompactHelpers.newInstance().createCompactionMetadata( + table, compactionInstantTime, writeStatuses, config.getSchema()); + extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata)); + completeCompaction(metadata, writeStatuses, table, compactionInstantTime); } @Override - protected void completeCompaction(HoodieCommitMetadata metadata, List writeStatuses, HoodieTable>, List, List> table, String compactionCommitTime) { - throw new HoodieNotSupportedException("Compaction is not supported yet"); + public void completeCompaction( + HoodieCommitMetadata metadata, + List writeStatuses, + HoodieTable>, List, List> table, + String compactionCommitTime) { + this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction"); + List writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()); + finalizeWrite(table, compactionCommitTime, writeStats); + LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata); + FlinkCompactHelpers.newInstance().completeInflightCompaction(table, compactionCommitTime, metadata); + + if (compactionTimer != null) { + long durationInMs = metrics.getDurationInMs(compactionTimer.stop()); + try { + metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).getTime(), + durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION); + } catch (ParseException e) { + throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " + + config.getBasePath() + " at time " + compactionCommitTime, e); + } + } + LOG.info("Compacted successfully on commit " + compactionCommitTime); } @Override @@ -244,6 +273,46 @@ public class HoodieFlinkWriteClient extends this.bucketToHandles.clear(); } + /** + * Get or create a new write handle in order to reuse the file handles. + * + * @param record The first record in the bucket + * @param isDelta Whether the table is in MOR mode + * @param config Write config + * @param instantTime The instant time + * @param table The table + * @param partitionPath Partition path + * @param recordItr Record iterator + * @return Existing write handle or create a new one + */ + private HoodieWriteHandle getOrCreateWriteHandle( + HoodieRecord record, + boolean isDelta, + HoodieWriteConfig config, + String instantTime, + HoodieTable>, List, List> table, + String partitionPath, + Iterator> recordItr) { + final HoodieRecordLocation loc = record.getCurrentLocation(); + final String fileID = loc.getFileId(); + if (bucketToHandles.containsKey(fileID)) { + return bucketToHandles.get(fileID); + } + final HoodieWriteHandle writeHandle; + if (isDelta) { + writeHandle = new FlinkAppendHandle<>(config, instantTime, table, partitionPath, fileID, recordItr, + table.getTaskContextSupplier()); + } else if (loc.getInstantTime().equals("I")) { + writeHandle = new FlinkCreateHandle<>(config, instantTime, table, partitionPath, + fileID, table.getTaskContextSupplier()); + } else { + writeHandle = new FlinkMergeHandle<>(config, instantTime, table, recordItr, partitionPath, + fileID, table.getTaskContextSupplier()); + } + this.bucketToHandles.put(fileID, writeHandle); + return writeHandle; + } + private HoodieTable>, List, List> getTableAndInitCtx(HoodieTableMetaClient metaClient, WriteOperationType operationType) { if (operationType == WriteOperationType.DELETE) { setWriteSchemaForDeletes(metaClient); @@ -305,4 +374,16 @@ public class HoodieFlinkWriteClient extends activeTimeline.transitionRequestedToInflight(requested, Option.empty(), config.shouldAllowMultiWriteOnSameInstant()); } + + public void rollbackInflightCompaction(HoodieInstant inflightInstant) { + HoodieFlinkTable table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); + HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); + if (pendingCompactionTimeline.containsInstant(inflightInstant)) { + rollbackInflightCompaction(inflightInstant, table); + } + } + + public HoodieFlinkTable getHoodieTable() { + return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java index bae8de239..118a0e729 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java @@ -23,16 +23,12 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.index.FlinkHoodieIndex; import org.apache.hudi.table.HoodieTable; -import org.apache.flink.api.common.state.MapState; -import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -46,15 +42,9 @@ import java.util.List; public class FlinkInMemoryStateIndex extends FlinkHoodieIndex { private static final Logger LOG = LogManager.getLogger(FlinkInMemoryStateIndex.class); - private MapState mapState; public FlinkInMemoryStateIndex(HoodieFlinkEngineContext context, HoodieWriteConfig config) { super(config); - if (context.getRuntimeContext() != null) { - MapStateDescriptor indexStateDesc = - new MapStateDescriptor<>("indexState", TypeInformation.of(HoodieKey.class), TypeInformation.of(HoodieRecordLocation.class)); - mapState = context.getRuntimeContext().getMapState(indexStateDesc); - } } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitCreateHandleFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitWriteHandleFactory.java similarity index 87% rename from hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitCreateHandleFactory.java rename to hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitWriteHandleFactory.java index f2847e228..092e945f0 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitCreateHandleFactory.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitWriteHandleFactory.java @@ -26,11 +26,11 @@ import org.apache.hudi.table.HoodieTable; /** * Create handle factory for Flink writer, use the specified write handle directly. */ -public class ExplicitCreateHandleFactory - extends CreateHandleFactory { +public class ExplicitWriteHandleFactory + extends WriteHandleFactory { private HoodieWriteHandle writeHandle; - public ExplicitCreateHandleFactory(HoodieWriteHandle writeHandle) { + public ExplicitWriteHandleFactory(HoodieWriteHandle writeHandle) { this.writeHandle = writeHandle; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java new file mode 100644 index 000000000..9d56d4770 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieUpsertException; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * A {@link HoodieAppendHandle} that supports append write incrementally(mini-batches). + * + *

For the first mini-batch, it initialize and set up the next file path to write, + * but does not close the file writer until all the mini-batches write finish. Each mini-batch + * data are appended to this handle, the back-up writer may rollover on condition. + * + * @param Payload type + * @param Input type + * @param Key type + * @param Output type + */ +public class FlinkAppendHandle extends HoodieAppendHandle implements MiniBatchHandle { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkAppendHandle.class); + private boolean needBootStrap = true; + // Total number of bytes written to file + private long sizeInBytes = 0; + + public FlinkAppendHandle( + HoodieWriteConfig config, + String instantTime, + HoodieTable hoodieTable, + String partitionPath, + String fileId, + Iterator> recordItr, + TaskContextSupplier taskContextSupplier) { + super(config, instantTime, hoodieTable, partitionPath, fileId, recordItr, taskContextSupplier); + } + + @Override + protected boolean needsUpdateLocation() { + return false; + } + + @Override + protected boolean isUpdateRecord(HoodieRecord hoodieRecord) { + return hoodieRecord.getCurrentLocation() != null + && hoodieRecord.getCurrentLocation().getInstantTime().equals("U"); + } + + /** + * Returns whether there is need to bootstrap this file handle. + * E.G. the first time that the handle is created. + */ + public boolean isNeedBootStrap() { + return this.needBootStrap; + } + + /** + * Appends new records into this append handle. + * @param recordItr The new records iterator + */ + public void appendNewRecords(Iterator> recordItr) { + this.recordItr = recordItr; + } + + @Override + public List close() { + needBootStrap = false; + // flush any remaining records to disk + appendDataAndDeleteBlocks(header); + try { + for (WriteStatus status: statuses) { + long logFileSize = FSUtils.getFileSize(fs, new Path(config.getBasePath(), status.getStat().getPath())); + status.getStat().setFileSizeInBytes(logFileSize); + } + } catch (IOException e) { + throw new HoodieUpsertException("Failed to get file size for append handle", e); + } + List ret = new ArrayList<>(statuses); + statuses.clear(); + return ret; + } + + @Override + public void finishWrite() { + LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten); + try { + if (writer != null) { + writer.close(); + } + } catch (IOException e) { + throw new HoodieUpsertException("Failed to close append handle", e); + } + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java index 07a71969f..ce3725b01 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java @@ -90,13 +90,12 @@ public class FlinkCreateHandle */ private WriteStatus getIncrementalWriteStatus() { try { - long fileSizeInBytes = FSUtils.getFileSize(fs, path); - setUpWriteStatus(fileSizeInBytes); + setUpWriteStatus(); // reset the write status recordsWritten = 0; recordsDeleted = 0; insertRecordsWritten = 0; - this.lastFileSize = fileSizeInBytes; + timer = new HoodieTimer().startTimer(); writeStatus.setTotalErrorRecords(0); return writeStatus; } catch (IOException e) { @@ -107,10 +106,12 @@ public class FlinkCreateHandle /** * Set up the write status. * - * @param fileSizeInBytes File size in bytes * @throws IOException if error occurs */ - private void setUpWriteStatus(long fileSizeInBytes) throws IOException { + private void setUpWriteStatus() throws IOException { + long fileSizeInBytes = FSUtils.getFileSize(fs, path); + long incFileSizeInBytes = fileSizeInBytes - lastFileSize; + this.lastFileSize = fileSizeInBytes; HoodieWriteStat stat = new HoodieWriteStat(); stat.setPartitionPath(writeStatus.getPartitionPath()); stat.setNumWrites(recordsWritten); @@ -119,13 +120,12 @@ public class FlinkCreateHandle stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT); stat.setFileId(writeStatus.getFileId()); stat.setPath(new Path(config.getBasePath()), path); - stat.setTotalWriteBytes(fileSizeInBytes - lastFileSize); + stat.setTotalWriteBytes(incFileSizeInBytes); stat.setFileSizeInBytes(fileSizeInBytes); stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords()); HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats(); runtimeStats.setTotalCreateTime(timer.endTimer()); stat.setRuntimeStats(runtimeStats); - timer = new HoodieTimer().startTimer(); writeStatus.setStat(stat); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java index 3d33b2e15..43930ad3e 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.HoodieTimer; -import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; @@ -109,29 +108,10 @@ public class FlinkMergeHandle return writeStatus; } - /** - * The difference with the parent method is that there is no need to set up - * locations for the records. - * - * @param fileId The file ID - * @param newRecordsItr The incremental records iterator - */ - @Override - protected void init(String fileId, Iterator> newRecordsItr) { - initializeIncomingRecordsMap(); - while (newRecordsItr.hasNext()) { - HoodieRecord record = newRecordsItr.next(); - // NOTE: Once Records are added to map (spillable-map), DO NOT change it as they won't persist - keyToNewRecords.put(record.getRecordKey(), record); - } - LOG.info(String.format("Number of entries in MemoryBasedMap => %d\n" - + "Total size in bytes of MemoryBasedMap => %d\n" - + "Number of entries in DiskBasedMap => %d\n" - + "Size of file spilled to disk => %d", - ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries(), - ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize(), - ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries(), - ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes())); + boolean needsUpdateLocation() { + // No need to update location for Flink hoodie records because all the records are pre-tagged + // with the desired locations. + return false; } /** diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index ddc3fbe27..e7f1d592e 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -26,6 +26,7 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -34,6 +35,10 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieNotSupportedException; +import org.apache.hudi.exception.HoodieUpsertException; +import org.apache.hudi.io.HoodieCreateHandle; +import org.apache.hudi.io.HoodieMergeHandle; +import org.apache.hudi.io.HoodieSortedMergeHandle; import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata; @@ -41,10 +46,17 @@ import org.apache.hudi.table.action.clean.FlinkCleanActionExecutor; import org.apache.hudi.table.action.commit.FlinkDeleteCommitActionExecutor; import org.apache.hudi.table.action.commit.FlinkInsertCommitActionExecutor; import org.apache.hudi.table.action.commit.FlinkInsertPreppedCommitActionExecutor; +import org.apache.hudi.table.action.commit.FlinkMergeHelper; import org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor; import org.apache.hudi.table.action.commit.FlinkUpsertPreppedCommitActionExecutor; import org.apache.hudi.table.action.rollback.FlinkCopyOnWriteRollbackActionExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -58,7 +70,9 @@ import java.util.Map; */ public class HoodieFlinkCopyOnWriteTable extends HoodieFlinkTable { - protected HoodieFlinkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) { + private static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCopyOnWriteTable.class); + + public HoodieFlinkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) { super(config, context, metaClient); } @@ -270,4 +284,52 @@ public class HoodieFlinkCopyOnWriteTable extends public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) { throw new HoodieNotSupportedException("Savepoint and restore is not supported yet"); } + + // ------------------------------------------------------------------------- + // Used for compaction + // ------------------------------------------------------------------------- + + public Iterator> handleUpdate(String instantTime, String partitionPath, String fileId, + Map> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException { + // these are updates + HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime, partitionPath, fileId, keyToNewRecords, oldDataFile); + return handleUpdateInternal(upsertHandle, instantTime, fileId); + } + + protected Iterator> handleUpdateInternal(HoodieMergeHandle upsertHandle, String instantTime, + String fileId) throws IOException { + if (upsertHandle.getOldFilePath() == null) { + throw new HoodieUpsertException( + "Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId); + } else { + FlinkMergeHelper.newInstance().runMerge(this, upsertHandle); + } + + // TODO(vc): This needs to be revisited + if (upsertHandle.getPartitionPath() == null) { + LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", " + + upsertHandle.writeStatuses()); + } + + return Collections.singletonList(upsertHandle.writeStatuses()).iterator(); + } + + protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId, + Map> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) { + if (requireSortedRecords()) { + return new HoodieSortedMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId, + dataFileToBeMerged, taskContextSupplier); + } else { + return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId, + dataFileToBeMerged,taskContextSupplier); + } + } + + public Iterator> handleInsert(String instantTime, String partitionPath, String fileId, + Map> recordMap) { + HoodieCreateHandle createHandle = + new HoodieCreateHandle(config, instantTime, this, partitionPath, fileId, recordMap, taskContextSupplier); + createHandle.write(); + return Collections.singletonList(createHandle.close()).iterator(); + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java index 994a49d22..b7f177b72 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java @@ -18,14 +18,74 @@ package org.apache.hudi.table; +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieNotSupportedException; +import org.apache.hudi.io.FlinkAppendHandle; +import org.apache.hudi.io.HoodieWriteHandle; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hudi.table.action.commit.delta.FlinkUpsertDeltaCommitActionExecutor; +import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor; +import org.apache.hudi.table.action.compact.FlinkScheduleCompactionActionExecutor; -public class HoodieFlinkMergeOnReadTable extends HoodieFlinkCopyOnWriteTable { - protected HoodieFlinkMergeOnReadTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) { +import java.util.List; +import java.util.Map; + +public class HoodieFlinkMergeOnReadTable + extends HoodieFlinkCopyOnWriteTable { + + HoodieFlinkMergeOnReadTable( + HoodieWriteConfig config, + HoodieEngineContext context, + HoodieTableMetaClient metaClient) { super(config, context, metaClient); } - // TODO not support yet. + + @Override + public HoodieWriteMetadata> upsert( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List> hoodieRecords) { + ValidationUtils.checkArgument(writeHandle instanceof FlinkAppendHandle, + "MOR write handle should always be a FlinkAppendHandle"); + FlinkAppendHandle appendHandle = (FlinkAppendHandle) writeHandle; + return new FlinkUpsertDeltaCommitActionExecutor<>(context, appendHandle, config, this, instantTime, hoodieRecords).execute(); + } + + @Override + public HoodieWriteMetadata> insert( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List> hoodieRecords) { + ValidationUtils.checkArgument(writeHandle instanceof FlinkAppendHandle, + "MOR write handle should always be a FlinkAppendHandle"); + FlinkAppendHandle appendHandle = (FlinkAppendHandle) writeHandle; + return new FlinkUpsertDeltaCommitActionExecutor<>(context, appendHandle, config, this, instantTime, hoodieRecords).execute(); + } + + @Override + public Option scheduleCompaction( + HoodieEngineContext context, + String instantTime, + Option> extraMetadata) { + BaseScheduleCompactionActionExecutor scheduleCompactionExecutor = new FlinkScheduleCompactionActionExecutor( + context, config, this, instantTime, extraMetadata); + return scheduleCompactionExecutor.execute(); + } + + @Override + public HoodieWriteMetadata> compact(HoodieEngineContext context, String compactionInstantTime) { + throw new HoodieNotSupportedException("Compaction is supported as a separate pipeline, " + + "should not invoke directly through HoodieFlinkMergeOnReadTable"); + } } + diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java index 79a310639..3e26025c2 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java @@ -29,7 +29,6 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.index.FlinkHoodieIndex; import org.apache.hudi.index.HoodieIndex; @@ -57,7 +56,7 @@ public abstract class HoodieFlinkTable case COPY_ON_WRITE: return new HoodieFlinkCopyOnWriteTable<>(config, context, metaClient); case MERGE_ON_READ: - throw new HoodieNotSupportedException("MERGE_ON_READ is not supported yet"); + return new HoodieFlinkMergeOnReadTable<>(config, context, metaClient); default: throw new HoodieException("Unsupported table type :" + metaClient.getTableType()); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java index 9b6dcd633..e0bbc25ef 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java @@ -35,7 +35,7 @@ import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.execution.FlinkLazyInsertIterable; -import org.apache.hudi.io.ExplicitCreateHandleFactory; +import org.apache.hudi.io.ExplicitWriteHandleFactory; import org.apache.hudi.io.FlinkMergeHandle; import org.apache.hudi.io.HoodieCreateHandle; import org.apache.hudi.io.HoodieMergeHandle; @@ -74,7 +74,7 @@ public abstract class BaseFlinkCommitActionExecutor writeHandle; + protected HoodieWriteHandle writeHandle; public BaseFlinkCommitActionExecutor(HoodieEngineContext context, HoodieWriteHandle writeHandle, @@ -107,22 +107,13 @@ public abstract class BaseFlinkCommitActionExecutor) Collections.EMPTY_LIST).iterator(); } return new FlinkLazyInsertIterable<>(recordItr, true, config, instantTime, table, idPfx, - taskContextSupplier, new ExplicitCreateHandleFactory<>(writeHandle)); + taskContextSupplier, new ExplicitWriteHandleFactory<>(writeHandle)); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java index 539f551c9..9f9f865d5 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java @@ -65,14 +65,13 @@ public class FlinkMergeHelper extends AbstractMer HoodieMergeHandle>, List, List> upsertHandle) throws IOException { final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation(); Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf()); - FlinkMergeHandle>, List, List> mergeHandle = - (FlinkMergeHandle>, List, List>) upsertHandle; + HoodieMergeHandle>, List, List> mergeHandle = upsertHandle; HoodieBaseFile baseFile = mergeHandle.baseFileForMerge(); final GenericDatumWriter gWriter; final GenericDatumReader gReader; Schema readSchema; - if (mergeHandle.isNeedBootStrap() + if (isNeedBootStrap(mergeHandle) && (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent())) { readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema(); gWriter = new GenericDatumWriter<>(readSchema); @@ -87,7 +86,7 @@ public class FlinkMergeHelper extends AbstractMer HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath()); try { final Iterator readerIterator; - if (mergeHandle.isNeedBootStrap() && baseFile.getBootstrapBaseFile().isPresent()) { + if (isNeedBootStrap(mergeHandle) && baseFile.getBootstrapBaseFile().isPresent()) { readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation); } else { readerIterator = reader.getRecordIterator(readSchema); @@ -116,4 +115,8 @@ public class FlinkMergeHelper extends AbstractMer } } + private static boolean isNeedBootStrap(HoodieMergeHandle mergeHandle) { + return mergeHandle instanceof FlinkMergeHandle && ((FlinkMergeHandle) mergeHandle).isNeedBootStrap(); + } + } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java new file mode 100644 index 000000000..8d0857eb7 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit.delta; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.execution.FlinkLazyInsertIterable; +import org.apache.hudi.io.ExplicitWriteHandleFactory; +import org.apache.hudi.io.FlinkAppendHandle; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +public abstract class BaseFlinkDeltaCommitActionExecutor> + extends BaseFlinkCommitActionExecutor { + + public BaseFlinkDeltaCommitActionExecutor(HoodieEngineContext context, + FlinkAppendHandle writeHandle, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + WriteOperationType operationType) { + super(context, writeHandle, config, table, instantTime, operationType); + } + + @Override + public Iterator> handleUpdate(String partitionPath, String fileId, Iterator> recordItr) { + FlinkAppendHandle appendHandle = (FlinkAppendHandle) writeHandle; + if (!appendHandle.isNeedBootStrap()) { + appendHandle.appendNewRecords(recordItr); + } + appendHandle.doAppend(); + List writeStatuses = appendHandle.close(); + return Collections.singletonList(writeStatuses).iterator(); + } + + @Override + public Iterator> handleInsert(String idPfx, Iterator> recordItr) { + return new FlinkLazyInsertIterable<>(recordItr, true, config, instantTime, table, + idPfx, taskContextSupplier, new ExplicitWriteHandleFactory(writeHandle)); + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/FlinkUpsertDeltaCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/FlinkUpsertDeltaCommitActionExecutor.java new file mode 100644 index 000000000..eb850602a --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/FlinkUpsertDeltaCommitActionExecutor.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit.delta; + +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.FlinkAppendHandle; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hudi.table.action.commit.FlinkWriteHelper; + +import java.util.List; + +public class FlinkUpsertDeltaCommitActionExecutor> + extends BaseFlinkDeltaCommitActionExecutor { + private List> inputRecords; + + public FlinkUpsertDeltaCommitActionExecutor(HoodieEngineContext context, + FlinkAppendHandle writeHandle, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + List> inputRecords) { + super(context, writeHandle, config, table, instantTime, WriteOperationType.UPSERT); + this.inputRecords = inputRecords; + } + + @Override + public HoodieWriteMetadata execute() { + return FlinkWriteHelper.newInstance().write(instantTime, inputRecords, context, table, + config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(), this, true); + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkCompactHelpers.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkCompactHelpers.java new file mode 100644 index 000000000..274f691c4 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkCompactHelpers.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.compact; + +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.table.HoodieTable; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; + +/** + * A flink implementation of {@link AbstractCompactHelpers}. + * + * @param + */ +public class FlinkCompactHelpers extends + AbstractCompactHelpers>, List, List> { + + private FlinkCompactHelpers() { + } + + private static class CompactHelperHolder { + private static final FlinkCompactHelpers FLINK_COMPACT_HELPERS = new FlinkCompactHelpers(); + } + + public static FlinkCompactHelpers newInstance() { + return CompactHelperHolder.FLINK_COMPACT_HELPERS; + } + + @Override + public HoodieCommitMetadata createCompactionMetadata(HoodieTable>, List, List> table, + String compactionInstantTime, + List writeStatuses, + String schema) throws IOException { + byte[] planBytes = table.getActiveTimeline().readCompactionPlanAsBytes( + HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get(); + HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeCompactionPlan(planBytes); + List updateStatusMap = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()); + org.apache.hudi.common.model.HoodieCommitMetadata metadata = new org.apache.hudi.common.model.HoodieCommitMetadata(true); + for (HoodieWriteStat stat : updateStatusMap) { + metadata.addWriteStat(stat.getPartitionPath(), stat); + } + metadata.addMetadata(org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY, schema); + if (compactionPlan.getExtraMetadata() != null) { + compactionPlan.getExtraMetadata().forEach(metadata::addMetadata); + } + return metadata; + } +} + diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkScheduleCompactionActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkScheduleCompactionActionExecutor.java new file mode 100644 index 000000000..7db2dc827 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkScheduleCompactionActionExecutor.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.compact; + +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.SyncableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieCompactionException; +import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.text.ParseException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +@SuppressWarnings("checkstyle:LineLength") +public class FlinkScheduleCompactionActionExecutor extends + BaseScheduleCompactionActionExecutor>, List, List> { + + private static final Logger LOG = LogManager.getLogger(FlinkScheduleCompactionActionExecutor.class); + + public FlinkScheduleCompactionActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable>, List, List> table, + String instantTime, + Option> extraMetadata) { + super(context, config, table, instantTime, extraMetadata); + } + + @Override + protected HoodieCompactionPlan scheduleCompaction() { + LOG.info("Checking if compaction needs to be run on " + config.getBasePath()); + // judge if we need to compact according to num delta commits and time elapsed + boolean compactable = needCompact(config.getInlineCompactTriggerStrategy()); + if (compactable) { + LOG.info("Generating compaction plan for merge on read table " + config.getBasePath()); + HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor(); + try { + SyncableFileSystemView fileSystemView = (SyncableFileSystemView) table.getSliceView(); + Set fgInPendingCompactionAndClustering = fileSystemView.getPendingCompactionOperations() + .map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId()) + .collect(Collectors.toSet()); + // exclude files in pending clustering from compaction. + fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet())); + return compactor.generateCompactionPlan(context, table, config, instantTime, fgInPendingCompactionAndClustering); + } catch (IOException e) { + throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e); + } + } + + return new HoodieCompactionPlan(); + } + + public Pair getLatestDeltaCommitInfo(CompactionTriggerStrategy compactionTriggerStrategy) { + Option lastCompaction = table.getActiveTimeline().getCommitTimeline() + .filterCompletedInstants().lastInstant(); + HoodieTimeline deltaCommits = table.getActiveTimeline().getDeltaCommitTimeline(); + + String latestInstantTs; + int deltaCommitsSinceLastCompaction = 0; + if (lastCompaction.isPresent()) { + latestInstantTs = lastCompaction.get().getTimestamp(); + deltaCommitsSinceLastCompaction = deltaCommits.findInstantsAfter(latestInstantTs, Integer.MAX_VALUE).countInstants(); + } else { + latestInstantTs = deltaCommits.firstInstant().get().getTimestamp(); + deltaCommitsSinceLastCompaction = deltaCommits.findInstantsAfterOrEquals(latestInstantTs, Integer.MAX_VALUE).countInstants(); + } + return Pair.of(deltaCommitsSinceLastCompaction, latestInstantTs); + } + + public boolean needCompact(CompactionTriggerStrategy compactionTriggerStrategy) { + boolean compactable; + // get deltaCommitsSinceLastCompaction and lastCompactionTs + Pair latestDeltaCommitInfo = getLatestDeltaCommitInfo(compactionTriggerStrategy); + int inlineCompactDeltaCommitMax = config.getInlineCompactDeltaCommitMax(); + int inlineCompactDeltaSecondsMax = config.getInlineCompactDeltaSecondsMax(); + switch (compactionTriggerStrategy) { + case NUM_COMMITS: + compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft(); + if (compactable) { + LOG.info(String.format("The delta commits >= %s, trigger compaction scheduler.", inlineCompactDeltaCommitMax)); + } + break; + case TIME_ELAPSED: + compactable = inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - parsedToSeconds(latestDeltaCommitInfo.getRight()); + if (compactable) { + LOG.info(String.format("The elapsed time >=%ss, trigger compaction scheduler.", inlineCompactDeltaSecondsMax)); + } + break; + case NUM_OR_TIME: + compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft() + || inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - parsedToSeconds(latestDeltaCommitInfo.getRight()); + if (compactable) { + LOG.info(String.format("The delta commits >= %s or elapsed_time >=%ss, trigger compaction scheduler.", inlineCompactDeltaCommitMax, + inlineCompactDeltaSecondsMax)); + } + break; + case NUM_AND_TIME: + compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft() + && inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - parsedToSeconds(latestDeltaCommitInfo.getRight()); + if (compactable) { + LOG.info(String.format("The delta commits >= %s and elapsed_time >=%ss, trigger compaction scheduler.", inlineCompactDeltaCommitMax, + inlineCompactDeltaSecondsMax)); + } + break; + default: + throw new HoodieCompactionException("Unsupported compaction trigger strategy: " + config.getInlineCompactTriggerStrategy()); + } + return compactable; + } + + public Long parsedToSeconds(String time) { + long timestamp; + try { + timestamp = HoodieActiveTimeline.COMMIT_FORMATTER.parse(time).getTime() / 1000; + } catch (ParseException e) { + throw new HoodieCompactionException(e.getMessage(), e); + } + return timestamp; + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java new file mode 100644 index 000000000..6a2d1bf1f --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.compact; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.model.HoodieCompactionOperation; +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.client.FlinkTaskContextSupplier; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.CompactionOperation; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; +import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.CompactionUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.IOUtils; +import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; + +import org.apache.avro.Schema; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static java.util.stream.Collectors.toList; + +/** + * Compacts a hoodie table with merge on read storage. Computes all possible compactions, + * passes it through a CompactionFilter and executes all the compactions and writes a new version of base files and make + * a normal commit. + * + *

Note: the compaction logic is invoked through the flink pipeline. + */ +@SuppressWarnings("checkstyle:LineLength") +public class HoodieFlinkMergeOnReadTableCompactor implements HoodieCompactor>, List, List> { + + private static final Logger LOG = LogManager.getLogger(HoodieFlinkMergeOnReadTableCompactor.class); + + // Accumulator to keep track of total log files for a table + private AtomicLong totalLogFiles; + // Accumulator to keep track of total log file slices for a table + private AtomicLong totalFileSlices; + + @Override + public List compact(HoodieEngineContext context, HoodieCompactionPlan compactionPlan, + HoodieTable hoodieTable, HoodieWriteConfig config, String compactionInstantTime) throws IOException { + throw new UnsupportedOperationException("HoodieFlinkMergeOnReadTableCompactor does not support compact directly, " + + "the function works as a separate pipeline"); + } + + public List compact(HoodieFlinkCopyOnWriteTable hoodieCopyOnWriteTable, + HoodieTableMetaClient metaClient, + HoodieWriteConfig config, + CompactionOperation operation, + String instantTime) throws IOException { + FileSystem fs = metaClient.getFs(); + + Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); + LOG.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames() + + " for commit " + instantTime); + // TODO - FIX THIS + // Reads the entire avro file. Always only specific blocks should be read from the avro file + // (failure recover). + // Load all the delta commits since the last compaction commit and get all the blocks to be + // loaded and load it using CompositeAvroLogReader + // Since a DeltaCommit is not defined yet, reading all the records. revisit this soon. + String maxInstantTime = metaClient + .getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, + HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) + .filterCompletedInstants().lastInstant().get().getTimestamp(); + // TODO(danny): make it configurable + long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new FlinkTaskContextSupplier(null), config.getProps()); + LOG.info("MaxMemoryPerCompaction => " + maxMemoryPerCompaction); + + List logFiles = operation.getDeltaFileNames().stream().map( + p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString()) + .collect(toList()); + HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(fs) + .withBasePath(metaClient.getBasePath()) + .withLogFilePaths(logFiles) + .withReaderSchema(readerSchema) + .withLatestInstantTime(maxInstantTime) + .withMaxMemorySizeInBytes(maxMemoryPerCompaction) + .withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled()) + .withReverseReader(config.getCompactionReverseLogReadEnabled()) + .withBufferSize(config.getMaxDFSStreamBufferSize()) + .withSpillableMapBasePath(config.getSpillableMapBasePath()) + .build(); + if (!scanner.iterator().hasNext()) { + return new ArrayList<>(); + } + + Option oldDataFileOpt = + operation.getBaseFile(metaClient.getBasePath(), operation.getPartitionPath()); + + // Compacting is very similar to applying updates to existing file + Iterator> result; + // If the dataFile is present, perform updates else perform inserts into a new base file. + if (oldDataFileOpt.isPresent()) { + result = hoodieCopyOnWriteTable.handleUpdate(instantTime, operation.getPartitionPath(), + operation.getFileId(), scanner.getRecords(), + oldDataFileOpt.get()); + } else { + result = hoodieCopyOnWriteTable.handleInsert(instantTime, operation.getPartitionPath(), operation.getFileId(), + scanner.getRecords()); + } + Iterable> resultIterable = () -> result; + return StreamSupport.stream(resultIterable.spliterator(), false).flatMap(Collection::stream).peek(s -> { + s.getStat().setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog()); + s.getStat().setTotalLogFilesCompacted(scanner.getTotalLogFiles()); + s.getStat().setTotalLogRecords(scanner.getTotalLogRecords()); + s.getStat().setPartitionPath(operation.getPartitionPath()); + s.getStat() + .setTotalLogSizeCompacted(operation.getMetrics().get(CompactionStrategy.TOTAL_LOG_FILE_SIZE).longValue()); + s.getStat().setTotalLogBlocks(scanner.getTotalLogBlocks()); + s.getStat().setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks()); + s.getStat().setTotalRollbackBlocks(scanner.getTotalRollbacks()); + RuntimeStats runtimeStats = new RuntimeStats(); + runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks()); + s.getStat().setRuntimeStats(runtimeStats); + }).collect(toList()); + } + + @Override + public HoodieCompactionPlan generateCompactionPlan(HoodieEngineContext context, + HoodieTable>, List, List> hoodieTable, + HoodieWriteConfig config, String compactionCommitTime, + Set fgIdsInPendingCompactionAndClustering) + throws IOException { + totalLogFiles = new AtomicLong(0); + totalFileSlices = new AtomicLong(0); + + ValidationUtils.checkArgument(hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ, + "Can only compact table of type " + HoodieTableType.MERGE_ON_READ + " and not " + + hoodieTable.getMetaClient().getTableType().name()); + + // TODO : check if maxMemory is not greater than JVM or flink.executor memory + // TODO - rollback any compactions in flight + HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); + LOG.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommitTime); + List partitionPaths = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), metaClient.getBasePath()); + + // filter the partition paths if needed to reduce list status + partitionPaths = config.getCompactionStrategy().filterPartitionPaths(config, partitionPaths); + + if (partitionPaths.isEmpty()) { + // In case no partitions could be picked, return no compaction plan + return null; + } + + SliceView fileSystemView = hoodieTable.getSliceView(); + LOG.info("Compaction looking for files to compact in " + partitionPaths + " partitions"); + context.setJobStatus(this.getClass().getSimpleName(), "Looking for files to compact"); + + List operations = context.flatMap(partitionPaths, partitionPath -> fileSystemView + .getLatestFileSlices(partitionPath) + .filter(slice -> !fgIdsInPendingCompactionAndClustering.contains(slice.getFileGroupId())) + .map(s -> { + List logFiles = + s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); + totalLogFiles.addAndGet(logFiles.size()); + totalFileSlices.addAndGet(1L); + // Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO + // for flink Map operations and collecting them finally in Avro generated classes for storing + // into meta files. + Option dataFile = s.getBaseFile(); + return new CompactionOperation(dataFile, partitionPath, logFiles, + config.getCompactionStrategy().captureMetrics(config, s)); + }) + .filter(c -> !c.getDeltaFileNames().isEmpty()), partitionPaths.size()).stream().map(CompactionUtils::buildHoodieCompactionOperation).collect(toList()); + + LOG.info("Total of " + operations.size() + " compactions are retrieved"); + LOG.info("Total number of latest files slices " + totalFileSlices.get()); + LOG.info("Total number of log files " + totalLogFiles.get()); + LOG.info("Total number of file slices " + totalFileSlices.get()); + // Filter the compactions with the passed in filter. This lets us choose most effective + // compactions only + HoodieCompactionPlan compactionPlan = config.getCompactionStrategy().generateCompactionPlan(config, operations, + CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList())); + ValidationUtils.checkArgument( + compactionPlan.getOperations().stream().noneMatch( + op -> fgIdsInPendingCompactionAndClustering.contains(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))), + "Bad Compaction Plan. FileId MUST NOT have multiple pending compactions. " + + "Please fix your strategy implementation. FileIdsWithPendingCompactions :" + fgIdsInPendingCompactionAndClustering + + ", Selected workload :" + compactionPlan); + if (compactionPlan.getOperations().isEmpty()) { + LOG.warn("After filtering, Nothing to compact for " + metaClient.getBasePath()); + } + return compactionPlan; + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java index f163b02f6..8ceea5164 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java @@ -51,8 +51,8 @@ public class FlinkOptions { .key("path") .stringType() .noDefaultValue() - .withDescription("Base path for the target hoodie table." - + "\nThe path would be created if it does not exist,\n" + .withDescription("Base path for the target hoodie table.\n" + + "The path would be created if it does not exist,\n" + "otherwise a Hoodie table expects to be initialized successfully"); // ------------------------------------------------------------------------ @@ -165,6 +165,42 @@ public class FlinkOptions { .defaultValue(128D) // 128MB .withDescription("Batch buffer size in MB to flush data into the underneath filesystem"); + // ------------------------------------------------------------------------ + // Compaction Options + // ------------------------------------------------------------------------ + + public static final ConfigOption COMPACTION_ASYNC_ENABLED = ConfigOptions + .key("compaction.async.enabled") + .booleanType() + .defaultValue(true) // default true for MOR write + .withDescription("Async Compaction, enabled by default for MOR"); + + public static final String NUM_COMMITS = "num_commits"; + public static final String TIME_ELAPSED = "time_elapsed"; + public static final String NUM_AND_TIME = "num_and_time"; + public static final String NUM_OR_TIME = "num_or_time"; + public static final ConfigOption COMPACTION_TRIGGER_STRATEGY = ConfigOptions + .key("compaction.trigger.strategy") + .stringType() + .defaultValue(NUM_COMMITS) // default true for MOR write + .withDescription("Strategy to trigger compaction, options are 'num_commits': trigger compaction when reach N delta commits;\n" + + "'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction;\n" + + "'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied;\n" + + "'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied.\n" + + "Default is 'num_commits'"); + + public static final ConfigOption COMPACTION_DELTA_COMMITS = ConfigOptions + .key("compaction.delta_commits") + .intType() + .defaultValue(5) + .withDescription("Max delta commits needed to trigger compaction, default 5 commits"); + + public static final ConfigOption COMPACTION_DELTA_SECONDS = ConfigOptions + .key("compaction.delta_seconds") + .intType() + .defaultValue(3600) // default 1 hour + .withDescription("Max delta seconds time needed to trigger compaction, default 1 hour"); + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java index ba6cea561..baf8cdf55 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java @@ -40,7 +40,6 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; -import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -333,8 +332,11 @@ public class StreamWriteFunction @SuppressWarnings("unchecked, rawtypes") private void flushBuffer(boolean isFinalBatch) { this.currentInstant = this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE)); - Preconditions.checkNotNull(this.currentInstant, - "No inflight instant when flushing data"); + if (this.currentInstant == null) { + // in case there are empty checkpoints that has no input data + LOG.info("No inflight instant when flushing data, cancel."); + return; + } final List writeStatus; if (buffer.size() > 0) { writeStatus = new ArrayList<>(); diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java index 787c4900b..d2ff45d4a 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java @@ -22,6 +22,7 @@ import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.operator.event.BatchWriteSuccessEvent; @@ -48,6 +49,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -101,6 +103,11 @@ public class StreamWriteOperatorCoordinator */ private final int parallelism; + /** + * Whether needs to schedule compaction task on finished checkpoints. + */ + private final boolean needsScheduleCompaction; + /** * Constructs a StreamingSinkOperatorCoordinator. * @@ -112,6 +119,7 @@ public class StreamWriteOperatorCoordinator int parallelism) { this.conf = conf; this.parallelism = parallelism; + this.needsScheduleCompaction = isNeedsScheduleCompaction(); } @Override @@ -152,6 +160,10 @@ public class StreamWriteOperatorCoordinator public void checkpointComplete(long checkpointId) { // start to commit the instant. checkAndCommitWithRetry(); + // if async compaction is on, schedule the compaction + if (needsScheduleCompaction) { + writeClient.scheduleCompaction(Option.empty()); + } // start new instant. startInstant(); } @@ -202,6 +214,13 @@ public class StreamWriteOperatorCoordinator // Utilities // ------------------------------------------------------------------------- + private boolean isNeedsScheduleCompaction() { + return this.conf.getString(FlinkOptions.TABLE_TYPE) + .toUpperCase(Locale.ROOT) + .equals(HoodieTableType.MERGE_ON_READ.name()) + && this.conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED); + } + @SuppressWarnings("rawtypes") private void initWriteClient() { writeClient = new HoodieFlinkWriteClient( diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/compact/CompactFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/compact/CompactFunction.java new file mode 100644 index 000000000..5b881fdd9 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/compact/CompactFunction.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator.compact; + +import org.apache.hudi.client.FlinkTaskContextSupplier; +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.model.CompactionOperation; +import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable; +import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.List; + +/** + * Function to execute the actual compaction task assigned by the compaction plan task. + * In order to execute scalable, the input should shuffle by the compact event {@link CompactionPlanEvent}. + */ +public class CompactFunction extends KeyedProcessFunction { + + /** + * Config options. + */ + private final Configuration conf; + + /** + * Write Client. + */ + private transient HoodieFlinkWriteClient writeClient; + + /** + * Id of current subtask. + */ + private int taskID; + + public CompactFunction(Configuration conf) { + this.conf = conf; + } + + @Override + public void open(Configuration parameters) throws Exception { + this.taskID = getRuntimeContext().getIndexOfThisSubtask(); + initWriteClient(); + } + + @Override + public void processElement(CompactionPlanEvent event, Context context, Collector collector) throws Exception { + final String instantTime = event.getCompactionInstantTime(); + final CompactionOperation compactionOperation = event.getOperation(); + + HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor(); + List writeStatuses = compactor.compact( + new HoodieFlinkCopyOnWriteTable<>( + this.writeClient.getConfig(), + this.writeClient.getEngineContext(), + this.writeClient.getHoodieTable().getMetaClient()), + this.writeClient.getHoodieTable().getMetaClient(), + this.writeClient.getConfig(), + compactionOperation, + instantTime); + collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, taskID)); + } + + private void initWriteClient() { + HoodieFlinkEngineContext context = + new HoodieFlinkEngineContext( + new SerializableConfiguration(StreamerUtil.getHadoopConf()), + new FlinkTaskContextSupplier(getRuntimeContext())); + + writeClient = new HoodieFlinkWriteClient<>(context, StreamerUtil.getHoodieClientConfig(conf)); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/compact/CompactionCommitEvent.java b/hudi-flink/src/main/java/org/apache/hudi/operator/compact/CompactionCommitEvent.java new file mode 100644 index 000000000..3d77cf90f --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/compact/CompactionCommitEvent.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator.compact; + +import org.apache.hudi.client.WriteStatus; + +import java.io.Serializable; +import java.util.List; + +/** + * Represents a commit event from the compaction task {@link CompactFunction}. + */ +public class CompactionCommitEvent implements Serializable { + private static final long serialVersionUID = 1L; + + /** + * The compaction commit instant time. + */ + private final String instant; + /** + * The write statuses. + */ + private final List writeStatuses; + /** + * The compaction task identifier. + */ + private final int taskID; + + public CompactionCommitEvent(String instant, List writeStatuses, int taskID) { + this.instant = instant; + this.writeStatuses = writeStatuses; + this.taskID = taskID; + } + + public String getInstant() { + return instant; + } + + public List getWriteStatuses() { + return writeStatuses; + } + + public int getTaskID() { + return taskID; + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/compact/CompactionCommitSink.java b/hudi-flink/src/main/java/org/apache/hudi/operator/compact/CompactionCommitSink.java new file mode 100644 index 000000000..f1ad7d070 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/compact/CompactionCommitSink.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator.compact; + +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.client.FlinkTaskContextSupplier; +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.CompactionUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Function to check and commit the compaction action. + * + *

Each time after receiving a compaction commit event {@link CompactionCommitEvent}, + * it loads and checks the compaction plan {@link HoodieCompactionPlan}, + * if all the compaction operations {@link org.apache.hudi.common.model.CompactionOperation} + * of the plan are finished, tries to commit the compaction action. + */ +public class CompactionCommitSink extends RichSinkFunction { + private static final Logger LOG = LoggerFactory.getLogger(CompactionCommitSink.class); + + /** + * Config options. + */ + private final Configuration conf; + + /** + * Write Client. + */ + private transient HoodieFlinkWriteClient writeClient; + + /** + * Buffer to collect the event from each compact task {@code CompactFunction}. + */ + private transient List commitBuffer; + + /** + * Current on-going compaction instant time. + */ + private String compactionInstantTime; + + public CompactionCommitSink(Configuration conf) { + this.conf = conf; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + initWriteClient(); + this.commitBuffer = new ArrayList<>(); + } + + @Override + public void invoke(CompactionCommitEvent event, Context context) throws Exception { + if (compactionInstantTime == null) { + compactionInstantTime = event.getInstant(); + } else if (!event.getInstant().equals(compactionInstantTime)) { + // last compaction still not finish, rolls it back + HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(this.compactionInstantTime); + writeClient.rollbackInflightCompaction(inflightInstant); + this.compactionInstantTime = event.getInstant(); + } + this.commitBuffer.add(event); + commitIfNecessary(); + } + + /** + * Condition to commit: the commit buffer has equal size with the compaction plan operations + * and all the compact commit event {@link CompactionCommitEvent} has the same compaction instant time. + */ + private void commitIfNecessary() throws IOException { + HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan( + this.writeClient.getHoodieTable().getMetaClient(), compactionInstantTime); + boolean isReady = compactionPlan.getOperations().size() == commitBuffer.size() + && commitBuffer.stream().allMatch(event -> event != null && Objects.equals(event.getInstant(), compactionInstantTime)); + if (!isReady) { + return; + } + List statuses = this.commitBuffer.stream() + .map(CompactionCommitEvent::getWriteStatuses) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + + if (this.writeClient.getConfig().shouldAutoCommit()) { + // Prepare the commit metadata. + List updateStatusMap = statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()); + HoodieCommitMetadata metadata = new HoodieCommitMetadata(true); + for (HoodieWriteStat stat : updateStatusMap) { + metadata.addWriteStat(stat.getPartitionPath(), stat); + } + metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, writeClient.getConfig().getSchema()); + this.writeClient.completeCompaction( + metadata, statuses, this.writeClient.getHoodieTable(), compactionInstantTime); + } + // commit the compaction + this.writeClient.commitCompaction(compactionInstantTime, statuses, Option.empty()); + // reset the status + reset(); + } + + private void reset() { + this.commitBuffer.clear(); + this.compactionInstantTime = null; + } + + private void initWriteClient() { + HoodieFlinkEngineContext context = + new HoodieFlinkEngineContext( + new SerializableConfiguration(StreamerUtil.getHadoopConf()), + new FlinkTaskContextSupplier(getRuntimeContext())); + + writeClient = new HoodieFlinkWriteClient<>(context, StreamerUtil.getHoodieClientConfig(this.conf)); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/compact/CompactionPlanEvent.java b/hudi-flink/src/main/java/org/apache/hudi/operator/compact/CompactionPlanEvent.java new file mode 100644 index 000000000..a19a2ee5d --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/compact/CompactionPlanEvent.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator.compact; + +import org.apache.hudi.common.model.CompactionOperation; + +import java.io.Serializable; + +/** + * Represents a compact command from the compaction plan task {@link CompactionPlanOperator}. + */ +public class CompactionPlanEvent implements Serializable { + private static final long serialVersionUID = 1L; + + private final String compactionInstantTime; + + private final CompactionOperation operation; + + public CompactionPlanEvent(String instantTime, CompactionOperation operation) { + this.compactionInstantTime = instantTime; + this.operation = operation; + } + + public String getCompactionInstantTime() { + return compactionInstantTime; + } + + public CompactionOperation getOperation() { + return operation; + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/compact/CompactionPlanOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/compact/CompactionPlanOperator.java new file mode 100644 index 000000000..b7b73731e --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/compact/CompactionPlanOperator.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator.compact; + +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.client.FlinkTaskContextSupplier; +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.model.CompactionOperation; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.CompactionUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +import static java.util.stream.Collectors.toList; + +/** + * Operator that generates the compaction plan with pluggable strategies on finished checkpoints. + * + *

It should be singleton to avoid conflicts. + */ +public class CompactionPlanOperator extends AbstractStreamOperator + implements OneInputStreamOperator { + + /** + * Config options. + */ + private final Configuration conf; + + /** + * Write Client. + */ + private transient HoodieFlinkWriteClient writeClient; + + /** + * Compaction instant time. + */ + private String compactionInstantTime; + + public CompactionPlanOperator(Configuration conf) { + this.conf = conf; + } + + @Override + public void open() throws Exception { + super.open(); + initWriteClient(); + } + + @Override + public void processElement(StreamRecord streamRecord) { + // no operation + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws IOException { + HoodieFlinkTable table = writeClient.getHoodieTable(); + // the last instant takes the highest priority. + Option compactionInstant = table.getActiveTimeline().filterPendingCompactionTimeline().lastInstant(); + String compactionInstantTime = compactionInstant.isPresent() ? compactionInstant.get().getTimestamp() : null; + if (compactionInstantTime == null) { + // do nothing. + LOG.info("No compaction plan for checkpoint " + checkpointId); + return; + } + if (this.compactionInstantTime != null + && Objects.equals(this.compactionInstantTime, compactionInstantTime)) { + // do nothing + LOG.info("Duplicate scheduling for compaction instant: " + compactionInstantTime + ", ignore"); + return; + } + // generate compaction plan + // should support configurable commit metadata + HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan( + table.getMetaClient(), compactionInstantTime); + + if (compactionPlan == null || (compactionPlan.getOperations() == null) + || (compactionPlan.getOperations().isEmpty())) { + // do nothing. + LOG.info("No compaction plan for checkpoint " + checkpointId + " and instant " + compactionInstantTime); + } else { + this.compactionInstantTime = compactionInstantTime; + HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); + HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); + if (!pendingCompactionTimeline.containsInstant(instant)) { + throw new IllegalStateException( + "No Compaction request available at " + compactionInstantTime + " to run compaction"); + } + + // Mark instant as compaction inflight + table.getActiveTimeline().transitionCompactionRequestedToInflight(instant); + table.getMetaClient().reloadActiveTimeline(); + + List operations = compactionPlan.getOperations().stream() + .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); + LOG.info("CompactionPlanFunction compacting " + operations + " files"); + for (CompactionOperation operation : operations) { + output.collect(new StreamRecord<>(new CompactionPlanEvent(compactionInstantTime, operation))); + } + } + } + + @VisibleForTesting + public void setOutput(Output> output) { + this.output = output; + } + + private void initWriteClient() { + HoodieFlinkEngineContext context = + new HoodieFlinkEngineContext( + new SerializableConfiguration(StreamerUtil.getHadoopConf()), + new FlinkTaskContextSupplier(getRuntimeContext())); + + writeClient = new HoodieFlinkWriteClient<>(context, StreamerUtil.getHoodieClientConfig(this.conf)); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java index 8ce7f4043..789d88d69 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.config.HoodieWriteConfig; @@ -136,7 +137,10 @@ public class BucketAssignFunction> this.context = new HoodieFlinkEngineContext( new SerializableConfiguration(this.hadoopConf), new FlinkTaskContextSupplier(getRuntimeContext())); - this.bucketAssigner = new BucketAssigner(context, writeConfig); + this.bucketAssigner = BucketAssigners.create( + HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)), + context, + writeConfig); // initialize and check the partitions load state loadInitialPartitions(); @@ -145,7 +149,7 @@ public class BucketAssignFunction> @Override public void snapshotState(FunctionSnapshotContext context) { - // no operation + this.bucketAssigner.reset(); } @Override @@ -209,7 +213,6 @@ public class BucketAssignFunction> @Override public void notifyCheckpointComplete(long l) { // Refresh the table state when there are new commits. - this.bucketAssigner.reset(); this.bucketAssigner.refreshTable(); checkPartitionsLoaded(); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssigner.java b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssigner.java index 793acdca6..54c9b4f47 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssigner.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssigner.java @@ -65,7 +65,7 @@ public class BucketAssigner { */ private final HashMap bucketInfoMap; - private HoodieTable table; + protected HoodieTable table; /** * Fink engine context. @@ -75,7 +75,7 @@ public class BucketAssigner { /** * The write config. */ - private final HoodieWriteConfig config; + protected final HoodieWriteConfig config; /** * The average record size. diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssigners.java b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssigners.java new file mode 100644 index 000000000..b052c77ed --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssigners.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator.partitioner; + +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.operator.partitioner.delta.DeltaBucketAssigner; + +/** + * Utilities for {@code BucketAssigner}. + */ +public abstract class BucketAssigners { + + private BucketAssigners() {} + + /** + * Creates a {@code BucketAssigner}. + * + * @param tableType The table type + * @param context The engine context + * @param config The configuration + * @return the bucket assigner instance + */ + public static BucketAssigner create( + HoodieTableType tableType, + HoodieFlinkEngineContext context, + HoodieWriteConfig config) { + switch (tableType) { + case COPY_ON_WRITE: + return new BucketAssigner(context, config); + case MERGE_ON_READ: + return new DeltaBucketAssigner(context, config); + default: + throw new AssertionError(); + } + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/delta/DeltaBucketAssigner.java b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/delta/DeltaBucketAssigner.java new file mode 100644 index 000000000..5b99cf3be --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/delta/DeltaBucketAssigner.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator.partitioner.delta; + +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.operator.partitioner.BucketAssigner; +import org.apache.hudi.table.action.commit.SmallFile; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * BucketAssigner for MERGE_ON_READ table type, this allows auto correction of small parquet files to larger ones + * without the need for an index in the logFile. + * + *

Note: assumes the index can always index log files for Flink write. + */ +public class DeltaBucketAssigner extends BucketAssigner { + public DeltaBucketAssigner(HoodieFlinkEngineContext context, HoodieWriteConfig config) { + super(context, config); + } + + @Override + protected List getSmallFiles(String partitionPath) { + // smallFiles only for partitionPath + List smallFileLocations = new ArrayList<>(); + + // Init here since this class (and member variables) might not have been initialized + HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline(); + + // Find out all eligible small file slices + if (!commitTimeline.empty()) { + HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); + // find smallest file in partition and append to it + List allSmallFileSlices = new ArrayList<>(); + // If we can index log files, we can add more inserts to log files for fileIds including those under + // pending compaction. + List allFileSlices = + table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true) + .collect(Collectors.toList()); + for (FileSlice fileSlice : allFileSlices) { + if (isSmallFile(fileSlice)) { + allSmallFileSlices.add(fileSlice); + } + } + // Create SmallFiles from the eligible file slices + for (FileSlice smallFileSlice : allSmallFileSlices) { + SmallFile sf = new SmallFile(); + if (smallFileSlice.getBaseFile().isPresent()) { + // TODO : Move logic of file name, file id, base commit time handling inside file slice + String filename = smallFileSlice.getBaseFile().get().getFileName(); + sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); + sf.sizeBytes = getTotalFileSize(smallFileSlice); + smallFileLocations.add(sf); + } else { + HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get(); + sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()), + FSUtils.getFileIdFromLogPath(logFile.getPath())); + sf.sizeBytes = getTotalFileSize(smallFileSlice); + smallFileLocations.add(sf); + } + } + } + return smallFileLocations; + } + + private long getTotalFileSize(FileSlice fileSlice) { + if (!fileSlice.getBaseFile().isPresent()) { + return convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList())); + } else { + return fileSlice.getBaseFile().get().getFileSize() + + convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList())); + } + } + + private boolean isSmallFile(FileSlice fileSlice) { + long totalSize = getTotalFileSize(fileSlice); + return totalSize < config.getParquetMaxFileSize(); + } + + // TODO (NA) : Make this static part of utility + public long convertLogFilesSizeToExpectedParquetSize(List hoodieLogFiles) { + long totalSizeOfLogFiles = hoodieLogFiles.stream().map(HoodieLogFile::getFileSize) + .filter(size -> size > 0).reduce(Long::sum).orElse(0L); + // Here we assume that if there is no base parquet file, all log files contain only inserts. + // We can then just get the parquet equivalent size of these log files, compare that with + // {@link config.getParquetMaxFileSize()} and decide if there is scope to insert more rows + return (long) (totalSizeOfLogFiles * config.getLogFileToParquetCompressionRatio()); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 1c4028493..e56ab22bc 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -36,6 +36,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.operator.FlinkOptions; import org.apache.hudi.schema.FilebasedSchemaProvider; +import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -54,6 +55,7 @@ import java.io.File; import java.io.IOException; import java.io.StringReader; import java.util.List; +import java.util.Locale; import java.util.Objects; import java.util.Properties; @@ -229,6 +231,10 @@ public class StreamerUtil { .withCompactionConfig( HoodieCompactionConfig.newBuilder() .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS)) + .withInlineCompactionTriggerStrategy( + CompactionTriggerStrategy.valueOf(conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY).toUpperCase(Locale.ROOT))) + .withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_COMMITS)) + .withMaxDeltaSecondsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS)) .build()) .forTable(conf.getString(FlinkOptions.TABLE_NAME)) .withAutoCommit(false) diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java index 42973862d..3c8b9f652 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java @@ -20,6 +20,12 @@ package org.apache.hudi.operator; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.operator.compact.CompactFunction; +import org.apache.hudi.operator.compact.CompactionCommitEvent; +import org.apache.hudi.operator.compact.CompactionCommitSink; +import org.apache.hudi.operator.compact.CompactionPlanEvent; +import org.apache.hudi.operator.compact.CompactionPlanOperator; import org.apache.hudi.operator.partitioner.BucketAssignFunction; import org.apache.hudi.operator.transform.RowDataToHoodieFunction; import org.apache.hudi.operator.utils.TestConfigurations; @@ -219,4 +225,81 @@ public class StreamWriteITCase extends TestLogger { TestData.checkWrittenFullData(tempFile, EXPECTED); } + + @Test + public void testMergeOnReadWriteWithCompaction() throws Exception { + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1); + conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name()); + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + execEnv.getConfig().disableObjectReuse(); + execEnv.setParallelism(4); + // set up checkpoint interval + execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); + execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + + // Read from file source + RowType rowType = + (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)) + .getLogicalType(); + StreamWriteOperatorFactory operatorFactory = + new StreamWriteOperatorFactory<>(conf); + + JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( + rowType, + new RowDataTypeInfo(rowType), + false, + true, + TimestampFormat.ISO_8601 + ); + String sourcePath = Objects.requireNonNull(Thread.currentThread() + .getContextClassLoader().getResource("test_source.data")).toString(); + + TextInputFormat format = new TextInputFormat(new Path(sourcePath)); + format.setFilesFilter(FilePathFilter.createDefaultFilter()); + TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO; + format.setCharsetName("UTF-8"); + + execEnv + // use PROCESS_CONTINUOUSLY mode to trigger checkpoint + .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo) + .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) + .setParallelism(4) + .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)) + // Key-by partition path, to avoid multiple subtasks write to a partition at the same time + .keyBy(HoodieRecord::getPartitionPath) + .transform( + "bucket_assigner", + TypeInformation.of(HoodieRecord.class), + new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) + .uid("uid_bucket_assigner") + // shuffle by fileId(bucket id) + .keyBy(record -> record.getCurrentLocation().getFileId()) + .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) + .uid("uid_hoodie_stream_write") + .transform("compact_plan_generate", + TypeInformation.of(CompactionPlanEvent.class), + new CompactionPlanOperator(conf)) + .uid("uid_compact_plan_generate") + .setParallelism(1) // plan generate must be singleton + .keyBy(event -> event.getOperation().hashCode()) + .transform("compact_task", + TypeInformation.of(CompactionCommitEvent.class), + new KeyedProcessOperator<>(new CompactFunction(conf))) + .addSink(new CompactionCommitSink(conf)) + .name("compact_commit") + .setParallelism(1); + + JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME))); + if (client.getJobStatus().get() != JobStatus.FAILED) { + try { + TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction to finish + client.cancel(); + } catch (Throwable var1) { + // ignored + } + } + + TestData.checkWrittenFullData(tempFile, EXPECTED); + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java b/hudi-flink/src/test/java/org/apache/hudi/operator/TestWriteCopyOnWrite.java similarity index 90% rename from hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java rename to hudi-flink/src/test/java/org/apache/hudi/operator/TestWriteCopyOnWrite.java index 492a50585..e9c678b8f 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/TestWriteCopyOnWrite.java @@ -22,6 +22,7 @@ import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.operator.event.BatchWriteSuccessEvent; @@ -45,7 +46,6 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import static org.apache.hudi.operator.utils.TestData.checkWrittenData; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -58,13 +58,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue; /** * Test cases for StreamingSinkFunction. */ -public class StreamWriteFunctionTest { +public class TestWriteCopyOnWrite { - private static final Map EXPECTED1 = new HashMap<>(); + protected static final Map EXPECTED1 = new HashMap<>(); - private static final Map EXPECTED2 = new HashMap<>(); + protected static final Map EXPECTED2 = new HashMap<>(); - private static final Map EXPECTED3 = new HashMap<>(); + protected static final Map EXPECTED3 = new HashMap<>(); static { EXPECTED1.put("par1", "[id1,par1,id1,Danny,23,1,par1, id2,par1,id2,Stephen,33,2,par1]"); @@ -82,14 +82,27 @@ public class StreamWriteFunctionTest { EXPECTED3.put("par1", "[id1,par1,id1,Danny,23,1,par1]"); } - private StreamWriteFunctionWrapper funcWrapper; + protected Configuration conf; + + protected StreamWriteFunctionWrapper funcWrapper; @TempDir File tempFile; @BeforeEach public void before() throws Exception { - this.funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath()); + final String basePath = tempFile.getAbsolutePath(); + conf = TestConfigurations.getDefaultConf(basePath); + conf.setString(FlinkOptions.TABLE_TYPE, getTableType()); + setUp(conf); + this.funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); + } + + /** + * Override to have custom configuration. + */ + protected void setUp(Configuration conf) { + // for sub-class extension } @AfterEach @@ -114,7 +127,7 @@ public class StreamWriteFunctionTest { funcWrapper.checkpointFunction(1); String instant = funcWrapper.getWriteClient() - .getInflightAndRequestedInstant("COPY_ON_WRITE"); + .getInflightAndRequestedInstant(getTableType()); final OperatorEvent nextEvent = funcWrapper.getNextEvent(); MatcherAssert.assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); @@ -140,7 +153,7 @@ public class StreamWriteFunctionTest { funcWrapper.checkpointFunction(2); String instant2 = funcWrapper.getWriteClient() - .getInflightAndRequestedInstant("COPY_ON_WRITE"); + .getInflightAndRequestedInstant(getTableType()); assertNotEquals(instant, instant2); final OperatorEvent nextEvent2 = funcWrapper.getNextEvent(); @@ -169,7 +182,7 @@ public class StreamWriteFunctionTest { funcWrapper.checkpointFunction(1); String instant = funcWrapper.getWriteClient() - .getInflightAndRequestedInstant("COPY_ON_WRITE"); + .getInflightAndRequestedInstant(getTableType()); assertNotNull(instant); final OperatorEvent nextEvent = funcWrapper.getNextEvent(); @@ -191,10 +204,8 @@ public class StreamWriteFunctionTest { funcWrapper.invoke(rowData); } - // this triggers NPE cause there is no inflight instant - assertThrows(NullPointerException.class, - () -> funcWrapper.checkpointFunction(2), - "No inflight instant when flushing data"); + // this returns early cause there is no inflight instant + funcWrapper.checkpointFunction(2); // do not sent the write event and fails the checkpoint, // behaves like the last checkpoint is successful. funcWrapper.checkpointFails(2); @@ -213,7 +224,7 @@ public class StreamWriteFunctionTest { funcWrapper.checkpointFunction(1); String instant = funcWrapper.getWriteClient() - .getInflightAndRequestedInstant("COPY_ON_WRITE"); + .getInflightAndRequestedInstant(getTableType()); final OperatorEvent nextEvent = funcWrapper.getNextEvent(); assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); @@ -232,7 +243,6 @@ public class StreamWriteFunctionTest { @Test public void testInsertDuplicates() throws Exception { // reset the config option - Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true); funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); @@ -300,11 +310,10 @@ public class StreamWriteFunctionTest { funcWrapper.checkpointFunction(2); String instant = funcWrapper.getWriteClient() - .getInflightAndRequestedInstant("COPY_ON_WRITE"); + .getInflightAndRequestedInstant(getTableType()); nextEvent = funcWrapper.getNextEvent(); assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); - checkWrittenData(tempFile, EXPECTED2); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); @@ -319,7 +328,6 @@ public class StreamWriteFunctionTest { @Test public void testInsertWithMiniBatches() throws Exception { // reset the config option - Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.001); // 1Kb batch size funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); @@ -349,16 +357,11 @@ public class StreamWriteFunctionTest { assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); String instant = funcWrapper.getWriteClient() - .getInflightAndRequestedInstant("COPY_ON_WRITE"); + .getInflightAndRequestedInstant(getTableType()); funcWrapper.checkpointComplete(1); - Map expected = new HashMap<>(); - expected.put("par1", "[id1,par1,id1,Danny,23,1,par1, " - + "id1,par1,id1,Danny,23,1,par1, " - + "id1,par1,id1,Danny,23,1,par1, " - + "id1,par1,id1,Danny,23,1,par1, " - + "id1,par1,id1,Danny,23,1,par1]"); + Map expected = getMiniBatchExpected(); checkWrittenData(tempFile, expected, 1); // started a new instant already @@ -384,6 +387,16 @@ public class StreamWriteFunctionTest { checkWrittenData(tempFile, expected, 1); } + Map getMiniBatchExpected() { + Map expected = new HashMap<>(); + expected.put("par1", "[id1,par1,id1,Danny,23,1,par1, " + + "id1,par1,id1,Danny,23,1,par1, " + + "id1,par1,id1,Danny,23,1,par1, " + + "id1,par1,id1,Danny,23,1,par1, " + + "id1,par1,id1,Danny,23,1,par1]"); + return expected; + } + @Test public void testIndexStateBootstrap() throws Exception { // open the function and ingest data @@ -452,7 +465,7 @@ public class StreamWriteFunctionTest { @SuppressWarnings("rawtypes") private void checkInflightInstant(HoodieFlinkWriteClient writeClient) { - final String instant = writeClient.getInflightAndRequestedInstant("COPY_ON_WRITE"); + final String instant = writeClient.getInflightAndRequestedInstant(getTableType()); assertNotNull(instant); } @@ -464,10 +477,10 @@ public class StreamWriteFunctionTest { final String instant; switch (state) { case REQUESTED: - instant = writeClient.getInflightAndRequestedInstant("COPY_ON_WRITE"); + instant = writeClient.getInflightAndRequestedInstant(getTableType()); break; case COMPLETED: - instant = writeClient.getLastCompletedInstant("COPY_ON_WRITE"); + instant = writeClient.getLastCompletedInstant(getTableType()); break; default: throw new AssertionError("Unexpected state"); @@ -475,10 +488,22 @@ public class StreamWriteFunctionTest { assertThat(instant, is(instantStr)); } + protected String getTableType() { + return HoodieTableType.COPY_ON_WRITE.name(); + } + + protected void checkWrittenData(File baseFile, Map expected) throws Exception { + checkWrittenData(baseFile, expected, 4); + } + + protected void checkWrittenData(File baseFile, Map expected, int partitions) throws Exception { + TestData.checkWrittenData(baseFile, expected, partitions); + } + /** * Asserts the data files are empty. */ - private void assertEmptyDataFiles() { + protected void assertEmptyDataFiles() { File[] dataFiles = tempFile.listFiles(file -> !file.getName().startsWith(".")); assertNotNull(dataFiles); assertThat(dataFiles.length, is(0)); diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/TestWriteMergeOnRead.java b/hudi-flink/src/test/java/org/apache/hudi/operator/TestWriteMergeOnRead.java new file mode 100644 index 000000000..0b305a7c1 --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/TestWriteMergeOnRead.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator; + +import org.apache.hudi.client.FlinkTaskContextSupplier; +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.operator.utils.TestData; +import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.avro.Schema; +import org.apache.hadoop.fs.FileSystem; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Test cases for delta stream write. + */ +public class TestWriteMergeOnRead extends TestWriteCopyOnWrite { + private FileSystem fs; + private HoodieWriteConfig writeConfig; + private HoodieFlinkEngineContext context; + + @BeforeEach + public void before() throws Exception { + super.before(); + fs = FSUtils.getFs(tempFile.getAbsolutePath(), new org.apache.hadoop.conf.Configuration()); + writeConfig = StreamerUtil.getHoodieClientConfig(conf); + context = new HoodieFlinkEngineContext( + new SerializableConfiguration(StreamerUtil.getHadoopConf()), + new FlinkTaskContextSupplier(null)); + } + + @Override + protected void checkWrittenData(File baseFile, Map expected, int partitions) throws Exception { + HoodieTableMetaClient metaClient = HoodieFlinkTable.create(writeConfig, context).getMetaClient(); + Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema(); + String latestInstant = metaClient.getCommitsTimeline().filterCompletedInstants() + .getInstants() + .filter(x -> x.getAction().equals(HoodieActiveTimeline.DELTA_COMMIT_ACTION)) + .map(HoodieInstant::getTimestamp) + .collect(Collectors.toList()).stream() + .max(Comparator.naturalOrder()) + .orElse(null); + TestData.checkWrittenDataMOR(fs, latestInstant, baseFile, expected, partitions, schema); + } + + @Disabled + @Test + public void testIndexStateBootstrap() { + // Ignore the index bootstrap because we only support parquet load now. + } + + Map getMiniBatchExpected() { + Map expected = new HashMap<>(); + // MOR mode merges the messages with the same key. + expected.put("par1", "[id1,par1,id1,Danny,23,1,par1]"); + return expected; + } + + @Override + protected String getTableType() { + return HoodieTableType.MERGE_ON_READ.name(); + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/TestWriteMergeOnReadWithCompact.java b/hudi-flink/src/test/java/org/apache/hudi/operator/TestWriteMergeOnReadWithCompact.java new file mode 100644 index 000000000..9a9312c83 --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/TestWriteMergeOnReadWithCompact.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator; + +import org.apache.hudi.common.model.HoodieTableType; + +import org.apache.flink.configuration.Configuration; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +/** + * Test cases for delta stream write with compaction. + */ +public class TestWriteMergeOnReadWithCompact extends TestWriteCopyOnWrite { + + @Override + protected void setUp(Configuration conf) { + // trigger the compaction for every finished checkpoint + conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1); + } + + @Disabled + @Test + public void testIndexStateBootstrap() { + // Ignore the index bootstrap because we only support parquet load now. + } + + Map getMiniBatchExpected() { + Map expected = new HashMap<>(); + // MOR mode merges the messages with the same key. + expected.put("par1", "[id1,par1,id1,Danny,23,1,par1]"); + return expected; + } + + @Override + protected String getTableType() { + return HoodieTableType.MERGE_ON_READ.name(); + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/CompactFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/CompactFunctionWrapper.java new file mode 100644 index 000000000..e2fcb04f1 --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/CompactFunctionWrapper.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator.utils; + +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.operator.compact.CompactFunction; +import org.apache.hudi.operator.compact.CompactionCommitEvent; +import org.apache.hudi.operator.compact.CompactionCommitSink; +import org.apache.hudi.operator.compact.CompactionPlanEvent; +import org.apache.hudi.operator.compact.CompactionPlanOperator; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; + +import java.util.ArrayList; +import java.util.List; + +/** + * A wrapper class to manipulate the {@link org.apache.hudi.operator.compact.CompactFunction} instance for testing. + */ +public class CompactFunctionWrapper { + private final Configuration conf; + + private final IOManager ioManager; + private final StreamingRuntimeContext runtimeContext; + private final MockFunctionInitializationContext functionInitializationContext; + + /** Function that generates the {@link HoodieCompactionPlan}. */ + private CompactionPlanOperator compactionPlanFunction; + /** Function that executes the compaction task. */ + private CompactFunction compactFunction; + /** Stream sink to handle compaction commits. */ + private CompactionCommitSink commitSink; + + public CompactFunctionWrapper(Configuration conf) throws Exception { + this.ioManager = new IOManagerAsync(); + MockEnvironment environment = new MockEnvironmentBuilder() + .setTaskName("mockTask") + .setManagedMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE) + .setIOManager(ioManager) + .build(); + this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment); + this.conf = conf; + this.functionInitializationContext = new MockFunctionInitializationContext(); + } + + public void openFunction() throws Exception { + compactionPlanFunction = new CompactionPlanOperator(conf); + compactionPlanFunction.open(); + + compactFunction = new CompactFunction(conf); + compactFunction.setRuntimeContext(runtimeContext); + compactFunction.open(conf); + + commitSink = new CompactionCommitSink(conf); + commitSink.setRuntimeContext(runtimeContext); + commitSink.open(conf); + } + + public void compact(long checkpointID) throws Exception { + List events = new ArrayList<>(); + // collect the CompactEvents. + Output> output = new Output>() { + @Override + public void emitWatermark(Watermark watermark) { + + } + + @Override + public void collect(OutputTag outputTag, StreamRecord streamRecord) { + + } + + @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) { + + } + + @Override + public void collect(StreamRecord record) { + events.add(record.getValue()); + } + + @Override + public void close() { + + } + }; + compactionPlanFunction.setOutput(output); + compactionPlanFunction.notifyCheckpointComplete(checkpointID); + // collect the CompactCommitEvents + List compactCommitEvents = new ArrayList<>(); + for (CompactionPlanEvent event: events) { + compactFunction.processElement(event, null, new Collector() { + @Override + public void collect(CompactionCommitEvent event) { + compactCommitEvents.add(event); + } + + @Override + public void close() { + + } + }); + } + // handle and commit the compaction + for (CompactionCommitEvent event : compactCommitEvents) { + commitSink.invoke(event, null); + } + } + + public void close() throws Exception { + ioManager.close(); + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java index d16d4d735..8b0eea7d7 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java @@ -21,6 +21,8 @@ package org.apache.hudi.operator.utils; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.operator.FlinkOptions; import org.apache.hudi.operator.StreamWriteFunction; import org.apache.hudi.operator.StreamWriteOperatorCoordinator; import org.apache.hudi.operator.event.BatchWriteSuccessEvent; @@ -64,6 +66,8 @@ public class StreamWriteFunctionWrapper { /** Stream write function. */ private StreamWriteFunction, Object> writeFunction; + private CompactFunctionWrapper compactFunctionWrapper; + public StreamWriteFunctionWrapper(String tablePath) throws Exception { this(tablePath, TestConfigurations.getDefaultConf(tablePath)); } @@ -81,6 +85,7 @@ public class StreamWriteFunctionWrapper { // one function this.coordinator = new StreamWriteOperatorCoordinator(conf, 1); this.functionInitializationContext = new MockFunctionInitializationContext(); + this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf); } public void openFunction() throws Exception { @@ -98,6 +103,10 @@ public class StreamWriteFunctionWrapper { writeFunction.setRuntimeContext(runtimeContext); writeFunction.setOperatorEventGateway(gateway); writeFunction.open(conf); + + if (conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)) { + compactFunctionWrapper.openFunction(); + } } public void invoke(I record) throws Exception { @@ -149,6 +158,13 @@ public class StreamWriteFunctionWrapper { coordinator.checkpointComplete(checkpointId); this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId); this.writeFunction.notifyCheckpointComplete(checkpointId); + if (conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)) { + try { + compactFunctionWrapper.compact(checkpointId); + } catch (Exception e) { + throw new HoodieException(e); + } + } } public void checkpointFails(long checkpointId) { diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java index 5b4131c99..c8c83e909 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java @@ -22,10 +22,12 @@ import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; @@ -36,6 +38,7 @@ import org.apache.flink.table.data.writer.BinaryWriter; import org.apache.flink.table.runtime.types.InternalSerializers; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.Strings; import org.apache.parquet.avro.AvroParquetReader; @@ -49,6 +52,8 @@ import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; import java.util.stream.IntStream; import static junit.framework.TestCase.assertEquals; @@ -107,6 +112,28 @@ public class TestData { TimestampData.fromEpochMillis(1), StringData.fromString("par1")))); } + public static List DATA_SET_FOUR = Arrays.asList( + // update: advance the age by 1 + binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 24, + TimestampData.fromEpochMillis(2), StringData.fromString("par1")), + binaryRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 34, + TimestampData.fromEpochMillis(3), StringData.fromString("par1")), + binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"), 54, + TimestampData.fromEpochMillis(4), StringData.fromString("par2")), + binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 32, + TimestampData.fromEpochMillis(5), StringData.fromString("par2")), + // same with before + binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18, + TimestampData.fromEpochMillis(6), StringData.fromString("par3")), + // new data + binaryRow(StringData.fromString("id9"), StringData.fromString("Jane"), 19, + TimestampData.fromEpochMillis(6), StringData.fromString("par3")), + binaryRow(StringData.fromString("id10"), StringData.fromString("Ella"), 38, + TimestampData.fromEpochMillis(7), StringData.fromString("par4")), + binaryRow(StringData.fromString("id11"), StringData.fromString("Phoebe"), 52, + TimestampData.fromEpochMillis(8), StringData.fromString("par4")) + ); + /** * Checks the source data TestConfigurations.DATA_SET_ONE are written as expected. * @@ -201,6 +228,75 @@ public class TestData { } + /** + * Checks the MERGE_ON_READ source data are written as expected. + * + *

Note: Replace it with the Flink reader when it is supported. + * + * @param fs The file system + * @param latestInstant The latest committed instant of current table + * @param baseFile The file base to check, should be a directory + * @param expected The expected results mapping, the key should be the partition path + * @param partitions The expected partition number + * @param schema The read schema + */ + public static void checkWrittenDataMOR( + FileSystem fs, + String latestInstant, + File baseFile, + Map expected, + int partitions, + Schema schema) { + assert baseFile.isDirectory() : "Base path should be a directory"; + FileFilter partitionFilter = file -> !file.getName().startsWith("."); + File[] partitionDirs = baseFile.listFiles(partitionFilter); + assertNotNull(partitionDirs); + assertThat(partitionDirs.length, is(partitions)); + for (File partitionDir : partitionDirs) { + File[] dataFiles = partitionDir.listFiles(file -> + file.getName().contains(".log.") && !file.getName().startsWith("..")); + assertNotNull(dataFiles); + HoodieMergedLogRecordScanner scanner = getScanner( + fs, baseFile.getPath(), Arrays.stream(dataFiles).map(File::getAbsolutePath) + .sorted(Comparator.naturalOrder()).collect(Collectors.toList()), + schema, latestInstant); + List readBuffer = scanner.getRecords().values().stream() + .map(hoodieRecord -> { + try { + return filterOutVariables((GenericRecord) hoodieRecord.getData().getInsertValue(schema, new Properties()).get()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .sorted(Comparator.naturalOrder()) + .collect(Collectors.toList()); + assertThat(readBuffer.toString(), is(expected.get(partitionDir.getName()))); + } + } + + /** + * Returns the scanner to read avro log files. + */ + private static HoodieMergedLogRecordScanner getScanner( + FileSystem fs, + String basePath, + List logPaths, + Schema readSchema, + String instant) { + return HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(fs) + .withBasePath(basePath) + .withLogFilePaths(logPaths) + .withReaderSchema(readSchema) + .withLatestInstantTime(instant) + .withReadBlocksLazily(false) + .withReverseReader(false) + .withBufferSize(16 * 1024 * 1024) + .withMaxMemorySizeInBytes(1024 * 1024L) + .withSpillableMapBasePath("/tmp/") + .build(); + } + /** * Filter out the variables like file name. */