1
0

[HUDI-2501] Add HoodieData abstraction and refactor compaction actions in hudi-client module (#3741)

This commit is contained in:
Y Ethan Guo
2021-10-22 12:58:51 -07:00
committed by GitHub
parent 1e285dc399
commit 5ed35bff83
41 changed files with 1084 additions and 1019 deletions

View File

@@ -775,19 +775,6 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
protected abstract void completeCompaction(HoodieCommitMetadata metadata, O writeStatuses, protected abstract void completeCompaction(HoodieCommitMetadata metadata, O writeStatuses,
HoodieTable<T, I, K, O> table, String compactionCommitTime); HoodieTable<T, I, K, O> table, String compactionCommitTime);
/**
* Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file to the .requested file
* TODO : Deprecate this method and make it protected
* @param inflightInstant Inflight Compaction Instant
* @param table Hoodie Table
*/
public void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable<T, I, K, O> table) {
String commitTime = HoodieActiveTimeline.createNewInstantTime();
table.scheduleRollback(context, commitTime, inflightInstant, false);
table.rollback(context, commitTime, inflightInstant, false);
table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
}
/** /**
* Get inflight time line exclude compaction and clustering. * Get inflight time line exclude compaction and clustering.
* @param metaClient * @param metaClient

View File

@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.table;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* Interface for insert and update operations in compaction.
*
* @param <T> HoodieRecordPayload type.
*/
public interface HoodieCompactionHandler<T extends HoodieRecordPayload> {
Iterator<List<WriteStatus>> handleUpdate(String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException;
Iterator<List<WriteStatus>> handleInsert(String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordMap);
}

View File

@@ -365,12 +365,11 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
/** /**
* Run Compaction on the table. Compaction arranges the data so that it is optimized for data access. * Run Compaction on the table. Compaction arranges the data so that it is optimized for data access.
* *
* @param context HoodieEngineContext * @param context HoodieEngineContext
* @param compactionInstantTime Instant Time * @param compactionInstantTime Instant Time
*/ */
public abstract HoodieWriteMetadata<O> compact(HoodieEngineContext context, public abstract HoodieWriteMetadata<O> compact(HoodieEngineContext context,
String compactionInstantTime); String compactionInstantTime);
/** /**
* Schedule clustering for the instant time. * Schedule clustering for the instant time.
@@ -471,11 +470,24 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
String restoreInstantTime, String restoreInstantTime,
String instantToRestore); String instantToRestore);
/**
* Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file
* to the .requested file.
*
* @param inflightInstant Inflight Compaction Instant
*/
public void rollbackInflightCompaction(HoodieInstant inflightInstant) {
String commitTime = HoodieActiveTimeline.createNewInstantTime();
scheduleRollback(context, commitTime, inflightInstant, false);
rollback(context, commitTime, inflightInstant, false);
getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
}
/** /**
* Finalize the written data onto storage. Perform any final cleanups. * Finalize the written data onto storage. Perform any final cleanups.
* *
* @param context HoodieEngineContext * @param context HoodieEngineContext
* @param stats List of HoodieWriteStats * @param stats List of HoodieWriteStats
* @throws HoodieIOException if some paths can't be finalized on storage * @throws HoodieIOException if some paths can't be finalized on storage
*/ */
public void finalizeWrite(HoodieEngineContext context, String instantTs, List<HoodieWriteStat> stats) throws HoodieIOException { public void finalizeWrite(HoodieEngineContext context, String instantTs, List<HoodieWriteStat> stats) throws HoodieIOException {

View File

@@ -46,6 +46,36 @@ public class HoodieWriteMetadata<O> {
public HoodieWriteMetadata() { public HoodieWriteMetadata() {
} }
/**
* Clones the write metadata with transformed write statuses.
*
* @param transformedWriteStatuses transformed write statuses
* @param <T> type of transformed write statuses
* @return Cloned {@link HoodieWriteMetadata<T>} instance
*/
public <T> HoodieWriteMetadata<T> clone(T transformedWriteStatuses) {
HoodieWriteMetadata<T> newMetadataInstance = new HoodieWriteMetadata<>();
newMetadataInstance.setWriteStatuses(transformedWriteStatuses);
if (indexLookupDuration.isPresent()) {
newMetadataInstance.setIndexLookupDuration(indexLookupDuration.get());
}
newMetadataInstance.setCommitted(isCommitted);
newMetadataInstance.setCommitMetadata(commitMetadata);
if (writeStats.isPresent()) {
newMetadataInstance.setWriteStats(writeStats.get());
}
if (indexUpdateDuration.isPresent()) {
newMetadataInstance.setIndexUpdateDuration(indexUpdateDuration.get());
}
if (finalizeDuration.isPresent()) {
newMetadataInstance.setFinalizeDuration(finalizeDuration.get());
}
if (partitionToReplaceFileIds.isPresent()) {
newMetadataInstance.setPartitionToReplaceFileIds(partitionToReplaceFileIds.get());
}
return newMetadataInstance;
}
public O getWriteStatuses() { public O getWriteStatuses() {
return writeStatuses; return writeStatuses;
} }

View File

@@ -1,90 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public abstract class BaseScheduleCompactionActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieCompactionPlan>> {
private final Option<Map<String, String>> extraMetadata;
public BaseScheduleCompactionActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table,
String instantTime,
Option<Map<String, String>> extraMetadata) {
super(context, config, table, instantTime);
this.extraMetadata = extraMetadata;
}
protected abstract HoodieCompactionPlan scheduleCompaction();
@Override
public Option<HoodieCompactionPlan> execute() {
if (!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
&& !config.getFailedWritesCleanPolicy().isLazy()) {
// if there are inflight writes, their instantTime must not be less than that of compaction instant time
table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant()
.ifPresent(earliestInflight -> ValidationUtils.checkArgument(
HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), HoodieTimeline.GREATER_THAN, instantTime),
"Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight
+ ", Compaction scheduled at " + instantTime));
// Committed and pending compaction instants should have strictly lower timestamps
List<HoodieInstant> conflictingInstants = table.getActiveTimeline()
.getWriteTimeline().filterCompletedAndCompactionInstants().getInstants()
.filter(instant -> HoodieTimeline.compareTimestamps(
instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, instantTime))
.collect(Collectors.toList());
ValidationUtils.checkArgument(conflictingInstants.isEmpty(),
"Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :"
+ conflictingInstants);
}
HoodieCompactionPlan plan = scheduleCompaction();
if (plan != null && (plan.getOperations() != null) && (!plan.getOperations().isEmpty())) {
extraMetadata.ifPresent(plan::setExtraMetadata);
HoodieInstant compactionInstant =
new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime);
try {
table.getActiveTimeline().saveToCompactionRequested(compactionInstant,
TimelineMetadataUtils.serializeCompactionPlan(plan));
} catch (IOException ioe) {
throw new HoodieIOException("Exception scheduling compaction", ioe);
}
return Option.of(plan);
}
return Option.empty();
}
}

View File

