diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index ec586a180..a197febea 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -775,19 +775,6 @@ public abstract class AbstractHoodieWriteClient table, String compactionCommitTime); - /** - * Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file to the .requested file - * TODO : Deprecate this method and make it protected - * @param inflightInstant Inflight Compaction Instant - * @param table Hoodie Table - */ - public void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) { - String commitTime = HoodieActiveTimeline.createNewInstantTime(); - table.scheduleRollback(context, commitTime, inflightInstant, false); - table.rollback(context, commitTime, inflightInstant, false); - table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant); - } - /** * Get inflight time line exclude compaction and clustering. * @param metaClient diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCompactionHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCompactionHandler.java new file mode 100644 index 000000000..eeb287abd --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCompactionHandler.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Interface for insert and update operations in compaction. + * + * @param HoodieRecordPayload type. + */ +public interface HoodieCompactionHandler { + Iterator> handleUpdate(String instantTime, String partitionPath, String fileId, + Map> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException; + + Iterator> handleInsert(String instantTime, String partitionPath, String fileId, + Map> recordMap); +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 994c74b02..135eb8be8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -365,12 +365,11 @@ public abstract class HoodieTable implem /** * Run Compaction on the table. Compaction arranges the data so that it is optimized for data access. * - * @param context HoodieEngineContext + * @param context HoodieEngineContext * @param compactionInstantTime Instant Time */ public abstract HoodieWriteMetadata compact(HoodieEngineContext context, - String compactionInstantTime); - + String compactionInstantTime); /** * Schedule clustering for the instant time. @@ -471,11 +470,24 @@ public abstract class HoodieTable implem String restoreInstantTime, String instantToRestore); + /** + * Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file + * to the .requested file. + * + * @param inflightInstant Inflight Compaction Instant + */ + public void rollbackInflightCompaction(HoodieInstant inflightInstant) { + String commitTime = HoodieActiveTimeline.createNewInstantTime(); + scheduleRollback(context, commitTime, inflightInstant, false); + rollback(context, commitTime, inflightInstant, false); + getActiveTimeline().revertCompactionInflightToRequested(inflightInstant); + } + /** * Finalize the written data onto storage. Perform any final cleanups. * * @param context HoodieEngineContext - * @param stats List of HoodieWriteStats + * @param stats List of HoodieWriteStats * @throws HoodieIOException if some paths can't be finalized on storage */ public void finalizeWrite(HoodieEngineContext context, String instantTs, List stats) throws HoodieIOException { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java index 5ef204f97..d771a574e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java @@ -46,6 +46,36 @@ public class HoodieWriteMetadata { public HoodieWriteMetadata() { } + /** + * Clones the write metadata with transformed write statuses. + * + * @param transformedWriteStatuses transformed write statuses + * @param type of transformed write statuses + * @return Cloned {@link HoodieWriteMetadata} instance + */ + public HoodieWriteMetadata clone(T transformedWriteStatuses) { + HoodieWriteMetadata newMetadataInstance = new HoodieWriteMetadata<>(); + newMetadataInstance.setWriteStatuses(transformedWriteStatuses); + if (indexLookupDuration.isPresent()) { + newMetadataInstance.setIndexLookupDuration(indexLookupDuration.get()); + } + newMetadataInstance.setCommitted(isCommitted); + newMetadataInstance.setCommitMetadata(commitMetadata); + if (writeStats.isPresent()) { + newMetadataInstance.setWriteStats(writeStats.get()); + } + if (indexUpdateDuration.isPresent()) { + newMetadataInstance.setIndexUpdateDuration(indexUpdateDuration.get()); + } + if (finalizeDuration.isPresent()) { + newMetadataInstance.setFinalizeDuration(finalizeDuration.get()); + } + if (partitionToReplaceFileIds.isPresent()) { + newMetadataInstance.setPartitionToReplaceFileIds(partitionToReplaceFileIds.get()); + } + return newMetadataInstance; + } + public O getWriteStatuses() { return writeStatuses; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/BaseScheduleCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/BaseScheduleCompactionActionExecutor.java deleted file mode 100644 index 25c2fec86..000000000 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/BaseScheduleCompactionActionExecutor.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.action.compact; - -import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.action.BaseActionExecutor; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -public abstract class BaseScheduleCompactionActionExecutor extends BaseActionExecutor> { - - private final Option> extraMetadata; - - public BaseScheduleCompactionActionExecutor(HoodieEngineContext context, - HoodieWriteConfig config, - HoodieTable table, - String instantTime, - Option> extraMetadata) { - super(context, config, table, instantTime); - this.extraMetadata = extraMetadata; - } - - protected abstract HoodieCompactionPlan scheduleCompaction(); - - @Override - public Option execute() { - if (!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() - && !config.getFailedWritesCleanPolicy().isLazy()) { - // if there are inflight writes, their instantTime must not be less than that of compaction instant time - table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant() - .ifPresent(earliestInflight -> ValidationUtils.checkArgument( - HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), HoodieTimeline.GREATER_THAN, instantTime), - "Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight - + ", Compaction scheduled at " + instantTime)); - // Committed and pending compaction instants should have strictly lower timestamps - List conflictingInstants = table.getActiveTimeline() - .getWriteTimeline().filterCompletedAndCompactionInstants().getInstants() - .filter(instant -> HoodieTimeline.compareTimestamps( - instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, instantTime)) - .collect(Collectors.toList()); - ValidationUtils.checkArgument(conflictingInstants.isEmpty(), - "Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :" - + conflictingInstants); - } - - HoodieCompactionPlan plan = scheduleCompaction(); - if (plan != null && (plan.getOperations() != null) && (!plan.getOperations().isEmpty())) { - extraMetadata.ifPresent(plan::setExtraMetadata); - HoodieInstant compactionInstant = - new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime); - try { - table.getActiveTimeline().saveToCompactionRequested(compactionInstant, - TimelineMetadataUtils.serializeCompactionPlan(plan)); - } catch (IOException ioe) { - throw new HoodieIOException("Exception scheduling compaction", ioe); - } - return Option.of(plan); - } - return Option.empty(); - } -} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/AbstractCompactHelpers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java similarity index 59% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/AbstractCompactHelpers.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java index 3ff9e625e..a348eb0ed 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/AbstractCompactHelpers.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java @@ -18,17 +18,23 @@ package org.apache.hudi.table.action.compact; +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieCompactionException; import org.apache.hudi.table.HoodieTable; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.List; /** * Base class helps to perform compact. @@ -38,11 +44,34 @@ import java.nio.charset.StandardCharsets; * @param Type of keys * @param Type of outputs */ -public abstract class AbstractCompactHelpers { - public abstract HoodieCommitMetadata createCompactionMetadata(HoodieTable table, - String compactionInstantTime, - O writeStatuses, - String schema) throws IOException; +public class CompactHelpers { + + private static final CompactHelpers SINGLETON_INSTANCE = new CompactHelpers(); + + private CompactHelpers() { + } + + public static CompactHelpers getInstance() { + return SINGLETON_INSTANCE; + } + + public HoodieCommitMetadata createCompactionMetadata( + HoodieTable table, String compactionInstantTime, HoodieData writeStatuses, + String schema) throws IOException { + byte[] planBytes = table.getActiveTimeline().readCompactionPlanAsBytes( + HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get(); + HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeCompactionPlan(planBytes); + List updateStatusMap = writeStatuses.map(WriteStatus::getStat).collectAsList(); + HoodieCommitMetadata metadata = new HoodieCommitMetadata(true); + for (HoodieWriteStat stat : updateStatusMap) { + metadata.addWriteStat(stat.getPartitionPath(), stat); + } + metadata.addMetadata(org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY, schema); + if (compactionPlan.getExtraMetadata() != null) { + compactionPlan.getExtraMetadata().forEach(metadata::addMetadata); + } + return metadata; + } public void completeInflightCompaction(HoodieTable table, String compactionCommitTime, HoodieCommitMetadata commitMetadata) { HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java index c92c0b3a0..ad05876d7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java @@ -18,39 +18,280 @@ package org.apache.hudi.table.action.compact; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieAccumulator; +import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.CompactionOperation; +import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; +import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.CompactionUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.IOUtils; +import org.apache.hudi.table.HoodieCompactionHandler; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; + +import org.apache.avro.Schema; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; import java.util.Set; +import java.util.stream.StreamSupport; + +import static java.util.stream.Collectors.toList; /** * A HoodieCompactor runs compaction on a hoodie table. */ -public interface HoodieCompactor extends Serializable { +public abstract class HoodieCompactor implements Serializable { + + private static final Logger LOG = LogManager.getLogger(HoodieCompactor.class); /** - * Generate a new compaction plan for scheduling. + * Handles the compaction timeline based on the compaction instant before actual compaction. * - * @param context HoodieEngineContext - * @param hoodieTable Hoodie Table - * @param config Hoodie Write Configuration - * @param compactionCommitTime scheduled compaction commit time - * @param fgIdsInPendingCompactions partition-fileId pairs for which compaction is pending - * @return Compaction Plan - * @throws IOException when encountering errors + * @param table {@link HoodieTable} instance to use. + * @param pendingCompactionTimeline pending compaction timeline. + * @param compactionInstantTime compaction instant */ - HoodieCompactionPlan generateCompactionPlan(HoodieEngineContext context, HoodieTable hoodieTable, HoodieWriteConfig config, - String compactionCommitTime, Set fgIdsInPendingCompactions) throws IOException; + public abstract void preCompact( + HoodieTable table, HoodieTimeline pendingCompactionTimeline, String compactionInstantTime); + + /** + * Maybe persist write status. + * + * @param writeStatus {@link HoodieData} of {@link WriteStatus}. + */ + public abstract void maybePersist(HoodieData writeStatus, HoodieWriteConfig config); /** * Execute compaction operations and report back status. */ - O compact(HoodieEngineContext context, HoodieCompactionPlan compactionPlan, HoodieTable hoodieTable, - HoodieWriteConfig config, String compactionInstantTime) throws IOException; + public HoodieData compact( + HoodieEngineContext context, HoodieCompactionPlan compactionPlan, + HoodieTable table, HoodieWriteConfig config, String compactionInstantTime, + HoodieCompactionHandler compactionHandler) { + if (compactionPlan == null || (compactionPlan.getOperations() == null) + || (compactionPlan.getOperations().isEmpty())) { + return context.emptyHoodieData(); + } + HoodieActiveTimeline timeline = table.getActiveTimeline(); + HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); + // Mark instant as compaction inflight + timeline.transitionCompactionRequestedToInflight(instant); + table.getMetaClient().reloadActiveTimeline(); + + HoodieTableMetaClient metaClient = table.getMetaClient(); + TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); + + // Here we firstly use the table schema as the reader schema to read + // log file.That is because in the case of MergeInto, the config.getSchema may not + // the same with the table schema. + try { + Schema readerSchema = schemaUtil.getTableAvroSchema(false); + config.setSchema(readerSchema.toString()); + } catch (Exception e) { + // If there is no commit in the table, just ignore the exception. + } + + // Compacting is very similar to applying updates to existing file + List operations = compactionPlan.getOperations().stream() + .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); + LOG.info("Compactor compacting " + operations + " files"); + + context.setJobStatus(this.getClass().getSimpleName(), "Compacting file slices"); + TaskContextSupplier taskContextSupplier = table.getTaskContextSupplier(); + return context.parallelize(operations).map(operation -> compact( + compactionHandler, metaClient, config, operation, compactionInstantTime, taskContextSupplier)) + .flatMap(List::iterator); + } + + /** + * Execute a single compaction operation and report back status. + */ + public List compact(HoodieCompactionHandler compactionHandler, + HoodieTableMetaClient metaClient, + HoodieWriteConfig config, + CompactionOperation operation, + String instantTime, + TaskContextSupplier taskContextSupplier) throws IOException { + FileSystem fs = metaClient.getFs(); + + Schema readerSchema = HoodieAvroUtils.addMetadataFields( + new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField()); + LOG.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames() + + " for commit " + instantTime); + // TODO - FIX THIS + // Reads the entire avro file. Always only specific blocks should be read from the avro file + // (failure recover). + // Load all the delta commits since the last compaction commit and get all the blocks to be + // loaded and load it using CompositeAvroLogReader + // Since a DeltaCommit is not defined yet, reading all the records. revisit this soon. + String maxInstantTime = metaClient + .getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, + HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) + .filterCompletedInstants().lastInstant().get().getTimestamp(); + long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(taskContextSupplier, config); + LOG.info("MaxMemoryPerCompaction => " + maxMemoryPerCompaction); + + List logFiles = operation.getDeltaFileNames().stream().map( + p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString()) + .collect(toList()); + HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(fs) + .withBasePath(metaClient.getBasePath()) + .withLogFilePaths(logFiles) + .withReaderSchema(readerSchema) + .withLatestInstantTime(maxInstantTime) + .withMaxMemorySizeInBytes(maxMemoryPerCompaction) + .withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled()) + .withReverseReader(config.getCompactionReverseLogReadEnabled()) + .withBufferSize(config.getMaxDFSStreamBufferSize()) + .withSpillableMapBasePath(config.getSpillableMapBasePath()) + .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType()) + .withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()) + .build(); + if (!scanner.iterator().hasNext()) { + return new ArrayList<>(); + } + + Option oldDataFileOpt = + operation.getBaseFile(metaClient.getBasePath(), operation.getPartitionPath()); + + // Compacting is very similar to applying updates to existing file + Iterator> result; + // If the dataFile is present, perform updates else perform inserts into a new base file. + if (oldDataFileOpt.isPresent()) { + result = compactionHandler.handleUpdate(instantTime, operation.getPartitionPath(), + operation.getFileId(), scanner.getRecords(), + oldDataFileOpt.get()); + } else { + result = compactionHandler.handleInsert(instantTime, operation.getPartitionPath(), operation.getFileId(), + scanner.getRecords()); + } + Iterable> resultIterable = () -> result; + return StreamSupport.stream(resultIterable.spliterator(), false).flatMap(Collection::stream).peek(s -> { + s.getStat().setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog()); + s.getStat().setTotalLogFilesCompacted(scanner.getTotalLogFiles()); + s.getStat().setTotalLogRecords(scanner.getTotalLogRecords()); + s.getStat().setPartitionPath(operation.getPartitionPath()); + s.getStat() + .setTotalLogSizeCompacted(operation.getMetrics().get(CompactionStrategy.TOTAL_LOG_FILE_SIZE).longValue()); + s.getStat().setTotalLogBlocks(scanner.getTotalLogBlocks()); + s.getStat().setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks()); + s.getStat().setTotalRollbackBlocks(scanner.getTotalRollbacks()); + RuntimeStats runtimeStats = new RuntimeStats(); + runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks()); + s.getStat().setRuntimeStats(runtimeStats); + scanner.close(); + }).collect(toList()); + } + + /** + * Generate a new compaction plan for scheduling. + * + * @param context HoodieEngineContext + * @param hoodieTable Hoodie Table + * @param config Hoodie Write Configuration + * @param compactionCommitTime scheduled compaction commit time + * @param fgIdsInPendingCompactionAndClustering partition-fileId pairs for which compaction is pending + * @return Compaction Plan + * @throws IOException when encountering errors + */ + HoodieCompactionPlan generateCompactionPlan( + HoodieEngineContext context, HoodieTable hoodieTable, HoodieWriteConfig config, + String compactionCommitTime, Set fgIdsInPendingCompactionAndClustering) throws IOException { + // Accumulator to keep track of total log files for a table + HoodieAccumulator totalLogFiles = context.newAccumulator(); + // Accumulator to keep track of total log file slices for a table + HoodieAccumulator totalFileSlices = context.newAccumulator(); + + ValidationUtils.checkArgument(hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ, + "Can only compact table of type " + HoodieTableType.MERGE_ON_READ + " and not " + + hoodieTable.getMetaClient().getTableType().name()); + + // TODO : check if maxMemory is not greater than JVM or executor memory + // TODO - rollback any compactions in flight + HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); + LOG.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommitTime); + List partitionPaths = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), metaClient.getBasePath()); + + // filter the partition paths if needed to reduce list status + partitionPaths = config.getCompactionStrategy().filterPartitionPaths(config, partitionPaths); + + if (partitionPaths.isEmpty()) { + // In case no partitions could be picked, return no compaction plan + return null; + } + + SliceView fileSystemView = hoodieTable.getSliceView(); + LOG.info("Compaction looking for files to compact in " + partitionPaths + " partitions"); + context.setJobStatus(this.getClass().getSimpleName(), "Looking for files to compact"); + + List operations = context.flatMap(partitionPaths, partitionPath -> fileSystemView + .getLatestFileSlices(partitionPath) + .filter(slice -> !fgIdsInPendingCompactionAndClustering.contains(slice.getFileGroupId())) + .map(s -> { + List logFiles = + s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(toList()); + totalLogFiles.add(logFiles.size()); + totalFileSlices.add(1L); + // Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO + // for Map operations and collecting them finally in Avro generated classes for storing + // into meta files. + Option dataFile = s.getBaseFile(); + return new CompactionOperation(dataFile, partitionPath, logFiles, + config.getCompactionStrategy().captureMetrics(config, s)); + }) + .filter(c -> !c.getDeltaFileNames().isEmpty()), partitionPaths.size()).stream() + .map(CompactionUtils::buildHoodieCompactionOperation).collect(toList()); + + LOG.info("Total of " + operations.size() + " compactions are retrieved"); + LOG.info("Total number of latest files slices " + totalFileSlices.value()); + LOG.info("Total number of log files " + totalLogFiles.value()); + LOG.info("Total number of file slices " + totalFileSlices.value()); + // Filter the compactions with the passed in filter. This lets us choose most effective + // compactions only + HoodieCompactionPlan compactionPlan = config.getCompactionStrategy().generateCompactionPlan(config, operations, + CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList())); + ValidationUtils.checkArgument( + compactionPlan.getOperations().stream().noneMatch( + op -> fgIdsInPendingCompactionAndClustering.contains(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))), + "Bad Compaction Plan. FileId MUST NOT have multiple pending compactions. " + + "Please fix your strategy implementation. FileIdsWithPendingCompactions :" + fgIdsInPendingCompactionAndClustering + + ", Selected workload :" + compactionPlan); + if (compactionPlan.getOperations().isEmpty()) { + LOG.warn("After filtering, Nothing to compact for " + metaClient.getBasePath()); + } + return compactionPlan; + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkRunCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java similarity index 60% rename from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkRunCompactionActionExecutor.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java index 5851b08c6..5e3005b22 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkRunCompactionActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java @@ -20,64 +20,62 @@ package org.apache.hudi.table.action.compact; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.client.utils.SparkMemoryUtils; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; -import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCompactionException; +import org.apache.hudi.table.HoodieCompactionHandler; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.BaseActionExecutor; import org.apache.hudi.table.action.HoodieWriteMetadata; -import org.apache.spark.api.java.JavaRDD; - import java.io.IOException; import java.util.List; @SuppressWarnings("checkstyle:LineLength") -public class SparkRunCompactionActionExecutor extends - BaseActionExecutor>, JavaRDD, JavaRDD, HoodieWriteMetadata>> { +public class RunCompactionActionExecutor extends + BaseActionExecutor>, HoodieData, HoodieData, HoodieWriteMetadata>> { - public SparkRunCompactionActionExecutor(HoodieSparkEngineContext context, - HoodieWriteConfig config, - HoodieTable>, JavaRDD, JavaRDD> table, - String instantTime) { + private final HoodieCompactor compactor; + private final HoodieCompactionHandler compactionHandler; + + public RunCompactionActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + HoodieCompactor compactor, + HoodieCompactionHandler compactionHandler) { super(context, config, table, instantTime); + this.compactor = compactor; + this.compactionHandler = compactionHandler; } @Override - public HoodieWriteMetadata> execute() { - HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(instantTime); + public HoodieWriteMetadata> execute() { HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); - if (!pendingCompactionTimeline.containsInstant(instant)) { - throw new IllegalStateException( - "No Compaction request available at " + instantTime + " to run compaction"); - } + compactor.preCompact(table, pendingCompactionTimeline, instantTime); - HoodieWriteMetadata> compactionMetadata = new HoodieWriteMetadata<>(); + HoodieWriteMetadata> compactionMetadata = new HoodieWriteMetadata<>(); try { - HoodieActiveTimeline timeline = table.getActiveTimeline(); + // generate compaction plan + // should support configurable commit metadata HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(table.getMetaClient(), instantTime); - // Mark instant as compaction inflight - timeline.transitionCompactionRequestedToInflight(instant); - table.getMetaClient().reloadActiveTimeline(); - HoodieSparkMergeOnReadTableCompactor compactor = new HoodieSparkMergeOnReadTableCompactor(); - JavaRDD statuses = compactor.compact(context, compactionPlan, table, config, instantTime); + HoodieData statuses = compactor.compact( + context, compactionPlan, table, config, instantTime, compactionHandler); - statuses.persist(SparkMemoryUtils.getWriteStatusStorageLevel(config.getProps())); + compactor.maybePersist(statuses, config); context.setJobStatus(this.getClass().getSimpleName(), "Preparing compaction metadata"); - List updateStatusMap = statuses.map(WriteStatus::getStat).collect(); + List updateStatusMap = statuses.map(WriteStatus::getStat).collectAsList(); HoodieCommitMetadata metadata = new HoodieCommitMetadata(true); for (HoodieWriteStat stat : updateStatusMap) { metadata.addWriteStat(stat.getPartitionPath(), stat); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkScheduleCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java similarity index 64% rename from hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkScheduleCompactionActionExecutor.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java index 4143944bb..31ced7b72 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkScheduleCompactionActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java @@ -19,21 +19,22 @@ package org.apache.hudi.table.action.compact; import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieFileGroupId; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCompactionException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.BaseActionExecutor; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -45,31 +46,67 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -@SuppressWarnings("checkstyle:LineLength") -public class FlinkScheduleCompactionActionExecutor extends - BaseScheduleCompactionActionExecutor>, List, List> { +public class ScheduleCompactionActionExecutor extends BaseActionExecutor> { - private static final Logger LOG = LogManager.getLogger(FlinkScheduleCompactionActionExecutor.class); + private static final Logger LOG = LogManager.getLogger(ScheduleCompactionActionExecutor.class); private final Option> extraMetadata; + private final HoodieCompactor compactor; - public FlinkScheduleCompactionActionExecutor(HoodieEngineContext context, - HoodieWriteConfig config, - HoodieTable>, List, List> table, - String instantTime, - Option> extraMetadata) { - super(context, config, table, instantTime, extraMetadata); + public ScheduleCompactionActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + Option> extraMetadata, + HoodieCompactor compactor) { + super(context, config, table, instantTime); this.extraMetadata = extraMetadata; + this.compactor = compactor; } @Override - protected HoodieCompactionPlan scheduleCompaction() { + public Option execute() { + if (!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() + && !config.getFailedWritesCleanPolicy().isLazy()) { + // if there are inflight writes, their instantTime must not be less than that of compaction instant time + table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant() + .ifPresent(earliestInflight -> ValidationUtils.checkArgument( + HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), HoodieTimeline.GREATER_THAN, instantTime), + "Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight + + ", Compaction scheduled at " + instantTime)); + // Committed and pending compaction instants should have strictly lower timestamps + List conflictingInstants = table.getActiveTimeline() + .getWriteTimeline().filterCompletedAndCompactionInstants().getInstants() + .filter(instant -> HoodieTimeline.compareTimestamps( + instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, instantTime)) + .collect(Collectors.toList()); + ValidationUtils.checkArgument(conflictingInstants.isEmpty(), + "Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :" + + conflictingInstants); + } + + HoodieCompactionPlan plan = scheduleCompaction(); + if (plan != null && (plan.getOperations() != null) && (!plan.getOperations().isEmpty())) { + extraMetadata.ifPresent(plan::setExtraMetadata); + HoodieInstant compactionInstant = + new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime); + try { + table.getActiveTimeline().saveToCompactionRequested(compactionInstant, + TimelineMetadataUtils.serializeCompactionPlan(plan)); + } catch (IOException ioe) { + throw new HoodieIOException("Exception scheduling compaction", ioe); + } + return Option.of(plan); + } + return Option.empty(); + } + + private HoodieCompactionPlan scheduleCompaction() { LOG.info("Checking if compaction needs to be run on " + config.getBasePath()); // judge if we need to compact according to num delta commits and time elapsed boolean compactable = needCompact(config.getInlineCompactTriggerStrategy()); if (compactable) { LOG.info("Generating compaction plan for merge on read table " + config.getBasePath()); - HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor(); try { SyncableFileSystemView fileSystemView = (SyncableFileSystemView) table.getSliceView(); Set fgInPendingCompactionAndClustering = fileSystemView.getPendingCompactionOperations() @@ -86,7 +123,7 @@ public class FlinkScheduleCompactionActionExecutor getLatestDeltaCommitInfo(CompactionTriggerStrategy compactionTriggerStrategy) { + private Pair getLatestDeltaCommitInfo(CompactionTriggerStrategy compactionTriggerStrategy) { Option lastCompaction = table.getActiveTimeline().getCommitTimeline() .filterCompletedInstants().lastInstant(); HoodieTimeline deltaCommits = table.getActiveTimeline().getDeltaCommitTimeline(); @@ -103,7 +140,7 @@ public class FlinkScheduleCompactionActionExecutor latestDeltaCommitInfo = getLatestDeltaCommitInfo(compactionTriggerStrategy); @@ -144,7 +181,7 @@ public class FlinkScheduleCompactionActionExecutor extends List writeStatuses, Option> extraMetadata) throws IOException { HoodieFlinkTable table = getHoodieTable(); - HoodieCommitMetadata metadata = FlinkCompactHelpers.newInstance().createCompactionMetadata( - table, compactionInstantTime, writeStatuses, config.getSchema()); + HoodieCommitMetadata metadata = CompactHelpers.getInstance().createCompactionMetadata( + table, compactionInstantTime, HoodieList.of(writeStatuses), config.getSchema()); extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata)); completeCompaction(metadata, writeStatuses, table, compactionInstantTime); } @@ -364,7 +365,7 @@ public class HoodieFlinkWriteClient extends // commit to data table after committing to metadata table. finalizeWrite(table, compactionCommitTime, writeStats); LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata); - FlinkCompactHelpers.newInstance().completeInflightCompaction(table, compactionCommitTime, metadata); + CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); if (compactionTimer != null) { long durationInMs = metrics.getDurationInMs(compactionTimer.stop()); @@ -383,7 +384,8 @@ public class HoodieFlinkWriteClient extends protected List compact(String compactionInstantTime, boolean shouldComplete) { // only used for metadata table, the compaction happens in single thread try { - List writeStatuses = FlinkCompactHelpers.compact(compactionInstantTime, this); + List writeStatuses = + getHoodieTable().compact(context, compactionInstantTime).getWriteStatuses(); commitCompaction(compactionInstantTime, writeStatuses, Option.empty()); return writeStatuses; } catch (IOException e) { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java index 687ecc194..c0bbd0878 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java @@ -20,6 +20,10 @@ package org.apache.hudi.client.common; import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.data.HoodieAccumulator; +import org.apache.hudi.common.data.HoodieAtomicLongAccumulator; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.data.HoodieList; import org.apache.hudi.common.engine.EngineProperty; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.TaskContextSupplier; @@ -32,6 +36,7 @@ import org.apache.hudi.common.util.Option; import org.apache.flink.api.common.functions.RuntimeContext; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -72,6 +77,21 @@ public class HoodieFlinkEngineContext extends HoodieEngineContext { this.runtimeContext = ((FlinkTaskContextSupplier) taskContextSupplier).getFlinkRuntimeContext(); } + @Override + public HoodieAccumulator newAccumulator() { + return HoodieAtomicLongAccumulator.create(); + } + + @Override + public HoodieData emptyHoodieData() { + return HoodieList.of(Collections.emptyList()); + } + + @Override + public HoodieData parallelize(List data) { + return HoodieList.of(data); + } + public RuntimeContext getRuntimeContext() { return this.runtimeContext; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index 2238ac391..e30f2d4bc 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -74,7 +74,8 @@ import java.util.Map; *

* UPDATES - Produce a new version of the file, just replacing the updated records with new values */ -public class HoodieFlinkCopyOnWriteTable extends HoodieFlinkTable { +public class HoodieFlinkCopyOnWriteTable + extends HoodieFlinkTable implements HoodieCompactionHandler { private static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCopyOnWriteTable.class); @@ -265,7 +266,8 @@ public class HoodieFlinkCopyOnWriteTable extends } @Override - public HoodieWriteMetadata> compact(HoodieEngineContext context, String compactionInstantTime) { + public HoodieWriteMetadata> compact( + HoodieEngineContext context, String compactionInstantTime) { throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table"); } @@ -329,9 +331,10 @@ public class HoodieFlinkCopyOnWriteTable extends // ------------------------------------------------------------------------- // Used for compaction // ------------------------------------------------------------------------- - - public Iterator> handleUpdate(String instantTime, String partitionPath, String fileId, - Map> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException { + @Override + public Iterator> handleUpdate( + String instantTime, String partitionPath, String fileId, + Map> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException { // these are updates HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime, partitionPath, fileId, keyToNewRecords, oldDataFile); return handleUpdateInternal(upsertHandle, instantTime, fileId); @@ -366,9 +369,11 @@ public class HoodieFlinkCopyOnWriteTable extends } } - public Iterator> handleInsert(String instantTime, String partitionPath, String fileId, - Map> recordMap) { - HoodieCreateHandle createHandle = + @Override + public Iterator> handleInsert( + String instantTime, String partitionPath, String fileId, + Map> recordMap) { + HoodieCreateHandle createHandle = new HoodieCreateHandle(config, instantTime, this, partitionPath, fileId, recordMap, taskContextSupplier); createHandle.write(); return Collections.singletonList(createHandle.close()).iterator(); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java index f4a4b0eb4..b165c844c 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java @@ -30,14 +30,14 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.io.FlinkAppendHandle; import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.commit.delta.FlinkUpsertDeltaCommitActionExecutor; import org.apache.hudi.table.action.commit.delta.FlinkUpsertPreppedDeltaCommitActionExecutor; -import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor; -import org.apache.hudi.table.action.compact.FlinkScheduleCompactionActionExecutor; +import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor; +import org.apache.hudi.table.action.compact.RunCompactionActionExecutor; +import org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor; import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor; import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor; @@ -97,15 +97,19 @@ public class HoodieFlinkMergeOnReadTable HoodieEngineContext context, String instantTime, Option> extraMetadata) { - BaseScheduleCompactionActionExecutor scheduleCompactionExecutor = new FlinkScheduleCompactionActionExecutor( - context, config, this, instantTime, extraMetadata); + ScheduleCompactionActionExecutor scheduleCompactionExecutor = new ScheduleCompactionActionExecutor( + context, config, this, instantTime, extraMetadata, + new HoodieFlinkMergeOnReadTableCompactor()); return scheduleCompactionExecutor.execute(); } @Override - public HoodieWriteMetadata> compact(HoodieEngineContext context, String compactionInstantTime) { - throw new HoodieNotSupportedException("Compaction is supported as a separate pipeline, " - + "should not invoke directly through HoodieFlinkMergeOnReadTable"); + public HoodieWriteMetadata> compact( + HoodieEngineContext context, String compactionInstantTime) { + RunCompactionActionExecutor compactionExecutor = new RunCompactionActionExecutor( + context, config, this, compactionInstantTime, new HoodieFlinkMergeOnReadTableCompactor(), + new HoodieFlinkCopyOnWriteTable(config, context, getMetaClient())); + return convertMetadata(compactionExecutor.execute()); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java index d7eed45df..475ca32b1 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java @@ -20,6 +20,7 @@ package org.apache.hudi.table; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; @@ -35,12 +36,15 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataWriter; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hadoop.fs.Path; import java.io.IOException; import java.util.List; +import static org.apache.hudi.common.data.HoodieList.getList; + public abstract class HoodieFlinkTable extends HoodieTable>, List, List> implements ExplicitWriteHandleTable { @@ -87,6 +91,11 @@ public abstract class HoodieFlinkTable return hoodieFlinkTable; } + public static HoodieWriteMetadata> convertMetadata( + HoodieWriteMetadata> metadata) { + return metadata.clone(getList(metadata.getWriteStatuses())); + } + @Override protected HoodieIndex>, List, List> getIndex(HoodieWriteConfig config, HoodieEngineContext context) { return FlinkHoodieIndex.createIndex((HoodieFlinkEngineContext) context, config); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkCompactHelpers.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkCompactHelpers.java deleted file mode 100644 index 68a42a557..000000000 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkCompactHelpers.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.action.compact; - -import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.client.HoodieFlinkWriteClient; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.model.CompactionOperation; -import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.HoodieWriteStat; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; -import org.apache.hudi.common.util.CompactionUtils; -import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable; -import org.apache.hudi.table.HoodieFlinkTable; -import org.apache.hudi.table.HoodieTable; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.stream.Collectors; - -import static java.util.stream.Collectors.toList; - -/** - * A flink implementation of {@link AbstractCompactHelpers}. - * - * @param - */ -public class FlinkCompactHelpers extends - AbstractCompactHelpers>, List, List> { - private static final Logger LOG = LoggerFactory.getLogger(FlinkCompactHelpers.class); - - private FlinkCompactHelpers() { - } - - private static class CompactHelperHolder { - private static final FlinkCompactHelpers FLINK_COMPACT_HELPERS = new FlinkCompactHelpers(); - } - - public static FlinkCompactHelpers newInstance() { - return CompactHelperHolder.FLINK_COMPACT_HELPERS; - } - - @Override - public HoodieCommitMetadata createCompactionMetadata(HoodieTable>, List, List> table, - String compactionInstantTime, - List writeStatuses, - String schema) throws IOException { - byte[] planBytes = table.getActiveTimeline().readCompactionPlanAsBytes( - HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get(); - HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeCompactionPlan(planBytes); - List updateStatusMap = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()); - org.apache.hudi.common.model.HoodieCommitMetadata metadata = new org.apache.hudi.common.model.HoodieCommitMetadata(true); - for (HoodieWriteStat stat : updateStatusMap) { - metadata.addWriteStat(stat.getPartitionPath(), stat); - } - metadata.addMetadata(org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY, schema); - if (compactionPlan.getExtraMetadata() != null) { - compactionPlan.getExtraMetadata().forEach(metadata::addMetadata); - } - return metadata; - } - - @SuppressWarnings("unchecked, rawtypes") - public static List compact( - HoodieFlinkWriteClient writeClient, - String compactInstantTime, - CompactionOperation compactionOperation) throws IOException { - HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor(); - return compactor.compact( - new HoodieFlinkCopyOnWriteTable<>( - writeClient.getConfig(), - writeClient.getEngineContext(), - writeClient.getHoodieTable().getMetaClient()), - writeClient.getHoodieTable().getMetaClient(), - writeClient.getConfig(), - compactionOperation, - compactInstantTime); - } - - /** - * Called by the metadata table compactor code path. - */ - @SuppressWarnings("unchecked, rawtypes") - public static List compact(String compactionInstantTime, HoodieFlinkWriteClient writeClient) throws IOException { - HoodieFlinkTable table = writeClient.getHoodieTable(); - HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); - HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); - if (pendingCompactionTimeline.containsInstant(inflightInstant)) { - writeClient.rollbackInflightCompaction(inflightInstant, table); - table.getMetaClient().reloadActiveTimeline(); - } - - // generate compaction plan - // should support configurable commit metadata - HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan( - table.getMetaClient(), compactionInstantTime); - - if (compactionPlan == null || (compactionPlan.getOperations() == null) - || (compactionPlan.getOperations().isEmpty())) { - // do nothing. - LOG.info("No compaction plan for instant " + compactionInstantTime); - return Collections.emptyList(); - } else { - HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); - // Mark instant as compaction inflight - table.getActiveTimeline().transitionCompactionRequestedToInflight(instant); - table.getMetaClient().reloadActiveTimeline(); - - List operations = compactionPlan.getOperations().stream() - .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); - LOG.info("Compacting " + operations + " files"); - List writeStatusList = new ArrayList<>(); - for (CompactionOperation operation : operations) { - List statuses = compact(writeClient, compactionInstantTime, operation); - writeStatusList.addAll(statuses); - } - return writeStatusList; - } - } -} - diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java index 1f4a52484..03b9f8e7e 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java @@ -18,54 +18,17 @@ package org.apache.hudi.table.action.compact; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.avro.model.HoodieCompactionOperation; -import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.CompactionOperation; -import org.apache.hudi.common.model.HoodieBaseFile; -import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; -import org.apache.hudi.common.util.CollectionUtils; -import org.apache.hudi.common.util.CompactionUtils; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.io.IOUtils; -import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; -import org.apache.avro.Schema; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; import java.util.List; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; - -import static java.util.stream.Collectors.toList; /** * Compacts a hoodie table with merge on read storage. Computes all possible compactions, @@ -75,164 +38,21 @@ import static java.util.stream.Collectors.toList; *

Note: the compaction logic is invoked through the flink pipeline. */ @SuppressWarnings("checkstyle:LineLength") -public class HoodieFlinkMergeOnReadTableCompactor implements HoodieCompactor>, List, List> { - - private static final Logger LOG = LogManager.getLogger(HoodieFlinkMergeOnReadTableCompactor.class); - - // Accumulator to keep track of total log files for a table - private AtomicLong totalLogFiles; - // Accumulator to keep track of total log file slices for a table - private AtomicLong totalFileSlices; +public class HoodieFlinkMergeOnReadTableCompactor + extends HoodieCompactor>, List, List> { @Override - public List compact(HoodieEngineContext context, HoodieCompactionPlan compactionPlan, - HoodieTable hoodieTable, HoodieWriteConfig config, String compactionInstantTime) throws IOException { - throw new UnsupportedOperationException("HoodieFlinkMergeOnReadTableCompactor does not support compact directly, " - + "the function works as a separate pipeline"); - } - - public List compact(HoodieFlinkCopyOnWriteTable hoodieCopyOnWriteTable, - HoodieTableMetaClient metaClient, - HoodieWriteConfig config, - CompactionOperation operation, - String instantTime) throws IOException { - FileSystem fs = metaClient.getFs(); - - Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField()); - LOG.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames() - + " for commit " + instantTime); - // TODO - FIX THIS - // Reads the entire avro file. Always only specific blocks should be read from the avro file - // (failure recover). - // Load all the delta commits since the last compaction commit and get all the blocks to be - // loaded and load it using CompositeAvroLogReader - // Since a DeltaCommit is not defined yet, reading all the records. revisit this soon. - String maxInstantTime = metaClient - .getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, - HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) - .filterCompletedInstants().lastInstant().get().getTimestamp(); - // TODO(danny): make it configurable - long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new FlinkTaskContextSupplier(null), config); - LOG.info("MaxMemoryPerCompaction => " + maxMemoryPerCompaction); - - List logFiles = operation.getDeltaFileNames().stream().map( - p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString()) - .collect(toList()); - HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() - .withFileSystem(fs) - .withBasePath(metaClient.getBasePath()) - .withLogFilePaths(logFiles) - .withReaderSchema(readerSchema) - .withLatestInstantTime(maxInstantTime) - .withMaxMemorySizeInBytes(maxMemoryPerCompaction) - .withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled()) - .withReverseReader(config.getCompactionReverseLogReadEnabled()) - .withBufferSize(config.getMaxDFSStreamBufferSize()) - .withSpillableMapBasePath(config.getSpillableMapBasePath()) - .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType()) - .withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()) - .build(); - if (!scanner.iterator().hasNext()) { - return new ArrayList<>(); + public void preCompact( + HoodieTable table, HoodieTimeline pendingCompactionTimeline, String compactionInstantTime) { + HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); + if (pendingCompactionTimeline.containsInstant(inflightInstant)) { + table.rollbackInflightCompaction(inflightInstant); + table.getMetaClient().reloadActiveTimeline(); } - - Option oldDataFileOpt = - operation.getBaseFile(metaClient.getBasePath(), operation.getPartitionPath()); - - // Compacting is very similar to applying updates to existing file - Iterator> result; - // If the dataFile is present, perform updates else perform inserts into a new base file. - if (oldDataFileOpt.isPresent()) { - result = hoodieCopyOnWriteTable.handleUpdate(instantTime, operation.getPartitionPath(), - operation.getFileId(), scanner.getRecords(), - oldDataFileOpt.get()); - } else { - result = hoodieCopyOnWriteTable.handleInsert(instantTime, operation.getPartitionPath(), operation.getFileId(), - scanner.getRecords()); - } - Iterable> resultIterable = () -> result; - return StreamSupport.stream(resultIterable.spliterator(), false).flatMap(Collection::stream).peek(s -> { - s.getStat().setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog()); - s.getStat().setTotalLogFilesCompacted(scanner.getTotalLogFiles()); - s.getStat().setTotalLogRecords(scanner.getTotalLogRecords()); - s.getStat().setPartitionPath(operation.getPartitionPath()); - s.getStat() - .setTotalLogSizeCompacted(operation.getMetrics().get(CompactionStrategy.TOTAL_LOG_FILE_SIZE).longValue()); - s.getStat().setTotalLogBlocks(scanner.getTotalLogBlocks()); - s.getStat().setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks()); - s.getStat().setTotalRollbackBlocks(scanner.getTotalRollbacks()); - RuntimeStats runtimeStats = new RuntimeStats(); - runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks()); - s.getStat().setRuntimeStats(runtimeStats); - scanner.close(); - }).collect(toList()); } @Override - public HoodieCompactionPlan generateCompactionPlan(HoodieEngineContext context, - HoodieTable>, List, List> hoodieTable, - HoodieWriteConfig config, String compactionCommitTime, - Set fgIdsInPendingCompactionAndClustering) - throws IOException { - totalLogFiles = new AtomicLong(0); - totalFileSlices = new AtomicLong(0); - - ValidationUtils.checkArgument(hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ, - "Can only compact table of type " + HoodieTableType.MERGE_ON_READ + " and not " - + hoodieTable.getMetaClient().getTableType().name()); - - // TODO : check if maxMemory is not greater than JVM or flink.executor memory - // TODO - rollback any compactions in flight - HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); - LOG.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommitTime); - List partitionPaths = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), metaClient.getBasePath()); - - // filter the partition paths if needed to reduce list status - partitionPaths = config.getCompactionStrategy().filterPartitionPaths(config, partitionPaths); - - if (partitionPaths.isEmpty()) { - // In case no partitions could be picked, return no compaction plan - return null; - } - - SliceView fileSystemView = hoodieTable.getSliceView(); - LOG.info("Compaction looking for files to compact in " + partitionPaths + " partitions"); - context.setJobStatus(this.getClass().getSimpleName(), "Looking for files to compact"); - - List operations = context.flatMap(partitionPaths, partitionPath -> fileSystemView - .getLatestFileSlices(partitionPath) - .filter(slice -> !fgIdsInPendingCompactionAndClustering.contains(slice.getFileGroupId())) - .map(s -> { - List logFiles = - s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); - totalLogFiles.addAndGet(logFiles.size()); - totalFileSlices.addAndGet(1L); - // Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO - // for flink Map operations and collecting them finally in Avro generated classes for storing - // into meta files. - Option dataFile = s.getBaseFile(); - return new CompactionOperation(dataFile, partitionPath, logFiles, - config.getCompactionStrategy().captureMetrics(config, s)); - }) - .filter(c -> !c.getDeltaFileNames().isEmpty()), partitionPaths.size()).stream().map(CompactionUtils::buildHoodieCompactionOperation).collect(toList()); - - LOG.info("Total of " + operations.size() + " compactions are retrieved"); - LOG.info("Total number of latest files slices " + totalFileSlices.get()); - LOG.info("Total number of log files " + totalLogFiles.get()); - LOG.info("Total number of file slices " + totalFileSlices.get()); - // Filter the compactions with the passed in filter. This lets us choose most effective - // compactions only - HoodieCompactionPlan compactionPlan = config.getCompactionStrategy().generateCompactionPlan(config, operations, - CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList())); - ValidationUtils.checkArgument( - compactionPlan.getOperations().stream().noneMatch( - op -> fgIdsInPendingCompactionAndClustering.contains(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))), - "Bad Compaction Plan. FileId MUST NOT have multiple pending compactions. " - + "Please fix your strategy implementation. FileIdsWithPendingCompactions :" + fgIdsInPendingCompactionAndClustering - + ", Selected workload :" + compactionPlan); - if (compactionPlan.getOperations().isEmpty()) { - LOG.warn("After filtering, Nothing to compact for " + metaClient.getBasePath()); - } - return compactionPlan; + public void maybePersist(HoodieData writeStatus, HoodieWriteConfig config) { + // No OP } } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java index bdc2a851c..2fdd86e0f 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java @@ -21,6 +21,10 @@ package org.apache.hudi.client.common; import org.apache.hadoop.conf.Configuration; import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.data.HoodieAccumulator; +import org.apache.hudi.common.data.HoodieAtomicLongAccumulator; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.data.HoodieList; import org.apache.hudi.common.engine.EngineProperty; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.TaskContextSupplier; @@ -34,6 +38,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -62,6 +67,21 @@ public class HoodieJavaEngineContext extends HoodieEngineContext { super(new SerializableConfiguration(conf), taskContextSupplier); } + @Override + public HoodieAccumulator newAccumulator() { + return HoodieAtomicLongAccumulator.create(); + } + + @Override + public HoodieData emptyHoodieData() { + return HoodieList.of(Collections.emptyList()); + } + + @Override + public HoodieData parallelize(List data) { + return HoodieList.of(data); + } + @Override public List map(List data, SerializableFunction func, int parallelism) { return data.stream().parallel().map(throwingMapWrapper(func)).collect(toList()); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index dd9f43d16..6c71d7548 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -40,6 +40,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.index.HoodieIndex; @@ -51,7 +52,7 @@ import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; -import org.apache.hudi.table.action.compact.SparkCompactHelpers; +import org.apache.hudi.table.action.compact.CompactHelpers; import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper; import org.apache.hudi.table.upgrade.UpgradeDowngrade; @@ -291,8 +292,8 @@ public class SparkRDDWriteClient extends @Override public void commitCompaction(String compactionInstantTime, JavaRDD writeStatuses, Option> extraMetadata) throws IOException { HoodieSparkTable table = HoodieSparkTable.create(config, context); - HoodieCommitMetadata metadata = SparkCompactHelpers.newInstance().createCompactionMetadata( - table, compactionInstantTime, writeStatuses, config.getSchema()); + HoodieCommitMetadata metadata = CompactHelpers.getInstance().createCompactionMetadata( + table, compactionInstantTime, HoodieJavaRDD.of(writeStatuses), config.getSchema()); extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata)); completeCompaction(metadata, writeStatuses, table, compactionInstantTime); } @@ -307,7 +308,7 @@ public class SparkRDDWriteClient extends // commit to data table after committing to metadata table. finalizeWrite(table, compactionCommitTime, writeStats); LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata); - SparkCompactHelpers.newInstance().completeInflightCompaction(table, compactionCommitTime, metadata); + CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); WriteMarkersFactory.get(config.getMarkersType(), table, compactionCommitTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); if (compactionTimer != null) { @@ -330,11 +331,12 @@ public class SparkRDDWriteClient extends HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); if (pendingCompactionTimeline.containsInstant(inflightInstant)) { - rollbackInflightCompaction(inflightInstant, table); + table.rollbackInflightCompaction(inflightInstant); table.getMetaClient().reloadActiveTimeline(); } compactionTimer = metrics.getCompactionCtx(); - HoodieWriteMetadata> compactionMetadata = table.compact(context, compactionInstantTime); + HoodieWriteMetadata> compactionMetadata = + table.compact(context, compactionInstantTime); JavaRDD statuses = compactionMetadata.getWriteStatuses(); if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) { completeTableService(TableServiceType.COMPACT, compactionMetadata.getCommitMetadata().get(), statuses, table, compactionInstantTime); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java index 416992e05..1c7f1c8e2 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java @@ -20,6 +20,8 @@ package org.apache.hudi.client.common; import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.data.HoodieAccumulator; +import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.EngineProperty; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.function.SerializableBiFunction; @@ -30,6 +32,8 @@ import org.apache.hudi.common.function.SerializablePairFunction; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.data.HoodieJavaRDD; +import org.apache.hudi.data.HoodieSparkLongAccumulator; import org.apache.hudi.exception.HoodieException; import org.apache.spark.api.java.JavaSparkContext; @@ -74,6 +78,23 @@ public class HoodieSparkEngineContext extends HoodieEngineContext { return ((HoodieSparkEngineContext) context).getJavaSparkContext(); } + @Override + public HoodieAccumulator newAccumulator() { + HoodieSparkLongAccumulator accumulator = HoodieSparkLongAccumulator.create(); + javaSparkContext.sc().register(accumulator.getAccumulator()); + return accumulator; + } + + @Override + public HoodieData emptyHoodieData() { + return HoodieJavaRDD.of(javaSparkContext.emptyRDD()); + } + + @Override + public HoodieData parallelize(List data) { + return HoodieJavaRDD.of(javaSparkContext.parallelize(data, data.size())); + } + @Override public List map(List data, SerializableFunction func, int parallelism) { return javaSparkContext.parallelize(data, parallelism).map(func::apply).collect(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java new file mode 100644 index 000000000..e6defd49f --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.data; + +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.function.SerializableFunction; + +import org.apache.spark.api.java.JavaRDD; + +import java.util.Iterator; +import java.util.List; + +/** + * Holds a {@link JavaRDD} of objects. + * + * @param type of object. + */ +public class HoodieJavaRDD extends HoodieData { + + private final JavaRDD rddData; + + private HoodieJavaRDD(JavaRDD rddData) { + this.rddData = rddData; + } + + /** + * @param rddData a {@link JavaRDD} of objects in type T. + * @param type of object. + * @return a new instance containing the {@link JavaRDD} reference. + */ + public static HoodieJavaRDD of(JavaRDD rddData) { + return new HoodieJavaRDD<>(rddData); + } + + /** + * @param data a {@link List} of objects in type T. + * @param context {@link HoodieSparkEngineContext} to use. + * @param parallelism parallelism for the {@link JavaRDD}. + * @param type of object. + * @return a new instance containing the {@link JavaRDD} instance. + */ + public static HoodieJavaRDD of( + List data, HoodieSparkEngineContext context, int parallelism) { + return new HoodieJavaRDD<>(context.getJavaSparkContext().parallelize(data, parallelism)); + } + + /** + * @param hoodieData {@link HoodieJavaRDD } instance containing the {@link JavaRDD} of objects. + * @param type of object. + * @return the a {@link JavaRDD} of objects in type T. + */ + public static JavaRDD getJavaRDD(HoodieData hoodieData) { + return ((HoodieJavaRDD) hoodieData).get(); + } + + @Override + public JavaRDD get() { + return rddData; + } + + @Override + public boolean isEmpty() { + return rddData.isEmpty(); + } + + @Override + public HoodieData map(SerializableFunction func) { + return HoodieJavaRDD.of(rddData.map(func::apply)); + } + + @Override + public HoodieData flatMap(SerializableFunction> func) { + return HoodieJavaRDD.of(rddData.flatMap(func::apply)); + } + + @Override + public List collectAsList() { + return rddData.collect(); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieSparkLongAccumulator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieSparkLongAccumulator.java new file mode 100644 index 000000000..10027a282 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieSparkLongAccumulator.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.data; + +import org.apache.hudi.common.data.HoodieAccumulator; + +import org.apache.spark.util.AccumulatorV2; +import org.apache.spark.util.LongAccumulator; + +/** + * An accumulator on counts based on Spark {@link AccumulatorV2} implementation. + */ +public class HoodieSparkLongAccumulator extends HoodieAccumulator { + + private final AccumulatorV2 accumulator; + + private HoodieSparkLongAccumulator() { + accumulator = new LongAccumulator(); + } + + public static HoodieSparkLongAccumulator create() { + return new HoodieSparkLongAccumulator(); + } + + @Override + public long value() { + return accumulator.value(); + } + + @Override + public void add(long increment) { + accumulator.add(increment); + } + + public AccumulatorV2 getAccumulator() { + return accumulator; + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index a9b36a8e9..6f5611f88 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -87,7 +87,8 @@ import java.util.Map; *

* UPDATES - Produce a new version of the file, just replacing the updated records with new values */ -public class HoodieSparkCopyOnWriteTable extends HoodieSparkTable { +public class HoodieSparkCopyOnWriteTable + extends HoodieSparkTable implements HoodieCompactionHandler { private static final Logger LOG = LogManager.getLogger(HoodieSparkCopyOnWriteTable.class); @@ -157,7 +158,8 @@ public class HoodieSparkCopyOnWriteTable extends } @Override - public HoodieWriteMetadata> compact(HoodieEngineContext context, String compactionInstantTime) { + public HoodieWriteMetadata> compact( + HoodieEngineContext context, String compactionInstantTime) { throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table"); } @@ -191,20 +193,22 @@ public class HoodieSparkCopyOnWriteTable extends @Override public Option scheduleRollback(HoodieEngineContext context, - String instantTime, - HoodieInstant instantToRollback, boolean skipTimelinePublish) { + String instantTime, + HoodieInstant instantToRollback, boolean skipTimelinePublish) { return new BaseRollbackPlanActionExecutor<>(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute(); } - public Iterator> handleUpdate(String instantTime, String partitionPath, String fileId, - Map> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException { + @Override + public Iterator> handleUpdate( + String instantTime, String partitionPath, String fileId, + Map> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException { // these are updates HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime, partitionPath, fileId, keyToNewRecords, oldDataFile); return handleUpdateInternal(upsertHandle, instantTime, fileId); } - protected Iterator> handleUpdateInternal(HoodieMergeHandle upsertHandle, String instantTime, - String fileId) throws IOException { + protected Iterator> handleUpdateInternal(HoodieMergeHandle upsertHandle, String instantTime, + String fileId) throws IOException { if (upsertHandle.getOldFilePath() == null) { throw new HoodieUpsertException( "Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId); @@ -241,9 +245,11 @@ public class HoodieSparkCopyOnWriteTable extends } } - public Iterator> handleInsert(String instantTime, String partitionPath, String fileId, - Map> recordMap) { - HoodieCreateHandle createHandle = + @Override + public Iterator> handleInsert( + String instantTime, String partitionPath, String fileId, + Map> recordMap) { + HoodieCreateHandle createHandle = new HoodieCreateHandle(config, instantTime, this, partitionPath, fileId, recordMap, taskContextSupplier); createHandle.write(); return Collections.singletonList(createHandle.close()).iterator(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java index b4b106c16..30984e010 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java @@ -39,9 +39,9 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.bootstrap.SparkBootstrapDeltaCommitActionExecutor; import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata; -import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor; -import org.apache.hudi.table.action.compact.SparkRunCompactionActionExecutor; -import org.apache.hudi.table.action.compact.SparkScheduleCompactionActionExecutor; +import org.apache.hudi.table.action.compact.HoodieSparkMergeOnReadTableCompactor; +import org.apache.hudi.table.action.compact.RunCompactionActionExecutor; +import org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor; import org.apache.hudi.table.action.deltacommit.SparkBulkInsertDeltaCommitActionExecutor; import org.apache.hudi.table.action.deltacommit.SparkBulkInsertPreppedDeltaCommitActionExecutor; import org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor; @@ -123,15 +123,19 @@ public class HoodieSparkMergeOnReadTable extends @Override public Option scheduleCompaction(HoodieEngineContext context, String instantTime, Option> extraMetadata) { - BaseScheduleCompactionActionExecutor scheduleCompactionExecutor = new SparkScheduleCompactionActionExecutor( - context, config, this, instantTime, extraMetadata); + ScheduleCompactionActionExecutor scheduleCompactionExecutor = new ScheduleCompactionActionExecutor( + context, config, this, instantTime, extraMetadata, + new HoodieSparkMergeOnReadTableCompactor()); return scheduleCompactionExecutor.execute(); } @Override - public HoodieWriteMetadata> compact(HoodieEngineContext context, String compactionInstantTime) { - SparkRunCompactionActionExecutor compactionExecutor = new SparkRunCompactionActionExecutor((HoodieSparkEngineContext) context, config, this, compactionInstantTime); - return compactionExecutor.execute(); + public HoodieWriteMetadata> compact( + HoodieEngineContext context, String compactionInstantTime) { + RunCompactionActionExecutor compactionExecutor = new RunCompactionActionExecutor( + context, config, this, compactionInstantTime, new HoodieSparkMergeOnReadTableCompactor(), + new HoodieSparkCopyOnWriteTable(config, context, getMetaClient())); + return convertMetadata(compactionExecutor.execute()); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java index a7b14be5f..cf18ef283 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java @@ -20,6 +20,7 @@ package org.apache.hudi.table; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; @@ -35,12 +36,15 @@ import org.apache.hudi.index.SparkHoodieIndex; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaRDD; import java.io.IOException; +import static org.apache.hudi.data.HoodieJavaRDD.getJavaRDD; + public abstract class HoodieSparkTable extends HoodieTable>, JavaRDD, JavaRDD> { @@ -91,6 +95,11 @@ public abstract class HoodieSparkTable return hoodieSparkTable; } + public static HoodieWriteMetadata> convertMetadata( + HoodieWriteMetadata> metadata) { + return metadata.clone(getJavaRDD(metadata.getWriteStatuses())); + } + @Override protected HoodieIndex>, JavaRDD, JavaRDD> getIndex(HoodieWriteConfig config, HoodieEngineContext context) { return SparkHoodieIndex.createIndex(config); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java index 2785403ba..6ca4408a7 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java @@ -18,248 +18,41 @@ package org.apache.hudi.table.action.compact; -import org.apache.avro.Schema; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.avro.model.HoodieCompactionOperation; -import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.CompactionOperation; -import org.apache.hudi.common.model.HoodieBaseFile; -import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.client.utils.SparkMemoryUtils; +import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.TableSchemaResolver; -import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; -import org.apache.hudi.common.util.CollectionUtils; -import org.apache.hudi.common.util.CompactionUtils; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.io.IOUtils; -import org.apache.hudi.table.HoodieSparkCopyOnWriteTable; +import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; + import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.util.AccumulatorV2; -import org.apache.spark.util.LongAccumulator; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; - -import static java.util.stream.Collectors.toList; /** * Compacts a hoodie table with merge on read storage. Computes all possible compactions, * passes it through a CompactionFilter and executes all the compactions and writes a new version of base files and make * a normal commit - * */ @SuppressWarnings("checkstyle:LineLength") -public class HoodieSparkMergeOnReadTableCompactor implements HoodieCompactor>, JavaRDD, JavaRDD> { - - private static final Logger LOG = LogManager.getLogger(HoodieSparkMergeOnReadTableCompactor.class); - // Accumulator to keep track of total log files for a table - private AccumulatorV2 totalLogFiles; - // Accumulator to keep track of total log file slices for a table - private AccumulatorV2 totalFileSlices; +public class HoodieSparkMergeOnReadTableCompactor + extends HoodieCompactor>, JavaRDD, JavaRDD> { @Override - public JavaRDD compact(HoodieEngineContext context, HoodieCompactionPlan compactionPlan, - HoodieTable hoodieTable, HoodieWriteConfig config, String compactionInstantTime) throws IOException { - JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context); - if (compactionPlan == null || (compactionPlan.getOperations() == null) - || (compactionPlan.getOperations().isEmpty())) { - return jsc.emptyRDD(); + public void preCompact( + HoodieTable table, HoodieTimeline pendingCompactionTimeline, String compactionInstantTime) { + HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); + if (!pendingCompactionTimeline.containsInstant(instant)) { + throw new IllegalStateException( + "No Compaction request available at " + compactionInstantTime + " to run compaction"); } - HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); - TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); - - // Here we firstly use the table schema as the reader schema to read - // log file.That is because in the case of MergeInto, the config.getSchema may not - // the same with the table schema. - try { - Schema readerSchema = schemaUtil.getTableAvroSchema(false); - config.setSchema(readerSchema.toString()); - } catch (Exception e) { - // If there is no commit in the table, just ignore the exception. - } - - // Compacting is very similar to applying updates to existing file - HoodieSparkCopyOnWriteTable table = new HoodieSparkCopyOnWriteTable(config, context, metaClient); - List operations = compactionPlan.getOperations().stream() - .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); - LOG.info("Compactor compacting " + operations + " files"); - - context.setJobStatus(this.getClass().getSimpleName(), "Compacting file slices"); - return jsc.parallelize(operations, operations.size()) - .map(s -> compact(table, metaClient, config, s, compactionInstantTime)).flatMap(List::iterator); - } - - private List compact(HoodieSparkCopyOnWriteTable hoodieCopyOnWriteTable, HoodieTableMetaClient metaClient, - HoodieWriteConfig config, CompactionOperation operation, String instantTime) throws IOException { - FileSystem fs = metaClient.getFs(); - Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); - LOG.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames() - + " for commit " + instantTime); - // TODO - FIX THIS - // Reads the entire avro file. Always only specific blocks should be read from the avro file - // (failure recover). - // Load all the delta commits since the last compaction commit and get all the blocks to be - // loaded and load it using CompositeAvroLogReader - // Since a DeltaCommit is not defined yet, reading all the records. revisit this soon. - String maxInstantTime = metaClient - .getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, - HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) - .filterCompletedInstants().lastInstant().get().getTimestamp(); - long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new SparkTaskContextSupplier(), config); - LOG.info("MaxMemoryPerCompaction => " + maxMemoryPerCompaction); - - List logFiles = operation.getDeltaFileNames().stream().map( - p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString()) - .collect(toList()); - HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() - .withFileSystem(fs) - .withBasePath(metaClient.getBasePath()) - .withLogFilePaths(logFiles) - .withReaderSchema(readerSchema) - .withLatestInstantTime(maxInstantTime) - .withMaxMemorySizeInBytes(maxMemoryPerCompaction) - .withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled()) - .withReverseReader(config.getCompactionReverseLogReadEnabled()) - .withBufferSize(config.getMaxDFSStreamBufferSize()) - .withSpillableMapBasePath(config.getSpillableMapBasePath()) - .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType()) - .withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()) - .build(); - if (!scanner.iterator().hasNext()) { - return new ArrayList<>(); - } - - Option oldDataFileOpt = - operation.getBaseFile(metaClient.getBasePath(), operation.getPartitionPath()); - - // Compacting is very similar to applying updates to existing file - Iterator> result; - // If the dataFile is present, perform updates else perform inserts into a new base file. - if (oldDataFileOpt.isPresent()) { - result = hoodieCopyOnWriteTable.handleUpdate(instantTime, operation.getPartitionPath(), - operation.getFileId(), scanner.getRecords(), - oldDataFileOpt.get()); - } else { - result = hoodieCopyOnWriteTable.handleInsert(instantTime, operation.getPartitionPath(), operation.getFileId(), - scanner.getRecords()); - } - Iterable> resultIterable = () -> result; - return StreamSupport.stream(resultIterable.spliterator(), false).flatMap(Collection::stream).peek(s -> { - s.getStat().setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog()); - s.getStat().setTotalLogFilesCompacted(scanner.getTotalLogFiles()); - s.getStat().setTotalLogRecords(scanner.getTotalLogRecords()); - s.getStat().setPartitionPath(operation.getPartitionPath()); - s.getStat() - .setTotalLogSizeCompacted(operation.getMetrics().get(CompactionStrategy.TOTAL_LOG_FILE_SIZE).longValue()); - s.getStat().setTotalLogBlocks(scanner.getTotalLogBlocks()); - s.getStat().setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks()); - s.getStat().setTotalRollbackBlocks(scanner.getTotalRollbacks()); - RuntimeStats runtimeStats = new RuntimeStats(); - runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks()); - s.getStat().setRuntimeStats(runtimeStats); - scanner.close(); - }).collect(toList()); } @Override - public HoodieCompactionPlan generateCompactionPlan(HoodieEngineContext context, - HoodieTable>, JavaRDD, JavaRDD> hoodieTable, - HoodieWriteConfig config, String compactionCommitTime, - Set fgIdsInPendingCompactionAndClustering) - throws IOException { - JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context); - totalLogFiles = new LongAccumulator(); - totalFileSlices = new LongAccumulator(); - jsc.sc().register(totalLogFiles); - jsc.sc().register(totalFileSlices); - - ValidationUtils.checkArgument(hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ, - "Can only compact table of type " + HoodieTableType.MERGE_ON_READ + " and not " - + hoodieTable.getMetaClient().getTableType().name()); - - // TODO : check if maxMemory is not greater than JVM or spark.executor memory - // TODO - rollback any compactions in flight - HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); - LOG.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommitTime); - List partitionPaths = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), metaClient.getBasePath()); - - // filter the partition paths if needed to reduce list status - partitionPaths = config.getCompactionStrategy().filterPartitionPaths(config, partitionPaths); - - if (partitionPaths.isEmpty()) { - // In case no partitions could be picked, return no compaction plan - return null; - } - - SliceView fileSystemView = hoodieTable.getSliceView(); - LOG.info("Compaction looking for files to compact in " + partitionPaths + " partitions"); - context.setJobStatus(this.getClass().getSimpleName(), "Looking for files to compact"); - - List operations = context.flatMap(partitionPaths, partitionPath -> { - return fileSystemView - .getLatestFileSlices(partitionPath) - .filter(slice -> !fgIdsInPendingCompactionAndClustering.contains(slice.getFileGroupId())) - .map(s -> { - List logFiles = - s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); - totalLogFiles.add((long) logFiles.size()); - totalFileSlices.add(1L); - // Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO - // for spark Map operations and collecting them finally in Avro generated classes for storing - // into meta files. - Option dataFile = s.getBaseFile(); - return new CompactionOperation(dataFile, partitionPath, logFiles, - config.getCompactionStrategy().captureMetrics(config, s)); - }) - .filter(c -> !c.getDeltaFileNames().isEmpty()); - }, partitionPaths.size()).stream().map(CompactionUtils::buildHoodieCompactionOperation).collect(toList()); - - LOG.info("Total of " + operations.size() + " compactions are retrieved"); - LOG.info("Total number of latest files slices " + totalFileSlices.value()); - LOG.info("Total number of log files " + totalLogFiles.value()); - LOG.info("Total number of file slices " + totalFileSlices.value()); - // Filter the compactions with the passed in filter. This lets us choose most effective - // compactions only - HoodieCompactionPlan compactionPlan = config.getCompactionStrategy().generateCompactionPlan(config, operations, - CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList())); - ValidationUtils.checkArgument( - compactionPlan.getOperations().stream().noneMatch( - op -> fgIdsInPendingCompactionAndClustering.contains(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))), - "Bad Compaction Plan. FileId MUST NOT have multiple pending compactions. " - + "Please fix your strategy implementation. FileIdsWithPendingCompactions :" + fgIdsInPendingCompactionAndClustering - + ", Selected workload :" + compactionPlan); - if (compactionPlan.getOperations().isEmpty()) { - LOG.warn("After filtering, Nothing to compact for " + metaClient.getBasePath()); - } - return compactionPlan; + public void maybePersist(HoodieData writeStatus, HoodieWriteConfig config) { + HoodieJavaRDD.getJavaRDD(writeStatus).persist(SparkMemoryUtils.getWriteStatusStorageLevel(config.getProps())); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkCompactHelpers.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkCompactHelpers.java deleted file mode 100644 index 107f533f2..000000000 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkCompactHelpers.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.action.compact; - -import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.HoodieWriteStat; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; -import org.apache.hudi.table.HoodieTable; - -import org.apache.spark.api.java.JavaRDD; - -import java.io.IOException; -import java.util.List; - -/** - * A spark implementation of {@link AbstractCompactHelpers}. - * - * @param - */ -public class SparkCompactHelpers extends - AbstractCompactHelpers>, JavaRDD, JavaRDD> { - - private SparkCompactHelpers() { - } - - private static class CompactHelperHolder { - private static final SparkCompactHelpers SPARK_COMPACT_HELPERS = new SparkCompactHelpers(); - } - - public static SparkCompactHelpers newInstance() { - return CompactHelperHolder.SPARK_COMPACT_HELPERS; - } - - @Override - public HoodieCommitMetadata createCompactionMetadata(HoodieTable>, JavaRDD, JavaRDD> table, - String compactionInstantTime, - JavaRDD writeStatuses, - String schema) throws IOException { - byte[] planBytes = table.getActiveTimeline().readCompactionPlanAsBytes( - HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get(); - HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeCompactionPlan(planBytes); - List updateStatusMap = writeStatuses.map(WriteStatus::getStat).collect(); - org.apache.hudi.common.model.HoodieCommitMetadata metadata = new org.apache.hudi.common.model.HoodieCommitMetadata(true); - for (HoodieWriteStat stat : updateStatusMap) { - metadata.addWriteStat(stat.getPartitionPath(), stat); - } - metadata.addMetadata(org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY, schema); - if (compactionPlan.getExtraMetadata() != null) { - compactionPlan.getExtraMetadata().forEach(metadata::addMetadata); - } - return metadata; - } -} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java deleted file mode 100644 index 9c44499a8..000000000 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.action.compact; - -import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.HoodieFileGroupId; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.view.SyncableFileSystemView; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieCompactionException; -import org.apache.hudi.table.HoodieTable; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.spark.api.java.JavaRDD; - -import java.io.IOException; -import java.text.ParseException; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - -@SuppressWarnings("checkstyle:LineLength") -public class SparkScheduleCompactionActionExecutor extends - BaseScheduleCompactionActionExecutor>, JavaRDD, JavaRDD> { - - private static final Logger LOG = LogManager.getLogger(SparkScheduleCompactionActionExecutor.class); - - public SparkScheduleCompactionActionExecutor(HoodieEngineContext context, - HoodieWriteConfig config, - HoodieTable>, JavaRDD, JavaRDD> table, - String instantTime, - Option> extraMetadata) { - super(context, config, table, instantTime, extraMetadata); - } - - @Override - protected HoodieCompactionPlan scheduleCompaction() { - LOG.info("Checking if compaction needs to be run on " + config.getBasePath()); - // judge if we need to compact according to num delta commits and time elapsed - boolean compactable = needCompact(config.getInlineCompactTriggerStrategy()); - if (compactable) { - LOG.info("Generating compaction plan for merge on read table " + config.getBasePath()); - HoodieSparkMergeOnReadTableCompactor compactor = new HoodieSparkMergeOnReadTableCompactor(); - try { - SyncableFileSystemView fileSystemView = (SyncableFileSystemView) table.getSliceView(); - Set fgInPendingCompactionAndClustering = fileSystemView.getPendingCompactionOperations() - .map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId()) - .collect(Collectors.toSet()); - // exclude files in pending clustering from compaction. - fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet())); - return compactor.generateCompactionPlan(context, table, config, instantTime, fgInPendingCompactionAndClustering); - } catch (IOException e) { - throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e); - } - } - - return new HoodieCompactionPlan(); - } - - public Pair getLatestDeltaCommitInfo(CompactionTriggerStrategy compactionTriggerStrategy) { - Option lastCompaction = table.getActiveTimeline().getCommitTimeline() - .filterCompletedInstants().lastInstant(); - HoodieTimeline deltaCommits = table.getActiveTimeline().getDeltaCommitTimeline(); - - String latestInstantTs; - int deltaCommitsSinceLastCompaction = 0; - if (lastCompaction.isPresent()) { - latestInstantTs = lastCompaction.get().getTimestamp(); - deltaCommitsSinceLastCompaction = deltaCommits.findInstantsAfter(latestInstantTs, Integer.MAX_VALUE).countInstants(); - } else { - latestInstantTs = deltaCommits.firstInstant().get().getTimestamp(); - deltaCommitsSinceLastCompaction = deltaCommits.findInstantsAfterOrEquals(latestInstantTs, Integer.MAX_VALUE).countInstants(); - } - return Pair.of(deltaCommitsSinceLastCompaction, latestInstantTs); - } - - public boolean needCompact(CompactionTriggerStrategy compactionTriggerStrategy) { - boolean compactable; - // get deltaCommitsSinceLastCompaction and lastCompactionTs - Pair latestDeltaCommitInfo = getLatestDeltaCommitInfo(compactionTriggerStrategy); - int inlineCompactDeltaCommitMax = config.getInlineCompactDeltaCommitMax(); - int inlineCompactDeltaSecondsMax = config.getInlineCompactDeltaSecondsMax(); - switch (compactionTriggerStrategy) { - case NUM_COMMITS: - compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft(); - if (compactable) { - LOG.info(String.format("The delta commits >= %s, trigger compaction scheduler.", inlineCompactDeltaCommitMax)); - } - break; - case TIME_ELAPSED: - compactable = inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - parsedToSeconds(latestDeltaCommitInfo.getRight()); - if (compactable) { - LOG.info(String.format("The elapsed time >=%ss, trigger compaction scheduler.", inlineCompactDeltaSecondsMax)); - } - break; - case NUM_OR_TIME: - compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft() - || inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - parsedToSeconds(latestDeltaCommitInfo.getRight()); - if (compactable) { - LOG.info(String.format("The delta commits >= %s or elapsed_time >=%ss, trigger compaction scheduler.", inlineCompactDeltaCommitMax, - inlineCompactDeltaSecondsMax)); - } - break; - case NUM_AND_TIME: - compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft() - && inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - parsedToSeconds(latestDeltaCommitInfo.getRight()); - if (compactable) { - LOG.info(String.format("The delta commits >= %s and elapsed_time >=%ss, trigger compaction scheduler.", inlineCompactDeltaCommitMax, - inlineCompactDeltaSecondsMax)); - } - break; - default: - throw new HoodieCompactionException("Unsupported compaction trigger strategy: " + config.getInlineCompactTriggerStrategy()); - } - return compactable; - } - - public Long parsedToSeconds(String time) { - long timestamp; - try { - timestamp = HoodieActiveTimeline.COMMIT_FORMATTER.parse(time).getTime() / 1000; - } catch (ParseException e) { - throw new HoodieCompactionException(e.getMessage(), e); - } - return timestamp; - } -} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java index 608d9ca07..c2879fb1a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java @@ -89,8 +89,8 @@ public class TestAsyncCompaction extends CompactionTestBase { metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient); - client.rollbackInflightCompaction( - new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), hoodieTable); + hoodieTable.rollbackInflightCompaction( + new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime)); metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); pendingCompactionInstant = metaClient.getCommitsAndCompactionTimeline().filterPendingCompactionTimeline() .getInstants().findFirst().get(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java index cad426492..36a70d71b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java @@ -208,7 +208,8 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { String compactionInstantTime = "102"; table.scheduleCompaction(context, compactionInstantTime, Option.empty()); table.getMetaClient().reloadActiveTimeline(); - JavaRDD result = (JavaRDD) table.compact(context, compactionInstantTime).getWriteStatuses(); + JavaRDD result = (JavaRDD) table.compact( + context, compactionInstantTime).getWriteStatuses(); // Verify that all partition paths are present in the WriteStatus result for (String partitionPath : dataGen.getPartitionPaths()) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java index b1aebc748..6bbb0f655 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java @@ -279,8 +279,8 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction final String compactedCommitTime = metaClient.getActiveTimeline().reload().lastInstant().get().getTimestamp(); assertTrue(Arrays.stream(listAllBaseFilesInPath(hoodieTable)) .anyMatch(file -> compactedCommitTime.equals(new HoodieBaseFile(file).getCommitTime()))); - thirdClient.rollbackInflightCompaction(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactedCommitTime), - hoodieTable); + hoodieTable.rollbackInflightCompaction(new HoodieInstant( + HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactedCommitTime)); allFiles = listAllBaseFilesInPath(hoodieTable); metaClient = HoodieTableMetaClient.reload(metaClient); tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); @@ -611,7 +611,8 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction //writeClient.commitCompaction(newCommitTime, statuses, Option.empty()); // Trigger a rollback of compaction table.getActiveTimeline().reload(); - writeClient.rollbackInflightCompaction(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, newCommitTime), table); + table.rollbackInflightCompaction(new HoodieInstant( + HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, newCommitTime)); metaClient = HoodieTableMetaClient.reload(metaClient); table = HoodieSparkTable.create(config, context(), metaClient); @@ -619,7 +620,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction ((SyncableFileSystemView) tableRTFileSystemView).reset(); for (String partitionPath : dataGen.getPartitionPaths()) { - List fileSlices = getFileSystemViewWithUnCommittedSlices(metaClient) + List fileSlices = getFileSystemViewWithUnCommittedSlices(metaClient) .getAllFileSlices(partitionPath).filter(fs -> fs.getBaseInstantTime().equals("100")).collect(Collectors.toList()); assertTrue(fileSlices.stream().noneMatch(fileSlice -> fileSlice.getBaseFile().isPresent())); assertTrue(fileSlices.stream().anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0)); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieAccumulator.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieAccumulator.java new file mode 100644 index 000000000..61fb98e1a --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieAccumulator.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.data; + +import java.io.Serializable; + +/** + * An abstraction for accumulator on counts. + */ +public abstract class HoodieAccumulator implements Serializable { + /** + * @return the count. + */ + public abstract long value(); + + /** + * Increments the count based on the input. + * + * @param increment the value to add. + */ + public abstract void add(long increment); +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieAtomicLongAccumulator.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieAtomicLongAccumulator.java new file mode 100644 index 000000000..3ace1c7a4 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieAtomicLongAccumulator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.data; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * An accumulator on counts based on {@link AtomicLong} implementation. + */ +public class HoodieAtomicLongAccumulator extends HoodieAccumulator { + + private final AtomicLong accumulator; + + private HoodieAtomicLongAccumulator() { + accumulator = new AtomicLong(0L); + } + + public static HoodieAtomicLongAccumulator create() { + return new HoodieAtomicLongAccumulator(); + } + + @Override + public long value() { + return accumulator.get(); + } + + @Override + public void add(long increment) { + accumulator.addAndGet(increment); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java new file mode 100644 index 000000000..f26a42035 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.data; + +import org.apache.hudi.common.function.SerializableFunction; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; + +/** + * An abstraction for a data collection of objects in type T to store the reference + * and do transformation. + * + * @param type of object. + */ +public abstract class HoodieData implements Serializable { + /** + * @return the collection of objects. + */ + public abstract Object get(); + + /** + * @return whether the collection is empty. + */ + public abstract boolean isEmpty(); + + /** + * @param func serializable map function. + * @param output object type. + * @return {@link HoodieData} containing the result. Actual execution may be deferred. + */ + public abstract HoodieData map(SerializableFunction func); + + /** + * @param func serializable flatmap function. + * @param output object type. + * @return {@link HoodieData} containing the result. Actual execution may be deferred. + */ + public abstract HoodieData flatMap(SerializableFunction> func); + + /** + * @return collected results in {@link List}. + */ + public abstract List collectAsList(); +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java new file mode 100644 index 000000000..2dd8c2ec5 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.data; + +import org.apache.hudi.common.function.SerializableFunction; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper; + +/** + * Holds a {@link List} of objects. + * + * @param type of object. + */ +public class HoodieList extends HoodieData { + + private final List listData; + + private HoodieList(List listData) { + this.listData = listData; + } + + /** + * @param listData a {@link List} of objects in type T. + * @param type of object. + * @return a new instance containing the {@link List} reference. + */ + public static HoodieList of(List listData) { + return new HoodieList<>(listData); + } + + /** + * @param hoodieData {@link HoodieList } instance containing the {@link List} of objects. + * @param type of object. + * @return the a {@link List} of objects in type T. + */ + public static List getList(HoodieData hoodieData) { + return ((HoodieList) hoodieData).get(); + } + + @Override + public List get() { + return listData; + } + + @Override + public boolean isEmpty() { + return listData.isEmpty(); + } + + @Override + public HoodieData map(SerializableFunction func) { + return HoodieList.of(listData.stream().parallel() + .map(throwingMapWrapper(func)).collect(Collectors.toList())); + } + + @Override + public HoodieData flatMap(SerializableFunction> func) { + Function> throwableFunc = throwingMapWrapper(func); + return HoodieList.of(listData.stream().flatMap(e -> { + List result = new ArrayList<>(); + Iterator iterator = throwableFunc.apply(e); + iterator.forEachRemaining(result::add); + return result.stream(); + }).collect(Collectors.toList())); + } + + @Override + public List collectAsList() { + return listData; + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java index fde34b609..4e5120ab1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java @@ -19,6 +19,8 @@ package org.apache.hudi.common.engine; import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.data.HoodieAccumulator; +import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableConsumer; import org.apache.hudi.common.function.SerializableFunction; @@ -59,6 +61,12 @@ public abstract class HoodieEngineContext { return taskContextSupplier; } + public abstract HoodieAccumulator newAccumulator(); + + public abstract HoodieData emptyHoodieData(); + + public abstract HoodieData parallelize(List data); + public abstract List map(List data, SerializableFunction func, int parallelism); public abstract List mapToPairAndReduceByKey( diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java index ca032e78a..61cbaed02 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java @@ -21,6 +21,10 @@ package org.apache.hudi.common.engine; import org.apache.hadoop.conf.Configuration; import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.data.HoodieAccumulator; +import org.apache.hudi.common.data.HoodieAtomicLongAccumulator; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.data.HoodieList; import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableConsumer; import org.apache.hudi.common.function.SerializableFunction; @@ -31,6 +35,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -59,6 +64,21 @@ public final class HoodieLocalEngineContext extends HoodieEngineContext { super(new SerializableConfiguration(conf), taskContextSupplier); } + @Override + public HoodieAccumulator newAccumulator() { + return HoodieAtomicLongAccumulator.create(); + } + + @Override + public HoodieData emptyHoodieData() { + return HoodieList.of(Collections.emptyList()); + } + + @Override + public HoodieData parallelize(List data) { + return HoodieList.of(data); + } + @Override public List map(List data, SerializableFunction func, int parallelism) { return data.stream().parallel().map(throwingMapWrapper(func)).collect(toList()); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java index 57b79df1c..98726d273 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java @@ -22,7 +22,8 @@ import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.sink.utils.NonThrownExecutor; -import org.apache.hudi.table.action.compact.FlinkCompactHelpers; +import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable; +import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; @@ -98,7 +99,17 @@ public class CompactFunction extends ProcessFunction collector) throws IOException { - List writeStatuses = FlinkCompactHelpers.compact(writeClient, instantTime, compactionOperation); + HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor(); + List writeStatuses = compactor.compact( + new HoodieFlinkCopyOnWriteTable<>( + writeClient.getConfig(), + writeClient.getEngineContext(), + writeClient.getHoodieTable().getMetaClient()), + writeClient.getHoodieTable().getMetaClient(), + writeClient.getConfig(), + compactionOperation, + instantTime, + writeClient.getHoodieTable().getTaskContextSupplier()); collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), writeStatuses, taskID)); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java index 325d88f3e..945d4288b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java @@ -84,7 +84,7 @@ public class CompactionPlanOperator extends AbstractStreamOperator table, HoodieFlinkWriteClient writeClient, Configuration conf) { + public static void rollbackCompaction(HoodieFlinkTable table, Configuration conf) { String curInstantTime = HoodieActiveTimeline.createNewInstantTime(); int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS); HoodieTimeline inflightCompactionTimeline = table.getActiveTimeline() @@ -121,7 +120,7 @@ public class CompactionUtil { && StreamerUtil.instantTimeDiffSeconds(curInstantTime, instant.getTimestamp()) >= deltaSeconds); inflightCompactionTimeline.getInstants().forEach(inflightInstant -> { LOG.info("Rollback the pending compaction instant: " + inflightInstant); - writeClient.rollbackInflightCompaction(inflightInstant, table); + table.rollbackInflightCompaction(inflightInstant); table.getMetaClient().reloadActiveTimeline(); }); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java index 073ae27bc..473a33e8c 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java @@ -78,7 +78,7 @@ public class TestCompactionUtil { HoodieInstant instant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().orElse(null); assertThat(instant.getTimestamp(), is(instantTime)); - CompactionUtil.rollbackCompaction(table, writeClient, conf); + CompactionUtil.rollbackCompaction(table, conf); HoodieInstant rollbackInstant = table.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().get(); assertThat(rollbackInstant.getState(), is(HoodieInstant.State.REQUESTED)); assertThat(rollbackInstant.getTimestamp(), is(instantTime));