diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java index 4a9f4b780..3d1059249 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java @@ -288,8 +288,7 @@ public class CompactionCommand implements CommandMarker { String message = "\n\n\t COMPACTION PLAN " + (valid ? "VALID" : "INVALID") + "\n\n"; List rows = new ArrayList<>(); res.stream().forEach(r -> { - Comparable[] row = new Comparable[]{r.getOperation().getFileId(), - r.getOperation().getBaseInstantTime(), + Comparable[] row = new Comparable[] {r.getOperation().getFileId(), r.getOperation().getBaseInstantTime(), r.getOperation().getDataFileName().isPresent() ? r.getOperation().getDataFileName().get() : "", r.getOperation().getDeltaFileNames().size(), r.isSuccess(), r.getException().isPresent() ? r.getException().get().getMessage() : ""}; diff --git a/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java b/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java index 5401c73a4..72af619d6 100644 --- a/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java @@ -239,9 +239,8 @@ public class CompactionAdminClient extends AbstractHoodieClient { FileSlice merged = fileSystemView.getLatestMergedFileSlicesBeforeOrOn(op.getPartitionPath(), lastInstant.getTimestamp()) .filter(fs -> fs.getFileId().equals(op.getFileId())).findFirst().get(); - final int maxVersion = - op.getDeltaFileNames().stream().map(lf -> FSUtils.getFileVersionFromLog(new Path(lf))) - .reduce((x, y) -> x > y ? x : y).orElse(0); + final int maxVersion = op.getDeltaFileNames().stream().map(lf -> FSUtils.getFileVersionFromLog(new Path(lf))) + .reduce((x, y) -> x > y ? x : y).orElse(0); List logFilesToBeMoved = merged.getLogFiles().filter(lf -> lf.getLogVersion() > maxVersion).collect(Collectors.toList()); return logFilesToBeMoved.stream().map(lf -> { @@ -293,33 +292,31 @@ public class CompactionAdminClient extends AbstractHoodieClient { FileSlice fs = fileSliceOptional.get(); Option df = fs.getDataFile(); if (operation.getDataFileName().isPresent()) { - String expPath = metaClient.getFs().getFileStatus(new Path( - FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), - new Path(operation.getDataFileName().get()))).getPath() - .toString(); - Preconditions.checkArgument(df.isPresent(), "Data File must be present. File Slice was : " - + fs + ", operation :" + operation); + String expPath = metaClient.getFs() + .getFileStatus( + new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), + new Path(operation.getDataFileName().get()))) + .getPath().toString(); + Preconditions.checkArgument(df.isPresent(), + "Data File must be present. File Slice was : " + fs + ", operation :" + operation); Preconditions.checkArgument(df.get().getPath().equals(expPath), "Base Path in operation is specified as " + expPath + " but got path " + df.get().getPath()); } Set logFilesInFileSlice = fs.getLogFiles().collect(Collectors.toSet()); - Set logFilesInCompactionOp = operation.getDeltaFileNames().stream() - .map(dp -> { - try { - FileStatus[] fileStatuses = metaClient.getFs().listStatus( - new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), - new Path(dp))); - Preconditions.checkArgument(fileStatuses.length == 1, "Expect only 1 file-status"); - return new HoodieLogFile(fileStatuses[0]); - } catch (FileNotFoundException fe) { - throw new CompactionValidationException(fe.getMessage()); - } catch (IOException ioe) { - throw new HoodieIOException(ioe.getMessage(), ioe); - } - }).collect(Collectors.toSet()); - Set missing = - logFilesInCompactionOp.stream().filter(lf -> !logFilesInFileSlice.contains(lf)) - .collect(Collectors.toSet()); + Set logFilesInCompactionOp = operation.getDeltaFileNames().stream().map(dp -> { + try { + FileStatus[] fileStatuses = metaClient.getFs().listStatus(new Path( + FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), new Path(dp))); + Preconditions.checkArgument(fileStatuses.length == 1, "Expect only 1 file-status"); + return new HoodieLogFile(fileStatuses[0]); + } catch (FileNotFoundException fe) { + throw new CompactionValidationException(fe.getMessage()); + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } + }).collect(Collectors.toSet()); + Set missing = logFilesInCompactionOp.stream().filter(lf -> !logFilesInFileSlice.contains(lf)) + .collect(Collectors.toSet()); Preconditions.checkArgument(missing.isEmpty(), "All log files specified in compaction operation is not present. Missing :" + missing + ", Exp :" + logFilesInCompactionOp + ", Got :" + logFilesInFileSlice); diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java new file mode 100644 index 000000000..97c67882d --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi; + +import com.codahale.metrics.Timer; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.List; +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieCleanerPlan; +import org.apache.hudi.client.embedded.EmbeddedTimelineService; +import org.apache.hudi.common.HoodieCleanStat; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTimeline; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieInstant.State; +import org.apache.hudi.common.util.AvroUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.metrics.HoodieMetrics; +import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; + +public class HoodieCleanClient extends AbstractHoodieClient { + + private static Logger logger = LogManager.getLogger(HoodieCleanClient.class); + private final transient HoodieMetrics metrics; + + public HoodieCleanClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, HoodieMetrics metrics) { + this(jsc, clientConfig, metrics, Option.empty()); + } + + public HoodieCleanClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, HoodieMetrics metrics, + Option timelineService) { + super(jsc, clientConfig, timelineService); + this.metrics = metrics; + } + + /** + * Clean up any stale/old files/data lying around (either on file storage or index storage) based on the + * configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be + * cleaned) + */ + public void clean() throws HoodieIOException { + String startCleanTime = HoodieActiveTimeline.createNewCommitTime(); + clean(startCleanTime); + } + + /** + * Clean up any stale/old files/data lying around (either on file storage or index storage) based on the + * configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be + * cleaned) + * + * @param startCleanTime Cleaner Instant Timestamp + * @throws HoodieIOException in case of any IOException + */ + protected HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOException { + // Create a Hoodie table which encapsulated the commits and files visible + final HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); + + // If there are inflight(failed) or previously requested clean operation, first perform them + table.getCleanTimeline().filterInflightsAndRequested().getInstants().forEach(hoodieInstant -> { + logger.info("There were previously unfinished cleaner operations. Finishing Instant=" + hoodieInstant); + runClean(table, hoodieInstant.getTimestamp()); + }); + + Option cleanerPlanOpt = scheduleClean(startCleanTime); + + if (cleanerPlanOpt.isPresent()) { + HoodieCleanerPlan cleanerPlan = cleanerPlanOpt.get(); + if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null) + && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) { + final HoodieTable hoodieTable = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); + return runClean(hoodieTable, startCleanTime); + } + } + return null; + } + + /** + * Creates a Cleaner plan if there are files to be cleaned and stores them in instant file + * + * @param startCleanTime Cleaner Instant Time + * @return Cleaner Plan if generated + */ + @VisibleForTesting + protected Option scheduleClean(String startCleanTime) { + // Create a Hoodie table which encapsulated the commits and files visible + HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); + + HoodieCleanerPlan cleanerPlan = table.scheduleClean(jsc); + + if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null) + && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) { + + HoodieInstant cleanInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime); + // Save to both aux and timeline folder + try { + table.getActiveTimeline().saveToCleanRequested(cleanInstant, AvroUtils.serializeCleanerPlan(cleanerPlan)); + logger.info("Requesting Cleaning with instant time " + cleanInstant); + } catch (IOException e) { + logger.error("Got exception when saving cleaner requested file", e); + throw new HoodieIOException(e.getMessage(), e); + } + return Option.of(cleanerPlan); + } + return Option.empty(); + } + + /** + * Executes the Cleaner plan stored in the instant metadata + * + * @param table Hoodie Table + * @param cleanInstantTs Cleaner Instant Timestamp + */ + @VisibleForTesting + protected HoodieCleanMetadata runClean(HoodieTable table, String cleanInstantTs) { + HoodieInstant cleanInstant = + table.getCleanTimeline().getInstants().filter(x -> x.getTimestamp().equals(cleanInstantTs)).findFirst().get(); + + Preconditions.checkArgument( + cleanInstant.getState().equals(State.REQUESTED) || cleanInstant.getState().equals(State.INFLIGHT)); + + try { + logger.info("Cleaner started"); + final Timer.Context context = metrics.getCleanCtx(); + + if (!cleanInstant.isInflight()) { + // Mark as inflight first + cleanInstant = table.getActiveTimeline().transitionCleanRequestedToInflight(cleanInstant); + } + + List cleanStats = table.clean(jsc, cleanInstant); + + if (cleanStats.isEmpty()) { + return HoodieCleanMetadata.newBuilder().build(); + } + + // Emit metrics (duration, numFilesDeleted) if needed + Option durationInMs = Option.empty(); + if (context != null) { + durationInMs = Option.of(metrics.getDurationInMs(context.stop())); + logger.info("cleanerElaspsedTime (Minutes): " + durationInMs.get() / (1000 * 60)); + } + + // Create the metadata and save it + HoodieCleanMetadata metadata = + AvroUtils.convertCleanMetadata(cleanInstant.getTimestamp(), durationInMs, cleanStats); + logger.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"); + metrics.updateCleanMetrics(durationInMs.orElseGet(() -> -1L), metadata.getTotalFilesDeleted()); + + table.getActiveTimeline().transitionCleanInflightToComplete( + new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, cleanInstant.getTimestamp()), + AvroUtils.serializeCleanMetadata(metadata)); + logger.info("Marked clean started on " + cleanInstant.getTimestamp() + " as complete"); + return metadata; + } catch (IOException e) { + throw new HoodieIOException("Failed to clean up after commit", e); + } + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java index da1d52c99..a147df751 100644 --- a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java @@ -38,7 +38,6 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.client.embedded.EmbeddedTimelineService; -import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieDataFile; @@ -98,6 +97,7 @@ public class HoodieWriteClient extends AbstractHo private final boolean rollbackInFlight; private final transient HoodieMetrics metrics; private final transient HoodieIndex index; + private final transient HoodieCleanClient cleanClient; private transient Timer.Context writeContext = null; private transient Timer.Context compactionTimer; private transient Timer.Context indexTimer = null; @@ -131,6 +131,7 @@ public class HoodieWriteClient extends AbstractHo this.index = index; this.metrics = new HoodieMetrics(config, config.getTableName()); this.rollbackInFlight = rollbackInFlight; + this.cleanClient = new HoodieCleanClient<>(jsc, config, metrics, timelineService); } public static SparkConf registerClasses(SparkConf conf) { @@ -918,6 +919,7 @@ public class HoodieWriteClient extends AbstractHo public void close() { // Stop timeline-server if running super.close(); + this.cleanClient.close(); // Calling this here releases any resources used by your index, so make sure to finish any related operations // before this point this.index.close(); @@ -927,55 +929,24 @@ public class HoodieWriteClient extends AbstractHo * Clean up any stale/old files/data lying around (either on file storage or index storage) based on the * configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be * cleaned) + * + * @throws HoodieIOException */ public void clean() throws HoodieIOException { - String startCleanTime = HoodieActiveTimeline.createNewCommitTime(); - clean(startCleanTime); + cleanClient.clean(); } /** * Clean up any stale/old files/data lying around (either on file storage or index storage) based on the * configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be * cleaned) + * + * @param startCleanTime Cleaner Instant Timestamp + * @return + * @throws HoodieIOException in case of any IOException */ - private void clean(String startCleanTime) throws HoodieIOException { - try { - logger.info("Cleaner started"); - final Timer.Context context = metrics.getCleanCtx(); - - // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); - - List cleanStats = table.clean(jsc); - if (cleanStats.isEmpty()) { - return; - } - - // Emit metrics (duration, numFilesDeleted) if needed - Option durationInMs = Option.empty(); - if (context != null) { - durationInMs = Option.of(metrics.getDurationInMs(context.stop())); - logger.info("cleanerElaspsedTime (Minutes): " + durationInMs.get() / (1000 * 60)); - } - - // Create the metadata and save it - HoodieCleanMetadata metadata = AvroUtils.convertCleanMetadata(startCleanTime, durationInMs, cleanStats); - logger.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"); - metrics.updateCleanMetrics(durationInMs.orElseGet(() -> -1L), metadata.getTotalFilesDeleted()); - - table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, startCleanTime), - AvroUtils.serializeCleanMetadata(metadata)); - logger.info("Marked clean started on " + startCleanTime + " as complete"); - - if (!table.getActiveTimeline().getCleanerTimeline().empty()) { - // Cleanup of older cleaner meta files - // TODO - make the commit archival generic and archive clean metadata - FSUtils.deleteOlderCleanMetaFiles(fs, table.getMetaClient().getMetaPath(), - table.getActiveTimeline().getCleanerTimeline().getInstants()); - } - } catch (IOException e) { - throw new HoodieIOException("Failed to clean up after commit", e); - } + protected HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOException { + return cleanClient.clean(startCleanTime); } /** @@ -1176,8 +1147,8 @@ public class HoodieWriteClient extends AbstractHo private JavaRDD runCompaction(HoodieInstant compactionInstant, HoodieActiveTimeline activeTimeline, boolean autoCommit) throws IOException { HoodieTableMetaClient metaClient = createMetaClient(true); - HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(metaClient, - compactionInstant.getTimestamp()); + HoodieCompactionPlan compactionPlan = + CompactionUtils.getCompactionPlan(metaClient, compactionInstant.getTimestamp()); // Mark instant as compaction inflight activeTimeline.transitionCompactionRequestedToInflight(compactionInstant); diff --git a/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java b/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java index 46247c17b..9f7b03b37 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java @@ -97,9 +97,11 @@ public class EmbeddedTimelineService { public void stop() { if (null != server) { + logger.info("Closing Timeline server"); this.server.close(); this.server = null; this.viewManager = null; + logger.info("Closed Timeline server"); } } } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java index 89ab363fc..53e08bd67 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java @@ -19,6 +19,7 @@ package org.apache.hudi.io; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -52,7 +53,7 @@ import org.apache.log4j.Logger; *

* TODO: Should all cleaning be done based on {@link HoodieCommitMetadata} */ -public class HoodieCleanHelper> { +public class HoodieCleanHelper> implements Serializable { private static Logger logger = LogManager.getLogger(HoodieCleanHelper.class); @@ -114,12 +115,11 @@ public class HoodieCleanHelper> { FileSlice nextSlice = fileSliceIterator.next(); if (nextSlice.getDataFile().isPresent()) { HoodieDataFile dataFile = nextSlice.getDataFile().get(); - deletePaths.add(dataFile.getPath()); + deletePaths.add(dataFile.getFileName()); } if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { // If merge on read, then clean the log files for the commits as well - deletePaths - .addAll(nextSlice.getLogFiles().map(file -> file.getPath().toString()).collect(Collectors.toList())); + deletePaths.addAll(nextSlice.getLogFiles().map(file -> file.getFileName()).collect(Collectors.toList())); } } } @@ -187,11 +187,10 @@ public class HoodieCleanHelper> { if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline .compareTimestamps(earliestCommitToRetain.getTimestamp(), fileCommitTime, HoodieTimeline.GREATER)) { // this is a commit, that should be cleaned. - aFile.ifPresent(hoodieDataFile -> deletePaths.add(hoodieDataFile.getPath())); + aFile.ifPresent(hoodieDataFile -> deletePaths.add(hoodieDataFile.getFileName())); if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { // If merge on read, then clean the log files for the commits as well - deletePaths - .addAll(aSlice.getLogFiles().map(file -> file.getPath().toString()).collect(Collectors.toList())); + deletePaths.addAll(aSlice.getLogFiles().map(file -> file.getFileName()).collect(Collectors.toList())); } } } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 095dbd5d7..81178ec05 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -195,9 +195,8 @@ public class HoodieMergeHandle extends HoodieWrit // Load the new records in a map long memoryForMerge = config.getMaxMemoryPerPartitionMerge(); logger.info("MaxMemoryPerPartitionMerge => " + memoryForMerge); - this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, - config.getSpillableMapBasePath(), new DefaultSizeEstimator(), - new HoodieRecordSizeEstimator(originalSchema)); + this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(), + new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(originalSchema)); } catch (IOException io) { throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io); } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java b/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java index 3fece32f0..dc68c5b98 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java @@ -101,8 +101,8 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { FileSystem fs = metaClient.getFs(); Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); - log.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation - .getDeltaFileNames() + " for commit " + commitTime); + log.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames() + + " for commit " + commitTime); // TODO - FIX THIS // Reads the entire avro file. Always only specific blocks should be read from the avro file // (failure recover). @@ -115,20 +115,19 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { .filterCompletedInstants().lastInstant().get().getTimestamp(); log.info("MaxMemoryPerCompaction => " + config.getMaxMemoryPerCompaction()); - List logFiles = operation.getDeltaFileNames().stream() - .map(p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), - p).toString()).collect(toList()); - HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, - metaClient.getBasePath(), logFiles, readerSchema, maxInstantTime, - config.getMaxMemoryPerCompaction(), config.getCompactionLazyBlockReadEnabled(), + List logFiles = operation.getDeltaFileNames().stream().map( + p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString()) + .collect(toList()); + HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, metaClient.getBasePath(), logFiles, + readerSchema, maxInstantTime, config.getMaxMemoryPerCompaction(), config.getCompactionLazyBlockReadEnabled(), config.getCompactionReverseLogReadEnabled(), config.getMaxDFSStreamBufferSize(), config.getSpillableMapBasePath()); if (!scanner.iterator().hasNext()) { return Lists.newArrayList(); } - Option oldDataFileOpt = operation.getBaseFile(metaClient.getBasePath(), - operation.getPartitionPath()); + Option oldDataFileOpt = + operation.getBaseFile(metaClient.getBasePath(), operation.getPartitionPath()); // Compacting is very similar to applying updates to existing file Iterator> result; @@ -189,28 +188,22 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { RealtimeView fileSystemView = hoodieTable.getRTFileSystemView(); log.info("Compaction looking for files to compact in " + partitionPaths + " partitions"); - List operations = - jsc.parallelize(partitionPaths, partitionPaths.size()) - .flatMap((FlatMapFunction) partitionPath -> fileSystemView - .getLatestFileSlices(partitionPath) - .filter(slice -> - !fgIdsInPendingCompactions.contains(slice.getFileGroupId())) - .map( - s -> { - List logFiles = s.getLogFiles().sorted(HoodieLogFile - .getLogFileComparator()).collect(Collectors.toList()); - totalLogFiles.add((long) logFiles.size()); - totalFileSlices.add(1L); - // Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO - // for spark Map operations and collecting them finally in Avro generated classes for storing - // into meta files. - Option dataFile = s.getDataFile(); - return new CompactionOperation(dataFile, partitionPath, logFiles, - config.getCompactionStrategy().captureMetrics(config, dataFile, partitionPath, logFiles)); - }) - .filter(c -> !c.getDeltaFileNames().isEmpty()) - .collect(toList()).iterator()).collect().stream().map(CompactionUtils::buildHoodieCompactionOperation) - .collect(toList()); + List operations = jsc.parallelize(partitionPaths, partitionPaths.size()) + .flatMap((FlatMapFunction) partitionPath -> fileSystemView + .getLatestFileSlices(partitionPath) + .filter(slice -> !fgIdsInPendingCompactions.contains(slice.getFileGroupId())).map(s -> { + List logFiles = + s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); + totalLogFiles.add((long) logFiles.size()); + totalFileSlices.add(1L); + // Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO + // for spark Map operations and collecting them finally in Avro generated classes for storing + // into meta files. + Option dataFile = s.getDataFile(); + return new CompactionOperation(dataFile, partitionPath, logFiles, + config.getCompactionStrategy().captureMetrics(config, dataFile, partitionPath, logFiles)); + }).filter(c -> !c.getDeltaFileNames().isEmpty()).collect(toList()).iterator()) + .collect().stream().map(CompactionUtils::buildHoodieCompactionOperation).collect(toList()); log.info("Total of " + operations.size() + " compactions are retrieved"); log.info("Total number of latest files slices " + totalFileSlices.value()); log.info("Total number of log files " + totalLogFiles.value()); diff --git a/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/CompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/CompactionStrategy.java index 8fa2b57d5..9d7de66e0 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/CompactionStrategy.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/CompactionStrategy.java @@ -96,8 +96,7 @@ public abstract class CompactionStrategy implements Serializable { // Strategy implementation can overload this method to set specific compactor-id return HoodieCompactionPlan.newBuilder() .setOperations(orderAndFilter(writeConfig, operations, pendingCompactionPlans)) - .setVersion(CompactionUtils.LATEST_COMPACTION_METADATA_VERSION) - .build(); + .setVersion(CompactionUtils.LATEST_COMPACTION_METADATA_VERSION).build(); } /** diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java index e18578d4c..6ebdc6d48 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java @@ -19,6 +19,7 @@ package org.apache.hudi.table; import com.google.common.hash.Hashing; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; @@ -35,9 +36,12 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hudi.WriteStatus; +import org.apache.hudi.avro.model.HoodieActionInstant; +import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.HoodieRollbackStat; +import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieDataFile; import org.apache.hudi.common.model.HoodieKey; @@ -48,6 +52,7 @@ import org.apache.hudi.common.model.HoodieRollingStatMetadata; import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.AvroUtils; import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; @@ -72,10 +77,10 @@ import org.apache.parquet.hadoop.ParquetReader; import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import scala.Tuple2; + /** * Implementation of a very heavily read-optimized Hoodie Table where *

@@ -97,10 +102,12 @@ public class HoodieCopyOnWriteTable extends Hoodi Map partitionCleanStatMap = new HashMap<>(); FileSystem fs = table.getMetaClient().getFs(); + Path basePath = new Path(table.getMetaClient().getBasePath()); while (iter.hasNext()) { Tuple2 partitionDelFileTuple = iter.next(); String partitionPath = partitionDelFileTuple._1(); - String deletePathStr = partitionDelFileTuple._2(); + String delFileName = partitionDelFileTuple._2(); + String deletePathStr = new Path(new Path(basePath, partitionPath), delFileName).toString(); Boolean deletedFileResult = deleteFileAndGetResult(fs, deletePathStr); if (!partitionCleanStatMap.containsKey(partitionPath)) { partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath)); @@ -109,29 +116,24 @@ public class HoodieCopyOnWriteTable extends Hoodi partitionCleanStat.addDeleteFilePatterns(deletePathStr); partitionCleanStat.addDeletedFileResult(deletePathStr, deletedFileResult); } - return partitionCleanStatMap.entrySet().stream().map(e -> new Tuple2<>(e.getKey(), e.getValue())) .collect(Collectors.toList()).iterator(); }; } - private static PairFlatMapFunction getFilesToDeleteFunc(HoodieTable table, - HoodieWriteConfig config) { - return (PairFlatMapFunction) partitionPathToClean -> { - HoodieCleanHelper cleaner = new HoodieCleanHelper(table, config); - return cleaner.getDeletePaths(partitionPathToClean).stream() - .map(deleteFile -> new Tuple2<>(partitionPathToClean, deleteFile.toString())).iterator(); - }; - } - private static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException { Path deletePath = new Path(deletePathStr); logger.debug("Working on delete path :" + deletePath); - boolean deleteResult = fs.delete(deletePath, false); - if (deleteResult) { - logger.debug("Cleaned file at path :" + deletePath); + try { + boolean deleteResult = fs.delete(deletePath, false); + if (deleteResult) { + logger.debug("Cleaned file at path :" + deletePath); + } + return deleteResult; + } catch (FileNotFoundException fio) { + // With cleanPlan being used for retried cleaning operations, its possible to clean a file twice + return false; } - return deleteResult; } @Override @@ -268,6 +270,40 @@ public class HoodieCopyOnWriteTable extends Hoodi return handleUpsertPartition(commitTime, partition, recordItr, partitioner); } + /** + * Generates List of files to be cleaned + * + * @param jsc JavaSparkContext + * @return Cleaner Plan + */ + public HoodieCleanerPlan scheduleClean(JavaSparkContext jsc) { + try { + FileSystem fs = getMetaClient().getFs(); + + List partitionsToClean = + FSUtils.getAllPartitionPaths(fs, getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning()); + if (partitionsToClean.isEmpty()) { + logger.info("Nothing to clean here. It is already clean"); + return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build(); + } + logger.info( + "Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy()); + int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism()); + logger.info("Using cleanerParallelism: " + cleanerParallelism); + HoodieCleanHelper cleaner = new HoodieCleanHelper(this, config); + Option earliestInstant = cleaner.getEarliestCommitToRetain(); + + Map> cleanOps = jsc.parallelize(partitionsToClean, cleanerParallelism) + .map(partitionPathToClean -> Pair.of(partitionPathToClean, cleaner.getDeletePaths(partitionPathToClean))) + .collect().stream().collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + return new HoodieCleanerPlan(earliestInstant + .map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null), + config.getCleanerPolicy().name(), cleanOps, 1); + } catch (IOException e) { + throw new HoodieIOException("Failed to schedule clean operation", e); + } + } + /** * Performs cleaning of partition paths according to cleaning policy and returns the number of files cleaned. Handles * skews in partitions to clean by making files to clean as the unit of task distribution. @@ -275,17 +311,40 @@ public class HoodieCopyOnWriteTable extends Hoodi * @throws IllegalArgumentException if unknown cleaning policy is provided */ @Override - public List clean(JavaSparkContext jsc) { + public List clean(JavaSparkContext jsc, HoodieInstant cleanInstant) { try { - FileSystem fs = getMetaClient().getFs(); - List partitionsToClean = - FSUtils.getAllPartitionPaths(fs, getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning()); - logger.info("Partitions to clean up : " + partitionsToClean + ", with policy " + config.getCleanerPolicy()); - if (partitionsToClean.isEmpty()) { - logger.info("Nothing to clean here mom. It is already clean"); - return Collections.emptyList(); - } - return cleanPartitionPaths(partitionsToClean, jsc); + HoodieCleanerPlan cleanerPlan = AvroUtils.deserializeCleanerPlan(getActiveTimeline() + .getInstantAuxiliaryDetails(HoodieTimeline.getCleanRequestedInstant(cleanInstant.getTimestamp())).get()); + + int cleanerParallelism = Math.min( + (int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(x -> x.size()).count()), + config.getCleanerParallelism()); + logger.info("Using cleanerParallelism: " + cleanerParallelism); + List> partitionCleanStats = jsc + .parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream() + .flatMap(x -> x.getValue().stream().map(y -> new Tuple2(x.getKey(), y))) + .collect(Collectors.toList()), cleanerParallelism) + .mapPartitionsToPair(deleteFilesFunc(this)).reduceByKey((e1, e2) -> e1.merge(e2)).collect(); + + Map partitionCleanStatsMap = + partitionCleanStats.stream().collect(Collectors.toMap(Tuple2::_1, Tuple2::_2)); + + // Return PartitionCleanStat for each partition passed. + return cleanerPlan.getFilesToBeDeletedPerPartition().keySet().stream().map(partitionPath -> { + PartitionCleanStat partitionCleanStat = + (partitionCleanStatsMap.containsKey(partitionPath)) ? partitionCleanStatsMap.get(partitionPath) + : new PartitionCleanStat(partitionPath); + HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain(); + return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath) + .withEarliestCommitRetained(Option.ofNullable( + actionInstant != null + ? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()), + actionInstant.getAction(), actionInstant.getTimestamp()) + : null)) + .withDeletePathPattern(partitionCleanStat.deletePathPatterns) + .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles) + .withFailedDeletes(partitionCleanStat.failedDeleteFiles).build(); + }).collect(Collectors.toList()); } catch (IOException e) { throw new HoodieIOException("Failed to clean up after commit", e); } @@ -311,7 +370,7 @@ public class HoodieCopyOnWriteTable extends Hoodi logger.info("Clean out all parquet files generated for commit: " + commit); List rollbackRequests = generateRollbackRequests(instantToRollback); - //TODO: We need to persist this as rollback workload and use it in case of partial failures + // TODO: We need to persist this as rollback workload and use it in case of partial failures List stats = new RollbackExecutor(metaClient, config).performRollback(jsc, instantToRollback, rollbackRequests); @@ -321,8 +380,7 @@ public class HoodieCopyOnWriteTable extends Hoodi return stats; } - private List generateRollbackRequests(HoodieInstant instantToRollback) - throws IOException { + private List generateRollbackRequests(HoodieInstant instantToRollback) throws IOException { return FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning()).stream().map(partitionPath -> { return RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback); @@ -350,34 +408,6 @@ public class HoodieCopyOnWriteTable extends Hoodi } } - private List cleanPartitionPaths(List partitionsToClean, JavaSparkContext jsc) { - int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism()); - logger.info("Using cleanerParallelism: " + cleanerParallelism); - List> partitionCleanStats = jsc - .parallelize(partitionsToClean, cleanerParallelism).flatMapToPair(getFilesToDeleteFunc(this, config)) - .repartition(cleanerParallelism) // repartition to remove skews - .mapPartitionsToPair(deleteFilesFunc(this)).reduceByKey( - // merge partition level clean stats below - (Function2) (e1, e2) -> e1.merge(e2)) - .collect(); - - Map partitionCleanStatsMap = - partitionCleanStats.stream().collect(Collectors.toMap(Tuple2::_1, Tuple2::_2)); - - HoodieCleanHelper cleaner = new HoodieCleanHelper(this, config); - // Return PartitionCleanStat for each partition passed. - return partitionsToClean.stream().map(partitionPath -> { - PartitionCleanStat partitionCleanStat = - (partitionCleanStatsMap.containsKey(partitionPath)) ? partitionCleanStatsMap.get(partitionPath) - : new PartitionCleanStat(partitionPath); - return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath) - .withEarliestCommitRetained(cleaner.getEarliestCommitToRetain()) - .withDeletePathPattern(partitionCleanStat.deletePathPatterns) - .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles) - .withFailedDeletes(partitionCleanStat.failedDeleteFiles).build(); - }).collect(Collectors.toList()); - } - enum BucketType { UPDATE, INSERT } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java index 8dcb3bfe3..185681c4a 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java @@ -185,12 +185,12 @@ public class HoodieMergeOnReadTable extends Hoodi logger.info("Unpublished " + commit); Long startTime = System.currentTimeMillis(); List rollbackRequests = generateRollbackRequests(jsc, instantToRollback); - //TODO: We need to persist this as rollback workload and use it in case of partial failures + // TODO: We need to persist this as rollback workload and use it in case of partial failures List allRollbackStats = new RollbackExecutor(metaClient, config).performRollback(jsc, instantToRollback, rollbackRequests); // Delete Inflight instants if enabled - deleteInflightInstant(deleteInstants, this.getActiveTimeline(), new HoodieInstant(true, instantToRollback - .getAction(), instantToRollback.getTimestamp())); + deleteInflightInstant(deleteInstants, this.getActiveTimeline(), + new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp())); logger.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime)); @@ -200,6 +200,7 @@ public class HoodieMergeOnReadTable extends Hoodi /** * Generate all rollback requests that we need to perform for rolling back this action without actually performing * rolling back + * * @param jsc JavaSparkContext * @param instantToRollback Instant to Rollback * @return list of rollback requests @@ -211,92 +212,92 @@ public class HoodieMergeOnReadTable extends Hoodi List partitions = FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning()); int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1); - return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions)) - .flatMap(partitionPath -> { - HoodieActiveTimeline activeTimeline = this.getActiveTimeline().reload(); - List partitionRollbackRequests = new ArrayList<>(); - switch (instantToRollback.getAction()) { - case HoodieTimeline.COMMIT_ACTION: - logger.info("Rolling back commit action. There are higher delta commits. So only rolling back this " - + "instant"); - partitionRollbackRequests.add(RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction( - partitionPath, instantToRollback)); - break; - case HoodieTimeline.COMPACTION_ACTION: - // If there is no delta commit present after the current commit (if compaction), no action, else we - // need to make sure that a compaction commit rollback also deletes any log files written as part of the - // succeeding deltacommit. - boolean higherDeltaCommits = !activeTimeline.getDeltaCommitTimeline() - .filterCompletedInstants().findInstantsAfter(commit, 1).empty(); - if (higherDeltaCommits) { - // Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled - // and has not yet finished. In this scenario we should delete only the newly created parquet files - // and not corresponding base commit log files created with this as baseCommit since updates would - // have been written to the log files. - logger.info("Rolling back compaction. There are higher delta commits. So only deleting data files"); - partitionRollbackRequests.add(RollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction( - partitionPath, instantToRollback)); - } else { - // No deltacommits present after this compaction commit (inflight or requested). In this case, we - // can also delete any log files that were created with this compaction commit as base - // commit. - logger.info("Rolling back compaction plan. There are NO higher delta commits. So deleting both data and" - + " log files"); - partitionRollbackRequests.add( - RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, - instantToRollback)); - } - break; - case HoodieTimeline.DELTA_COMMIT_ACTION: - // -------------------------------------------------------------------------------------------------- - // (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal - // -------------------------------------------------------------------------------------------------- - // (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In - // this scenario we would want to delete these log files. - // (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario, - // HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks. - // (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is - // being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and - // and hence will end up deleting these log files. This is done so there are no orphan log files - // lying around. - // (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions - // taken in this scenario is a combination of (A.2) and (A.3) - // --------------------------------------------------------------------------------------------------- - // (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal - // --------------------------------------------------------------------------------------------------- - // (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no entries. - // In this scenario, we delete all the parquet files written for the failed commit. - // (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In - // this scenario, perform (A.1) and for updates written to log files, write rollback blocks. - // (B.3) Rollback triggered for first commit - Same as (B.1) - // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files - // as well if the base parquet file gets deleted. - try { - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( - metaClient.getCommitTimeline().getInstantDetails( - new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp())) - .get(), HoodieCommitMetadata.class); - - // In case all data was inserts and the commit failed, delete the file belonging to that commit - // We do not know fileIds for inserts (first inserts are either log files or parquet files), - // delete all files for the corresponding failed commit, if present (same as COW) - partitionRollbackRequests.add(RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction( - partitionPath, instantToRollback)); - - // append rollback blocks for updates - if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { - partitionRollbackRequests - .addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata)); - } - break; - } catch (IOException io) { - throw new UncheckedIOException("Failed to collect rollback actions for commit " + commit, io); - } - default: - break; + return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions)).flatMap(partitionPath -> { + HoodieActiveTimeline activeTimeline = this.getActiveTimeline().reload(); + List partitionRollbackRequests = new ArrayList<>(); + switch (instantToRollback.getAction()) { + case HoodieTimeline.COMMIT_ACTION: + logger.info( + "Rolling back commit action. There are higher delta commits. So only rolling back this " + "instant"); + partitionRollbackRequests.add( + RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback)); + break; + case HoodieTimeline.COMPACTION_ACTION: + // If there is no delta commit present after the current commit (if compaction), no action, else we + // need to make sure that a compaction commit rollback also deletes any log files written as part of the + // succeeding deltacommit. + boolean higherDeltaCommits = + !activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit, 1).empty(); + if (higherDeltaCommits) { + // Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled + // and has not yet finished. In this scenario we should delete only the newly created parquet files + // and not corresponding base commit log files created with this as baseCommit since updates would + // have been written to the log files. + logger.info("Rolling back compaction. There are higher delta commits. So only deleting data files"); + partitionRollbackRequests.add( + RollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction(partitionPath, instantToRollback)); + } else { + // No deltacommits present after this compaction commit (inflight or requested). In this case, we + // can also delete any log files that were created with this compaction commit as base + // commit. + logger.info("Rolling back compaction plan. There are NO higher delta commits. So deleting both data and" + + " log files"); + partitionRollbackRequests.add( + RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback)); } - return partitionRollbackRequests.iterator(); - }).filter(Objects::nonNull).collect(); + break; + case HoodieTimeline.DELTA_COMMIT_ACTION: + // -------------------------------------------------------------------------------------------------- + // (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal + // -------------------------------------------------------------------------------------------------- + // (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In + // this scenario we would want to delete these log files. + // (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario, + // HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks. + // (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is + // being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and + // and hence will end up deleting these log files. This is done so there are no orphan log files + // lying around. + // (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions + // taken in this scenario is a combination of (A.2) and (A.3) + // --------------------------------------------------------------------------------------------------- + // (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal + // --------------------------------------------------------------------------------------------------- + // (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no entries. + // In this scenario, we delete all the parquet files written for the failed commit. + // (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In + // this scenario, perform (A.1) and for updates written to log files, write rollback blocks. + // (B.3) Rollback triggered for first commit - Same as (B.1) + // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files + // as well if the base parquet file gets deleted. + try { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + metaClient.getCommitTimeline() + .getInstantDetails( + new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp())) + .get(), + HoodieCommitMetadata.class); + + // In case all data was inserts and the commit failed, delete the file belonging to that commit + // We do not know fileIds for inserts (first inserts are either log files or parquet files), + // delete all files for the corresponding failed commit, if present (same as COW) + partitionRollbackRequests.add( + RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback)); + + // append rollback blocks for updates + if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { + partitionRollbackRequests + .addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata)); + } + break; + } catch (IOException io) { + throw new UncheckedIOException("Failed to collect rollback actions for commit " + commit, io); + } + default: + break; + } + return partitionRollbackRequests.iterator(); + }).filter(Objects::nonNull).collect(); } @Override @@ -428,27 +429,27 @@ public class HoodieMergeOnReadTable extends Hoodi // baseCommit always by listing the file slice Map fileIdToBaseCommitTimeForLogMap = this.getRTFileSystemView().getLatestFileSlices(partitionPath) .collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime)); - return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream() - .filter(wStat -> { + return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat -> { - // Filter out stats without prevCommit since they are all inserts - boolean validForRollback = (wStat != null) && (wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT) - && (wStat.getPrevCommit() != null) && fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId()); + // Filter out stats without prevCommit since they are all inserts + boolean validForRollback = (wStat != null) && (wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT) + && (wStat.getPrevCommit() != null) && fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId()); - if (validForRollback) { - // For sanity, log instant time can never be less than base-commit on which we are rolling back - Preconditions.checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get( - wStat.getFileId()), rollbackInstant.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL)); - } + if (validForRollback) { + // For sanity, log instant time can never be less than base-commit on which we are rolling back + Preconditions + .checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()), + rollbackInstant.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL)); + } - return validForRollback && HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get( - // Base Ts should be strictly less. If equal (for inserts-to-logs), the caller employs another option - // to delete and we should not step on it - wStat.getFileId()), rollbackInstant.getTimestamp(), HoodieTimeline.LESSER); - }).map(wStat -> { - String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()); - return RollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, wStat.getFileId(), - baseCommitTime, rollbackInstant); - }).collect(Collectors.toList()); + return validForRollback && HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get( + // Base Ts should be strictly less. If equal (for inserts-to-logs), the caller employs another option + // to delete and we should not step on it + wStat.getFileId()), rollbackInstant.getTimestamp(), HoodieTimeline.LESSER); + }).map(wStat -> { + String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()); + return RollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, wStat.getFileId(), + baseCommitTime, rollbackInstant); + }).collect(Collectors.toList()); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java index 30f74def8..09007b984 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hudi.WriteStatus; +import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.client.utils.ClientUtils; @@ -190,6 +191,13 @@ public abstract class HoodieTable implements Seri return getActiveTimeline().getCleanerTimeline().filterCompletedInstants(); } + /** + * Get clean timeline + */ + public HoodieTimeline getCleanTimeline() { + return getActiveTimeline().getCleanerTimeline(); + } + /** * Get only the completed (no-inflights) savepoint timeline */ @@ -265,9 +273,21 @@ public abstract class HoodieTable implements Seri HoodieCompactionPlan compactionPlan); /** - * Clean partition paths according to cleaning policy and returns the number of files cleaned. + * Generates list of files that are eligible for cleaning + * + * @param jsc Java Spark Context + * @return Cleaner Plan containing list of files to be deleted. */ - public abstract List clean(JavaSparkContext jsc); + public abstract HoodieCleanerPlan scheduleClean(JavaSparkContext jsc); + + /** + * Cleans the files listed in the cleaner plan associated with clean instant + * + * @param jsc Java Spark Context + * @param cleanInstant Clean Instant + * @return list of Clean Stats + */ + public abstract List clean(JavaSparkContext jsc, HoodieInstant cleanInstant); /** * Rollback the (inflight/committed) record changes with the given commit time. Four steps: (1) Atomically unpublish diff --git a/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java index 8bdf371b8..1ba9b43a0 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java @@ -65,8 +65,8 @@ public class RollbackExecutor implements Serializable { /** * Performs all rollback actions that we have collected in parallel. */ - public List performRollback(JavaSparkContext jsc, - HoodieInstant instantToRollback, List rollbackRequests) { + public List performRollback(JavaSparkContext jsc, HoodieInstant instantToRollback, + List rollbackRequests) { SerializablePathFilter filter = (path) -> { if (path.toString().contains(".parquet")) { @@ -101,11 +101,10 @@ public class RollbackExecutor implements Serializable { Writer writer = null; boolean success = false; try { - writer = HoodieLogFormat.newWriterBuilder().onParentPath( - FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath())) + writer = HoodieLogFormat.newWriterBuilder() + .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath())) .withFileId(rollbackRequest.getFileId().get()) - .overBaseCommit(rollbackRequest.getLatestBaseInstant().get()) - .withFs(metaClient.getFs()) + .overBaseCommit(rollbackRequest.getLatestBaseInstant().get()).withFs(metaClient.getFs()) .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); // generate metadata @@ -114,8 +113,7 @@ public class RollbackExecutor implements Serializable { writer = writer.appendBlock(new HoodieCommandBlock(header)); success = true; } catch (IOException | InterruptedException io) { - throw new HoodieRollbackException( - "Failed to rollback for instant " + instantToRollback, io); + throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io); } finally { try { if (writer != null) { @@ -130,8 +128,7 @@ public class RollbackExecutor implements Serializable { // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in // cloud-storage : HUDI-168 Map filesToNumBlocksRollback = new HashMap<>(); - filesToNumBlocksRollback.put(metaClient.getFs() - .getFileStatus(writer.getLogFile().getPath()), 1L); + filesToNumBlocksRollback.put(metaClient.getFs().getFileStatus(writer.getLogFile().getPath()), 1L); return new Tuple2(rollbackRequest.getPartitionPath(), HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) .withRollbackBlockAppendResults(filesToNumBlocksRollback).build()); @@ -180,8 +177,7 @@ public class RollbackExecutor implements Serializable { * Common method used for cleaning out parquet files under a partition path during rollback of a set of commits */ private Map deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, - Map results, String partitionPath, - PathFilter filter) throws IOException { + Map results, String partitionPath, PathFilter filter) throws IOException { logger.info("Cleaning path " + partitionPath); FileSystem fs = metaClient.getFs(); FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); diff --git a/hudi-client/src/main/java/org/apache/hudi/table/RollbackRequest.java b/hudi-client/src/main/java/org/apache/hudi/table/RollbackRequest.java index 0e619883d..326f347ad 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/RollbackRequest.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/RollbackRequest.java @@ -30,9 +30,7 @@ public class RollbackRequest { * Rollback Action Types */ public enum RollbackAction { - DELETE_DATA_FILES_ONLY, - DELETE_DATA_AND_LOG_FILES, - APPEND_ROLLBACK_BLOCK + DELETE_DATA_FILES_ONLY, DELETE_DATA_AND_LOG_FILES, APPEND_ROLLBACK_BLOCK } /** @@ -60,8 +58,8 @@ public class RollbackRequest { */ private final RollbackAction rollbackAction; - public RollbackRequest(String partitionPath, HoodieInstant rollbackInstant, - Option fileId, Option latestBaseInstant, RollbackAction rollbackAction) { + public RollbackRequest(String partitionPath, HoodieInstant rollbackInstant, Option fileId, + Option latestBaseInstant, RollbackAction rollbackAction) { this.partitionPath = partitionPath; this.rollbackInstant = rollbackInstant; this.fileId = fileId; diff --git a/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java index 4b20608f6..d23b15000 100644 --- a/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java +++ b/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; @@ -52,6 +53,11 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im protected transient HoodieTestDataGenerator dataGen = null; protected transient ExecutorService executorService; protected transient HoodieTableMetaClient metaClient; + private static AtomicInteger instantGen = new AtomicInteger(1); + + public String getNextInstant() { + return String.format("%09d", instantGen.getAndIncrement()); + } // dfs protected String dfsBasePath; diff --git a/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java index 10f160786..301fdf4da 100644 --- a/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java +++ b/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeSet; import java.util.function.Predicate; @@ -37,6 +38,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.HoodieTestDataGenerator; @@ -69,12 +72,8 @@ import org.apache.hudi.table.HoodieTable; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.scheduler.SparkListener; -import org.apache.spark.scheduler.SparkListenerTaskEnd; -import org.apache.spark.util.AccumulatorV2; import org.junit.Assert; import org.junit.Test; -import scala.collection.Iterator; /** * Test Cleaning related logic @@ -396,6 +395,62 @@ public class TestCleaner extends TestHoodieClientBase { }); } + /** + * Helper to run cleaner and collect Clean Stats + * + * @param config HoodieWriteConfig + */ + private List runCleaner(HoodieWriteConfig config) { + return runCleaner(config, false); + } + + /** + * Helper to run cleaner and collect Clean Stats + * + * @param config HoodieWriteConfig + */ + private List runCleaner(HoodieWriteConfig config, boolean simulateRetryFailure) { + HoodieCleanClient writeClient = getHoodieCleanClient(config); + + String cleanInstantTs = getNextInstant(); + HoodieCleanMetadata cleanMetadata1 = writeClient.clean(cleanInstantTs); + + if (null == cleanMetadata1) { + return new ArrayList<>(); + } + + if (simulateRetryFailure) { + metaClient.reloadActiveTimeline() + .revertToInflight(new HoodieInstant(State.COMPLETED, HoodieTimeline.CLEAN_ACTION, cleanInstantTs)); + final HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieCleanMetadata cleanMetadata2 = writeClient.runClean(table, cleanInstantTs); + Assert.assertTrue( + Objects.equals(cleanMetadata1.getEarliestCommitToRetain(), cleanMetadata2.getEarliestCommitToRetain())); + Assert.assertEquals(new Integer(0), cleanMetadata2.getTotalFilesDeleted()); + Assert.assertEquals(cleanMetadata1.getPartitionMetadata().keySet(), + cleanMetadata2.getPartitionMetadata().keySet()); + cleanMetadata1.getPartitionMetadata().keySet().stream().forEach(k -> { + HoodieCleanPartitionMetadata p1 = cleanMetadata1.getPartitionMetadata().get(k); + HoodieCleanPartitionMetadata p2 = cleanMetadata2.getPartitionMetadata().get(k); + Assert.assertEquals(p1.getDeletePathPatterns(), p2.getDeletePathPatterns()); + Assert.assertEquals(p1.getSuccessDeleteFiles(), p2.getFailedDeleteFiles()); + Assert.assertEquals(p1.getPartitionPath(), p2.getPartitionPath()); + Assert.assertEquals(k, p1.getPartitionPath()); + }); + } + List stats = cleanMetadata1.getPartitionMetadata().values().stream() + .map(x -> new HoodieCleanStat.Builder().withPartitionPath(x.getPartitionPath()) + .withFailedDeletes(x.getFailedDeleteFiles()).withSuccessfulDeletes(x.getSuccessDeleteFiles()) + .withPolicy(HoodieCleaningPolicy.valueOf(x.getPolicy())).withDeletePathPattern(x.getDeletePathPatterns()) + .withEarliestCommitRetained(Option.ofNullable(cleanMetadata1.getEarliestCommitToRetain() != null + ? new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "000") + : null)) + .build()) + .collect(Collectors.toList()); + + return stats; + } + /** * Test HoodieTable.clean() Cleaning by versions logic */ @@ -417,7 +472,7 @@ public class TestCleaner extends TestHoodieClientBase { metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); - List hoodieCleanStatsOne = table.clean(jsc); + List hoodieCleanStatsOne = runCleaner(config); assertEquals("Must not clean any files", 0, getCleanStat(hoodieCleanStatsOne, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() .size()); @@ -441,7 +496,7 @@ public class TestCleaner extends TestHoodieClientBase { HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // update HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // update - List hoodieCleanStatsTwo = table.clean(jsc); + List hoodieCleanStatsTwo = runCleaner(config);; assertEquals("Must clean 1 file", 1, getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() .size()); @@ -467,7 +522,7 @@ public class TestCleaner extends TestHoodieClientBase { String file3P0C2 = HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002"); - List hoodieCleanStatsThree = table.clean(jsc); + List hoodieCleanStatsThree = runCleaner(config);; assertEquals("Must clean two files", 2, getCleanStat(hoodieCleanStatsThree, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) .getSuccessDeleteFiles().size()); @@ -480,7 +535,7 @@ public class TestCleaner extends TestHoodieClientBase { // No cleaning on partially written file, with no commit. HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file3P0C2); // update - List hoodieCleanStatsFour = table.clean(jsc); + List hoodieCleanStatsFour = runCleaner(config); assertEquals("Must not clean any files", 0, getCleanStat(hoodieCleanStatsFour, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() .size()); @@ -525,7 +580,7 @@ public class TestCleaner extends TestHoodieClientBase { HoodieTestUtils.createCompactionCommitFiles(fs, basePath, "001"); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); - List hoodieCleanStats = table.clean(jsc); + List hoodieCleanStats = runCleaner(config);; assertEquals("Must clean three files, one parquet and 2 log files", 3, getCleanStat(hoodieCleanStats, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() .size()); @@ -542,6 +597,22 @@ public class TestCleaner extends TestHoodieClientBase { */ @Test public void testKeepLatestCommits() throws IOException { + testKeepLatestCommits(false); + } + + /** + * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. Here the operations are simulated + * such that first clean attempt failed after files were cleaned and a subsequent cleanup succeeds. + */ + @Test + public void testKeepLatestCommitsWithFailureRetry() throws IOException { + testKeepLatestCommits(true); + } + + /** + * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. + */ + private void testKeepLatestCommits(boolean simulateFailureRetry) throws IOException { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()) @@ -558,7 +629,7 @@ public class TestCleaner extends TestHoodieClientBase { metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); - List hoodieCleanStatsOne = table.clean(jsc); + List hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry); assertEquals("Must not clean any files", 0, getCleanStat(hoodieCleanStatsOne, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() .size()); @@ -582,7 +653,7 @@ public class TestCleaner extends TestHoodieClientBase { HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // update HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // update - List hoodieCleanStatsTwo = table.clean(jsc); + List hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry); assertEquals("Must not clean any files", 0, getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() .size()); @@ -608,7 +679,7 @@ public class TestCleaner extends TestHoodieClientBase { String file3P0C2 = HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002"); - List hoodieCleanStatsThree = table.clean(jsc); + List hoodieCleanStatsThree = runCleaner(config, simulateFailureRetry); assertEquals("Must not clean any file. We have to keep 1 version before the latest commit time to keep", 0, getCleanStat(hoodieCleanStatsThree, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) .getSuccessDeleteFiles().size()); @@ -626,7 +697,7 @@ public class TestCleaner extends TestHoodieClientBase { String file4P0C3 = HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003"); - List hoodieCleanStatsFour = table.clean(jsc); + List hoodieCleanStatsFour = runCleaner(config, simulateFailureRetry); assertEquals("Must not clean one old file", 1, getCleanStat(hoodieCleanStatsFour, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() .size()); @@ -648,7 +719,7 @@ public class TestCleaner extends TestHoodieClientBase { // No cleaning on partially written file, with no commit. HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "004", file3P0C2); // update - List hoodieCleanStatsFive = table.clean(jsc); + List hoodieCleanStatsFive = runCleaner(config, simulateFailureRetry); assertEquals("Must not clean any files", 0, getCleanStat(hoodieCleanStatsFive, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() .size()); @@ -694,88 +765,10 @@ public class TestCleaner extends TestHoodieClientBase { metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); - List hoodieCleanStatsOne = table.clean(jsc); + List hoodieCleanStatsOne = runCleaner(config); assertTrue("HoodieCleanStats should be empty for a table with empty partitionPaths", hoodieCleanStatsOne.isEmpty()); } - /** - * Test Clean-by-commits behavior in the presence of skewed partitions - */ - @Test - public void testCleaningSkewedPartitons() throws IOException { - HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) - .withCompactionConfig(HoodieCompactionConfig.newBuilder() - .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()) - .build(); - Map stageOneShuffleReadTaskRecordsCountMap = new HashMap<>(); - - // Since clean involves repartition in order to uniformly distribute data, - // we can inspect the number of records read by various tasks in stage 1. - // There should not be skew in the number of records read in the task. - - // SparkListener below listens to the stage end events and captures number of - // records read by various tasks in stage-1. - jsc.sc().addSparkListener(new SparkListener() { - - @Override - public void onTaskEnd(SparkListenerTaskEnd taskEnd) { - - Iterator> iterator = taskEnd.taskMetrics().accumulators().iterator(); - while (iterator.hasNext()) { - AccumulatorV2 accumulator = iterator.next(); - if (taskEnd.stageId() == 1 && accumulator.isRegistered() && accumulator.name().isDefined() - && accumulator.name().get().equals("internal.metrics.shuffle.read.recordsRead")) { - stageOneShuffleReadTaskRecordsCountMap.put(taskEnd.taskInfo().taskId(), (Long) accumulator.value()); - } - } - } - }); - - // make 1 commit, with 100 files in one partition and 10 in other two - HoodieTestUtils.createCommitFiles(basePath, "000"); - List filesP0C0 = createFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", 100); - List filesP1C0 = createFilesInPartition(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000", 10); - List filesP2C0 = createFilesInPartition(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "000", 10); - - HoodieTestUtils.createCommitFiles(basePath, "001"); - updateAllFilesInPartition(filesP0C0, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); - updateAllFilesInPartition(filesP1C0, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001"); - updateAllFilesInPartition(filesP2C0, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "001"); - - HoodieTestUtils.createCommitFiles(basePath, "002"); - updateAllFilesInPartition(filesP0C0, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002"); - updateAllFilesInPartition(filesP1C0, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "002"); - updateAllFilesInPartition(filesP2C0, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "002"); - - HoodieTestUtils.createCommitFiles(basePath, "003"); - updateAllFilesInPartition(filesP0C0, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003"); - updateAllFilesInPartition(filesP1C0, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "003"); - updateAllFilesInPartition(filesP2C0, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "003"); - - metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); - List hoodieCleanStats = table.clean(jsc); - - assertEquals(100, getCleanStat(hoodieCleanStats, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) - .getSuccessDeleteFiles().size()); - assertEquals(10, getCleanStat(hoodieCleanStats, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH) - .getSuccessDeleteFiles().size()); - assertEquals(10, getCleanStat(hoodieCleanStats, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH) - .getSuccessDeleteFiles().size()); - - // 3 tasks are expected since the number of partitions is 3 - assertEquals(3, stageOneShuffleReadTaskRecordsCountMap.keySet().size()); - // Sum of all records processed = total number of files to clean - assertEquals(120, - stageOneShuffleReadTaskRecordsCountMap.values().stream().reduce((a, b) -> a + b).get().intValue()); - assertTrue( - "The skew in handling files to clean is not removed. " - + "Each task should handle more records than the partitionPath with least files " - + "and less records than the partitionPath with most files.", - stageOneShuffleReadTaskRecordsCountMap.values().stream().filter(a -> a > 10 && a < 100).count() == 3); - } - - /** * Test Keep Latest Commits when there are pending compactions */ @@ -794,14 +787,28 @@ public class TestCleaner extends TestHoodieClientBase { // FileId3 1 2 3 001 // FileId2 0 0 0 000 // FileId1 0 0 0 000 - testPendingCompactions(config, 48, 18); + testPendingCompactions(config, 48, 18, false); } + /** + * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. Here the operations are simulated + * such that first clean attempt failed after files were cleaned and a subsequent cleanup succeeds. + */ + @Test + public void testKeepLatestVersionsWithPendingCompactions() throws IOException { + testKeepLatestVersionsWithPendingCompactions(false); + } + + /** * Test Keep Latest Versions when there are pending compactions */ @Test - public void testKeepLatestVersionsWithPendingCompactions() throws IOException { + public void testKeepLatestVersionsWithPendingCompactionsAndFailureRetry() throws IOException { + testKeepLatestVersionsWithPendingCompactions(true); + } + + private void testKeepLatestVersionsWithPendingCompactions(boolean retryFailure) throws IOException { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) .withCompactionConfig(HoodieCompactionConfig.newBuilder() @@ -816,7 +823,7 @@ public class TestCleaner extends TestHoodieClientBase { // FileId3 0 0 0 000, 001 // FileId2 0 0 0 000 // FileId1 0 0 0 000 - testPendingCompactions(config, 36, 9); + testPendingCompactions(config, 36, 9, retryFailure); } /** @@ -825,8 +832,8 @@ public class TestCleaner extends TestHoodieClientBase { * @param config Hoodie Write Config * @param expNumFilesDeleted Number of files deleted */ - public void testPendingCompactions(HoodieWriteConfig config, int expNumFilesDeleted, - int expNumFilesUnderCompactionDeleted) throws IOException { + private void testPendingCompactions(HoodieWriteConfig config, int expNumFilesDeleted, + int expNumFilesUnderCompactionDeleted, boolean retryFailure) throws IOException { HoodieTableMetaClient metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ); String[] instants = new String[] {"000", "001", "003", "005", "007", "009", "011", "013"}; @@ -897,7 +904,7 @@ public class TestCleaner extends TestHoodieClientBase { // Clean now metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); - List hoodieCleanStats = table.clean(jsc); + List hoodieCleanStats = runCleaner(config, retryFailure); // Test for safety final HoodieTableMetaClient newMetaClient = HoodieTableMetaClient.reload(metaClient); diff --git a/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java b/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java index ff9f798c7..7274cad93 100644 --- a/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java +++ b/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java @@ -52,6 +52,7 @@ import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex.IndexType; +import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.table.HoodieTable; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -77,6 +78,10 @@ public class TestHoodieClientBase extends HoodieClientTestHarness { cleanupResources(); } + protected HoodieCleanClient getHoodieCleanClient(HoodieWriteConfig cfg) { + return new HoodieCleanClient(jsc, cfg, new HoodieMetrics(cfg, cfg.getTableName())); + } + protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) { return getHoodieWriteClient(cfg, false); } diff --git a/hudi-common/src/main/avro/HoodieCleanerPlan.avsc b/hudi-common/src/main/avro/HoodieCleanerPlan.avsc new file mode 100644 index 000000000..b87ed7718 --- /dev/null +++ b/hudi-common/src/main/avro/HoodieCleanerPlan.avsc @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "namespace": "org.apache.hudi.avro.model", + "type": "record", + "name": "HoodieCleanerPlan", + "fields": [ + { + "name": "earliestInstantToRetain", + "type":["null", { + "type": "record", + "name": "HoodieActionInstant", + "fields": [ + { + "name": "timestamp", + "type": "string" + }, + { + "name": "action", + "type": "string" + }, + { + "name": "state", + "type": "string" + } + ] + }], + "default" : null + }, + { + "name": "policy", + "type": "string" + }, + { + "name": "filesToBeDeletedPerPartition", + "type": [ + "null", { + "type":"map", + "values": { + "type":"array", + "items":{ + "name":"filePath", + "type": "string" + } + }}], + "default" : null + }, + { + "name":"version", + "type":["int", "null"], + "default": 1 + } + ] +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java b/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java index d5c07d40a..f9c39ab63 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java @@ -134,14 +134,9 @@ public class CompactionOperation implements Serializable { @Override public String toString() { - return "CompactionOperation{" - + "baseInstantTime='" + baseInstantTime + '\'' - + ", dataFileCommitTime=" + dataFileCommitTime - + ", deltaFileNames=" + deltaFileNames - + ", dataFileName=" + dataFileName - + ", id='" + id + '\'' - + ", metrics=" + metrics - + '}'; + return "CompactionOperation{" + "baseInstantTime='" + baseInstantTime + '\'' + ", dataFileCommitTime=" + + dataFileCommitTime + ", deltaFileNames=" + deltaFileNames + ", dataFileName=" + dataFileName + ", id='" + id + + '\'' + ", metrics=" + metrics + '}'; } @Override @@ -156,8 +151,7 @@ public class CompactionOperation implements Serializable { return Objects.equals(baseInstantTime, operation.baseInstantTime) && Objects.equals(dataFileCommitTime, operation.dataFileCommitTime) && Objects.equals(deltaFileNames, operation.deltaFileNames) - && Objects.equals(dataFileName, operation.dataFileName) - && Objects.equals(id, operation.id); + && Objects.equals(dataFileName, operation.dataFileName) && Objects.equals(id, operation.id); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java index 4e42c9910..ccc7e2e97 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java @@ -63,6 +63,7 @@ public interface HoodieTimeline extends Serializable { String INFLIGHT_COMMIT_EXTENSION = INFLIGHT_EXTENSION; String INFLIGHT_DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION + INFLIGHT_EXTENSION; String INFLIGHT_CLEAN_EXTENSION = "." + CLEAN_ACTION + INFLIGHT_EXTENSION; + String REQUESTED_CLEAN_EXTENSION = "." + CLEAN_ACTION + REQUESTED_EXTENSION; String INFLIGHT_ROLLBACK_EXTENSION = "." + ROLLBACK_ACTION + INFLIGHT_EXTENSION; String INFLIGHT_SAVEPOINT_EXTENSION = "." + SAVEPOINT_ACTION + INFLIGHT_EXTENSION; String REQUESTED_COMPACTION_SUFFIX = StringUtils.join(COMPACTION_ACTION, REQUESTED_EXTENSION); @@ -80,6 +81,13 @@ public interface HoodieTimeline extends Serializable { */ HoodieTimeline filterInflights(); + /** + * Filter this timeline to include requested and in-flights + * + * @return New instance of HoodieTimeline with just in-flights and requested instants + */ + HoodieTimeline filterInflightsAndRequested(); + /** * Filter this timeline to just include the in-flights excluding compaction instants * @@ -219,7 +227,19 @@ public interface HoodieTimeline extends Serializable { } static HoodieInstant getCompletedInstant(final HoodieInstant instant) { - return new HoodieInstant(false, instant.getAction(), instant.getTimestamp()); + return new HoodieInstant(State.COMPLETED, instant.getAction(), instant.getTimestamp()); + } + + static HoodieInstant getRequestedInstant(final HoodieInstant instant) { + return new HoodieInstant(State.REQUESTED, instant.getAction(), instant.getTimestamp()); + } + + static HoodieInstant getCleanRequestedInstant(final String timestamp) { + return new HoodieInstant(State.REQUESTED, CLEAN_ACTION, timestamp); + } + + static HoodieInstant getCleanInflightInstant(final String timestamp) { + return new HoodieInstant(State.INFLIGHT, CLEAN_ACTION, timestamp); } static HoodieInstant getCompactionRequestedInstant(final String timestamp) { @@ -246,6 +266,10 @@ public interface HoodieTimeline extends Serializable { return StringUtils.join(instant, HoodieTimeline.CLEAN_EXTENSION); } + static String makeRequestedCleanerFileName(String instant) { + return StringUtils.join(instant, HoodieTimeline.REQUESTED_CLEAN_EXTENSION); + } + static String makeInflightCleanerFileName(String instant) { return StringUtils.join(instant, HoodieTimeline.INFLIGHT_CLEAN_EXTENSION); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 140d4113d..412851df8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -58,10 +58,11 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { public static final SimpleDateFormat COMMIT_FORMATTER = new SimpleDateFormat("yyyyMMddHHmmss"); - public static final Set VALID_EXTENSIONS_IN_ACTIVE_TIMELINE = new HashSet<>(Arrays.asList(new String[] { - COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION, INFLIGHT_DELTA_COMMIT_EXTENSION, - SAVEPOINT_EXTENSION, INFLIGHT_SAVEPOINT_EXTENSION, CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION, - INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION})); + public static final Set VALID_EXTENSIONS_IN_ACTIVE_TIMELINE = + new HashSet<>(Arrays.asList(new String[] {COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION, + INFLIGHT_DELTA_COMMIT_EXTENSION, SAVEPOINT_EXTENSION, INFLIGHT_SAVEPOINT_EXTENSION, CLEAN_EXTENSION, + INFLIGHT_CLEAN_EXTENSION, REQUESTED_CLEAN_EXTENSION, INFLIGHT_COMPACTION_EXTENSION, + REQUESTED_COMPACTION_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION})); private static final transient Logger log = LogManager.getLogger(HoodieActiveTimeline.class); protected HoodieTableMetaClient metaClient; @@ -212,11 +213,19 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { } public void revertToInflight(HoodieInstant instant) { - log.info("Reverting instant to inflight " + instant); - revertCompleteToInflight(instant, HoodieTimeline.getInflightInstant(instant)); + log.info("Reverting " + instant + " to inflight "); + revertStateTransition(instant, HoodieTimeline.getInflightInstant(instant)); log.info("Reverted " + instant + " to inflight"); } + public HoodieInstant revertToRequested(HoodieInstant instant) { + log.warn("Reverting " + instant + " to requested "); + HoodieInstant requestedInstant = HoodieTimeline.getRequestedInstant(instant); + revertStateTransition(instant, HoodieTimeline.getRequestedInstant(instant)); + log.warn("Reverted " + instant + " to requested"); + return requestedInstant; + } + public void deleteInflight(HoodieInstant instant) { Preconditions.checkArgument(instant.isInflight()); deleteInstantFile(instant); @@ -311,6 +320,39 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { * END - COMPACTION RELATED META-DATA MANAGEMENT **/ + /** + * Transition Clean State from inflight to Committed + * + * @param inflightInstant Inflight instant + * @param data Extra Metadata + * @return commit instant + */ + public HoodieInstant transitionCleanInflightToComplete(HoodieInstant inflightInstant, Option data) { + Preconditions.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.CLEAN_ACTION)); + Preconditions.checkArgument(inflightInstant.isInflight()); + HoodieInstant commitInstant = new HoodieInstant(State.COMPLETED, CLEAN_ACTION, inflightInstant.getTimestamp()); + // First write metadata to aux folder + createFileInAuxiliaryFolder(commitInstant, data); + // Then write to timeline + transitionState(inflightInstant, commitInstant, data); + return commitInstant; + } + + /** + * Transition Clean State from requested to inflight + * + * @param requestedInstant requested instant + * @return commit instant + */ + public HoodieInstant transitionCleanRequestedToInflight(HoodieInstant requestedInstant) { + Preconditions.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.CLEAN_ACTION)); + Preconditions.checkArgument(requestedInstant.isRequested()); + HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, CLEAN_ACTION, requestedInstant.getTimestamp()); + transitionState(requestedInstant, inflight, Option.empty()); + return inflight; + } + + private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option data) { Preconditions.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp())); Path commitFilePath = new Path(metaClient.getMetaPath(), toInstant.getFileName()); @@ -327,19 +369,20 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { } } - private void revertCompleteToInflight(HoodieInstant completed, HoodieInstant inflight) { - Preconditions.checkArgument(completed.getTimestamp().equals(inflight.getTimestamp())); - Path inFlightCommitFilePath = new Path(metaClient.getMetaPath(), inflight.getFileName()); + private void revertStateTransition(HoodieInstant curr, HoodieInstant revert) { + Preconditions.checkArgument(curr.getTimestamp().equals(revert.getTimestamp())); + Path revertFilePath = new Path(metaClient.getMetaPath(), revert.getFileName()); try { - if (!metaClient.getFs().exists(inFlightCommitFilePath)) { - Path commitFilePath = new Path(metaClient.getMetaPath(), completed.getFileName()); - boolean success = metaClient.getFs().rename(commitFilePath, inFlightCommitFilePath); + if (!metaClient.getFs().exists(revertFilePath)) { + Path currFilePath = new Path(metaClient.getMetaPath(), curr.getFileName()); + boolean success = metaClient.getFs().rename(currFilePath, revertFilePath); if (!success) { - throw new HoodieIOException("Could not rename " + commitFilePath + " to " + inFlightCommitFilePath); + throw new HoodieIOException("Could not rename " + currFilePath + " to " + revertFilePath); } + log.info("Renamed " + currFilePath + " to " + revertFilePath); } } catch (IOException e) { - throw new HoodieIOException("Could not complete revert " + completed, e); + throw new HoodieIOException("Could not complete revert " + curr, e); } } @@ -355,6 +398,15 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { createFileInMetaPath(instant.getFileName(), content); } + public void saveToCleanRequested(HoodieInstant instant, Option content) { + Preconditions.checkArgument(instant.getAction().equals(HoodieTimeline.CLEAN_ACTION)); + Preconditions.checkArgument(instant.getState().equals(State.REQUESTED)); + // Write workload to auxiliary folder + createFileInAuxiliaryFolder(instant, content); + // Plan is only stored in auxiliary folder + createFileInMetaPath(instant.getFileName(), Option.empty()); + } + private void createFileInMetaPath(String filename, Option content) { Path fullPath = new Path(metaClient.getMetaPath(), filename); createFileInPath(fullPath, content); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index 937bbfc70..72318dd0d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -30,6 +30,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hudi.common.table.HoodieTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieException; @@ -83,6 +84,13 @@ public class HoodieDefaultTimeline implements HoodieTimeline { return new HoodieDefaultTimeline(instants.stream().filter(HoodieInstant::isInflight), details); } + @Override + public HoodieTimeline filterInflightsAndRequested() { + return new HoodieDefaultTimeline( + instants.stream().filter(i -> i.getState().equals(State.REQUESTED) || i.getState().equals(State.INFLIGHT)), + details); + } + @Override public HoodieTimeline filterInflightsExcludingCompaction() { return new HoodieDefaultTimeline(instants.stream().filter(instant -> { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java index e3a268301..471d2d0d2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java @@ -69,7 +69,7 @@ public class HoodieInstant implements Serializable { } else if (action.contains(HoodieTimeline.INFLIGHT_EXTENSION)) { state = State.INFLIGHT; action = action.replace(HoodieTimeline.INFLIGHT_EXTENSION, ""); - } else if (action.equals(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX)) { + } else if (action.contains(HoodieTimeline.REQUESTED_EXTENSION)) { state = State.REQUESTED; action = action.replace(HoodieTimeline.REQUESTED_EXTENSION, ""); } @@ -117,7 +117,8 @@ public class HoodieInstant implements Serializable { : HoodieTimeline.makeCommitFileName(timestamp); } else if (HoodieTimeline.CLEAN_ACTION.equals(action)) { return isInflight() ? HoodieTimeline.makeInflightCleanerFileName(timestamp) - : HoodieTimeline.makeCleanerFileName(timestamp); + : isRequested() ? HoodieTimeline.makeRequestedCleanerFileName(timestamp) + : HoodieTimeline.makeCleanerFileName(timestamp); } else if (HoodieTimeline.ROLLBACK_ACTION.equals(action)) { return isInflight() ? HoodieTimeline.makeInflightRollbackFileName(timestamp) : HoodieTimeline.makeRollbackFileName(timestamp); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java index 837d45689..8580653c8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java @@ -333,8 +333,10 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste @Override public void close() { + log.info("Closing Rocksdb !!"); closed = true; rocksDB.close(); + log.info("Closed Rocksdb !!"); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/AvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroUtils.java index a0185b922..243a1a3c7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/AvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroUtils.java @@ -36,6 +36,7 @@ import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.specific.SpecificRecordBase; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata; +import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; @@ -110,6 +111,11 @@ public class AvroUtils { return serializeAvroMetadata(compactionWorkload, HoodieCompactionPlan.class); } + + public static Option serializeCleanerPlan(HoodieCleanerPlan cleanPlan) throws IOException { + return serializeAvroMetadata(cleanPlan, HoodieCleanerPlan.class); + } + public static Option serializeCleanMetadata(HoodieCleanMetadata metadata) throws IOException { return serializeAvroMetadata(metadata, HoodieCleanMetadata.class); } @@ -137,6 +143,10 @@ public class AvroUtils { return Option.of(baos.toByteArray()); } + public static HoodieCleanerPlan deserializeCleanerPlan(byte[] bytes) throws IOException { + return deserializeAvroMetadata(bytes, HoodieCleanerPlan.class); + } + public static HoodieCompactionPlan deserializeCompactionPlan(byte[] bytes) throws IOException { return deserializeAvroMetadata(bytes, HoodieCompactionPlan.class); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index 788e6070d..7baf368e5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -141,10 +141,9 @@ public class ParquetUtils { * Read out the bloom filter from the parquet file meta data. */ public static BloomFilter readBloomFilterFromParquetMetadata(Configuration configuration, Path parquetFilePath) { - Map footerVals = - readParquetFooter(configuration, false, parquetFilePath, - HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, - HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY); + Map footerVals = readParquetFooter(configuration, false, parquetFilePath, + HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, + HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY); String footerVal = footerVals.get(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY); if (null == footerVal) { // We use old style key "com.uber.hoodie.bloomfilter" diff --git a/hudi-common/src/main/java/org/apache/hudi/common/versioning/MetadataMigrator.java b/hudi-common/src/main/java/org/apache/hudi/common/versioning/MetadataMigrator.java index e348084ca..2c2c5b58f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/versioning/MetadataMigrator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/versioning/MetadataMigrator.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.util.collection.Pair; /** * Migrates a specific metadata type stored in .hoodie folder to latest version + * * @param */ public class MetadataMigrator { @@ -36,15 +37,16 @@ public class MetadataMigrator { private final Integer oldestVersion; public MetadataMigrator(HoodieTableMetaClient metaClient, List> migratorList) { - migrators = migratorList.stream().map(m -> - Pair.of(m.getManagedVersion(), m)).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + migrators = migratorList.stream().map(m -> Pair.of(m.getManagedVersion(), m)) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); latestVersion = migrators.keySet().stream().reduce((x, y) -> x > y ? x : y).get(); oldestVersion = migrators.keySet().stream().reduce((x, y) -> x < y ? x : y).get(); } /** * Upgrade Metadata version to its latest - * @param metadata Metadata + * + * @param metadata Metadata * @param metadataVersion Current version of metadata * @return Metadata conforming to the latest version of this metadata */ @@ -64,9 +66,10 @@ public class MetadataMigrator { /** * Migrate metadata to a specific version - * @param metadata Hoodie Table Meta Client - * @param metadataVersion Metadata Version - * @param targetVersion Target Version + * + * @param metadata Hoodie Table Meta Client + * @param metadataVersion Metadata Version + * @param targetVersion Target Version * @return Metadata conforming to the target version */ public T migrateToVersion(T metadata, int metadataVersion, int targetVersion) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/versioning/VersionMigrator.java b/hudi-common/src/main/java/org/apache/hudi/common/versioning/VersionMigrator.java index cac865055..4f6269365 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/versioning/VersionMigrator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/versioning/VersionMigrator.java @@ -22,18 +22,21 @@ import java.io.Serializable; /** * Responsible for upgrading and downgrading metadata versions for a specific metadata + * * @param Metadata Type */ public interface VersionMigrator extends Serializable { /** * Version of Metadata that this class will handle + * * @return */ Integer getManagedVersion(); /** * Upgrades metadata of type T from previous version to this version + * * @param input Metadata as of previous version. * @return Metadata compatible with the version managed by this class */ @@ -41,6 +44,7 @@ public interface VersionMigrator extends Serializable { /** * Downgrades metadata of type T from next version to this version + * * @param input Metadata as of next highest version * @return Metadata compatible with the version managed by this class */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionPlanMigrator.java b/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionPlanMigrator.java index ae6525c40..f8a4fb422 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionPlanMigrator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionPlanMigrator.java @@ -29,8 +29,7 @@ import org.apache.hudi.common.versioning.MetadataMigrator; public class CompactionPlanMigrator extends MetadataMigrator { public CompactionPlanMigrator(HoodieTableMetaClient metaClient) { - super(metaClient, Arrays.asList( - new CompactionV1MigrationHandler(metaClient), - new CompactionV2MigrationHandler(metaClient))); + super(metaClient, + Arrays.asList(new CompactionV1MigrationHandler(metaClient), new CompactionV2MigrationHandler(metaClient))); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionV1MigrationHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionV1MigrationHandler.java index 893b02562..67cb21398 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionV1MigrationHandler.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionV1MigrationHandler.java @@ -52,21 +52,17 @@ public class CompactionV1MigrationHandler extends AbstractMigratorBase v1CompactionOperationList = new ArrayList<>(); if (null != input.getOperations()) { v1CompactionOperationList = input.getOperations().stream().map(inp -> { - return HoodieCompactionOperation.newBuilder() - .setBaseInstantTime(inp.getBaseInstantTime()) - .setFileId(inp.getFileId()) - .setPartitionPath(inp.getPartitionPath()) - .setMetrics(inp.getMetrics()) + return HoodieCompactionOperation.newBuilder().setBaseInstantTime(inp.getBaseInstantTime()) + .setFileId(inp.getFileId()).setPartitionPath(inp.getPartitionPath()).setMetrics(inp.getMetrics()) .setDataFilePath(convertToV1Path(basePath, inp.getPartitionPath(), inp.getDataFilePath())) - .setDeltaFilePaths(inp.getDeltaFilePaths().stream().map(s -> convertToV1Path(basePath, - inp.getPartitionPath(), s)).collect(Collectors.toList())) + .setDeltaFilePaths(inp.getDeltaFilePaths().stream() + .map(s -> convertToV1Path(basePath, inp.getPartitionPath(), s)).collect(Collectors.toList())) .build(); }).collect(Collectors.toList()); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionV2MigrationHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionV2MigrationHandler.java index 7a5416afb..e1d60ba6c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionV2MigrationHandler.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionV2MigrationHandler.java @@ -46,20 +46,15 @@ public class CompactionV2MigrationHandler extends AbstractMigratorBase v2CompactionOperationList = new ArrayList<>(); if (null != input.getOperations()) { v2CompactionOperationList = input.getOperations().stream().map(inp -> { - return HoodieCompactionOperation.newBuilder() - .setBaseInstantTime(inp.getBaseInstantTime()) - .setFileId(inp.getFileId()) - .setPartitionPath(inp.getPartitionPath()) - .setMetrics(inp.getMetrics()) - .setDataFilePath(new Path(inp.getDataFilePath()).getName()) - .setDeltaFilePaths(inp.getDeltaFilePaths().stream().map(s -> new Path(s).getName()) - .collect(Collectors.toList())) + return HoodieCompactionOperation.newBuilder().setBaseInstantTime(inp.getBaseInstantTime()) + .setFileId(inp.getFileId()).setPartitionPath(inp.getPartitionPath()).setMetrics(inp.getMetrics()) + .setDataFilePath(new Path(inp.getDataFilePath()).getName()).setDeltaFilePaths( + inp.getDeltaFilePaths().stream().map(s -> new Path(s).getName()).collect(Collectors.toList())) .build(); }).collect(Collectors.toList()); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java index 71373409a..ad269b36f 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java @@ -114,8 +114,7 @@ public class TestCompactionUtils extends HoodieCommonTestHarness { fileSlice.addLogFile( new HoodieLogFile(new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN)))); op = CompactionUtils.buildFromFileSlice(DEFAULT_PARTITION_PATHS[0], fileSlice, Option.of(metricsCaptureFn)); - testFileSliceCompactionOpEquality(fileSlice, op, DEFAULT_PARTITION_PATHS[0], - LATEST_COMPACTION_METADATA_VERSION); + testFileSliceCompactionOpEquality(fileSlice, op, DEFAULT_PARTITION_PATHS[0], LATEST_COMPACTION_METADATA_VERSION); } /** @@ -126,17 +125,17 @@ public class TestCompactionUtils extends HoodieCommonTestHarness { FileSlice emptyFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "empty1"); FileSlice fileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1"); fileSlice.setDataFile(new TestHoodieDataFile(fullPartitionPath.toString() + "/data1_1_000.parquet")); - fileSlice.addLogFile( - new HoodieLogFile(new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN))))); - fileSlice.addLogFile( - new HoodieLogFile(new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN))))); + fileSlice.addLogFile(new HoodieLogFile( + new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN))))); + fileSlice.addLogFile(new HoodieLogFile( + new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN))))); FileSlice noLogFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noLog1"); noLogFileSlice.setDataFile(new TestHoodieDataFile(fullPartitionPath.toString() + "/noLog_1_000.parquet")); FileSlice noDataFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1"); - noDataFileSlice.addLogFile( - new HoodieLogFile(new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN))))); - noDataFileSlice.addLogFile( - new HoodieLogFile(new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN))))); + noDataFileSlice.addLogFile(new HoodieLogFile( + new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN))))); + noDataFileSlice.addLogFile(new HoodieLogFile( + new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN))))); List fileSliceList = Arrays.asList(emptyFileSlice, noDataFileSlice, fileSlice, noLogFileSlice); List> input = fileSliceList.stream().map(f -> Pair.of(DEFAULT_PARTITION_PATHS[0], f)).collect(Collectors.toList()); @@ -221,9 +220,8 @@ public class TestCompactionUtils extends HoodieCommonTestHarness { */ private void testFileSlicesCompactionPlanEquality(List> input, HoodieCompactionPlan plan) { Assert.assertEquals("All file-slices present", input.size(), plan.getOperations().size()); - IntStream.range(0, input.size()).boxed().forEach(idx -> - testFileSliceCompactionOpEquality(input.get(idx).getValue(), plan.getOperations().get(idx), - input.get(idx).getKey(), plan.getVersion())); + IntStream.range(0, input.size()).boxed().forEach(idx -> testFileSliceCompactionOpEquality(input.get(idx).getValue(), + plan.getOperations().get(idx), input.get(idx).getKey(), plan.getVersion())); } /** @@ -233,15 +231,15 @@ public class TestCompactionUtils extends HoodieCommonTestHarness { * @param op HoodieCompactionOperation * @param expPartitionPath Partition path */ - private void testFileSliceCompactionOpEquality(FileSlice slice, HoodieCompactionOperation op, - String expPartitionPath, int version) { + private void testFileSliceCompactionOpEquality(FileSlice slice, HoodieCompactionOperation op, String expPartitionPath, + int version) { Assert.assertEquals("Partition path is correct", expPartitionPath, op.getPartitionPath()); Assert.assertEquals("Same base-instant", slice.getBaseInstantTime(), op.getBaseInstantTime()); Assert.assertEquals("Same file-id", slice.getFileId(), op.getFileId()); if (slice.getDataFile().isPresent()) { HoodieDataFile df = slice.getDataFile().get(); - Assert.assertEquals("Same data-file", - version == COMPACTION_METADATA_VERSION_1 ? df.getPath() : df.getFileName(), op.getDataFilePath()); + Assert.assertEquals("Same data-file", version == COMPACTION_METADATA_VERSION_1 ? df.getPath() : df.getFileName(), + op.getDataFilePath()); } List paths = slice.getLogFiles().map(l -> l.getPath().toString()).collect(Collectors.toList()); IntStream.range(0, paths.size()).boxed().forEach(idx -> { diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index fa2c8b6c5..40ac91e8e 100644 --- a/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -62,25 +62,30 @@ public class HiveSyncTool { } public void syncHoodieTable() throws ClassNotFoundException { - switch (hoodieHiveClient.getTableType()) { - case COPY_ON_WRITE: - syncHoodieTable(false); - break; - case MERGE_ON_READ: - // sync a RO table for MOR - syncHoodieTable(false); - String originalTableName = cfg.tableName; - // TODO : Make realtime table registration optional using a config param - cfg.tableName = cfg.tableName + SUFFIX_REALTIME_TABLE; - // sync a RT table for MOR - syncHoodieTable(true); - cfg.tableName = originalTableName; - break; - default: - LOG.error("Unknown table type " + hoodieHiveClient.getTableType()); - throw new InvalidDatasetException(hoodieHiveClient.getBasePath()); + try { + switch (hoodieHiveClient.getTableType()) { + case COPY_ON_WRITE: + syncHoodieTable(false); + break; + case MERGE_ON_READ: + // sync a RO table for MOR + syncHoodieTable(false); + String originalTableName = cfg.tableName; + // TODO : Make realtime table registration optional using a config param + cfg.tableName = cfg.tableName + SUFFIX_REALTIME_TABLE; + // sync a RT table for MOR + syncHoodieTable(true); + cfg.tableName = originalTableName; + break; + default: + LOG.error("Unknown table type " + hoodieHiveClient.getTableType()); + throw new InvalidDatasetException(hoodieHiveClient.getBasePath()); + } + } catch (RuntimeException re) { + LOG.error("Got runtime exception when hive syncing", re); + } finally { + hoodieHiveClient.close(); } - hoodieHiveClient.close(); } private void syncHoodieTable(boolean isRealTime) throws ClassNotFoundException { diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java index 531917310..2272757c7 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java @@ -138,9 +138,11 @@ public class TimelineService { } public void close() { + log.info("Closing Timeline Service"); this.app.stop(); this.app = null; this.fsViewsManager.close(); + log.info("Closed Timeline Service"); } public Configuration getConf() {