@@ -18,17 +18,23 @@
package org.apache.hudi.table.action.compact; package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieCompactionException; import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.List;
/** /**
* Base class helps to perform compact. * Base class helps to perform compact.
@@ -38,11 +44,34 @@ import java.nio.charset.StandardCharsets;
* @param <K> Type of keys * @param <K> Type of keys
* @param <O> Type of outputs * @param <O> Type of outputs
*/ */
public abstract class AbstractCompactHelpers<T extends HoodieRecordPayload, I, K, O> { public class CompactHelpers<T extends HoodieRecordPayload, I, K, O> {
public abstract HoodieCommitMetadata createCompactionMetadata(HoodieTable<T, I, K, O> table,
String compactionInstantTime, private static final CompactHelpers SINGLETON_INSTANCE = new CompactHelpers();
O writeStatuses,
String schema) throws IOException; private CompactHelpers() {
}
public static CompactHelpers getInstance() {
return SINGLETON_INSTANCE;
}
public HoodieCommitMetadata createCompactionMetadata(
HoodieTable table, String compactionInstantTime, HoodieData<WriteStatus> writeStatuses,
String schema) throws IOException {
byte[] planBytes = table.getActiveTimeline().readCompactionPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get();
HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeCompactionPlan(planBytes);
List<HoodieWriteStat> updateStatusMap = writeStatuses.map(WriteStatus::getStat).collectAsList();
HoodieCommitMetadata metadata = new HoodieCommitMetadata(true);
for (HoodieWriteStat stat : updateStatusMap) {
metadata.addWriteStat(stat.getPartitionPath(), stat);
}
metadata.addMetadata(org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY, schema);
if (compactionPlan.getExtraMetadata() != null) {
compactionPlan.getExtraMetadata().forEach(metadata::addMetadata);
}
return metadata;
}
public void completeInflightCompaction(HoodieTable table, String compactionCommitTime, HoodieCommitMetadata commitMetadata) { public void completeInflightCompaction(HoodieTable table, String compactionCommitTime, HoodieCommitMetadata commitMetadata) {
HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); HoodieActiveTimeline activeTimeline = table.getActiveTimeline();

View File

@@ -18,39 +18,280 @@
package org.apache.hudi.table.action.compact; package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.table.HoodieCompactionHandler;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.StreamSupport;
import static java.util.stream.Collectors.toList;
/** /**
* A HoodieCompactor runs compaction on a hoodie table. * A HoodieCompactor runs compaction on a hoodie table.
*/ */
public interface HoodieCompactor<T extends HoodieRecordPayload, I, K, O> extends Serializable { public abstract class HoodieCompactor<T extends HoodieRecordPayload, I, K, O> implements Serializable {
private static final Logger LOG = LogManager.getLogger(HoodieCompactor.class);
/** /**
* Generate a new compaction plan for scheduling. * Handles the compaction timeline based on the compaction instant before actual compaction.
* *
* @param context HoodieEngineContext * @param table {@link HoodieTable} instance to use.
* @param hoodieTable Hoodie Table * @param pendingCompactionTimeline pending compaction timeline.
* @param config Hoodie Write Configuration * @param compactionInstantTime compaction instant
* @param compactionCommitTime scheduled compaction commit time
* @param fgIdsInPendingCompactions partition-fileId pairs for which compaction is pending
* @return Compaction Plan
* @throws IOException when encountering errors
*/ */
HoodieCompactionPlan generateCompactionPlan(HoodieEngineContext context, HoodieTable<T, I, K, O> hoodieTable, HoodieWriteConfig config, public abstract void preCompact(
String compactionCommitTime, Set<HoodieFileGroupId> fgIdsInPendingCompactions) throws IOException; HoodieTable table, HoodieTimeline pendingCompactionTimeline, String compactionInstantTime);
/**
* Maybe persist write status.
*
* @param writeStatus {@link HoodieData} of {@link WriteStatus}.
*/
public abstract void maybePersist(HoodieData<WriteStatus> writeStatus, HoodieWriteConfig config);
/** /**
* Execute compaction operations and report back status. * Execute compaction operations and report back status.
*/ */
O compact(HoodieEngineContext context, HoodieCompactionPlan compactionPlan, HoodieTable<T, I, K, O> hoodieTable, public HoodieData<WriteStatus> compact(
HoodieWriteConfig config, String compactionInstantTime) throws IOException; HoodieEngineContext context, HoodieCompactionPlan compactionPlan,
HoodieTable table, HoodieWriteConfig config, String compactionInstantTime,
HoodieCompactionHandler compactionHandler) {
if (compactionPlan == null || (compactionPlan.getOperations() == null)
|| (compactionPlan.getOperations().isEmpty())) {
return context.emptyHoodieData();
}
HoodieActiveTimeline timeline = table.getActiveTimeline();
HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
// Mark instant as compaction inflight
timeline.transitionCompactionRequestedToInflight(instant);
table.getMetaClient().reloadActiveTimeline();
HoodieTableMetaClient metaClient = table.getMetaClient();
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
// Here we firstly use the table schema as the reader schema to read
// log file.That is because in the case of MergeInto, the config.getSchema may not
// the same with the table schema.
try {
Schema readerSchema = schemaUtil.getTableAvroSchema(false);
config.setSchema(readerSchema.toString());
} catch (Exception e) {
// If there is no commit in the table, just ignore the exception.
}
// Compacting is very similar to applying updates to existing file
List<CompactionOperation> operations = compactionPlan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
LOG.info("Compactor compacting " + operations + " files");
context.setJobStatus(this.getClass().getSimpleName(), "Compacting file slices");
TaskContextSupplier taskContextSupplier = table.getTaskContextSupplier();
return context.parallelize(operations).map(operation -> compact(
compactionHandler, metaClient, config, operation, compactionInstantTime, taskContextSupplier))
.flatMap(List::iterator);
}
/**
* Execute a single compaction operation and report back status.
*/
public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler,
HoodieTableMetaClient metaClient,
HoodieWriteConfig config,
CompactionOperation operation,
String instantTime,
TaskContextSupplier taskContextSupplier) throws IOException {
FileSystem fs = metaClient.getFs();
Schema readerSchema = HoodieAvroUtils.addMetadataFields(
new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField());
LOG.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames()
+ " for commit " + instantTime);
// TODO - FIX THIS
// Reads the entire avro file. Always only specific blocks should be read from the avro file
// (failure recover).
// Load all the delta commits since the last compaction commit and get all the blocks to be
// loaded and load it using CompositeAvroLogReader
// Since a DeltaCommit is not defined yet, reading all the records. revisit this soon.
String maxInstantTime = metaClient
.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
.filterCompletedInstants().lastInstant().get().getTimestamp();
long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(taskContextSupplier, config);
LOG.info("MaxMemoryPerCompaction => " + maxMemoryPerCompaction);
List<String> logFiles = operation.getDeltaFileNames().stream().map(
p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString())
.collect(toList());
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(metaClient.getBasePath())
.withLogFilePaths(logFiles)
.withReaderSchema(readerSchema)
.withLatestInstantTime(maxInstantTime)
.withMaxMemorySizeInBytes(maxMemoryPerCompaction)
.withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled())
.withReverseReader(config.getCompactionReverseLogReadEnabled())
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.build();
if (!scanner.iterator().hasNext()) {
return new ArrayList<>();
}
Option<HoodieBaseFile> oldDataFileOpt =
operation.getBaseFile(metaClient.getBasePath(), operation.getPartitionPath());
// Compacting is very similar to applying updates to existing file
Iterator<List<WriteStatus>> result;
// If the dataFile is present, perform updates else perform inserts into a new base file.
if (oldDataFileOpt.isPresent()) {
result = compactionHandler.handleUpdate(instantTime, operation.getPartitionPath(),
operation.getFileId(), scanner.getRecords(),
oldDataFileOpt.get());
} else {
result = compactionHandler.handleInsert(instantTime, operation.getPartitionPath(), operation.getFileId(),
scanner.getRecords());
}
Iterable<List<WriteStatus>> resultIterable = () -> result;
return StreamSupport.stream(resultIterable.spliterator(), false).flatMap(Collection::stream).peek(s -> {
s.getStat().setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog());
s.getStat().setTotalLogFilesCompacted(scanner.getTotalLogFiles());
s.getStat().setTotalLogRecords(scanner.getTotalLogRecords());
s.getStat().setPartitionPath(operation.getPartitionPath());
s.getStat()
.setTotalLogSizeCompacted(operation.getMetrics().get(CompactionStrategy.TOTAL_LOG_FILE_SIZE).longValue());
s.getStat().setTotalLogBlocks(scanner.getTotalLogBlocks());
s.getStat().setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks());
s.getStat().setTotalRollbackBlocks(scanner.getTotalRollbacks());
RuntimeStats runtimeStats = new RuntimeStats();
runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks());
s.getStat().setRuntimeStats(runtimeStats);
scanner.close();
}).collect(toList());
}
/**
* Generate a new compaction plan for scheduling.
*
* @param context HoodieEngineContext
* @param hoodieTable Hoodie Table
* @param config Hoodie Write Configuration
* @param compactionCommitTime scheduled compaction commit time
* @param fgIdsInPendingCompactionAndClustering partition-fileId pairs for which compaction is pending
* @return Compaction Plan
* @throws IOException when encountering errors
*/
HoodieCompactionPlan generateCompactionPlan(
HoodieEngineContext context, HoodieTable<T, I, K, O> hoodieTable, HoodieWriteConfig config,
String compactionCommitTime, Set<HoodieFileGroupId> fgIdsInPendingCompactionAndClustering) throws IOException {
// Accumulator to keep track of total log files for a table
HoodieAccumulator totalLogFiles = context.newAccumulator();
// Accumulator to keep track of total log file slices for a table
HoodieAccumulator totalFileSlices = context.newAccumulator();
ValidationUtils.checkArgument(hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ,
"Can only compact table of type " + HoodieTableType.MERGE_ON_READ + " and not "
+ hoodieTable.getMetaClient().getTableType().name());
// TODO : check if maxMemory is not greater than JVM or executor memory
// TODO - rollback any compactions in flight
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
LOG.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommitTime);
List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), metaClient.getBasePath());
// filter the partition paths if needed to reduce list status
partitionPaths = config.getCompactionStrategy().filterPartitionPaths(config, partitionPaths);
if (partitionPaths.isEmpty()) {
// In case no partitions could be picked, return no compaction plan
return null;
}
SliceView fileSystemView = hoodieTable.getSliceView();
LOG.info("Compaction looking for files to compact in " + partitionPaths + " partitions");
context.setJobStatus(this.getClass().getSimpleName(), "Looking for files to compact");
List<HoodieCompactionOperation> operations = context.flatMap(partitionPaths, partitionPath -> fileSystemView
.getLatestFileSlices(partitionPath)
.filter(slice -> !fgIdsInPendingCompactionAndClustering.contains(slice.getFileGroupId()))
.map(s -> {
List<HoodieLogFile> logFiles =
s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(toList());
totalLogFiles.add(logFiles.size());
totalFileSlices.add(1L);
// Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO
// for Map operations and collecting them finally in Avro generated classes for storing
// into meta files.
Option<HoodieBaseFile> dataFile = s.getBaseFile();
return new CompactionOperation(dataFile, partitionPath, logFiles,
config.getCompactionStrategy().captureMetrics(config, s));
})
.filter(c -> !c.getDeltaFileNames().isEmpty()), partitionPaths.size()).stream()
.map(CompactionUtils::buildHoodieCompactionOperation).collect(toList());
LOG.info("Total of " + operations.size() + " compactions are retrieved");
LOG.info("Total number of latest files slices " + totalFileSlices.value());
LOG.info("Total number of log files " + totalLogFiles.value());
LOG.info("Total number of file slices " + totalFileSlices.value());
// Filter the compactions with the passed in filter. This lets us choose most effective
// compactions only
HoodieCompactionPlan compactionPlan = config.getCompactionStrategy().generateCompactionPlan(config, operations,
CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList()));
ValidationUtils.checkArgument(
compactionPlan.getOperations().stream().noneMatch(
op -> fgIdsInPendingCompactionAndClustering.contains(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))),
"Bad Compaction Plan. FileId MUST NOT have multiple pending compactions. "
+ "Please fix your strategy implementation. FileIdsWithPendingCompactions :" + fgIdsInPendingCompactionAndClustering
+ ", Selected workload :" + compactionPlan);
if (compactionPlan.getOperations().isEmpty()) {
LOG.warn("After filtering, Nothing to compact for " + metaClient.getBasePath());
}
return compactionPlan;
}
} }

View File

@@ -20,64 +20,62 @@ package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.client.utils.SparkMemoryUtils; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException; import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.table.HoodieCompactionHandler;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor; import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
@SuppressWarnings("checkstyle:LineLength") @SuppressWarnings("checkstyle:LineLength")
public class SparkRunCompactionActionExecutor<T extends HoodieRecordPayload> extends public class RunCompactionActionExecutor<T extends HoodieRecordPayload> extends
BaseActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, HoodieWriteMetadata<JavaRDD<WriteStatus>>> { BaseActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, HoodieWriteMetadata<HoodieData<WriteStatus>>> {
public SparkRunCompactionActionExecutor(HoodieSparkEngineContext context, private final HoodieCompactor compactor;
HoodieWriteConfig config, private final HoodieCompactionHandler compactionHandler;
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
String instantTime) { public RunCompactionActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable table,
String instantTime,
HoodieCompactor compactor,
HoodieCompactionHandler compactionHandler) {
super(context, config, table, instantTime); super(context, config, table, instantTime);
this.compactor = compactor;
this.compactionHandler = compactionHandler;
} }
@Override @Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() { public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(instantTime);
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
if (!pendingCompactionTimeline.containsInstant(instant)) { compactor.preCompact(table, pendingCompactionTimeline, instantTime);
throw new IllegalStateException(
"No Compaction request available at " + instantTime + " to run compaction");
}
HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = new HoodieWriteMetadata<>(); HoodieWriteMetadata<HoodieData<WriteStatus>> compactionMetadata = new HoodieWriteMetadata<>();
try { try {
HoodieActiveTimeline timeline = table.getActiveTimeline(); // generate compaction plan
// should support configurable commit metadata
HoodieCompactionPlan compactionPlan = HoodieCompactionPlan compactionPlan =
CompactionUtils.getCompactionPlan(table.getMetaClient(), instantTime); CompactionUtils.getCompactionPlan(table.getMetaClient(), instantTime);
// Mark instant as compaction inflight
timeline.transitionCompactionRequestedToInflight(instant);
table.getMetaClient().reloadActiveTimeline();
HoodieSparkMergeOnReadTableCompactor compactor = new HoodieSparkMergeOnReadTableCompactor(); HoodieData<WriteStatus> statuses = compactor.compact(
JavaRDD<WriteStatus> statuses = compactor.compact(context, compactionPlan, table, config, instantTime); context, compactionPlan, table, config, instantTime, compactionHandler);
statuses.persist(SparkMemoryUtils.getWriteStatusStorageLevel(config.getProps())); compactor.maybePersist(statuses, config);
context.setJobStatus(this.getClass().getSimpleName(), "Preparing compaction metadata"); context.setJobStatus(this.getClass().getSimpleName(), "Preparing compaction metadata");
List<HoodieWriteStat> updateStatusMap = statuses.map(WriteStatus::getStat).collect(); List<HoodieWriteStat> updateStatusMap = statuses.map(WriteStatus::getStat).collectAsList();
HoodieCommitMetadata metadata = new HoodieCommitMetadata(true); HoodieCommitMetadata metadata = new HoodieCommitMetadata(true);
for (HoodieWriteStat stat : updateStatusMap) { for (HoodieWriteStat stat : updateStatusMap) {
metadata.addWriteStat(stat.getPartitionPath(), stat); metadata.addWriteStat(stat.getPartitionPath(), stat);

View File

@@ -19,21 +19,22 @@
package org.apache.hudi.table.action.compact; package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException; import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@@ -45,31 +46,67 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@SuppressWarnings("checkstyle:LineLength") public class ScheduleCompactionActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieCompactionPlan>> {
public class FlinkScheduleCompactionActionExecutor<T extends HoodieRecordPayload> extends
BaseScheduleCompactionActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(FlinkScheduleCompactionActionExecutor.class); private static final Logger LOG = LogManager.getLogger(ScheduleCompactionActionExecutor.class);
private final Option<Map<String, String>> extraMetadata; private final Option<Map<String, String>> extraMetadata;
private final HoodieCompactor compactor;
public FlinkScheduleCompactionActionExecutor(HoodieEngineContext context, public ScheduleCompactionActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config, HoodieWriteConfig config,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, HoodieTable<T, I, K, O> table,
String instantTime, String instantTime,
Option<Map<String, String>> extraMetadata) { Option<Map<String, String>> extraMetadata,
super(context, config, table, instantTime, extraMetadata); HoodieCompactor compactor) {
super(context, config, table, instantTime);
this.extraMetadata = extraMetadata; this.extraMetadata = extraMetadata;
this.compactor = compactor;
} }
@Override @Override
protected HoodieCompactionPlan scheduleCompaction() { public Option<HoodieCompactionPlan> execute() {
if (!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
&& !config.getFailedWritesCleanPolicy().isLazy()) {
// if there are inflight writes, their instantTime must not be less than that of compaction instant time
table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant()
.ifPresent(earliestInflight -> ValidationUtils.checkArgument(
HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), HoodieTimeline.GREATER_THAN, instantTime),
"Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight
+ ", Compaction scheduled at " + instantTime));
// Committed and pending compaction instants should have strictly lower timestamps
List<HoodieInstant> conflictingInstants = table.getActiveTimeline()
.getWriteTimeline().filterCompletedAndCompactionInstants().getInstants()
.filter(instant -> HoodieTimeline.compareTimestamps(
instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, instantTime))
.collect(Collectors.toList());
ValidationUtils.checkArgument(conflictingInstants.isEmpty(),
"Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :"
+ conflictingInstants);
}
HoodieCompactionPlan plan = scheduleCompaction();
if (plan != null && (plan.getOperations() != null) && (!plan.getOperations().isEmpty())) {
extraMetadata.ifPresent(plan::setExtraMetadata);
HoodieInstant compactionInstant =
new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime);
try {
table.getActiveTimeline().saveToCompactionRequested(compactionInstant,
TimelineMetadataUtils.serializeCompactionPlan(plan));
} catch (IOException ioe) {
throw new HoodieIOException("Exception scheduling compaction", ioe);
}
return Option.of(plan);
}
return Option.empty();
}
private HoodieCompactionPlan scheduleCompaction() {
LOG.info("Checking if compaction needs to be run on " + config.getBasePath()); LOG.info("Checking if compaction needs to be run on " + config.getBasePath());
// judge if we need to compact according to num delta commits and time elapsed // judge if we need to compact according to num delta commits and time elapsed
boolean compactable = needCompact(config.getInlineCompactTriggerStrategy()); boolean compactable = needCompact(config.getInlineCompactTriggerStrategy());
if (compactable) { if (compactable) {
LOG.info("Generating compaction plan for merge on read table " + config.getBasePath()); LOG.info("Generating compaction plan for merge on read table " + config.getBasePath());
HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor();
try { try {
SyncableFileSystemView fileSystemView = (SyncableFileSystemView) table.getSliceView(); SyncableFileSystemView fileSystemView = (SyncableFileSystemView) table.getSliceView();
Set<HoodieFileGroupId> fgInPendingCompactionAndClustering = fileSystemView.getPendingCompactionOperations() Set<HoodieFileGroupId> fgInPendingCompactionAndClustering = fileSystemView.getPendingCompactionOperations()
@@ -86,7 +123,7 @@ public class FlinkScheduleCompactionActionExecutor<T extends HoodieRecordPayload
return new HoodieCompactionPlan(); return new HoodieCompactionPlan();
} }
public Pair<Integer, String> getLatestDeltaCommitInfo(CompactionTriggerStrategy compactionTriggerStrategy) { private Pair<Integer, String> getLatestDeltaCommitInfo(CompactionTriggerStrategy compactionTriggerStrategy) {
Option<HoodieInstant> lastCompaction = table.getActiveTimeline().getCommitTimeline() Option<HoodieInstant> lastCompaction = table.getActiveTimeline().getCommitTimeline()
.filterCompletedInstants().lastInstant(); .filterCompletedInstants().lastInstant();
HoodieTimeline deltaCommits = table.getActiveTimeline().getDeltaCommitTimeline(); HoodieTimeline deltaCommits = table.getActiveTimeline().getDeltaCommitTimeline();
@@ -103,7 +140,7 @@ public class FlinkScheduleCompactionActionExecutor<T extends HoodieRecordPayload
return Pair.of(deltaCommitsSinceLastCompaction, latestInstantTs); return Pair.of(deltaCommitsSinceLastCompaction, latestInstantTs);
} }
public boolean needCompact(CompactionTriggerStrategy compactionTriggerStrategy) { private boolean needCompact(CompactionTriggerStrategy compactionTriggerStrategy) {
boolean compactable; boolean compactable;
// get deltaCommitsSinceLastCompaction and lastCompactionTs // get deltaCommitsSinceLastCompaction and lastCompactionTs
Pair<Integer, String> latestDeltaCommitInfo = getLatestDeltaCommitInfo(compactionTriggerStrategy); Pair<Integer, String> latestDeltaCommitInfo = getLatestDeltaCommitInfo(compactionTriggerStrategy);
@@ -144,7 +181,7 @@ public class FlinkScheduleCompactionActionExecutor<T extends HoodieRecordPayload
return compactable; return compactable;
} }
public Long parsedToSeconds(String time) { private Long parsedToSeconds(String time) {
long timestamp; long timestamp;
try { try {
timestamp = HoodieActiveTimeline.COMMIT_FORMATTER.parse(time).getTime() / 1000; timestamp = HoodieActiveTimeline.COMMIT_FORMATTER.parse(time).getTime() / 1000;

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.client; package org.apache.hudi.client;
import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.FileSlice;
@@ -58,7 +59,7 @@ import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.HoodieTimelineArchiveLog; import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.FlinkCompactHelpers; import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper; import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.UpgradeDowngrade; import org.apache.hudi.table.upgrade.UpgradeDowngrade;
@@ -346,8 +347,8 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
List<WriteStatus> writeStatuses, List<WriteStatus> writeStatuses,
Option<Map<String, String>> extraMetadata) throws IOException { Option<Map<String, String>> extraMetadata) throws IOException {
HoodieFlinkTable<T> table = getHoodieTable(); HoodieFlinkTable<T> table = getHoodieTable();
HoodieCommitMetadata metadata = FlinkCompactHelpers.newInstance().createCompactionMetadata( HoodieCommitMetadata metadata = CompactHelpers.getInstance().createCompactionMetadata(
table, compactionInstantTime, writeStatuses, config.getSchema()); table, compactionInstantTime, HoodieList.of(writeStatuses), config.getSchema());
extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata)); extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
completeCompaction(metadata, writeStatuses, table, compactionInstantTime); completeCompaction(metadata, writeStatuses, table, compactionInstantTime);
} }
@@ -364,7 +365,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
// commit to data table after committing to metadata table. // commit to data table after committing to metadata table.
finalizeWrite(table, compactionCommitTime, writeStats); finalizeWrite(table, compactionCommitTime, writeStats);
LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata); LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
FlinkCompactHelpers.newInstance().completeInflightCompaction(table, compactionCommitTime, metadata); CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
if (compactionTimer != null) { if (compactionTimer != null) {
long durationInMs = metrics.getDurationInMs(compactionTimer.stop()); long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
@@ -383,7 +384,8 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
protected List<WriteStatus> compact(String compactionInstantTime, boolean shouldComplete) { protected List<WriteStatus> compact(String compactionInstantTime, boolean shouldComplete) {
// only used for metadata table, the compaction happens in single thread // only used for metadata table, the compaction happens in single thread
try { try {
List<WriteStatus> writeStatuses = FlinkCompactHelpers.compact(compactionInstantTime, this); List<WriteStatus> writeStatuses =
getHoodieTable().compact(context, compactionInstantTime).getWriteStatuses();
commitCompaction(compactionInstantTime, writeStatuses, Option.empty()); commitCompaction(compactionInstantTime, writeStatuses, Option.empty());
return writeStatuses; return writeStatuses;
} catch (IOException e) { } catch (IOException e) {

View File

@@ -20,6 +20,10 @@ package org.apache.hudi.client.common;
import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.engine.EngineProperty; import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.engine.TaskContextSupplier;
@@ -32,6 +36,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.RuntimeContext;
import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -72,6 +77,21 @@ public class HoodieFlinkEngineContext extends HoodieEngineContext {
this.runtimeContext = ((FlinkTaskContextSupplier) taskContextSupplier).getFlinkRuntimeContext(); this.runtimeContext = ((FlinkTaskContextSupplier) taskContextSupplier).getFlinkRuntimeContext();
} }
@Override
public HoodieAccumulator newAccumulator() {
return HoodieAtomicLongAccumulator.create();
}
@Override
public <T> HoodieData<T> emptyHoodieData() {
return HoodieList.of(Collections.emptyList());
}
@Override
public <T> HoodieData<T> parallelize(List<T> data) {
return HoodieList.of(data);
}
public RuntimeContext getRuntimeContext() { public RuntimeContext getRuntimeContext() {
return this.runtimeContext; return this.runtimeContext;
} }

View File

@@ -74,7 +74,8 @@ import java.util.Map;
* <p> * <p>
* UPDATES - Produce a new version of the file, just replacing the updated records with new values * UPDATES - Produce a new version of the file, just replacing the updated records with new values
*/ */
public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload> extends HoodieFlinkTable<T> { public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload>
extends HoodieFlinkTable<T> implements HoodieCompactionHandler<T> {
private static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCopyOnWriteTable.class); private static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCopyOnWriteTable.class);
@@ -265,7 +266,8 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload> extends
} }
@Override @Override
public HoodieWriteMetadata<List<WriteStatus>> compact(HoodieEngineContext context, String compactionInstantTime) { public HoodieWriteMetadata<List<WriteStatus>> compact(
HoodieEngineContext context, String compactionInstantTime) {
throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table"); throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table");
} }
@@ -329,9 +331,10 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload> extends
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
// Used for compaction // Used for compaction
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
@Override
public Iterator<List<WriteStatus>> handleUpdate(String instantTime, String partitionPath, String fileId, public Iterator<List<WriteStatus>> handleUpdate(
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException { String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException {
// these are updates // these are updates
HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime, partitionPath, fileId, keyToNewRecords, oldDataFile); HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime, partitionPath, fileId, keyToNewRecords, oldDataFile);
return handleUpdateInternal(upsertHandle, instantTime, fileId); return handleUpdateInternal(upsertHandle, instantTime, fileId);
@@ -366,9 +369,11 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload> extends
} }
} }
public Iterator<List<WriteStatus>> handleInsert(String instantTime, String partitionPath, String fileId, @Override
Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordMap) { public Iterator<List<WriteStatus>> handleInsert(
HoodieCreateHandle<?,?,?,?> createHandle = String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordMap) {
HoodieCreateHandle<?, ?, ?, ?> createHandle =
new HoodieCreateHandle(config, instantTime, this, partitionPath, fileId, recordMap, taskContextSupplier); new HoodieCreateHandle(config, instantTime, this, partitionPath, fileId, recordMap, taskContextSupplier);
createHandle.write(); createHandle.write();
return Collections.singletonList(createHandle.close()).iterator(); return Collections.singletonList(createHandle.close()).iterator();

View File

@@ -30,14 +30,14 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.io.FlinkAppendHandle; import org.apache.hudi.io.FlinkAppendHandle;
import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.delta.FlinkUpsertDeltaCommitActionExecutor; import org.apache.hudi.table.action.commit.delta.FlinkUpsertDeltaCommitActionExecutor;
import org.apache.hudi.table.action.commit.delta.FlinkUpsertPreppedDeltaCommitActionExecutor; import org.apache.hudi.table.action.commit.delta.FlinkUpsertPreppedDeltaCommitActionExecutor;
import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor; import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
import org.apache.hudi.table.action.compact.FlinkScheduleCompactionActionExecutor; import org.apache.hudi.table.action.compact.RunCompactionActionExecutor;
import org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor;
import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor; import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor; import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
@@ -97,15 +97,19 @@ public class HoodieFlinkMergeOnReadTable<T extends HoodieRecordPayload>
HoodieEngineContext context, HoodieEngineContext context,
String instantTime, String instantTime,
Option<Map<String, String>> extraMetadata) { Option<Map<String, String>> extraMetadata) {
BaseScheduleCompactionActionExecutor scheduleCompactionExecutor = new FlinkScheduleCompactionActionExecutor( ScheduleCompactionActionExecutor scheduleCompactionExecutor = new ScheduleCompactionActionExecutor(
context, config, this, instantTime, extraMetadata); context, config, this, instantTime, extraMetadata,
new HoodieFlinkMergeOnReadTableCompactor());
return scheduleCompactionExecutor.execute(); return scheduleCompactionExecutor.execute();
} }
@Override @Override
public HoodieWriteMetadata<List<WriteStatus>> compact(HoodieEngineContext context, String compactionInstantTime) { public HoodieWriteMetadata<List<WriteStatus>> compact(
throw new HoodieNotSupportedException("Compaction is supported as a separate pipeline, " HoodieEngineContext context, String compactionInstantTime) {
+ "should not invoke directly through HoodieFlinkMergeOnReadTable"); RunCompactionActionExecutor compactionExecutor = new RunCompactionActionExecutor(
context, config, this, compactionInstantTime, new HoodieFlinkMergeOnReadTableCompactor(),
new HoodieFlinkCopyOnWriteTable(config, context, getMetaClient()));
return convertMetadata(compactionExecutor.execute());
} }
@Override @Override

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.table;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
@@ -35,12 +36,15 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter; import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import static org.apache.hudi.common.data.HoodieList.getList;
public abstract class HoodieFlinkTable<T extends HoodieRecordPayload> public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>
implements ExplicitWriteHandleTable<T> { implements ExplicitWriteHandleTable<T> {
@@ -87,6 +91,11 @@ public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
return hoodieFlinkTable; return hoodieFlinkTable;
} }
public static HoodieWriteMetadata<List<WriteStatus>> convertMetadata(
HoodieWriteMetadata<HoodieData<WriteStatus>> metadata) {
return metadata.clone(getList(metadata.getWriteStatuses()));
}
@Override @Override
protected HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext context) { protected HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext context) {
return FlinkHoodieIndex.createIndex((HoodieFlinkEngineContext) context, config); return FlinkHoodieIndex.createIndex((HoodieFlinkEngineContext) context, config);

View File

@@ -1,147 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.toList;
/**
* A flink implementation of {@link AbstractCompactHelpers}.
*
* @param <T>
*/
public class FlinkCompactHelpers<T extends HoodieRecordPayload> extends
AbstractCompactHelpers<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
private static final Logger LOG = LoggerFactory.getLogger(FlinkCompactHelpers.class);
private FlinkCompactHelpers() {
}
private static class CompactHelperHolder {
private static final FlinkCompactHelpers FLINK_COMPACT_HELPERS = new FlinkCompactHelpers();
}
public static FlinkCompactHelpers newInstance() {
return CompactHelperHolder.FLINK_COMPACT_HELPERS;
}
@Override
public HoodieCommitMetadata createCompactionMetadata(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
String compactionInstantTime,
List<WriteStatus> writeStatuses,
String schema) throws IOException {
byte[] planBytes = table.getActiveTimeline().readCompactionPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get();
HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeCompactionPlan(planBytes);
List<HoodieWriteStat> updateStatusMap = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
org.apache.hudi.common.model.HoodieCommitMetadata metadata = new org.apache.hudi.common.model.HoodieCommitMetadata(true);
for (HoodieWriteStat stat : updateStatusMap) {
metadata.addWriteStat(stat.getPartitionPath(), stat);
}
metadata.addMetadata(org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY, schema);
if (compactionPlan.getExtraMetadata() != null) {
compactionPlan.getExtraMetadata().forEach(metadata::addMetadata);
}
return metadata;
}
@SuppressWarnings("unchecked, rawtypes")
public static List<WriteStatus> compact(
HoodieFlinkWriteClient writeClient,
String compactInstantTime,
CompactionOperation compactionOperation) throws IOException {
HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor();
return compactor.compact(
new HoodieFlinkCopyOnWriteTable<>(
writeClient.getConfig(),
writeClient.getEngineContext(),
writeClient.getHoodieTable().getMetaClient()),
writeClient.getHoodieTable().getMetaClient(),
writeClient.getConfig(),
compactionOperation,
compactInstantTime);
}
/**
* Called by the metadata table compactor code path.
*/
@SuppressWarnings("unchecked, rawtypes")
public static List<WriteStatus> compact(String compactionInstantTime, HoodieFlinkWriteClient writeClient) throws IOException {
HoodieFlinkTable table = writeClient.getHoodieTable();
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
writeClient.rollbackInflightCompaction(inflightInstant, table);
table.getMetaClient().reloadActiveTimeline();
}
// generate compaction plan
// should support configurable commit metadata
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
table.getMetaClient(), compactionInstantTime);
if (compactionPlan == null || (compactionPlan.getOperations() == null)
|| (compactionPlan.getOperations().isEmpty())) {
// do nothing.
LOG.info("No compaction plan for instant " + compactionInstantTime);
return Collections.emptyList();
} else {
HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
// Mark instant as compaction inflight
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
table.getMetaClient().reloadActiveTimeline();
List<CompactionOperation> operations = compactionPlan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
LOG.info("Compacting " + operations + " files");
List<WriteStatus> writeStatusList = new ArrayList<>();
for (CompactionOperation operation : operations) {
List<WriteStatus> statuses = compact(writeClient, compactionInstantTime, operation);
writeStatusList.addAll(statuses);
}
return writeStatusList;
}
}
}

View File

@@ -18,54 +18,17 @@
package org.apache.hudi.table.action.compact; package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static java.util.stream.Collectors.toList;
/** /**
* Compacts a hoodie table with merge on read storage. Computes all possible compactions, * Compacts a hoodie table with merge on read storage. Computes all possible compactions,
@@ -75,164 +38,21 @@ import static java.util.stream.Collectors.toList;
* <p>Note: the compaction logic is invoked through the flink pipeline. * <p>Note: the compaction logic is invoked through the flink pipeline.
*/ */
@SuppressWarnings("checkstyle:LineLength") @SuppressWarnings("checkstyle:LineLength")
public class HoodieFlinkMergeOnReadTableCompactor<T extends HoodieRecordPayload> implements HoodieCompactor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> { public class HoodieFlinkMergeOnReadTableCompactor<T extends HoodieRecordPayload>
extends HoodieCompactor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(HoodieFlinkMergeOnReadTableCompactor.class);
// Accumulator to keep track of total log files for a table
private AtomicLong totalLogFiles;
// Accumulator to keep track of total log file slices for a table
private AtomicLong totalFileSlices;
@Override @Override
public List<WriteStatus> compact(HoodieEngineContext context, HoodieCompactionPlan compactionPlan, public void preCompact(
HoodieTable hoodieTable, HoodieWriteConfig config, String compactionInstantTime) throws IOException { HoodieTable table, HoodieTimeline pendingCompactionTimeline, String compactionInstantTime) {
throw new UnsupportedOperationException("HoodieFlinkMergeOnReadTableCompactor does not support compact directly, " HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
+ "the function works as a separate pipeline"); if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
} table.rollbackInflightCompaction(inflightInstant);
table.getMetaClient().reloadActiveTimeline();
public List<WriteStatus> compact(HoodieFlinkCopyOnWriteTable hoodieCopyOnWriteTable,
HoodieTableMetaClient metaClient,
HoodieWriteConfig config,
CompactionOperation operation,
String instantTime) throws IOException {
FileSystem fs = metaClient.getFs();
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField());
LOG.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames()
+ " for commit " + instantTime);
// TODO - FIX THIS
// Reads the entire avro file. Always only specific blocks should be read from the avro file
// (failure recover).
// Load all the delta commits since the last compaction commit and get all the blocks to be
// loaded and load it using CompositeAvroLogReader
// Since a DeltaCommit is not defined yet, reading all the records. revisit this soon.
String maxInstantTime = metaClient
.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
.filterCompletedInstants().lastInstant().get().getTimestamp();
// TODO(danny): make it configurable
long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new FlinkTaskContextSupplier(null), config);
LOG.info("MaxMemoryPerCompaction => " + maxMemoryPerCompaction);
List<String> logFiles = operation.getDeltaFileNames().stream().map(
p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString())
.collect(toList());
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(metaClient.getBasePath())
.withLogFilePaths(logFiles)
.withReaderSchema(readerSchema)
.withLatestInstantTime(maxInstantTime)
.withMaxMemorySizeInBytes(maxMemoryPerCompaction)
.withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled())
.withReverseReader(config.getCompactionReverseLogReadEnabled())
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.build();
if (!scanner.iterator().hasNext()) {
return new ArrayList<>();
} }
Option<HoodieBaseFile> oldDataFileOpt =
operation.getBaseFile(metaClient.getBasePath(), operation.getPartitionPath());
// Compacting is very similar to applying updates to existing file
Iterator<List<WriteStatus>> result;
// If the dataFile is present, perform updates else perform inserts into a new base file.
if (oldDataFileOpt.isPresent()) {
result = hoodieCopyOnWriteTable.handleUpdate(instantTime, operation.getPartitionPath(),
operation.getFileId(), scanner.getRecords(),
oldDataFileOpt.get());
} else {
result = hoodieCopyOnWriteTable.handleInsert(instantTime, operation.getPartitionPath(), operation.getFileId(),
scanner.getRecords());
}
Iterable<List<WriteStatus>> resultIterable = () -> result;
return StreamSupport.stream(resultIterable.spliterator(), false).flatMap(Collection::stream).peek(s -> {
s.getStat().setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog());
s.getStat().setTotalLogFilesCompacted(scanner.getTotalLogFiles());
s.getStat().setTotalLogRecords(scanner.getTotalLogRecords());
s.getStat().setPartitionPath(operation.getPartitionPath());
s.getStat()
.setTotalLogSizeCompacted(operation.getMetrics().get(CompactionStrategy.TOTAL_LOG_FILE_SIZE).longValue());
s.getStat().setTotalLogBlocks(scanner.getTotalLogBlocks());
s.getStat().setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks());
s.getStat().setTotalRollbackBlocks(scanner.getTotalRollbacks());
RuntimeStats runtimeStats = new RuntimeStats();
runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks());
s.getStat().setRuntimeStats(runtimeStats);
scanner.close();
}).collect(toList());
} }
@Override @Override
public HoodieCompactionPlan generateCompactionPlan(HoodieEngineContext context, public void maybePersist(HoodieData<WriteStatus> writeStatus, HoodieWriteConfig config) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable, // No OP
HoodieWriteConfig config, String compactionCommitTime,
Set<HoodieFileGroupId> fgIdsInPendingCompactionAndClustering)
throws IOException {
totalLogFiles = new AtomicLong(0);
totalFileSlices = new AtomicLong(0);
ValidationUtils.checkArgument(hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ,
"Can only compact table of type " + HoodieTableType.MERGE_ON_READ + " and not "
+ hoodieTable.getMetaClient().getTableType().name());
// TODO : check if maxMemory is not greater than JVM or flink.executor memory
// TODO - rollback any compactions in flight
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
LOG.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommitTime);
List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), metaClient.getBasePath());
// filter the partition paths if needed to reduce list status
partitionPaths = config.getCompactionStrategy().filterPartitionPaths(config, partitionPaths);
if (partitionPaths.isEmpty()) {
// In case no partitions could be picked, return no compaction plan
return null;
}
SliceView fileSystemView = hoodieTable.getSliceView();
LOG.info("Compaction looking for files to compact in " + partitionPaths + " partitions");
context.setJobStatus(this.getClass().getSimpleName(), "Looking for files to compact");
List<HoodieCompactionOperation> operations = context.flatMap(partitionPaths, partitionPath -> fileSystemView
.getLatestFileSlices(partitionPath)
.filter(slice -> !fgIdsInPendingCompactionAndClustering.contains(slice.getFileGroupId()))
.map(s -> {
List<HoodieLogFile> logFiles =
s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
totalLogFiles.addAndGet(logFiles.size());
totalFileSlices.addAndGet(1L);
// Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO
// for flink Map operations and collecting them finally in Avro generated classes for storing
// into meta files.
Option<HoodieBaseFile> dataFile = s.getBaseFile();
return new CompactionOperation(dataFile, partitionPath, logFiles,
config.getCompactionStrategy().captureMetrics(config, s));
})
.filter(c -> !c.getDeltaFileNames().isEmpty()), partitionPaths.size()).stream().map(CompactionUtils::buildHoodieCompactionOperation).collect(toList());
LOG.info("Total of " + operations.size() + " compactions are retrieved");
LOG.info("Total number of latest files slices " + totalFileSlices.get());
LOG.info("Total number of log files " + totalLogFiles.get());
LOG.info("Total number of file slices " + totalFileSlices.get());
// Filter the compactions with the passed in filter. This lets us choose most effective
// compactions only
HoodieCompactionPlan compactionPlan = config.getCompactionStrategy().generateCompactionPlan(config, operations,
CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList()));
ValidationUtils.checkArgument(
compactionPlan.getOperations().stream().noneMatch(
op -> fgIdsInPendingCompactionAndClustering.contains(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))),
"Bad Compaction Plan. FileId MUST NOT have multiple pending compactions. "
+ "Please fix your strategy implementation. FileIdsWithPendingCompactions :" + fgIdsInPendingCompactionAndClustering
+ ", Selected workload :" + compactionPlan);
if (compactionPlan.getOperations().isEmpty()) {
LOG.warn("After filtering, Nothing to compact for " + metaClient.getBasePath());
}
return compactionPlan;
} }
} }

View File

@@ -21,6 +21,10 @@ package org.apache.hudi.client.common;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.engine.EngineProperty; import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.engine.TaskContextSupplier;
@@ -34,6 +38,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -62,6 +67,21 @@ public class HoodieJavaEngineContext extends HoodieEngineContext {
super(new SerializableConfiguration(conf), taskContextSupplier); super(new SerializableConfiguration(conf), taskContextSupplier);
} }
@Override
public HoodieAccumulator newAccumulator() {
return HoodieAtomicLongAccumulator.create();
}
@Override
public <T> HoodieData<T> emptyHoodieData() {
return HoodieList.of(Collections.emptyList());
}
@Override
public <T> HoodieData<T> parallelize(List<T> data) {
return HoodieList.of(data);
}
@Override @Override
public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) { public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {
return data.stream().parallel().map(throwingMapWrapper(func)).collect(toList()); return data.stream().parallel().map(throwingMapWrapper(func)).collect(toList());

View File

@@ -40,6 +40,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
@@ -51,7 +52,7 @@ import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.SparkCompactHelpers; import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper; import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.UpgradeDowngrade; import org.apache.hudi.table.upgrade.UpgradeDowngrade;
@@ -291,8 +292,8 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
@Override @Override
public void commitCompaction(String compactionInstantTime, JavaRDD<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata) throws IOException { public void commitCompaction(String compactionInstantTime, JavaRDD<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata) throws IOException {
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context); HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
HoodieCommitMetadata metadata = SparkCompactHelpers.newInstance().createCompactionMetadata( HoodieCommitMetadata metadata = CompactHelpers.getInstance().createCompactionMetadata(
table, compactionInstantTime, writeStatuses, config.getSchema()); table, compactionInstantTime, HoodieJavaRDD.of(writeStatuses), config.getSchema());
extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata)); extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
completeCompaction(metadata, writeStatuses, table, compactionInstantTime); completeCompaction(metadata, writeStatuses, table, compactionInstantTime);
} }
@@ -307,7 +308,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
// commit to data table after committing to metadata table. // commit to data table after committing to metadata table.
finalizeWrite(table, compactionCommitTime, writeStats); finalizeWrite(table, compactionCommitTime, writeStats);
LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata); LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);
SparkCompactHelpers.newInstance().completeInflightCompaction(table, compactionCommitTime, metadata); CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
WriteMarkersFactory.get(config.getMarkersType(), table, compactionCommitTime) WriteMarkersFactory.get(config.getMarkersType(), table, compactionCommitTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
if (compactionTimer != null) { if (compactionTimer != null) {
@@ -330,11 +331,12 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
if (pendingCompactionTimeline.containsInstant(inflightInstant)) { if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
rollbackInflightCompaction(inflightInstant, table); table.rollbackInflightCompaction(inflightInstant);
table.getMetaClient().reloadActiveTimeline(); table.getMetaClient().reloadActiveTimeline();
} }
compactionTimer = metrics.getCompactionCtx(); compactionTimer = metrics.getCompactionCtx();
HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = table.compact(context, compactionInstantTime); HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata =
table.compact(context, compactionInstantTime);
JavaRDD<WriteStatus> statuses = compactionMetadata.getWriteStatuses(); JavaRDD<WriteStatus> statuses = compactionMetadata.getWriteStatuses();
if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) { if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) {
completeTableService(TableServiceType.COMPACT, compactionMetadata.getCommitMetadata().get(), statuses, table, compactionInstantTime); completeTableService(TableServiceType.COMPACT, compactionMetadata.getCommitMetadata().get(), statuses, table, compactionInstantTime);

View File

@@ -20,6 +20,8 @@ package org.apache.hudi.client.common;
import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineProperty; import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableBiFunction;
@@ -30,6 +32,8 @@ import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.data.HoodieSparkLongAccumulator;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
@@ -74,6 +78,23 @@ public class HoodieSparkEngineContext extends HoodieEngineContext {
return ((HoodieSparkEngineContext) context).getJavaSparkContext(); return ((HoodieSparkEngineContext) context).getJavaSparkContext();
} }
@Override
public HoodieAccumulator newAccumulator() {
HoodieSparkLongAccumulator accumulator = HoodieSparkLongAccumulator.create();
javaSparkContext.sc().register(accumulator.getAccumulator());
return accumulator;
}
@Override
public <T> HoodieData<T> emptyHoodieData() {
return HoodieJavaRDD.of(javaSparkContext.emptyRDD());
}
@Override
public <T> HoodieData<T> parallelize(List<T> data) {
return HoodieJavaRDD.of(javaSparkContext.parallelize(data, data.size()));
}
@Override @Override
public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) { public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {
return javaSparkContext.parallelize(data, parallelism).map(func::apply).collect(); return javaSparkContext.parallelize(data, parallelism).map(func::apply).collect();

View File

@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.data;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.spark.api.java.JavaRDD;
import java.util.Iterator;
import java.util.List;
/**
* Holds a {@link JavaRDD} of objects.
*
* @param <T> type of object.
*/
public class HoodieJavaRDD<T> extends HoodieData<T> {
private final JavaRDD<T> rddData;
private HoodieJavaRDD(JavaRDD<T> rddData) {
this.rddData = rddData;
}
/**
* @param rddData a {@link JavaRDD} of objects in type T.
* @param <T> type of object.
* @return a new instance containing the {@link JavaRDD<T>} reference.
*/
public static <T> HoodieJavaRDD<T> of(JavaRDD<T> rddData) {
return new HoodieJavaRDD<>(rddData);
}
/**
* @param data a {@link List} of objects in type T.
* @param context {@link HoodieSparkEngineContext} to use.
* @param parallelism parallelism for the {@link JavaRDD<T>}.
* @param <T> type of object.
* @return a new instance containing the {@link JavaRDD<T>} instance.
*/
public static <T> HoodieJavaRDD<T> of(
List<T> data, HoodieSparkEngineContext context, int parallelism) {
return new HoodieJavaRDD<>(context.getJavaSparkContext().parallelize(data, parallelism));
}
/**
* @param hoodieData {@link HoodieJavaRDD <T>} instance containing the {@link JavaRDD} of objects.
* @param <T> type of object.
* @return the a {@link JavaRDD} of objects in type T.
*/
public static <T> JavaRDD<T> getJavaRDD(HoodieData<T> hoodieData) {
return ((HoodieJavaRDD<T>) hoodieData).get();
}
@Override
public JavaRDD<T> get() {
return rddData;
}
@Override
public boolean isEmpty() {
return rddData.isEmpty();
}
@Override
public <O> HoodieData<O> map(SerializableFunction<T, O> func) {
return HoodieJavaRDD.of(rddData.map(func::apply));
}
@Override
public <O> HoodieData<O> flatMap(SerializableFunction<T, Iterator<O>> func) {
return HoodieJavaRDD.of(rddData.flatMap(func::apply));
}
@Override
public List<T> collectAsList() {
return rddData.collect();
}
}

View File

@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.data;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.spark.util.AccumulatorV2;
import org.apache.spark.util.LongAccumulator;
/**
* An accumulator on counts based on Spark {@link AccumulatorV2} implementation.
*/
public class HoodieSparkLongAccumulator extends HoodieAccumulator {
private final AccumulatorV2<Long, Long> accumulator;
private HoodieSparkLongAccumulator() {
accumulator = new LongAccumulator();
}
public static HoodieSparkLongAccumulator create() {
return new HoodieSparkLongAccumulator();
}
@Override
public long value() {
return accumulator.value();
}
@Override
public void add(long increment) {
accumulator.add(increment);
}
public AccumulatorV2<Long, Long> getAccumulator() {
return accumulator;
}
}

View File

@@ -87,7 +87,8 @@ import java.util.Map;
* <p> * <p>
* UPDATES - Produce a new version of the file, just replacing the updated records with new values * UPDATES - Produce a new version of the file, just replacing the updated records with new values
*/ */
public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload> extends HoodieSparkTable<T> { public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
extends HoodieSparkTable<T> implements HoodieCompactionHandler<T> {
private static final Logger LOG = LogManager.getLogger(HoodieSparkCopyOnWriteTable.class); private static final Logger LOG = LogManager.getLogger(HoodieSparkCopyOnWriteTable.class);
@@ -157,7 +158,8 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload> extends
} }
@Override @Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> compact(HoodieEngineContext context, String compactionInstantTime) { public HoodieWriteMetadata<JavaRDD<WriteStatus>> compact(
HoodieEngineContext context, String compactionInstantTime) {
throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table"); throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table");
} }
@@ -191,20 +193,22 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload> extends
@Override @Override
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context, public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context,
String instantTime, String instantTime,
HoodieInstant instantToRollback, boolean skipTimelinePublish) { HoodieInstant instantToRollback, boolean skipTimelinePublish) {
return new BaseRollbackPlanActionExecutor<>(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute(); return new BaseRollbackPlanActionExecutor<>(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute();
} }
public Iterator<List<WriteStatus>> handleUpdate(String instantTime, String partitionPath, String fileId, @Override
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException { public Iterator<List<WriteStatus>> handleUpdate(
String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException {
// these are updates // these are updates
HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime, partitionPath, fileId, keyToNewRecords, oldDataFile); HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime, partitionPath, fileId, keyToNewRecords, oldDataFile);
return handleUpdateInternal(upsertHandle, instantTime, fileId); return handleUpdateInternal(upsertHandle, instantTime, fileId);
} }
protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?,?,?,?> upsertHandle, String instantTime, protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String instantTime,
String fileId) throws IOException { String fileId) throws IOException {
if (upsertHandle.getOldFilePath() == null) { if (upsertHandle.getOldFilePath() == null) {
throw new HoodieUpsertException( throw new HoodieUpsertException(
"Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId); "Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
@@ -241,9 +245,11 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload> extends
} }
} }
public Iterator<List<WriteStatus>> handleInsert(String instantTime, String partitionPath, String fileId, @Override
Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordMap) { public Iterator<List<WriteStatus>> handleInsert(
HoodieCreateHandle<?,?,?,?> createHandle = String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordMap) {
HoodieCreateHandle<?, ?, ?, ?> createHandle =
new HoodieCreateHandle(config, instantTime, this, partitionPath, fileId, recordMap, taskContextSupplier); new HoodieCreateHandle(config, instantTime, this, partitionPath, fileId, recordMap, taskContextSupplier);
createHandle.write(); createHandle.write();
return Collections.singletonList(createHandle.close()).iterator(); return Collections.singletonList(createHandle.close()).iterator();

View File

@@ -39,9 +39,9 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.SparkBootstrapDeltaCommitActionExecutor; import org.apache.hudi.table.action.bootstrap.SparkBootstrapDeltaCommitActionExecutor;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata; import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor; import org.apache.hudi.table.action.compact.HoodieSparkMergeOnReadTableCompactor;
import org.apache.hudi.table.action.compact.SparkRunCompactionActionExecutor; import org.apache.hudi.table.action.compact.RunCompactionActionExecutor;
import org.apache.hudi.table.action.compact.SparkScheduleCompactionActionExecutor; import org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor;
import org.apache.hudi.table.action.deltacommit.SparkBulkInsertDeltaCommitActionExecutor; import org.apache.hudi.table.action.deltacommit.SparkBulkInsertDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.SparkBulkInsertPreppedDeltaCommitActionExecutor; import org.apache.hudi.table.action.deltacommit.SparkBulkInsertPreppedDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor; import org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor;
@@ -123,15 +123,19 @@ public class HoodieSparkMergeOnReadTable<T extends HoodieRecordPayload> extends
@Override @Override
public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) { public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) {
BaseScheduleCompactionActionExecutor scheduleCompactionExecutor = new SparkScheduleCompactionActionExecutor( ScheduleCompactionActionExecutor scheduleCompactionExecutor = new ScheduleCompactionActionExecutor(
context, config, this, instantTime, extraMetadata); context, config, this, instantTime, extraMetadata,
new HoodieSparkMergeOnReadTableCompactor());
return scheduleCompactionExecutor.execute(); return scheduleCompactionExecutor.execute();
} }
@Override @Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> compact(HoodieEngineContext context, String compactionInstantTime) { public HoodieWriteMetadata<JavaRDD<WriteStatus>> compact(
SparkRunCompactionActionExecutor compactionExecutor = new SparkRunCompactionActionExecutor((HoodieSparkEngineContext) context, config, this, compactionInstantTime); HoodieEngineContext context, String compactionInstantTime) {
return compactionExecutor.execute(); RunCompactionActionExecutor compactionExecutor = new RunCompactionActionExecutor(
context, config, this, compactionInstantTime, new HoodieSparkMergeOnReadTableCompactor(),
new HoodieSparkCopyOnWriteTable(config, context, getMetaClient()));
return convertMetadata(compactionExecutor.execute());
} }
@Override @Override

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.table;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
@@ -35,12 +36,15 @@ import org.apache.hudi.index.SparkHoodieIndex;
import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import java.io.IOException; import java.io.IOException;
import static org.apache.hudi.data.HoodieJavaRDD.getJavaRDD;
public abstract class HoodieSparkTable<T extends HoodieRecordPayload> public abstract class HoodieSparkTable<T extends HoodieRecordPayload>
extends HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> { extends HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
@@ -91,6 +95,11 @@ public abstract class HoodieSparkTable<T extends HoodieRecordPayload>
return hoodieSparkTable; return hoodieSparkTable;
} }
public static HoodieWriteMetadata<JavaRDD<WriteStatus>> convertMetadata(
HoodieWriteMetadata<HoodieData<WriteStatus>> metadata) {
return metadata.clone(getJavaRDD(metadata.getWriteStatuses()));
}
@Override @Override
protected HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext context) { protected HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext context) {
return SparkHoodieIndex.createIndex(config); return SparkHoodieIndex.createIndex(config);

View File

@@ -18,248 +18,41 @@
package org.apache.hudi.table.action.compact; package org.apache.hudi.table.action.compact;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.utils.SparkMemoryUtils;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.IOUtils; import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.util.AccumulatorV2;
import org.apache.spark.util.LongAccumulator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static java.util.stream.Collectors.toList;
/** /**
* Compacts a hoodie table with merge on read storage. Computes all possible compactions, * Compacts a hoodie table with merge on read storage. Computes all possible compactions,
* passes it through a CompactionFilter and executes all the compactions and writes a new version of base files and make * passes it through a CompactionFilter and executes all the compactions and writes a new version of base files and make
* a normal commit * a normal commit
*
*/ */
@SuppressWarnings("checkstyle:LineLength") @SuppressWarnings("checkstyle:LineLength")
public class HoodieSparkMergeOnReadTableCompactor<T extends HoodieRecordPayload> implements HoodieCompactor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> { public class HoodieSparkMergeOnReadTableCompactor<T extends HoodieRecordPayload>
extends HoodieCompactor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(HoodieSparkMergeOnReadTableCompactor.class);
// Accumulator to keep track of total log files for a table
private AccumulatorV2<Long, Long> totalLogFiles;
// Accumulator to keep track of total log file slices for a table
private AccumulatorV2<Long, Long> totalFileSlices;
@Override @Override
public JavaRDD<WriteStatus> compact(HoodieEngineContext context, HoodieCompactionPlan compactionPlan, public void preCompact(
HoodieTable hoodieTable, HoodieWriteConfig config, String compactionInstantTime) throws IOException { HoodieTable table, HoodieTimeline pendingCompactionTimeline, String compactionInstantTime) {
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context); HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
if (compactionPlan == null || (compactionPlan.getOperations() == null) if (!pendingCompactionTimeline.containsInstant(instant)) {
|| (compactionPlan.getOperations().isEmpty())) { throw new IllegalStateException(
return jsc.emptyRDD(); "No Compaction request available at " + compactionInstantTime + " to run compaction");
} }
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
// Here we firstly use the table schema as the reader schema to read
// log file.That is because in the case of MergeInto, the config.getSchema may not
// the same with the table schema.
try {
Schema readerSchema = schemaUtil.getTableAvroSchema(false);
config.setSchema(readerSchema.toString());
} catch (Exception e) {
// If there is no commit in the table, just ignore the exception.
}
// Compacting is very similar to applying updates to existing file
HoodieSparkCopyOnWriteTable table = new HoodieSparkCopyOnWriteTable(config, context, metaClient);
List<CompactionOperation> operations = compactionPlan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
LOG.info("Compactor compacting " + operations + " files");
context.setJobStatus(this.getClass().getSimpleName(), "Compacting file slices");
return jsc.parallelize(operations, operations.size())
.map(s -> compact(table, metaClient, config, s, compactionInstantTime)).flatMap(List::iterator);
}
private List<WriteStatus> compact(HoodieSparkCopyOnWriteTable hoodieCopyOnWriteTable, HoodieTableMetaClient metaClient,
HoodieWriteConfig config, CompactionOperation operation, String instantTime) throws IOException {
FileSystem fs = metaClient.getFs();
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
LOG.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames()
+ " for commit " + instantTime);
// TODO - FIX THIS
// Reads the entire avro file. Always only specific blocks should be read from the avro file
// (failure recover).
// Load all the delta commits since the last compaction commit and get all the blocks to be
// loaded and load it using CompositeAvroLogReader
// Since a DeltaCommit is not defined yet, reading all the records. revisit this soon.
String maxInstantTime = metaClient
.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
.filterCompletedInstants().lastInstant().get().getTimestamp();
long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new SparkTaskContextSupplier(), config);
LOG.info("MaxMemoryPerCompaction => " + maxMemoryPerCompaction);
List<String> logFiles = operation.getDeltaFileNames().stream().map(
p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString())
.collect(toList());
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(metaClient.getBasePath())
.withLogFilePaths(logFiles)
.withReaderSchema(readerSchema)
.withLatestInstantTime(maxInstantTime)
.withMaxMemorySizeInBytes(maxMemoryPerCompaction)
.withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled())
.withReverseReader(config.getCompactionReverseLogReadEnabled())
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.build();
if (!scanner.iterator().hasNext()) {
return new ArrayList<>();
}
Option<HoodieBaseFile> oldDataFileOpt =
operation.getBaseFile(metaClient.getBasePath(), operation.getPartitionPath());
// Compacting is very similar to applying updates to existing file
Iterator<List<WriteStatus>> result;
// If the dataFile is present, perform updates else perform inserts into a new base file.
if (oldDataFileOpt.isPresent()) {
result = hoodieCopyOnWriteTable.handleUpdate(instantTime, operation.getPartitionPath(),
operation.getFileId(), scanner.getRecords(),
oldDataFileOpt.get());
} else {
result = hoodieCopyOnWriteTable.handleInsert(instantTime, operation.getPartitionPath(), operation.getFileId(),
scanner.getRecords());
}
Iterable<List<WriteStatus>> resultIterable = () -> result;
return StreamSupport.stream(resultIterable.spliterator(), false).flatMap(Collection::stream).peek(s -> {
s.getStat().setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog());
s.getStat().setTotalLogFilesCompacted(scanner.getTotalLogFiles());
s.getStat().setTotalLogRecords(scanner.getTotalLogRecords());
s.getStat().setPartitionPath(operation.getPartitionPath());
s.getStat()
.setTotalLogSizeCompacted(operation.getMetrics().get(CompactionStrategy.TOTAL_LOG_FILE_SIZE).longValue());
s.getStat().setTotalLogBlocks(scanner.getTotalLogBlocks());
s.getStat().setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks());
s.getStat().setTotalRollbackBlocks(scanner.getTotalRollbacks());
RuntimeStats runtimeStats = new RuntimeStats();
runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks());
s.getStat().setRuntimeStats(runtimeStats);
scanner.close();
}).collect(toList());
} }
@Override @Override
public HoodieCompactionPlan generateCompactionPlan(HoodieEngineContext context, public void maybePersist(HoodieData<WriteStatus> writeStatus, HoodieWriteConfig config) {
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> hoodieTable, HoodieJavaRDD.getJavaRDD(writeStatus).persist(SparkMemoryUtils.getWriteStatusStorageLevel(config.getProps()));
HoodieWriteConfig config, String compactionCommitTime,
Set<HoodieFileGroupId> fgIdsInPendingCompactionAndClustering)
throws IOException {
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
totalLogFiles = new LongAccumulator();
totalFileSlices = new LongAccumulator();
jsc.sc().register(totalLogFiles);
jsc.sc().register(totalFileSlices);
ValidationUtils.checkArgument(hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ,
"Can only compact table of type " + HoodieTableType.MERGE_ON_READ + " and not "
+ hoodieTable.getMetaClient().getTableType().name());
// TODO : check if maxMemory is not greater than JVM or spark.executor memory
// TODO - rollback any compactions in flight
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
LOG.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommitTime);
List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), metaClient.getBasePath());
// filter the partition paths if needed to reduce list status
partitionPaths = config.getCompactionStrategy().filterPartitionPaths(config, partitionPaths);
if (partitionPaths.isEmpty()) {
// In case no partitions could be picked, return no compaction plan
return null;
}
SliceView fileSystemView = hoodieTable.getSliceView();
LOG.info("Compaction looking for files to compact in " + partitionPaths + " partitions");
context.setJobStatus(this.getClass().getSimpleName(), "Looking for files to compact");
List<HoodieCompactionOperation> operations = context.flatMap(partitionPaths, partitionPath -> {
return fileSystemView
.getLatestFileSlices(partitionPath)
.filter(slice -> !fgIdsInPendingCompactionAndClustering.contains(slice.getFileGroupId()))
.map(s -> {
List<HoodieLogFile> logFiles =
s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
totalLogFiles.add((long) logFiles.size());
totalFileSlices.add(1L);
// Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO
// for spark Map operations and collecting them finally in Avro generated classes for storing
// into meta files.
Option<HoodieBaseFile> dataFile = s.getBaseFile();
return new CompactionOperation(dataFile, partitionPath, logFiles,
config.getCompactionStrategy().captureMetrics(config, s));
})
.filter(c -> !c.getDeltaFileNames().isEmpty());
}, partitionPaths.size()).stream().map(CompactionUtils::buildHoodieCompactionOperation).collect(toList());
LOG.info("Total of " + operations.size() + " compactions are retrieved");
LOG.info("Total number of latest files slices " + totalFileSlices.value());
LOG.info("Total number of log files " + totalLogFiles.value());
LOG.info("Total number of file slices " + totalFileSlices.value());
// Filter the compactions with the passed in filter. This lets us choose most effective
// compactions only
HoodieCompactionPlan compactionPlan = config.getCompactionStrategy().generateCompactionPlan(config, operations,
CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList()));
ValidationUtils.checkArgument(
compactionPlan.getOperations().stream().noneMatch(
op -> fgIdsInPendingCompactionAndClustering.contains(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))),
"Bad Compaction Plan. FileId MUST NOT have multiple pending compactions. "
+ "Please fix your strategy implementation. FileIdsWithPendingCompactions :" + fgIdsInPendingCompactionAndClustering
+ ", Selected workload :" + compactionPlan);
if (compactionPlan.getOperations().isEmpty()) {
LOG.warn("After filtering, Nothing to compact for " + metaClient.getBasePath());
}
return compactionPlan;
} }
} }

View File

@@ -1,75 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaRDD;
import java.io.IOException;
import java.util.List;
/**
* A spark implementation of {@link AbstractCompactHelpers}.
*
* @param <T>
*/
public class SparkCompactHelpers<T extends HoodieRecordPayload> extends
AbstractCompactHelpers<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
private SparkCompactHelpers() {
}
private static class CompactHelperHolder {
private static final SparkCompactHelpers SPARK_COMPACT_HELPERS = new SparkCompactHelpers();
}
public static SparkCompactHelpers newInstance() {
return CompactHelperHolder.SPARK_COMPACT_HELPERS;
}
@Override
public HoodieCommitMetadata createCompactionMetadata(HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
String compactionInstantTime,
JavaRDD<WriteStatus> writeStatuses,
String schema) throws IOException {
byte[] planBytes = table.getActiveTimeline().readCompactionPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get();
HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeCompactionPlan(planBytes);
List<HoodieWriteStat> updateStatusMap = writeStatuses.map(WriteStatus::getStat).collect();
org.apache.hudi.common.model.HoodieCommitMetadata metadata = new org.apache.hudi.common.model.HoodieCommitMetadata(true);
for (HoodieWriteStat stat : updateStatusMap) {
metadata.addWriteStat(stat.getPartitionPath(), stat);
}
metadata.addMetadata(org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY, schema);
if (compactionPlan.getExtraMetadata() != null) {
compactionPlan.getExtraMetadata().forEach(metadata::addMetadata);
}
return metadata;
}
}

View File

@@ -1,152 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import java.io.IOException;
import java.text.ParseException;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@SuppressWarnings("checkstyle:LineLength")
public class SparkScheduleCompactionActionExecutor<T extends HoodieRecordPayload> extends
BaseScheduleCompactionActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(SparkScheduleCompactionActionExecutor.class);
public SparkScheduleCompactionActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
String instantTime,
Option<Map<String, String>> extraMetadata) {
super(context, config, table, instantTime, extraMetadata);
}
@Override
protected HoodieCompactionPlan scheduleCompaction() {
LOG.info("Checking if compaction needs to be run on " + config.getBasePath());
// judge if we need to compact according to num delta commits and time elapsed
boolean compactable = needCompact(config.getInlineCompactTriggerStrategy());
if (compactable) {
LOG.info("Generating compaction plan for merge on read table " + config.getBasePath());
HoodieSparkMergeOnReadTableCompactor compactor = new HoodieSparkMergeOnReadTableCompactor();
try {
SyncableFileSystemView fileSystemView = (SyncableFileSystemView) table.getSliceView();
Set<HoodieFileGroupId> fgInPendingCompactionAndClustering = fileSystemView.getPendingCompactionOperations()
.map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId())
.collect(Collectors.toSet());
// exclude files in pending clustering from compaction.
fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
return compactor.generateCompactionPlan(context, table, config, instantTime, fgInPendingCompactionAndClustering);
} catch (IOException e) {
throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e);
}
}
return new HoodieCompactionPlan();
}
public Pair<Integer, String> getLatestDeltaCommitInfo(CompactionTriggerStrategy compactionTriggerStrategy) {
Option<HoodieInstant> lastCompaction = table.getActiveTimeline().getCommitTimeline()
.filterCompletedInstants().lastInstant();
HoodieTimeline deltaCommits = table.getActiveTimeline().getDeltaCommitTimeline();
String latestInstantTs;
int deltaCommitsSinceLastCompaction = 0;
if (lastCompaction.isPresent()) {
latestInstantTs = lastCompaction.get().getTimestamp();
deltaCommitsSinceLastCompaction = deltaCommits.findInstantsAfter(latestInstantTs, Integer.MAX_VALUE).countInstants();
} else {
latestInstantTs = deltaCommits.firstInstant().get().getTimestamp();
deltaCommitsSinceLastCompaction = deltaCommits.findInstantsAfterOrEquals(latestInstantTs, Integer.MAX_VALUE).countInstants();
}
return Pair.of(deltaCommitsSinceLastCompaction, latestInstantTs);
}
public boolean needCompact(CompactionTriggerStrategy compactionTriggerStrategy) {
boolean compactable;
// get deltaCommitsSinceLastCompaction and lastCompactionTs
Pair<Integer, String> latestDeltaCommitInfo = getLatestDeltaCommitInfo(compactionTriggerStrategy);
int inlineCompactDeltaCommitMax = config.getInlineCompactDeltaCommitMax();
int inlineCompactDeltaSecondsMax = config.getInlineCompactDeltaSecondsMax();
switch (compactionTriggerStrategy) {
case NUM_COMMITS:
compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft();
if (compactable) {
LOG.info(String.format("The delta commits >= %s, trigger compaction scheduler.", inlineCompactDeltaCommitMax));
}
break;
case TIME_ELAPSED:
compactable = inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - parsedToSeconds(latestDeltaCommitInfo.getRight());
if (compactable) {
LOG.info(String.format("The elapsed time >=%ss, trigger compaction scheduler.", inlineCompactDeltaSecondsMax));
}
break;
case NUM_OR_TIME:
compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft()
|| inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - parsedToSeconds(latestDeltaCommitInfo.getRight());
if (compactable) {
LOG.info(String.format("The delta commits >= %s or elapsed_time >=%ss, trigger compaction scheduler.", inlineCompactDeltaCommitMax,
inlineCompactDeltaSecondsMax));
}
break;
case NUM_AND_TIME:
compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft()
&& inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - parsedToSeconds(latestDeltaCommitInfo.getRight());
if (compactable) {
LOG.info(String.format("The delta commits >= %s and elapsed_time >=%ss, trigger compaction scheduler.", inlineCompactDeltaCommitMax,
inlineCompactDeltaSecondsMax));
}
break;
default:
throw new HoodieCompactionException("Unsupported compaction trigger strategy: " + config.getInlineCompactTriggerStrategy());
}
return compactable;
}
public Long parsedToSeconds(String time) {
long timestamp;
try {
timestamp = HoodieActiveTimeline.COMMIT_FORMATTER.parse(time).getTime() / 1000;
} catch (ParseException e) {
throw new HoodieCompactionException(e.getMessage(), e);
}
return timestamp;
}
}

View File

@@ -89,8 +89,8 @@ public class TestAsyncCompaction extends CompactionTestBase {
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient); HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient);
client.rollbackInflightCompaction( hoodieTable.rollbackInflightCompaction(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), hoodieTable); new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime));
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
pendingCompactionInstant = metaClient.getCommitsAndCompactionTimeline().filterPendingCompactionTimeline() pendingCompactionInstant = metaClient.getCommitsAndCompactionTimeline().filterPendingCompactionTimeline()
.getInstants().findFirst().get(); .getInstants().findFirst().get();

View File

@@ -208,7 +208,8 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
String compactionInstantTime = "102"; String compactionInstantTime = "102";
table.scheduleCompaction(context, compactionInstantTime, Option.empty()); table.scheduleCompaction(context, compactionInstantTime, Option.empty());
table.getMetaClient().reloadActiveTimeline(); table.getMetaClient().reloadActiveTimeline();
JavaRDD<WriteStatus> result = (JavaRDD<WriteStatus>) table.compact(context, compactionInstantTime).getWriteStatuses(); JavaRDD<WriteStatus> result = (JavaRDD<WriteStatus>) table.compact(
context, compactionInstantTime).getWriteStatuses();
// Verify that all partition paths are present in the WriteStatus result // Verify that all partition paths are present in the WriteStatus result
for (String partitionPath : dataGen.getPartitionPaths()) { for (String partitionPath : dataGen.getPartitionPaths()) {

View File

@@ -279,8 +279,8 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
final String compactedCommitTime = metaClient.getActiveTimeline().reload().lastInstant().get().getTimestamp(); final String compactedCommitTime = metaClient.getActiveTimeline().reload().lastInstant().get().getTimestamp();
assertTrue(Arrays.stream(listAllBaseFilesInPath(hoodieTable)) assertTrue(Arrays.stream(listAllBaseFilesInPath(hoodieTable))
.anyMatch(file -> compactedCommitTime.equals(new HoodieBaseFile(file).getCommitTime()))); .anyMatch(file -> compactedCommitTime.equals(new HoodieBaseFile(file).getCommitTime())));
thirdClient.rollbackInflightCompaction(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactedCommitTime), hoodieTable.rollbackInflightCompaction(new HoodieInstant(
hoodieTable); HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactedCommitTime));
allFiles = listAllBaseFilesInPath(hoodieTable); allFiles = listAllBaseFilesInPath(hoodieTable);
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
@@ -611,7 +611,8 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
//writeClient.commitCompaction(newCommitTime, statuses, Option.empty()); //writeClient.commitCompaction(newCommitTime, statuses, Option.empty());
// Trigger a rollback of compaction // Trigger a rollback of compaction
table.getActiveTimeline().reload(); table.getActiveTimeline().reload();
writeClient.rollbackInflightCompaction(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, newCommitTime), table); table.rollbackInflightCompaction(new HoodieInstant(
HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, newCommitTime));
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieSparkTable.create(config, context(), metaClient); table = HoodieSparkTable.create(config, context(), metaClient);
@@ -619,7 +620,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
((SyncableFileSystemView) tableRTFileSystemView).reset(); ((SyncableFileSystemView) tableRTFileSystemView).reset();
for (String partitionPath : dataGen.getPartitionPaths()) { for (String partitionPath : dataGen.getPartitionPaths()) {
List<FileSlice> fileSlices = getFileSystemViewWithUnCommittedSlices(metaClient) List<FileSlice> fileSlices = getFileSystemViewWithUnCommittedSlices(metaClient)
.getAllFileSlices(partitionPath).filter(fs -> fs.getBaseInstantTime().equals("100")).collect(Collectors.toList()); .getAllFileSlices(partitionPath).filter(fs -> fs.getBaseInstantTime().equals("100")).collect(Collectors.toList());
assertTrue(fileSlices.stream().noneMatch(fileSlice -> fileSlice.getBaseFile().isPresent())); assertTrue(fileSlices.stream().noneMatch(fileSlice -> fileSlice.getBaseFile().isPresent()));
assertTrue(fileSlices.stream().anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0)); assertTrue(fileSlices.stream().anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0));

View File

@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.common.data;
import java.io.Serializable;
/**
* An abstraction for accumulator on counts.
*/
public abstract class HoodieAccumulator implements Serializable {
/**
* @return the count.
*/
public abstract long value();
/**
* Increments the count based on the input.
*
* @param increment the value to add.
*/
public abstract void add(long increment);
}

View File

@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.common.data;
import java.util.concurrent.atomic.AtomicLong;
/**
* An accumulator on counts based on {@link AtomicLong} implementation.
*/
public class HoodieAtomicLongAccumulator extends HoodieAccumulator {
private final AtomicLong accumulator;
private HoodieAtomicLongAccumulator() {
accumulator = new AtomicLong(0L);
}
public static HoodieAtomicLongAccumulator create() {
return new HoodieAtomicLongAccumulator();
}
@Override
public long value() {
return accumulator.get();
}
@Override
public void add(long increment) {
accumulator.addAndGet(increment);
}
}

View File

@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.common.data;
import org.apache.hudi.common.function.SerializableFunction;
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
/**
* An abstraction for a data collection of objects in type T to store the reference
* and do transformation.
*
* @param <T> type of object.
*/
public abstract class HoodieData<T> implements Serializable {
/**
* @return the collection of objects.
*/
public abstract Object get();
/**
* @return whether the collection is empty.
*/
public abstract boolean isEmpty();
/**
* @param func serializable map function.
* @param <O> output object type.
* @return {@link HoodieData<O>} containing the result. Actual execution may be deferred.
*/
public abstract <O> HoodieData<O> map(SerializableFunction<T, O> func);
/**
* @param func serializable flatmap function.
* @param <O> output object type.
* @return {@link HoodieData<O>} containing the result. Actual execution may be deferred.
*/
public abstract <O> HoodieData<O> flatMap(SerializableFunction<T, Iterator<O>> func);
/**
* @return collected results in {@link List<T>}.
*/
public abstract List<T> collectAsList();
}

View File

@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.common.data;
import org.apache.hudi.common.function.SerializableFunction;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper;
/**
* Holds a {@link List} of objects.
*
* @param <T> type of object.
*/
public class HoodieList<T> extends HoodieData<T> {
private final List<T> listData;
private HoodieList(List<T> listData) {
this.listData = listData;
}
/**
* @param listData a {@link List} of objects in type T.
* @param <T> type of object.
* @return a new instance containing the {@link List<T>} reference.
*/
public static <T> HoodieList<T> of(List<T> listData) {
return new HoodieList<>(listData);
}
/**
* @param hoodieData {@link HoodieList <T>} instance containing the {@link List} of objects.
* @param <T> type of object.
* @return the a {@link List} of objects in type T.
*/
public static <T> List<T> getList(HoodieData<T> hoodieData) {
return ((HoodieList<T>) hoodieData).get();
}
@Override
public List<T> get() {
return listData;
}
@Override
public boolean isEmpty() {
return listData.isEmpty();
}
@Override
public <O> HoodieData<O> map(SerializableFunction<T, O> func) {
return HoodieList.of(listData.stream().parallel()
.map(throwingMapWrapper(func)).collect(Collectors.toList()));
}
@Override
public <O> HoodieData<O> flatMap(SerializableFunction<T, Iterator<O>> func) {
Function<T, Iterator<O>> throwableFunc = throwingMapWrapper(func);
return HoodieList.of(listData.stream().flatMap(e -> {
List<O> result = new ArrayList<>();
Iterator<O> iterator = throwableFunc.apply(e);
iterator.forEachRemaining(result::add);
return result.stream();
}).collect(Collectors.toList()));
}
@Override
public List<T> collectAsList() {
return listData;
}
}

View File

@@ -19,6 +19,8 @@
package org.apache.hudi.common.engine; package org.apache.hudi.common.engine;
import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer; import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction; import org.apache.hudi.common.function.SerializableFunction;
@@ -59,6 +61,12 @@ public abstract class HoodieEngineContext {
return taskContextSupplier; return taskContextSupplier;
} }
public abstract HoodieAccumulator newAccumulator();
public abstract <T> HoodieData<T> emptyHoodieData();
public abstract <T> HoodieData<T> parallelize(List<T> data);
public abstract <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism); public abstract <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism);
public abstract <I, K, V> List<V> mapToPairAndReduceByKey( public abstract <I, K, V> List<V> mapToPairAndReduceByKey(

View File

@@ -21,6 +21,10 @@ package org.apache.hudi.common.engine;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer; import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction; import org.apache.hudi.common.function.SerializableFunction;
@@ -31,6 +35,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -59,6 +64,21 @@ public final class HoodieLocalEngineContext extends HoodieEngineContext {
super(new SerializableConfiguration(conf), taskContextSupplier); super(new SerializableConfiguration(conf), taskContextSupplier);
} }
@Override
public HoodieAccumulator newAccumulator() {
return HoodieAtomicLongAccumulator.create();
}
@Override
public <T> HoodieData<T> emptyHoodieData() {
return HoodieList.of(Collections.emptyList());
}
@Override
public <T> HoodieData<T> parallelize(List<T> data) {
return HoodieList.of(data);
}
@Override @Override
public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) { public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {
return data.stream().parallel().map(throwingMapWrapper(func)).collect(toList()); return data.stream().parallel().map(throwingMapWrapper(func)).collect(toList());

View File

@@ -22,7 +22,8 @@ import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.table.action.compact.FlinkCompactHelpers; import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.annotation.VisibleForTesting;
@@ -98,7 +99,17 @@ public class CompactFunction extends ProcessFunction<CompactionPlanEvent, Compac
} }
private void doCompaction(String instantTime, CompactionOperation compactionOperation, Collector<CompactionCommitEvent> collector) throws IOException { private void doCompaction(String instantTime, CompactionOperation compactionOperation, Collector<CompactionCommitEvent> collector) throws IOException {
List<WriteStatus> writeStatuses = FlinkCompactHelpers.compact(writeClient, instantTime, compactionOperation); HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor();
List<WriteStatus> writeStatuses = compactor.compact(
new HoodieFlinkCopyOnWriteTable<>(
writeClient.getConfig(),
writeClient.getEngineContext(),
writeClient.getHoodieTable().getMetaClient()),
writeClient.getHoodieTable().getMetaClient(),
writeClient.getConfig(),
compactionOperation,
instantTime,
writeClient.getHoodieTable().getTaskContextSupplier());
collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), writeStatuses, taskID)); collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), writeStatuses, taskID));
} }

View File

@@ -84,7 +84,7 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
public void notifyCheckpointComplete(long checkpointId) { public void notifyCheckpointComplete(long checkpointId) {
try { try {
table.getMetaClient().reloadActiveTimeline(); table.getMetaClient().reloadActiveTimeline();
CompactionUtil.rollbackCompaction(table, writeClient, conf); CompactionUtil.rollbackCompaction(table, conf);
scheduleCompaction(table, checkpointId); scheduleCompaction(table, checkpointId);
} catch (Throwable throwable) { } catch (Throwable throwable) {
// make it fail-safe // make it fail-safe

View File

@@ -99,7 +99,7 @@ public class HoodieFlinkCompactor {
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
if (timeline.containsInstant(inflightInstant)) { if (timeline.containsInstant(inflightInstant)) {
LOG.info("Rollback inflight compaction instant: [" + compactionInstantTime + "]"); LOG.info("Rollback inflight compaction instant: [" + compactionInstantTime + "]");
writeClient.rollbackInflightCompaction(inflightInstant, table); table.rollbackInflightCompaction(inflightInstant);
table.getMetaClient().reloadActiveTimeline(); table.getMetaClient().reloadActiveTimeline();
} }

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.util; package org.apache.hudi.util;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.TableSchemaResolver;
@@ -111,7 +110,7 @@ public class CompactionUtil {
} }
} }
public static void rollbackCompaction(HoodieFlinkTable<?> table, HoodieFlinkWriteClient writeClient, Configuration conf) { public static void rollbackCompaction(HoodieFlinkTable<?> table, Configuration conf) {
String curInstantTime = HoodieActiveTimeline.createNewInstantTime(); String curInstantTime = HoodieActiveTimeline.createNewInstantTime();
int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS); int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS);
HoodieTimeline inflightCompactionTimeline = table.getActiveTimeline() HoodieTimeline inflightCompactionTimeline = table.getActiveTimeline()
@@ -121,7 +120,7 @@ public class CompactionUtil {
&& StreamerUtil.instantTimeDiffSeconds(curInstantTime, instant.getTimestamp()) >= deltaSeconds); && StreamerUtil.instantTimeDiffSeconds(curInstantTime, instant.getTimestamp()) >= deltaSeconds);
inflightCompactionTimeline.getInstants().forEach(inflightInstant -> { inflightCompactionTimeline.getInstants().forEach(inflightInstant -> {
LOG.info("Rollback the pending compaction instant: " + inflightInstant); LOG.info("Rollback the pending compaction instant: " + inflightInstant);
writeClient.rollbackInflightCompaction(inflightInstant, table); table.rollbackInflightCompaction(inflightInstant);
table.getMetaClient().reloadActiveTimeline(); table.getMetaClient().reloadActiveTimeline();
}); });
} }

View File

@@ -78,7 +78,7 @@ public class TestCompactionUtil {
HoodieInstant instant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().orElse(null); HoodieInstant instant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().orElse(null);
assertThat(instant.getTimestamp(), is(instantTime)); assertThat(instant.getTimestamp(), is(instantTime));
CompactionUtil.rollbackCompaction(table, writeClient, conf); CompactionUtil.rollbackCompaction(table, conf);
HoodieInstant rollbackInstant = table.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().get(); HoodieInstant rollbackInstant = table.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().get();
assertThat(rollbackInstant.getState(), is(HoodieInstant.State.REQUESTED)); assertThat(rollbackInstant.getState(), is(HoodieInstant.State.REQUESTED));
assertThat(rollbackInstant.getTimestamp(), is(instantTime)); assertThat(rollbackInstant.getTimestamp(), is(instantTime));