From 1032fc3e549aa70a8817fd4782e971de976b682a Mon Sep 17 00:00:00 2001 From: Balaji Varadarajan Date: Mon, 28 Oct 2019 18:54:48 -0700 Subject: [PATCH] [HUDI-137] Hudi cleaning state changes should be consistent with compaction actions Before this change, Cleaner performs cleaning of old file versions and then stores the deleted files in .clean files. With this setup, we will not be able to track file deletions if a cleaner fails after deleting files but before writing .clean metadata. This is fine for regular file-system view generation but Incremental timeline syncing relies on clean/commit/compaction metadata to keep a consistent file-system view. Cleaner state transitions is now similar to that of compaction. 1. Requested : HoodieWriteClient.scheduleClean() selects the list of files that needs to be deleted and stores them in metadata 2. Inflight : HoodieWriteClient marks the state to be inflight before it starts deleting 3. Completed : HoodieWriteClient marks the state after completing the deletion according to the cleaner plan --- .../hudi/cli/commands/CompactionCommand.java | 3 +- .../apache/hudi/CompactionAdminClient.java | 49 ++-- .../org/apache/hudi/HoodieCleanClient.java | 182 +++++++++++++++ .../org/apache/hudi/HoodieWriteClient.java | 57 ++--- .../embedded/EmbeddedTimelineService.java | 2 + .../org/apache/hudi/io/HoodieCleanHelper.java | 13 +- .../org/apache/hudi/io/HoodieMergeHandle.java | 5 +- .../compact/HoodieRealtimeTableCompactor.java | 57 ++--- .../compact/strategy/CompactionStrategy.java | 3 +- .../hudi/table/HoodieCopyOnWriteTable.java | 144 +++++++----- .../hudi/table/HoodieMergeOnReadTable.java | 215 +++++++++--------- .../org/apache/hudi/table/HoodieTable.java | 24 +- .../apache/hudi/table/RollbackExecutor.java | 20 +- .../apache/hudi/table/RollbackRequest.java | 8 +- .../apache/hudi/HoodieClientTestHarness.java | 6 + .../java/org/apache/hudi/TestCleaner.java | 205 +++++++++-------- .../org/apache/hudi/TestHoodieClientBase.java | 5 + .../src/main/avro/HoodieCleanerPlan.avsc | 69 ++++++ .../common/model/CompactionOperation.java | 14 +- .../hudi/common/table/HoodieTimeline.java | 26 ++- .../table/timeline/HoodieActiveTimeline.java | 80 +++++-- .../table/timeline/HoodieDefaultTimeline.java | 8 + .../common/table/timeline/HoodieInstant.java | 5 +- .../view/RocksDbBasedFileSystemView.java | 2 + .../apache/hudi/common/util/AvroUtils.java | 10 + .../apache/hudi/common/util/ParquetUtils.java | 7 +- .../common/versioning/MetadataMigrator.java | 15 +- .../common/versioning/VersionMigrator.java | 4 + .../compaction/CompactionPlanMigrator.java | 5 +- .../CompactionV1MigrationHandler.java | 14 +- .../CompactionV2MigrationHandler.java | 15 +- .../hudi/common/util/TestCompactionUtils.java | 32 ++- .../org/apache/hudi/hive/HiveSyncTool.java | 41 ++-- .../timeline/service/TimelineService.java | 2 + 34 files changed, 856 insertions(+), 491 deletions(-) create mode 100644 hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java create mode 100644 hudi-common/src/main/avro/HoodieCleanerPlan.avsc diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java index 4a9f4b780..3d1059249 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java @@ -288,8 +288,7 @@ public class CompactionCommand implements CommandMarker { String message = "\n\n\t COMPACTION PLAN " + (valid ? "VALID" : "INVALID") + "\n\n"; List rows = new ArrayList<>(); res.stream().forEach(r -> { - Comparable[] row = new Comparable[]{r.getOperation().getFileId(), - r.getOperation().getBaseInstantTime(), + Comparable[] row = new Comparable[] {r.getOperation().getFileId(), r.getOperation().getBaseInstantTime(), r.getOperation().getDataFileName().isPresent() ? r.getOperation().getDataFileName().get() : "", r.getOperation().getDeltaFileNames().size(), r.isSuccess(), r.getException().isPresent() ? r.getException().get().getMessage() : ""}; diff --git a/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java b/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java index 5401c73a4..72af619d6 100644 --- a/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java @@ -239,9 +239,8 @@ public class CompactionAdminClient extends AbstractHoodieClient { FileSlice merged = fileSystemView.getLatestMergedFileSlicesBeforeOrOn(op.getPartitionPath(), lastInstant.getTimestamp()) .filter(fs -> fs.getFileId().equals(op.getFileId())).findFirst().get(); - final int maxVersion = - op.getDeltaFileNames().stream().map(lf -> FSUtils.getFileVersionFromLog(new Path(lf))) - .reduce((x, y) -> x > y ? x : y).orElse(0); + final int maxVersion = op.getDeltaFileNames().stream().map(lf -> FSUtils.getFileVersionFromLog(new Path(lf))) + .reduce((x, y) -> x > y ? x : y).orElse(0); List logFilesToBeMoved = merged.getLogFiles().filter(lf -> lf.getLogVersion() > maxVersion).collect(Collectors.toList()); return logFilesToBeMoved.stream().map(lf -> { @@ -293,33 +292,31 @@ public class CompactionAdminClient extends AbstractHoodieClient { FileSlice fs = fileSliceOptional.get(); Option df = fs.getDataFile(); if (operation.getDataFileName().isPresent()) { - String expPath = metaClient.getFs().getFileStatus(new Path( - FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), - new Path(operation.getDataFileName().get()))).getPath() - .toString(); - Preconditions.checkArgument(df.isPresent(), "Data File must be present. File Slice was : " - + fs + ", operation :" + operation); + String expPath = metaClient.getFs() + .getFileStatus( + new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), + new Path(operation.getDataFileName().get()))) + .getPath().toString(); + Preconditions.checkArgument(df.isPresent(), + "Data File must be present. File Slice was : " + fs + ", operation :" + operation); Preconditions.checkArgument(df.get().getPath().equals(expPath), "Base Path in operation is specified as " + expPath + " but got path " + df.get().getPath()); } Set logFilesInFileSlice = fs.getLogFiles().collect(Collectors.toSet()); - Set logFilesInCompactionOp = operation.getDeltaFileNames().stream() - .map(dp -> { - try { - FileStatus[] fileStatuses = metaClient.getFs().listStatus( - new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), - new Path(dp))); - Preconditions.checkArgument(fileStatuses.length == 1, "Expect only 1 file-status"); - return new HoodieLogFile(fileStatuses[0]); - } catch (FileNotFoundException fe) { - throw new CompactionValidationException(fe.getMessage()); - } catch (IOException ioe) { - throw new HoodieIOException(ioe.getMessage(), ioe); - } - }).collect(Collectors.toSet()); - Set missing = - logFilesInCompactionOp.stream().filter(lf -> !logFilesInFileSlice.contains(lf)) - .collect(Collectors.toSet()); + Set logFilesInCompactionOp = operation.getDeltaFileNames().stream().map(dp -> { + try { + FileStatus[] fileStatuses = metaClient.getFs().listStatus(new Path( + FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), new Path(dp))); + Preconditions.checkArgument(fileStatuses.length == 1, "Expect only 1 file-status"); + return new HoodieLogFile(fileStatuses[0]); + } catch (FileNotFoundException fe) { + throw new CompactionValidationException(fe.getMessage()); + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } + }).collect(Collectors.toSet()); + Set missing = logFilesInCompactionOp.stream().filter(lf -> !logFilesInFileSlice.contains(lf)) + .collect(Collectors.toSet()); Preconditions.checkArgument(missing.isEmpty(), "All log files specified in compaction operation is not present. Missing :" + missing + ", Exp :" + logFilesInCompactionOp + ", Got :" + logFilesInFileSlice); diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java new file mode 100644 index 000000000..97c67882d --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi; + +import com.codahale.metrics.Timer; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.List; +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieCleanerPlan; +import org.apache.hudi.client.embedded.EmbeddedTimelineService; +import org.apache.hudi.common.HoodieCleanStat; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTimeline; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieInstant.State; +import org.apache.hudi.common.util.AvroUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.metrics.HoodieMetrics; +import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; + +public class HoodieCleanClient extends AbstractHoodieClient { + + private static Logger logger = LogManager.getLogger(HoodieCleanClient.class); + private final transient HoodieMetrics metrics; + + public HoodieCleanClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, HoodieMetrics metrics) { + this(jsc, clientConfig, metrics, Option.empty()); + } + + public HoodieCleanClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, HoodieMetrics metrics, + Option timelineService) { + super(jsc, clientConfig, timelineService); + this.metrics = metrics; + } + + /** + * Clean up any stale/old files/data lying around (either on file storage or index storage) based on the + * configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be + * cleaned) + */ + public void clean() throws HoodieIOException { + String startCleanTime = HoodieActiveTimeline.createNewCommitTime(); + clean(startCleanTime); + } + + /** + * Clean up any stale/old files/data lying around (either on file storage or index storage) based on the + * configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be + * cleaned) + * + * @param startCleanTime Cleaner Instant Timestamp + * @throws HoodieIOException in case of any IOException + */ + protected HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOException { + // Create a Hoodie table which encapsulated the commits and files visible + final HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); + + // If there are inflight(failed) or previously requested clean operation, first perform them + table.getCleanTimeline().filterInflightsAndRequested().getInstants().forEach(hoodieInstant -> { + logger.info("There were previously unfinished cleaner operations. Finishing Instant=" + hoodieInstant); + runClean(table, hoodieInstant.getTimestamp()); + }); + + Option cleanerPlanOpt = scheduleClean(startCleanTime); + + if (cleanerPlanOpt.isPresent()) { + HoodieCleanerPlan cleanerPlan = cleanerPlanOpt.get(); + if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null) + && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) { + final HoodieTable hoodieTable = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); + return runClean(hoodieTable, startCleanTime); + } + } + return null; + } + + /** + * Creates a Cleaner plan if there are files to be cleaned and stores them in instant file + * + * @param startCleanTime Cleaner Instant Time + * @return Cleaner Plan if generated + */ + @VisibleForTesting + protected Option scheduleClean(String startCleanTime) { + // Create a Hoodie table which encapsulated the commits and files visible + HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); + + HoodieCleanerPlan cleanerPlan = table.scheduleClean(jsc); + + if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null) + && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) { + + HoodieInstant cleanInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime); + // Save to both aux and timeline folder + try { + table.getActiveTimeline().saveToCleanRequested(cleanInstant, AvroUtils.serializeCleanerPlan(cleanerPlan)); + logger.info("Requesting Cleaning with instant time " + cleanInstant); + } catch (IOException e) { + logger.error("Got exception when saving cleaner requested file", e); + throw new HoodieIOException(e.getMessage(), e); + } + return Option.of(cleanerPlan); + } + return Option.empty(); + } + + /** + * Executes the Cleaner plan stored in the instant metadata + * + * @param table Hoodie Table + * @param cleanInstantTs Cleaner Instant Timestamp + */ + @VisibleForTesting + protected HoodieCleanMetadata runClean(HoodieTable table, String cleanInstantTs) { + HoodieInstant cleanInstant = + table.getCleanTimeline().getInstants().filter(x -> x.getTimestamp().equals(cleanInstantTs)).findFirst().get(); + + Preconditions.checkArgument( + cleanInstant.getState().equals(State.REQUESTED) || cleanInstant.getState().equals(State.INFLIGHT)); + + try { + logger.info("Cleaner started"); + final Timer.Context context = metrics.getCleanCtx(); + + if (!cleanInstant.isInflight()) { + // Mark as inflight first + cleanInstant = table.getActiveTimeline().transitionCleanRequestedToInflight(cleanInstant); + } + + List cleanStats = table.clean(jsc, cleanInstant); + + if (cleanStats.isEmpty()) { + return HoodieCleanMetadata.newBuilder().build(); + } + + // Emit metrics (duration, numFilesDeleted) if needed + Option durationInMs = Option.empty(); + if (context != null) { + durationInMs = Option.of(metrics.getDurationInMs(context.stop())); + logger.info("cleanerElaspsedTime (Minutes): " + durationInMs.get() / (1000 * 60)); + } + + // Create the metadata and save it + HoodieCleanMetadata metadata = + AvroUtils.convertCleanMetadata(cleanInstant.getTimestamp(), durationInMs, cleanStats); + logger.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"); + metrics.updateCleanMetrics(durationInMs.orElseGet(() -> -1L), metadata.getTotalFilesDeleted()); + + table.getActiveTimeline().transitionCleanInflightToComplete( + new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, cleanInstant.getTimestamp()), + AvroUtils.serializeCleanMetadata(metadata)); + logger.info("Marked clean started on " + cleanInstant.getTimestamp() + " as complete"); + return metadata; + } catch (IOException e) { + throw new HoodieIOException("Failed to clean up after commit", e); + } + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java index da1d52c99..a147df751 100644 --- a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java @@ -38,7 +38,6 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.client.embedded.EmbeddedTimelineService; -import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieDataFile; @@ -98,6 +97,7 @@ public class HoodieWriteClient extends AbstractHo private final boolean rollbackInFlight; private final transient HoodieMetrics metrics; private final transient HoodieIndex index; + private final transient HoodieCleanClient cleanClient; private transient Timer.Context writeContext = null; private transient Timer.Context compactionTimer; private transient Timer.Context indexTimer = null; @@ -131,6 +131,7 @@ public class HoodieWriteClient extends AbstractHo this.index = index; this.metrics = new HoodieMetrics(config, config.getTableName()); this.rollbackInFlight = rollbackInFlight; + this.cleanClient = new HoodieCleanClient<>(jsc, config, metrics, timelineService); } public static SparkConf registerClasses(SparkConf conf) { @@ -918,6 +919,7 @@ public class HoodieWriteClient extends AbstractHo public void close() { // Stop timeline-server if running super.close(); + this.cleanClient.close(); // Calling this here releases any resources used by your index, so make sure to finish any related operations // before this point this.index.close(); @@ -927,55 +929,24 @@ public class HoodieWriteClient extends AbstractHo * Clean up any stale/old files/data lying around (either on file storage or index storage) based on the * configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be * cleaned) + * + * @throws HoodieIOException */ public void clean() throws HoodieIOException { - String startCleanTime = HoodieActiveTimeline.createNewCommitTime(); - clean(startCleanTime); + cleanClient.clean(); } /** * Clean up any stale/old files/data lying around (either on file storage or index storage) based on the * configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be * cleaned) + * + * @param startCleanTime Cleaner Instant Timestamp + * @return + * @throws HoodieIOException in case of any IOException */ - private void clean(String startCleanTime) throws HoodieIOException { - try { - logger.info("Cleaner started"); - final Timer.Context context = metrics.getCleanCtx(); - - // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); - - List cleanStats = table.clean(jsc); - if (cleanStats.isEmpty()) { - return; - } - - // Emit metrics (duration, numFilesDeleted) if needed - Option durationInMs = Option.empty(); - if (context != null) { - durationInMs = Option.of(metrics.getDurationInMs(context.stop())); - logger.info("cleanerElaspsedTime (Minutes): " + durationInMs.get() / (1000 * 60)); - } - - // Create the metadata and save it - HoodieCleanMetadata metadata = AvroUtils.convertCleanMetadata(startCleanTime, durationInMs, cleanStats); - logger.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"); - metrics.updateCleanMetrics(durationInMs.orElseGet(() -> -1L), metadata.getTotalFilesDeleted()); - - table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, startCleanTime), - AvroUtils.serializeCleanMetadata(metadata)); - logger.info("Marked clean started on " + startCleanTime + " as complete"); - - if (!table.getActiveTimeline().getCleanerTimeline().empty()) { - // Cleanup of older cleaner meta files - // TODO - make the commit archival generic and archive clean metadata - FSUtils.deleteOlderCleanMetaFiles(fs, table.getMetaClient().getMetaPath(), - table.getActiveTimeline().getCleanerTimeline().getInstants()); - } - } catch (IOException e) { - throw new HoodieIOException("Failed to clean up after commit", e); - } + protected HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOException { + return cleanClient.clean(startCleanTime); } /** @@ -1176,8 +1147,8 @@ public class HoodieWriteClient extends AbstractHo private JavaRDD runCompaction(HoodieInstant compactionInstant, HoodieActiveTimeline activeTimeline, boolean autoCommit) throws IOException { HoodieTableMetaClient metaClient = createMetaClient(true); - HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(metaClient, - compactionInstant.getTimestamp()); + HoodieCompactionPlan compactionPlan = + CompactionUtils.getCompactionPlan(metaClient, compactionInstant.getTimestamp()); // Mark instant as compaction inflight activeTimeline.transitionCompactionRequestedToInflight(compactionInstant); diff --git a/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java b/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java index 46247c17b..9f7b03b37 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java @@ -97,9 +97,11 @@ public class EmbeddedTimelineService { public void stop() { if (null != server) { + logger.info("Closing Timeline server"); this.server.close(); this.server = null; this.viewManager = null; + logger.info("Closed Timeline server"); } } } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java index 89ab363fc..53e08bd67 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java @@ -19,6 +19,7 @@ package org.apache.hudi.io; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -52,7 +53,7 @@ import org.apache.log4j.Logger; *

* TODO: Should all cleaning be done based on {@link HoodieCommitMetadata} */ -public class HoodieCleanHelper> { +public class HoodieCleanHelper> implements Serializable { private static Logger logger = LogManager.getLogger(HoodieCleanHelper.class); @@ -114,12 +115,11 @@ public class HoodieCleanHelper> { FileSlice nextSlice = fileSliceIterator.next(); if (nextSlice.getDataFile().isPresent()) { HoodieDataFile dataFile = nextSlice.getDataFile().get(); - deletePaths.add(dataFile.getPath()); + deletePaths.add(dataFile.getFileName()); } if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { // If merge on read, then clean the log files for the commits as well - deletePaths - .addAll(nextSlice.getLogFiles().map(file -> file.getPath().toString()).collect(Collectors.toList())); + deletePaths.addAll(nextSlice.getLogFiles().map(file -> file.getFileName()).collect(Collectors.toList())); } } } @@ -187,11 +187,10 @@ public class HoodieCleanHelper> { if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline .compareTimestamps(earliestCommitToRetain.getTimestamp(), fileCommitTime, HoodieTimeline.GREATER)) { // this is a commit, that should be cleaned. - aFile.ifPresent(hoodieDataFile -> deletePaths.add(hoodieDataFile.getPath())); + aFile.ifPresent(hoodieDataFile -> deletePaths.add(hoodieDataFile.getFileName())); if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { // If merge on read, then clean the log files for the commits as well - deletePaths - .addAll(aSlice.getLogFiles().map(file -> file.getPath().toString()).collect(Collectors.toList())); + deletePaths.addAll(aSlice.getLogFiles().map(file -> file.getFileName()).collect(Collectors.toList())); } } } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 095dbd5d7..81178ec05 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -195,9 +195,8 @@ public class HoodieMergeHandle extends HoodieWrit // Load the new records in a map long memoryForMerge = config.getMaxMemoryPerPartitionMerge(); logger.info("MaxMemoryPerPartitionMerge => " + memoryForMerge); - this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, - config.getSpillableMapBasePath(), new DefaultSizeEstimator(), - new HoodieRecordSizeEstimator(originalSchema)); + this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(), + new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(originalSchema)); } catch (IOException io) { throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io); } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java b/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java index 3fece32f0..dc68c5b98 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java @@ -101,8 +101,8 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { FileSystem fs = metaClient.getFs(); Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); - log.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation - .getDeltaFileNames() + " for commit " + commitTime); + log.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames() + + " for commit " + commitTime); // TODO - FIX THIS // Reads the entire avro file. Always only specific blocks should be read from the avro file // (failure recover). @@ -115,20 +115,19 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { .filterCompletedInstants().lastInstant().get().getTimestamp(); log.info("MaxMemoryPerCompaction => " + config.getMaxMemoryPerCompaction()); - List logFiles = operation.getDeltaFileNames().stream() - .map(p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), - p).toString()).collect(toList()); - HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, - metaClient.getBasePath(), logFiles, readerSchema, maxInstantTime, - config.getMaxMemoryPerCompaction(), config.getCompactionLazyBlockReadEnabled(), + List logFiles = operation.getDeltaFileNames().stream().map( + p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString()) + .collect(toList()); + HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, metaClient.getBasePath(), logFiles, + readerSchema, maxInstantTime, config.getMaxMemoryPerCompaction(), config.getCompactionLazyBlockReadEnabled(), config.getCompactionReverseLogReadEnabled(), config.getMaxDFSStreamBufferSize(), config.getSpillableMapBasePath()); if (!scanner.iterator().hasNext()) { return Lists.newArrayList(); } - Option oldDataFileOpt = operation.getBaseFile(metaClient.getBasePath(), - operation.getPartitionPath()); + Option oldDataFileOpt = + operation.getBaseFile(metaClient.getBasePath(), operation.getPartitionPath()); // Compacting is very similar to applying updates to existing file Iterator> result; @@ -189,28 +188,22 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { RealtimeView fileSystemView = hoodieTable.getRTFileSystemView(); log.info("Compaction looking for files to compact in " + partitionPaths + " partitions"); - List operations = - jsc.parallelize(partitionPaths, partitionPaths.size()) - .flatMap((FlatMapFunction) partitionPath -> fileSystemView - .getLatestFileSlices(partitionPath) - .filter(slice -> - !fgIdsInPendingCompactions.contains(slice.getFileGroupId())) - .map( - s -> { - List logFiles = s.getLogFiles().sorted(HoodieLogFile - .getLogFileComparator()).collect(Collectors.toList()); - totalLogFiles.add((long) logFiles.size()); - totalFileSlices.add(1L); - // Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO - // for spark Map operations and collecting them finally in Avro generated classes for storing - // into meta files. - Option dataFile = s.getDataFile(); - return new CompactionOperation(dataFile, partitionPath, logFiles, - config.getCompactionStrategy().captureMetrics(config, dataFile, partitionPath, logFiles)); - }) - .filter(c -> !c.getDeltaFileNames().isEmpty()) - .collect(toList()).iterator()).collect().stream().map(CompactionUtils::buildHoodieCompactionOperation) - .collect(toList()); + List operations = jsc.parallelize(partitionPaths, partitionPaths.size()) + .flatMap((FlatMapFunction) partitionPath -> fileSystemView + .getLatestFileSlices(partitionPath) + .filter(slice -> !fgIdsInPendingCompactions.contains(slice.getFileGroupId())).map(s -> { + List logFiles = + s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); + totalLogFiles.add((long) logFiles.size()); + totalFileSlices.add(1L); + // Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO + // for spark Map operations and collecting them finally in Avro generated classes for storing + // into meta files. + Option dataFile = s.getDataFile(); + return new CompactionOperation(dataFile, partitionPath, logFiles, + config.getCompactionStrategy().captureMetrics(config, dataFile, partitionPath, logFiles)); + }).filter(c -> !c.getDeltaFileNames().isEmpty()).collect(toList()).iterator()) + .collect().stream().map(CompactionUtils::buildHoodieCompactionOperation).collect(toList()); log.info("Total of " + operations.size() + " compactions are retrieved"); log.info("Total number of latest files slices " + totalFileSlices.value()); log.info("Total number of log files " + totalLogFiles.value()); diff --git a/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/CompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/CompactionStrategy.java index 8fa2b57d5..9d7de66e0 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/CompactionStrategy.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/CompactionStrategy.java @@ -96,8 +96,7 @@ public abstract class CompactionStrategy implements Serializable { // Strategy implementation can overload this method to set specific compactor-id return HoodieCompactionPlan.newBuilder() .setOperations(orderAndFilter(writeConfig, operations, pendingCompactionPlans)) - .setVersion(CompactionUtils.LATEST_COMPACTION_METADATA_VERSION) - .build(); + .setVersion(CompactionUtils.LATEST_COMPACTION_METADATA_VERSION).build(); } /** diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java index e18578d4c..6ebdc6d48 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java @@ -19,6 +19,7 @@ package org.apache.hudi.table; import com.google.common.hash.Hashing; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; @@ -35,9 +36,12 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hudi.WriteStatus; +import org.apache.hudi.avro.model.HoodieActionInstant; +import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.HoodieRollbackStat; +import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieDataFile; import org.apache.hudi.common.model.HoodieKey; @@ -48,6 +52,7 @@ import org.apache.hudi.common.model.HoodieRollingStatMetadata; import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.AvroUtils; import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; @@ -72,10 +77,10 @@ import org.apache.parquet.hadoop.ParquetReader; import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import scala.Tuple2; + /** * Implementation of a very heavily read-optimized Hoodie Table where *

@@ -97,10 +102,12 @@ public class HoodieCopyOnWriteTable extends Hoodi Map partitionCleanStatMap = new HashMap<>(); FileSystem fs = table.getMetaClient().getFs(); + Path basePath = new Path(table.getMetaClient().getBasePath()); while (iter.hasNext()) { Tuple2 partitionDelFileTuple = iter.next(); String partitionPath = partitionDelFileTuple._1(); - String deletePathStr = partitionDelFileTuple._2(); + String delFileName = partitionDelFileTuple._2(); + String deletePathStr = new Path(new Path(basePath, partitionPath), delFileName).toString(); Boolean deletedFileResult = deleteFileAndGetResult(fs, deletePathStr); if (!partitionCleanStatMap.containsKey(partitionPath)) { partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath)); @@ -109,29 +116,24 @@ public class HoodieCopyOnWriteTable extends Hoodi partitionCleanStat.addDeleteFilePatterns(deletePathStr); partitionCleanStat.addDeletedFileResult(deletePathStr, deletedFileResult); } - return partitionCleanStatMap.entrySet().stream().map(e -> new Tuple2<>(e.getKey(), e.getValue())) .collect(Collectors.toList()).iterator(); }; } - private static PairFlatMapFunction getFilesToDeleteFunc(HoodieTable table, - HoodieWriteConfig config) { - return (PairFlatMapFunction) partitionPathToClean -> { - HoodieCleanHelper cleaner = new HoodieCleanHelper(table, config); - return cleaner.getDeletePaths(partitionPathToClean).stream() - .map(deleteFile -> new Tuple2<>(partitionPathToClean, deleteFile.toString())).iterator(); - }; - } - private static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException { Path deletePath = new Path(deletePathStr); logger.debug("Working on delete path :" + deletePath); - boolean deleteResult = fs.delete(deletePath, false); - if (deleteResult) { - logger.debug("Cleaned file at path :" + deletePath); + try { + boolean deleteResult = fs.delete(deletePath, false); + if (deleteResult) { + logger.debug("Cleaned file at path :" + deletePath); + } + return deleteResult; + } catch (FileNotFoundException fio) { + // With cleanPlan being used for retried cleaning operations, its possible to clean a file twice + return false; } - return deleteResult; } @Override @@ -268,6 +270,40 @@ public class HoodieCopyOnWriteTable extends Hoodi return handleUpsertPartition(commitTime, partition, recordItr, partitioner); } + /** + * Generates List of files to be cleaned + * + * @param jsc JavaSparkContext + * @return Cleaner Plan + */ + public HoodieCleanerPlan scheduleClean(JavaSparkContext jsc) { + try { + FileSystem fs = getMetaClient().getFs(); + + List partitionsToClean = + FSUtils.getAllPartitionPaths(fs, getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning()); + if (partitionsToClean.isEmpty()) { + logger.info("Nothing to clean here. It is already clean"); + return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build(); + } + logger.info( + "Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy()); + int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism()); + logger.info("Using cleanerParallelism: " + cleanerParallelism); + HoodieCleanHelper cleaner = new HoodieCleanHelper(this, config); + Option earliestInstant = cleaner.getEarliestCommitToRetain(); + + Map> cleanOps = jsc.parallelize(partitionsToClean, cleanerParallelism) + .map(partitionPathToClean -> Pair.of(partitionPathToClean, cleaner.getDeletePaths(partitionPathToClean))) + .collect().stream().collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + return new HoodieCleanerPlan(earliestInstant + .map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null), + config.getCleanerPolicy().name(), cleanOps, 1); + } catch (IOException e) { + throw new HoodieIOException("Failed to schedule clean operation", e); + } + } + /** * Performs cleaning of partition paths according to cleaning policy and returns the number of files cleaned. Handles * skews in partitions to clean by making files to clean as the unit of task distribution. @@ -275,17 +311,40 @@ public class HoodieCopyOnWriteTable extends Hoodi * @throws IllegalArgumentException if unknown cleaning policy is provided */ @Override - public List clean(JavaSparkContext jsc) { + public List clean(JavaSparkContext jsc, HoodieInstant cleanInstant) { try { - FileSystem fs = getMetaClient().getFs(); - List partitionsToClean = - FSUtils.getAllPartitionPaths(fs, getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning()); - logger.info("Partitions to clean up : " + partitionsToClean + ", with policy " + config.getCleanerPolicy()); - if (partitionsToClean.isEmpty()) { - logger.info("Nothing to clean here mom. It is already clean"); - return Collections.emptyList(); - } - return cleanPartitionPaths(partitionsToClean, jsc); + HoodieCleanerPlan cleanerPlan = AvroUtils.deserializeCleanerPlan(getActiveTimeline() + .getInstantAuxiliaryDetails(HoodieTimeline.getCleanRequestedInstant(cleanInstant.getTimestamp())).get()); + + int cleanerParallelism = Math.min( + (int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(x -> x.size()).count()), + config.getCleanerParallelism()); + logger.info("Using cleanerParallelism: " + cleanerParallelism); + List> partitionCleanStats = jsc + .parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream() + .flatMap(x -> x.getValue().stream().map(y -> new Tuple2(x.getKey(), y))) + .collect(Collectors.toList()), cleanerParallelism) + .mapPartitionsToPair(deleteFilesFunc(this)).reduceByKey((e1, e2) -> e1.merge(e2)).collect(); + + Map partitionCleanStatsMap = + partitionCleanStats.stream().collect(Collectors.toMap(Tuple2::_1, Tuple2::_2)); + + // Return PartitionCleanStat for each partition passed. + return cleanerPlan.getFilesToBeDeletedPerPartition().keySet().stream().map(partitionPath -> { + PartitionCleanStat partitionCleanStat = + (partitionCleanStatsMap.containsKey(partitionPath)) ? partitionCleanStatsMap.get(partitionPath) + : new PartitionCleanStat(partitionPath); + HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain(); + return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath) + .withEarliestCommitRetained(Option.ofNullable( + actionInstant != null + ? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()), + actionInstant.getAction(), actionInstant.getTimestamp()) + : null)) + .withDeletePathPattern(partitionCleanStat.deletePathPatterns) + .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles) + .withFailedDeletes(partitionCleanStat.failedDeleteFiles).build(); + }).collect(Collectors.toList()); } catch (IOException e) { throw new HoodieIOException("Failed to clean up after commit", e); } @@ -311,7 +370,7 @@ public class HoodieCopyOnWriteTable extends Hoodi logger.info("Clean out all parquet files generated for commit: " + commit); List rollbackRequests = generateRollbackRequests(instantToRollback); - //TODO: We need to persist this as rollback workload and use it in case of partial failures + // TODO: We need to persist this as rollback workload and use it in case of partial failures List stats = new RollbackExecutor(metaClient, config).performRollback(jsc, instantToRollback, rollbackRequests); @@ -321,8 +380,7 @@ public class HoodieCopyOnWriteTable extends Hoodi return stats; } - private List generateRollbackRequests(HoodieInstant instantToRollback) - throws IOException { + private List generateRollbackRequests(HoodieInstant instantToRollback) throws IOException { return FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning()).stream().map(partitionPath -> { return RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback); @@ -350,34 +408,6 @@ public class HoodieCopyOnWriteTable extends Hoodi } } - private List cleanPartitionPaths(List partitionsToClean, JavaSparkContext jsc) { - int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism()); - logger.info("Using cleanerParallelism: " + cleanerParallelism); - List> partitionCleanStats = jsc - .parallelize(partitionsToClean, cleanerParallelism).flatMapToPair(getFilesToDeleteFunc(this, config)) - .repartition(cleanerParallelism) // repartition to remove skews - .mapPartitionsToPair(deleteFilesFunc(this)).reduceByKey( - // merge partition level clean stats below - (Function2) (e1, e2) -> e1.merge(e2)) - .collect(); - - Map partitionCleanStatsMap = - partitionCleanStats.stream().collect(Collectors.toMap(Tuple2::_1, Tuple2::_2)); - - HoodieCleanHelper cleaner = new HoodieCleanHelper(this, config); - // Return PartitionCleanStat for each partition passed. - return partitionsToClean.stream().map(partitionPath -> { - PartitionCleanStat partitionCleanStat = - (partitionCleanStatsMap.containsKey(partitionPath)) ? partitionCleanStatsMap.get(partitionPath) - : new PartitionCleanStat(partitionPath); - return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath) - .withEarliestCommitRetained(cleaner.getEarliestCommitToRetain()) - .withDeletePathPattern(partitionCleanStat.deletePathPatterns) - .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles) - .withFailedDeletes(partitionCleanStat.failedDeleteFiles).build(); - }).collect(Collectors.toList()); - } - enum BucketType { UPDATE, INSERT } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java index 8dcb3bfe3..185681c4a 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java @@ -185,12 +185,12 @@ public class HoodieMergeOnReadTable extends Hoodi logger.info("Unpublished " + commit); Long startTime = System.currentTimeMillis(); List rollbackRequests = generateRollbackRequests(jsc, instantToRollback); - //TODO: We need to persist this as rollback workload and use it in case of partial failures + // TODO: We need to persist this as rollback workload and use it in case of partial failures List allRollbackStats = new RollbackExecutor(metaClient, config).performRollback(jsc, instantToRollback, rollbackRequests); // Delete Inflight instants if enabled - deleteInflightInstant(deleteInstants, this.getActiveTimeline(), new HoodieInstant(true, instantToRollback - .getAction(), instantToRollback.getTimestamp())); + deleteInflightInstant(deleteInstants, this.getActiveTimeline(), + new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp())); logger.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime)); @@ -200,6 +200,7 @@ public class HoodieMergeOnReadTable extends Hoodi /** * Generate all rollback requests that we need to perform for rolling back this action without actually performing * rolling back + * * @param jsc JavaSparkContext * @param instantToRollback Instant to Rollback * @return list of rollback requests @@ -211,92 +212,92 @@ public class HoodieMergeOnReadTable extends Hoodi List partitions = FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning()); int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1); - return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions)) - .flatMap(partitionPath -> { - HoodieActiveTimeline activeTimeline = this.getActiveTimeline().reload(); - List partitionRollbackRequests = new ArrayList<>(); - switch (instantToRollback.getAction()) { - case HoodieTimeline.COMMIT_ACTION: - logger.info("Rolling back commit action. There are higher delta commits. So only rolling back this " - + "instant"); - partitionRollbackRequests.add(RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction( - partitionPath, instantToRollback)); - break; - case HoodieTimeline.COMPACTION_ACTION: - // If there is no delta commit present after the current commit (if compaction), no action, else we - // need to make sure that a compaction commit rollback also deletes any log files written as part of the - // succeeding deltacommit. - boolean higherDeltaCommits = !activeTimeline.getDeltaCommitTimeline() - .filterCompletedInstants().findInstantsAfter(commit, 1).empty(); - if (higherDeltaCommits) { - // Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled - // and has not yet finished. In this scenario we should delete only the newly created parquet files - // and not corresponding base commit log files created with this as baseCommit since updates would - // have been written to the log files. - logger.info("Rolling back compaction. There are higher delta commits. So only deleting data files"); - partitionRollbackRequests.add(RollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction( - partitionPath, instantToRollback)); - } else { - // No deltacommits present after this compaction commit (inflight or requested). In this case, we - // can also delete any log files that were created with this compaction commit as base - // commit. - logger.info("Rolling back compaction plan. There are NO higher delta commits. So deleting both data and" - + " log files"); - partitionRollbackRequests.add( - RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, - instantToRollback)); - } - break; - case HoodieTimeline.DELTA_COMMIT_ACTION: - // -------------------------------------------------------------------------------------------------- - // (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal - // -------------------------------------------------------------------------------------------------- - // (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In - // this scenario we would want to delete these log files. - // (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario, - // HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks. - // (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is - // being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and - // and hence will end up deleting these log files. This is done so there are no orphan log files - // lying around. - // (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions - // taken in this scenario is a combination of (A.2) and (A.3) - // --------------------------------------------------------------------------------------------------- - // (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal - // --------------------------------------------------------------------------------------------------- - // (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no entries. - // In this scenario, we delete all the parquet files written for the failed commit. - // (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In - // this scenario, perform (A.1) and for updates written to log files, write rollback blocks. - // (B.3) Rollback triggered for first commit - Same as (B.1) - // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files - // as well if the base parquet file gets deleted. - try { - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( - metaClient.getCommitTimeline().getInstantDetails( - new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp())) - .get(), HoodieCommitMetadata.class); - - // In case all data was inserts and the commit failed, delete the file belonging to that commit - // We do not know fileIds for inserts (first inserts are either log files or parquet files), - // delete all files for the corresponding failed commit, if present (same as COW) - partitionRollbackRequests.add(RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction( - partitionPath, instantToRollback)); - - // append rollback blocks for updates - if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { - partitionRollbackRequests - .addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata)); - } - break; - } catch (IOException io) { - throw new UncheckedIOException("Failed to collect rollback actions for commit " + commit, io); - } - default: - break; + return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions)).flatMap(partitionPath -> { + HoodieActiveTimeline activeTimeline = this.getActiveTimeline().reload(); + List partitionRollbackRequests = new ArrayList<>(); + switch (instantToRollback.getAction()) { + case HoodieTimeline.COMMIT_ACTION: + logger.info( + "Rolling back commit action. There are higher delta commits. So only rolling back this " + "instant"); + partitionRollbackRequests.add( + RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback)); + break; + case HoodieTimeline.COMPACTION_ACTION: + // If there is no delta commit present after the current commit (if compaction), no action, else we + // need to make sure that a compaction commit rollback also deletes any log files written as part of the + // succeeding deltacommit. + boolean higherDeltaCommits = + !activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit, 1).empty(); + if (higherDeltaCommits) { + // Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled + // and has not yet finished. In this scenario we should delete only the newly created parquet files + // and not corresponding base commit log files created with this as baseCommit since updates would + // have been written to the log files. + logger.info("Rolling back compaction. There are higher delta commits. So only deleting data files"); + partitionRollbackRequests.add( + RollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction(partitionPath, instantToRollback)); + } else { + // No deltacommits present after this compaction commit (inflight or requested). In this case, we + // can also delete any log files that were created with this compaction commit as base + // commit. + logger.info("Rolling back compaction plan. There are NO higher delta commits. So deleting both data and" + + " log files"); + partitionRollbackRequests.add( + RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback)); } - return partitionRollbackRequests.iterator(); - }).filter(Objects::nonNull).collect(); + break; + case HoodieTimeline.DELTA_COMMIT_ACTION: + // -------------------------------------------------------------------------------------------------- + // (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal + // -------------------------------------------------------------------------------------------------- + // (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In + // this scenario we would want to delete these log files. + // (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario, + // HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks. + // (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is + // being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and + // and hence will end up deleting these log files. This is done so there are no orphan log files + // lying around. + // (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions + // taken in this scenario is a combination of (A.2) and (A.3) + // --------------------------------------------------------------------------------------------------- + // (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal + // --------------------------------------------------------------------------------------------------- + // (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no entries. + // In this scenario, we delete all the parquet files written for the failed commit. + // (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In + // this scenario, perform (A.1) and for updates written to log files, write rollback blocks. + // (B.3) Rollback triggered for first commit - Same as (B.1) + // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files + // as well if the base parquet file gets deleted. + try { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + metaClient.getCommitTimeline() + .getInstantDetails( + new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp())) + .get(), + HoodieCommitMetadata.class); + + // In case all data was inserts and the commit failed, delete the file belonging to that commit + // We do not know fileIds for inserts (first inserts are either log files or parquet files), + // delete all files for the corresponding failed commit, if present (same as COW) + partitionRollbackRequests.add( + RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback)); + + // append rollback blocks for updates + if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { + partitionRollbackRequests + .addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata)); + } + break; + } catch (IOException io) { + throw new UncheckedIOException("Failed to collect rollback actions for commit " + commit, io); + } + default: + break; + } + return partitionRollbackRequests.iterator(); + }).filter(Objects::nonNull).collect(); } @Override @@ -428,27 +429,27 @@ public class HoodieMergeOnReadTable extends Hoodi // baseCommit always by listing the file slice Map fileIdToBaseCommitTimeForLogMap = this.getRTFileSystemView().getLatestFileSlices(partitionPath) .collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime)); - return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream() - .filter(wStat -> { + return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat -> { - // Filter out stats without prevCommit since they are all inserts - boolean validForRollback = (wStat != null) && (wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT) - && (wStat.getPrevCommit() != null) && fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId()); + // Filter out stats without prevCommit since they are all inserts + boolean validForRollback = (wStat != null) && (wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT) + && (wStat.getPrevCommit() != null) && fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId()); - if (validForRollback) { - // For sanity, log instant time can never be less than base-commit on which we are rolling back - Preconditions.checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get( - wStat.getFileId()), rollbackInstant.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL)); - } + if (validForRollback) { + // For sanity, log instant time can never be less than base-commit on which we are rolling back + Preconditions + .checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()), + rollbackInstant.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL)); + } - return validForRollback && HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get( - // Base Ts should be strictly less. If equal (for inserts-to-logs), the caller employs another option - // to delete and we should not step on it - wStat.getFileId()), rollbackInstant.getTimestamp(), HoodieTimeline.LESSER); - }).map(wStat -> { - String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()); - return RollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, wStat.getFileId(), - baseCommitTime, rollbackInstant); - }).collect(Collectors.toList()); + return validForRollback && HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get( + // Base Ts should be strictly less. If equal (for inserts-to-logs), the caller employs another option + // to delete and we should not step on it + wStat.getFileId()), rollbackInstant.getTimestamp(), HoodieTimeline.LESSER); + }).map(wStat -> { + String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()); + return RollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, wStat.getFileId(), + baseCommitTime, rollbackInstant); + }).collect(Collectors.toList()); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java index 30f74def8..09007b984 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hudi.WriteStatus; +import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.client.utils.ClientUtils; @@ -190,6 +191,13 @@ public abstract class HoodieTable implements Seri return getActiveTimeline().getCleanerTimeline().filterCompletedInstants(); } + /** + * Get clean timeline + */ + public HoodieTimeline getCleanTimeline() { + return getActiveTimeline().getCleanerTimeline(); + } + /** * Get only the completed (no-inflights) savepoint timeline */ @@ -265,9 +273,21 @@ public abstract class HoodieTable implements Seri HoodieCompactionPlan compactionPlan); /** - * Clean partition paths according to cleaning policy and returns the number of files cleaned. + * Generates list of files that are eligible for cleaning + * + * @param jsc Java Spark Context + * @return Cleaner Plan containing list of files to be deleted. */ - public abstract List clean(JavaSparkContext jsc); + public abstract HoodieCleanerPlan scheduleClean(JavaSparkContext jsc); + + /** + * Cleans the files listed in the cleaner plan associated with clean instant + * + * @param jsc Java Spark Context + * @param cleanInstant Clean Instant + * @return list of Clean Stats + */ + public abstract List clean(JavaSparkContext jsc, HoodieInstant cleanInstant); /** * Rollback the (inflight/committed) record changes with the given commit time. Four steps: (1) Atomically unpublish diff --git a/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java index 8bdf371b8..1ba9b43a0 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java @@ -65,8 +65,8 @@ public class RollbackExecutor implements Serializable { /** * Performs all rollback actions that we have collected in parallel. */ - public List performRollback(JavaSparkContext jsc, - HoodieInstant instantToRollback, List rollbackRequests) { + public List performRollback(JavaSparkContext jsc, HoodieInstant instantToRollback, + List rollbackRequests) { SerializablePathFilter filter = (path) -> { if (path.toString().contains(".parquet")) { @@ -101,11 +101,10 @@ public class RollbackExecutor implements Serializable { Writer writer = null; boolean success = false; try { - writer = HoodieLogFormat.newWriterBuilder().onParentPath( - FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath())) + writer = HoodieLogFormat.newWriterBuilder() + .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath())) .withFileId(rollbackRequest.getFileId().get()) - .overBaseCommit(rollbackRequest.getLatestBaseInstant().get()) - .withFs(metaClient.getFs()) + .overBaseCommit(rollbackRequest.getLatestBaseInstant().get()).withFs(metaClient.getFs()) .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); // generate metadata @@ -114,8 +113,7 @@ public class RollbackExecutor implements Serializable { writer = writer.appendBlock(new HoodieCommandBlock(header)); success = true; } catch (IOException | InterruptedException io) { - throw new HoodieRollbackException( - "Failed to rollback for instant " + instantToRollback, io); + throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io); } finally { try { if (writer != null) { @@ -130,8 +128,7 @@ public class RollbackExecutor implements Serializable { // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in // cloud-storage : HUDI-168 Map filesToNumBlocksRollback = new HashMap<>(); - filesToNumBlocksRollback.put(metaClient.getFs() - .getFileStatus(writer.getLogFile().getPath()), 1L); + filesToNumBlocksRollback.put(metaClient.getFs().getFileStatus(writer.getLogFile().getPath()), 1L); return new Tuple2(rollbackRequest.getPartitionPath(), HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) .withRollbackBlockAppendResults(filesToNumBlocksRollback).build()); @@ -180,8 +177,7 @@ public class RollbackExecutor implements Serializable { * Common method used for cleaning out parquet files under a partition path during rollback of a set of commits */ private Map deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, - Map results, String partitionPath, - PathFilter filter) throws IOException { + Map results, String partitionPath, PathFilter filter) throws IOException { logger.info("Cleaning path " + partitionPath); FileSystem fs = metaClient.getFs(); FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); diff --git a/hudi-client/src/main/java/org/apache/hudi/table/RollbackRequest.java b/hudi-client/src/main/java/org/apache/hudi/table/RollbackRequest.java index 0e619883d..326f347ad 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/RollbackRequest.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/RollbackRequest.java @@ -30,9 +30,7 @@ public class RollbackRequest { * Rollback Action Types */ public enum RollbackAction { - DELETE_DATA_FILES_ONLY, - DELETE_DATA_AND_LOG_FILES, - APPEND_ROLLBACK_BLOCK + DELETE_DATA_FILES_ONLY, DELETE_DATA_AND_LOG_FILES, APPEND_ROLLBACK_BLOCK } /** @@ -60,8 +58,8 @@ public class RollbackRequest { */ private final RollbackAction rollbackAction; - public RollbackRequest(String partitionPath, HoodieInstant rollbackInstant, - Option fileId, Option latestBaseInstant, RollbackAction rollbackAction) { + public RollbackRequest(String partitionPath, HoodieInstant rollbackInstant, Option fileId, + Option latestBaseInstant, RollbackAction rollbackAction) { this.partitionPath = partitionPath; this.rollbackInstant = rollbackInstant; this.fileId = fileId; diff --git a/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java index 4b20608f6..d23b15000 100644 --- a/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java +++ b/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; @@ -52,6 +53,11 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im protected transient HoodieTestDataGenerator dataGen = null; protected transient ExecutorService executorService; protected transient HoodieTableMetaClient metaClient; + private static AtomicInteger instantGen = new AtomicInteger(1); + + public String getNextInstant() { + return String.format("%09d", instantGen.getAndIncrement()); + } // dfs protected String dfsBasePath; diff --git a/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java index 10f160786..301fdf4da 100644 --- a/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java +++ b/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeSet; import java.util.function.Predicate; @@ -37,6 +38,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.HoodieTestDataGenerator; @@ -69,12 +72,8 @@ import org.apache.hudi.table.HoodieTable; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.scheduler.SparkListener; -import org.apache.spark.scheduler.SparkListenerTaskEnd; -import org.apache.spark.util.AccumulatorV2; import org.junit.Assert; import org.junit.Test; -import scala.collection.Iterator; /** * Test Cleaning related logic @@ -396,6 +395,62 @@ public class TestCleaner extends TestHoodieClientBase { }); } + /** + * Helper to run cleaner and collect Clean Stats + * + * @param config HoodieWriteConfig + */ + private List runCleaner(HoodieWriteConfig config) { + return runCleaner(config, false); + } + + /** + * Helper to run cleaner and collect Clean Stats + * + * @param config HoodieWriteConfig + */ + private List runCleaner(HoodieWriteConfig config, boolean simulateRetryFailure) { + HoodieCleanClient writeClient = getHoodieCleanClient(config); + + String cleanInstantTs = getNextInstant(); + HoodieCleanMetadata cleanMetadata1 = writeClient.clean(cleanInstantTs); + + if (null == cleanMetadata1) { + return new ArrayList<>(); + } + + if (simulateRetryFailure) { + metaClient.reloadActiveTimeline() + .revertToInflight(new HoodieInstant(State.COMPLETED, HoodieTimeline.CLEAN_ACTION, cleanInstantTs)); + final HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieCleanMetadata cleanMetadata2 = writeClient.runClean(table, cleanInstantTs); + Assert.assertTrue( + Objects.equals(cleanMetadata1.getEarliestCommitToRetain(), cleanMetadata2.getEarliestCommitToRetain())); + Assert.assertEquals(new Integer(0), cleanMetadata2.getTotalFilesDeleted()); + Assert.assertEquals(cleanMetadata1.getPartitionMetadata().keySet(), + cleanMetadata2.getPartitionMetadata().keySet()); + cleanMetadata1.getPartitionMetadata().keySet().stream().forEach(k -> { + HoodieCleanPartitionMetadata p1 = cleanMetadata1.getPartitionMetadata().get(k); + HoodieCleanPartitionMetadata p2 = cleanMetadata2.getPartitionMetadata().get(k); + Assert.assertEquals(p1.getDeletePathPatterns(), p2.getDeletePathPatterns()); + Assert.assertEquals(p1.getSuccessDeleteFiles(), p2.getFailedDeleteFiles()); + Assert.assertEquals(p1.getPartitionPath(), p2.getPartitionPath()); + Assert.assertEquals(k, p1.getPartitionPath()); + }); + } + List stats = cleanMetadata1.getPartitionMetadata().values().stream() + .map(x -> new HoodieCleanStat.Builder().withPartitionPath(x.getPartitionPath()) + .withFailedDeletes(x.getFailedDeleteFiles()).withSuccessfulDeletes(x.getSuccessDeleteFiles()) + .withPolicy(HoodieCleaningPolicy.valueOf(x.getPolicy())).withDeletePathPattern(x.getDeletePathPatterns()) + .withEarliestCommitRetained(Option.ofNullable(cleanMetadata1.getEarliestCommitToRetain() != null + ? new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "000") + : null)) + .build()) + .collect(Collectors.toList()); + + return stats; + } + /** * Test HoodieTable.clean() Cleaning by versions logic */ @@ -417,7 +472,7 @@ public class TestCleaner extends TestHoodieClientBase { metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); - List hoodieCleanStatsOne = table.clean(jsc); + List hoodieCleanStatsOne = runCleaner(config); assertEquals("Must not clean any files", 0, getCleanStat(hoodieCleanStatsOne, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() .size()); @@ -441,7 +496,7 @@ public class TestCleaner extends TestHoodieClientBase { HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // update HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // update - List hoodieCleanStatsTwo = table.clean(jsc); + List hoodieCleanStatsTwo = runCleaner(config);; assertEquals("Must clean 1 file", 1, getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() .size()); @@ -467,7 +522,7 @@ public class TestCleaner extends TestHoodieClientBase { String file3P0C2 = HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002"); - List hoodieCleanStatsThree = table.clean(jsc); + List hoodieCleanStatsThree = runCleaner(config);; assertEquals("Must clean two files", 2, getCleanStat(hoodieCleanStatsThree, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) .getSuccessDeleteFiles().size()); @@ -480,7 +535,7 @@ public class TestCleaner extends TestHoodieClientBase { // No cleaning on partially written file, with no commit. HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file3P0C2); // update - List hoodieCleanStatsFour = table.clean(jsc); + List hoodieCleanStatsFour = runCleaner(config); assertEquals("Must not clean any files", 0, getCleanStat(hoodieCleanStatsFour, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() .size()); @@ -525,7 +580,7 @@ public class TestCleaner extends TestHoodieClientBase { HoodieTestUtils.createCompactionCommitFiles(fs, basePath, "001"); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); - List hoodieCleanStats = table.clean(jsc); + List hoodieCleanStats = runCleaner(config);; assertEquals("Must clean three files, one parquet and 2 log files", 3, getCleanStat(hoodieCleanStats, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() .size()); @@ -542,6 +597,22 @@ public class TestCleaner extends TestHoodieClientBase { */ @Test public void testKeepLatestCommits() throws IOException { + testKeepLatestCommits(false); + } + + /** + * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. Here the operations are simulated + * such that first clean attempt failed after files were cleaned and a subsequent cleanup succeeds. + */ + @Test + public void testKeepLatestCommitsWithFailureRetry() throws IOException { + testKeepLatestCommits(true); + } + + /** + * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. + */ + private void testKeepLatestCommits(boolean simulateFailureRetry) throws IOException { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()) @@ -558,7 +629,7 @@ public class TestCleaner extends TestHoodieClientBase { metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); - List hoodieCleanStatsOne = table.clean(jsc); + List hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry); assertEquals("Must not clean any files", 0, getCleanStat(hoodieCleanStatsOne, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() .size()); @@ -582,7 +653,7 @@ public class TestCleaner extends TestHoodieClientBase { HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // update HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // update - List hoodieCleanStatsTwo = table.clean(jsc); + List hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry); assertEquals("Must not clean any files", 0, getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() .size()); @@ -608,7 +679,7 @@ public class TestCleaner extends TestHoodieClientBase { String file3P0C2 = HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002"); - List hoodieCleanStatsThree = table.clean(jsc); + List hoodieCleanStatsThree = runCleaner(config, simulateFailureRetry); assertEquals("Must not clean any file. We have to keep 1 version before the latest commit time to keep", 0, getCleanStat(hoodieCleanStatsThree, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) .getSuccessDeleteFiles().size()); @@ -626,7 +697,7 @@ public class TestCleaner extends TestHoodieClientBase { String file4P0C3 = HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003"); - List hoodieCleanStatsFour = table.clean(jsc); + List hoodieCleanStatsFour = runCleaner(config, simulateFailureRetry); assertEquals("Must not clean one old file", 1, getCleanStat(hoodieCleanStatsFour, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() .size()); @@ -648,7 +719,7 @@ public class TestCleaner extends TestHoodieClientBase { // No cleaning on partially written file, with no commit. HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "004", file3P0C2); // update - List hoodieCleanStatsFive = table.clean(jsc); + List hoodieCleanStatsFive = runCleaner(config, simulateFailureRetry); assertEquals("Must not clean any files", 0, getCleanStat(hoodieCleanStatsFive, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() .size()); @@ -694,88 +765,10 @@ public class TestCleaner extends TestHoodieClientBase { metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); - List hoodieCleanStatsOne = table.clean(jsc); + List hoodieCleanStatsOne = runCleaner(config); assertTrue("HoodieCleanStats should be empty for a table with empty partitionPaths", hoodieCleanStatsOne.isEmpty()); } - /** - * Test Clean-by-commits behavior in the presence of skewed partitions - */ - @Test - public void testCleaningSkewedPartitons() throws IOException { - HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) - .withCompactionConfig(HoodieCompactionConfig.newBuilder() - .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()) - .build(); - Map stageOneShuffleReadTaskRecordsCountMap = new HashMap<>(); - - // Since clean involves repartition in order to uniformly distribute data, - // we can inspect the number of records read by various tasks in stage 1. - // There should not be skew in the number of records read in the task. - - // SparkListener below listens to the stage end events and captures number of - // records read by various tasks in stage-1. - jsc.sc().addSparkListener(new SparkListener() { - - @Override - public void onTaskEnd(SparkListenerTaskEnd taskEnd) { - - Iterator> iterator = taskEnd.taskMetrics().accumulators().iterator(); - while (iterator.hasNext()) { - AccumulatorV2 accumulator = iterator.next(); - if (taskEnd.stageId() == 1 && accumulator.isRegistered() && accumulator.name().isDefined() - && accumulator.name().get().equals("internal.metrics.shuffle.read.recordsRead")) { - stageOneShuffleReadTaskRecordsCountMap.put(taskEnd.taskInfo().taskId(), (Long) accumulator.value()); - } - } - } - }); - - // make 1 commit, with 100 files in one partition and 10 in other two - HoodieTestUtils.createCommitFiles(basePath, "000"); - List filesP0C0 = createFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", 100); - List filesP1C0 = createFilesInPartition(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000", 10); - List filesP2C0 = createFilesInPartition(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "000", 10); - - HoodieTestUtils.createCommitFiles(basePath, "001"); - updateAllFilesInPartition(filesP0C0, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); - updateAllFilesInPartition(filesP1C0, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001"); - updateAllFilesInPartition(filesP2C0, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "001"); - - HoodieTestUtils.createCommitFiles(basePath, "002"); - updateAllFilesInPartition(filesP0C0, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002"); - updateAllFilesInPartition(filesP1C0, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "002"); - updateAllFilesInPartition(filesP2C0, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "002"); - - HoodieTestUtils.createCommitFiles(basePath, "003"); - updateAllFilesInPartition(filesP0C0, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003"); - updateAllFilesInPartition(filesP1C0, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "003"); - updateAllFilesInPartition(filesP2C0, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "003"); - - metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); - List hoodieCleanStats = table.clean(jsc); - - assertEquals(100, getCleanStat(hoodieCleanStats, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) - .getSuccessDeleteFiles().size()); - assertEquals(10, getCleanStat(hoodieCleanStats, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH) - .getSuccessDeleteFiles().size()); - assertEquals(10, getCleanStat(hoodieCleanStats, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH) - .getSuccessDeleteFiles().size()); - - // 3 tasks are expected since the number of partitions is 3 - assertEquals(3, stageOneShuffleReadTaskRecordsCountMap.keySet().size()); - // Sum of all records processed = total number of files to clean - assertEquals(120, - stageOneShuffleReadTaskRecordsCountMap.values().stream().reduce((a, b) -> a + b).get().intValue()); - assertTrue( - "The skew in handling files to clean is not removed. " - + "Each task should handle more records than the partitionPath with least files " - + "and less records than the partitionPath with most files.", - stageOneShuffleReadTaskRecordsCountMap.values().stream().filter(a -> a > 10 && a < 100).count() == 3); - } - - /** * Test Keep Latest Commits when there are pending compactions */ @@ -794,14 +787,28 @@ public class TestCleaner extends TestHoodieClientBase { // FileId3 1 2 3 001 // FileId2 0 0 0 000 // FileId1 0 0 0 000 - testPendingCompactions(config, 48, 18); + testPendingCompactions(config, 48, 18, false); } + /** + * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. Here the operations are simulated + * such that first clean attempt failed after files were cleaned and a subsequent cleanup succeeds. + */ + @Test + public void testKeepLatestVersionsWithPendingCompactions() throws IOException { + testKeepLatestVersionsWithPendingCompactions(false); + } + + /** * Test Keep Latest Versions when there are pending compactions */ @Test - public void testKeepLatestVersionsWithPendingCompactions() throws IOException { + public void testKeepLatestVersionsWithPendingCompactionsAndFailureRetry() throws IOException { + testKeepLatestVersionsWithPendingCompactions(true); + } + + private void testKeepLatestVersionsWithPendingCompactions(boolean retryFailure) throws IOException { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) .withCompactionConfig(HoodieCompactionConfig.newBuilder() @@ -816,7 +823,7 @@ public class TestCleaner extends TestHoodieClientBase { // FileId3 0 0 0 000, 001 // FileId2 0 0 0 000 // FileId1 0 0 0 000 - testPendingCompactions(config, 36, 9); + testPendingCompactions(config, 36, 9, retryFailure); } /** @@ -825,8 +832,8 @@ public class TestCleaner extends TestHoodieClientBase { * @param config Hoodie Write Config * @param expNumFilesDeleted Number of files deleted */ - public void testPendingCompactions(HoodieWriteConfig config, int expNumFilesDeleted, - int expNumFilesUnderCompactionDeleted) throws IOException { + private void testPendingCompactions(HoodieWriteConfig config, int expNumFilesDeleted, + int expNumFilesUnderCompactionDeleted, boolean retryFailure) throws IOException { HoodieTableMetaClient metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ); String[] instants = new String[] {"000", "001", "003", "005", "007", "009", "011", "013"}; @@ -897,7 +904,7 @@ public class TestCleaner extends TestHoodieClientBase { // Clean now metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); - List hoodieCleanStats = table.clean(jsc); + List hoodieCleanStats = runCleaner(config, retryFailure); // Test for safety final HoodieTableMetaClient newMetaClient = HoodieTableMetaClient.reload(metaClient); diff --git a/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java b/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java index ff9f798c7..7274cad93 100644 --- a/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java +++ b/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java @@ -52,6 +52,7 @@ import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex.IndexType; +import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.table.HoodieTable; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -77,6 +78,10 @@ public class TestHoodieClientBase extends HoodieClientTestHarness { cleanupResources(); } + protected HoodieCleanClient getHoodieCleanClient(HoodieWriteConfig cfg) { + return new HoodieCleanClient(jsc, cfg, new HoodieMetrics(cfg, cfg.getTableName())); + } + protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) { return getHoodieWriteClient(cfg, false); } diff --git a/hudi-common/src/main/avro/HoodieCleanerPlan.avsc b/hudi-common/src/main/avro/HoodieCleanerPlan.avsc new file mode 100644 index 000000000..b87ed7718 --- /dev/null +++ b/hudi-common/src/main/avro/HoodieCleanerPlan.avsc @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "namespace": "org.apache.hudi.avro.model", + "type": "record", + "name": "HoodieCleanerPlan", + "fields": [ + { + "name": "earliestInstantToRetain", + "type":["null", { + "type": "record", + "name": "HoodieActionInstant", + "fields": [ + { + "name": "timestamp", + "type": "string" + }, + { + "name": "action", + "type": "string" + }, + { + "name": "state", + "type": "string" + } + ] + }], + "default" : null + }, + { + "name": "policy", + "type": "string" + }, + { + "name": "filesToBeDeletedPerPartition", + "type": [ + "null", { + "type":"map", + "values": { + "type":"array", + "items":{ + "name":"filePath", + "type": "string" + } + }}], + "default" : null + }, + { + "name":"version", + "type":["int", "null"], + "default": 1 + } + ] +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java b/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java index d5c07d40a..f9c39ab63 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java @@ -134,14 +134,9 @@ public class CompactionOperation implements Serializable { @Override public String toString() { - return "CompactionOperation{" - + "baseInstantTime='" + baseInstantTime + '\'' - + ", dataFileCommitTime=" + dataFileCommitTime - + ", deltaFileNames=" + deltaFileNames - + ", dataFileName=" + dataFileName - + ", id='" + id + '\'' - + ", metrics=" + metrics - + '}'; + return "CompactionOperation{" + "baseInstantTime='" + baseInstantTime + '\'' + ", dataFileCommitTime=" + + dataFileCommitTime + ", deltaFileNames=" + deltaFileNames + ", dataFileName=" + dataFileName + ", id='" + id + + '\'' + ", metrics=" + metrics + '}'; } @Override @@ -156,8 +151,7 @@ public class CompactionOperation implements Serializable { return Objects.equals(baseInstantTime, operation.baseInstantTime) && Objects.equals(dataFileCommitTime, operation.dataFileCommitTime) && Objects.equals(deltaFileNames, operation.deltaFileNames) - && Objects.equals(dataFileName, operation.dataFileName) - && Objects.equals(id, operation.id); + && Objects.equals(dataFileName, operation.dataFileName) && Objects.equals(id, operation.id); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java index 4e42c9910..ccc7e2e97 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java @@ -63,6 +63,7 @@ public interface HoodieTimeline extends Serializable { String INFLIGHT_COMMIT_EXTENSION = INFLIGHT_EXTENSION; String INFLIGHT_DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION + INFLIGHT_EXTENSION; String INFLIGHT_CLEAN_EXTENSION = "." + CLEAN_ACTION + INFLIGHT_EXTENSION; + String REQUESTED_CLEAN_EXTENSION = "." + CLEAN_ACTION + REQUESTED_EXTENSION; String INFLIGHT_ROLLBACK_EXTENSION = "." + ROLLBACK_ACTION + INFLIGHT_EXTENSION; String INFLIGHT_SAVEPOINT_EXTENSION = "." + SAVEPOINT_ACTION + INFLIGHT_EXTENSION; String REQUESTED_COMPACTION_SUFFIX = StringUtils.join(COMPACTION_ACTION, REQUESTED_EXTENSION); @@ -80,6 +81,13 @@ public interface HoodieTimeline extends Serializable { */ HoodieTimeline filterInflights(); + /** + * Filter this timeline to include requested and in-flights + * + * @return New instance of HoodieTimeline with just in-flights and requested instants + */ + HoodieTimeline filterInflightsAndRequested(); + /** * Filter this timeline to just include the in-flights excluding compaction instants * @@ -219,7 +227,19 @@ public interface HoodieTimeline extends Serializable { } static HoodieInstant getCompletedInstant(final HoodieInstant instant) { - return new HoodieInstant(false, instant.getAction(), instant.getTimestamp()); + return new HoodieInstant(State.COMPLETED, instant.getAction(), instant.getTimestamp()); + } + + static HoodieInstant getRequestedInstant(final HoodieInstant instant) { + return new HoodieInstant(State.REQUESTED, instant.getAction(), instant.getTimestamp()); + } + + static HoodieInstant getCleanRequestedInstant(final String timestamp) { + return new HoodieInstant(State.REQUESTED, CLEAN_ACTION, timestamp); + } + + static HoodieInstant getCleanInflightInstant(final String timestamp) { + return new HoodieInstant(State.INFLIGHT, CLEAN_ACTION, timestamp); } static HoodieInstant getCompactionRequestedInstant(final String timestamp) { @@ -246,6 +266,10 @@ public interface HoodieTimeline extends Serializable { return StringUtils.join(instant, HoodieTimeline.CLEAN_EXTENSION); } + static String makeRequestedCleanerFileName(String instant) { + return StringUtils.join(instant, HoodieTimeline.REQUESTED_CLEAN_EXTENSION); + } + static String makeInflightCleanerFileName(String instant) { return StringUtils.join(instant, HoodieTimeline.INFLIGHT_CLEAN_EXTENSION); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 140d4113d..412851df8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -58,10 +58,11 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { public static final SimpleDateFormat COMMIT_FORMATTER = new SimpleDateFormat("yyyyMMddHHmmss"); - public static final Set VALID_EXTENSIONS_IN_ACTIVE_TIMELINE = new HashSet<>(Arrays.asList(new String[] { - COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION, INFLIGHT_DELTA_COMMIT_EXTENSION, - SAVEPOINT_EXTENSION, INFLIGHT_SAVEPOINT_EXTENSION, CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION, - INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION})); + public static final Set VALID_EXTENSIONS_IN_ACTIVE_TIMELINE = + new HashSet<>(Arrays.asList(new String[] {COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION, + INFLIGHT_DELTA_COMMIT_EXTENSION, SAVEPOINT_EXTENSION, INFLIGHT_SAVEPOINT_EXTENSION, CLEAN_EXTENSION, + INFLIGHT_CLEAN_EXTENSION, REQUESTED_CLEAN_EXTENSION, INFLIGHT_COMPACTION_EXTENSION, + REQUESTED_COMPACTION_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION})); private static final transient Logger log = LogManager.getLogger(HoodieActiveTimeline.class); protected HoodieTableMetaClient metaClient; @@ -212,11 +213,19 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { } public void revertToInflight(HoodieInstant instant) { - log.info("Reverting instant to inflight " + instant); - revertCompleteToInflight(instant, HoodieTimeline.getInflightInstant(instant)); + log.info("Reverting " + instant + " to inflight "); + revertStateTransition(instant, HoodieTimeline.getInflightInstant(instant)); log.info("Reverted " + instant + " to inflight"); } + public HoodieInstant revertToRequested(HoodieInstant instant) { + log.warn("Reverting " + instant + " to requested "); + HoodieInstant requestedInstant = HoodieTimeline.getRequestedInstant(instant); + revertStateTransition(instant, HoodieTimeline.getRequestedInstant(instant)); + log.warn("Reverted " + instant + " to requested"); + return requestedInstant; + } + public void deleteInflight(HoodieInstant instant) { Preconditions.checkArgument(instant.isInflight()); deleteInstantFile(instant); @@ -311,6 +320,39 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { * END - COMPACTION RELATED META-DATA MANAGEMENT **/ + /** + * Transition Clean State from inflight to Committed + * + * @param inflightInstant Inflight instant + * @param data Extra Metadata + * @return commit instant + */ + public HoodieInstant transitionCleanInflightToComplete(HoodieInstant inflightInstant, Option data) { + Preconditions.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.CLEAN_ACTION)); + Preconditions.checkArgument(inflightInstant.isInflight()); + HoodieInstant commitInstant = new HoodieInstant(State.COMPLETED, CLEAN_ACTION, inflightInstant.getTimestamp()); + // First write metadata to aux folder + createFileInAuxiliaryFolder(commitInstant, data); + // Then write to timeline + transitionState(inflightInstant, commitInstant, data); + return commitInstant; + } + + /** + * Transition Clean State from requested to inflight + * + * @param requestedInstant requested instant + * @return commit instant + */ + public HoodieInstant transitionCleanRequestedToInflight(HoodieInstant requestedInstant) { + Preconditions.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.CLEAN_ACTION)); + Preconditions.checkArgument(requestedInstant.isRequested()); + HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, CLEAN_ACTION, requestedInstant.getTimestamp()); + transitionState(requestedInstant, inflight, Option.empty()); + return inflight; + } + + private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option data) { Preconditions.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp())); Path commitFilePath = new Path(metaClient.getMetaPath(), toInstant.getFileName()); @@ -327,19 +369,20 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { } } - private void revertCompleteToInflight(HoodieInstant completed, HoodieInstant inflight) { - Preconditions.checkArgument(completed.getTimestamp().equals(inflight.getTimestamp())); - Path inFlightCommitFilePath = new Path(metaClient.getMetaPath(), inflight.getFileName()); + private void revertStateTransition(HoodieInstant curr, HoodieInstant revert) { + Preconditions.checkArgument(curr.getTimestamp().equals(revert.getTimestamp())); + Path revertFilePath = new Path(metaClient.getMetaPath(), revert.getFileName()); try { - if (!metaClient.getFs().exists(inFlightCommitFilePath)) { - Path commitFilePath = new Path(metaClient.getMetaPath(), completed.getFileName()); - boolean success = metaClient.getFs().rename(commitFilePath, inFlightCommitFilePath); + if (!metaClient.getFs().exists(revertFilePath)) { + Path currFilePath = new Path(metaClient.getMetaPath(), curr.getFileName()); + boolean success = metaClient.getFs().rename(currFilePath, revertFilePath); if (!success) { - throw new HoodieIOException("Could not rename " + commitFilePath + " to " + inFlightCommitFilePath); + throw new HoodieIOException("Could not rename " + currFilePath + " to " + revertFilePath); } + log.info("Renamed " + currFilePath + " to " + revertFilePath); } } catch (IOException e) { - throw new HoodieIOException("Could not complete revert " + completed, e); + throw new HoodieIOException("Could not complete revert " + curr, e); } } @@ -355,6 +398,15 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { createFileInMetaPath(instant.getFileName(), content); } + public void saveToCleanRequested(HoodieInstant instant, Option content) { + Preconditions.checkArgument(instant.getAction().equals(HoodieTimeline.CLEAN_ACTION)); + Preconditions.checkArgument(instant.getState().equals(State.REQUESTED)); + // Write workload to auxiliary folder + createFileInAuxiliaryFolder(instant, content); + // Plan is only stored in auxiliary folder + createFileInMetaPath(instant.getFileName(), Option.empty()); + } + private void createFileInMetaPath(String filename, Option content) { Path fullPath = new Path(metaClient.getMetaPath(), filename); createFileInPath(fullPath, content); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index 937bbfc70..72318dd0d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -30,6 +30,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hudi.common.table.HoodieTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieException; @@ -83,6 +84,13 @@ public class HoodieDefaultTimeline implements HoodieTimeline { return new HoodieDefaultTimeline(instants.stream().filter(HoodieInstant::isInflight), details); } + @Override + public HoodieTimeline filterInflightsAndRequested() { + return new HoodieDefaultTimeline( + instants.stream().filter(i -> i.getState().equals(State.REQUESTED) || i.getState().equals(State.INFLIGHT)), + details); + } + @Override public HoodieTimeline filterInflightsExcludingCompaction() { return new HoodieDefaultTimeline(instants.stream().filter(instant -> { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java index e3a268301..471d2d0d2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java @@ -69,7 +69,7 @@ public class HoodieInstant implements Serializable { } else if (action.contains(HoodieTimeline.INFLIGHT_EXTENSION)) { state = State.INFLIGHT; action = action.replace(HoodieTimeline.INFLIGHT_EXTENSION, ""); - } else if (action.equals(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX)) { + } else if (action.contains(HoodieTimeline.REQUESTED_EXTENSION)) { state = State.REQUESTED; action = action.replace(HoodieTimeline.REQUESTED_EXTENSION, ""); } @@ -117,7 +117,8 @@ public class HoodieInstant implements Serializable { : HoodieTimeline.makeCommitFileName(timestamp); } else if (HoodieTimeline.CLEAN_ACTION.equals(action)) { return isInflight() ? HoodieTimeline.makeInflightCleanerFileName(timestamp) - : HoodieTimeline.makeCleanerFileName(timestamp); + : isRequested() ? HoodieTimeline.makeRequestedCleanerFileName(timestamp) + : HoodieTimeline.makeCleanerFileName(timestamp); } else if (HoodieTimeline.ROLLBACK_ACTION.equals(action)) { return isInflight() ? HoodieTimeline.makeInflightRollbackFileName(timestamp) : HoodieTimeline.makeRollbackFileName(timestamp); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java index 837d45689..8580653c8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java @@ -333,8 +333,10 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste @Override public void close() { + log.info("Closing Rocksdb !!"); closed = true; rocksDB.close(); + log.info("Closed Rocksdb !!"); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/AvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroUtils.java index a0185b922..243a1a3c7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/AvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroUtils.java @@ -36,6 +36,7 @@ import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.specific.SpecificRecordBase; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata; +import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; @@ -110,6 +111,11 @@ public class AvroUtils { return serializeAvroMetadata(compactionWorkload, HoodieCompactionPlan.class); } + + public static Option serializeCleanerPlan(HoodieCleanerPlan cleanPlan) throws IOException { + return serializeAvroMetadata(cleanPlan, HoodieCleanerPlan.class); + } + public static Option serializeCleanMetadata(HoodieCleanMetadata metadata) throws IOException { return serializeAvroMetadata(metadata, HoodieCleanMetadata.class); } @@ -137,6 +143,10 @@ public class AvroUtils { return Option.of(baos.toByteArray()); } + public static HoodieCleanerPlan deserializeCleanerPlan(byte[] bytes) throws IOException { + return deserializeAvroMetadata(bytes, HoodieCleanerPlan.class); + } + public static HoodieCompactionPlan deserializeCompactionPlan(byte[] bytes) throws IOException { return deserializeAvroMetadata(bytes, HoodieCompactionPlan.class); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index 788e6070d..7baf368e5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -141,10 +141,9 @@ public class ParquetUtils { * Read out the bloom filter from the parquet file meta data. */ public static BloomFilter readBloomFilterFromParquetMetadata(Configuration configuration, Path parquetFilePath) { - Map footerVals = - readParquetFooter(configuration, false, parquetFilePath, - HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, - HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY); + Map footerVals = readParquetFooter(configuration, false, parquetFilePath, + HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, + HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY); String footerVal = footerVals.get(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY); if (null == footerVal) { // We use old style key "com.uber.hoodie.bloomfilter" diff --git a/hudi-common/src/main/java/org/apache/hudi/common/versioning/MetadataMigrator.java b/hudi-common/src/main/java/org/apache/hudi/common/versioning/MetadataMigrator.java index e348084ca..2c2c5b58f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/versioning/MetadataMigrator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/versioning/MetadataMigrator.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.util.collection.Pair; /** * Migrates a specific metadata type stored in .hoodie folder to latest version + * * @param */ public class MetadataMigrator { @@ -36,15 +37,16 @@ public class MetadataMigrator { private final Integer oldestVersion; public MetadataMigrator(HoodieTableMetaClient metaClient, List> migratorList) { - migrators = migratorList.stream().map(m -> - Pair.of(m.getManagedVersion(), m)).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + migrators = migratorList.stream().map(m -> Pair.of(m.getManagedVersion(), m)) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); latestVersion = migrators.keySet().stream().reduce((x, y) -> x > y ? x : y).get(); oldestVersion = migrators.keySet().stream().reduce((x, y) -> x < y ? x : y).get(); } /** * Upgrade Metadata version to its latest - * @param metadata Metadata + * + * @param metadata Metadata * @param metadataVersion Current version of metadata * @return Metadata conforming to the latest version of this metadata */ @@ -64,9 +66,10 @@ public class MetadataMigrator { /** * Migrate metadata to a specific version - * @param metadata Hoodie Table Meta Client - * @param metadataVersion Metadata Version - * @param targetVersion Target Version + * + * @param metadata Hoodie Table Meta Client + * @param metadataVersion Metadata Version + * @param targetVersion Target Version * @return Metadata conforming to the target version */ public T migrateToVersion(T metadata, int metadataVersion, int targetVersion) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/versioning/VersionMigrator.java b/hudi-common/src/main/java/org/apache/hudi/common/versioning/VersionMigrator.java index cac865055..4f6269365 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/versioning/VersionMigrator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/versioning/VersionMigrator.java @@ -22,18 +22,21 @@ import java.io.Serializable; /** * Responsible for upgrading and downgrading metadata versions for a specific metadata + * * @param Metadata Type */ public interface VersionMigrator extends Serializable { /** * Version of Metadata that this class will handle + * * @return */ Integer getManagedVersion(); /** * Upgrades metadata of type T from previous version to this version + * * @param input Metadata as of previous version. * @return Metadata compatible with the version managed by this class */ @@ -41,6 +44,7 @@ public interface VersionMigrator extends Serializable { /** * Downgrades metadata of type T from next version to this version + * * @param input Metadata as of next highest version * @return Metadata compatible with the version managed by this class */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionPlanMigrator.java b/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionPlanMigrator.java index ae6525c40..f8a4fb422 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionPlanMigrator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionPlanMigrator.java @@ -29,8 +29,7 @@ import org.apache.hudi.common.versioning.MetadataMigrator; public class CompactionPlanMigrator extends MetadataMigrator { public CompactionPlanMigrator(HoodieTableMetaClient metaClient) { - super(metaClient, Arrays.asList( - new CompactionV1MigrationHandler(metaClient), - new CompactionV2MigrationHandler(metaClient))); + super(metaClient, + Arrays.asList(new CompactionV1MigrationHandler(metaClient), new CompactionV2MigrationHandler(metaClient))); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionV1MigrationHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionV1MigrationHandler.java index 893b02562..67cb21398 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionV1MigrationHandler.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionV1MigrationHandler.java @@ -52,21 +52,17 @@ public class CompactionV1MigrationHandler extends AbstractMigratorBase v1CompactionOperationList = new ArrayList<>(); if (null != input.getOperations()) { v1CompactionOperationList = input.getOperations().stream().map(inp -> { - return HoodieCompactionOperation.newBuilder() - .setBaseInstantTime(inp.getBaseInstantTime()) - .setFileId(inp.getFileId()) - .setPartitionPath(inp.getPartitionPath()) - .setMetrics(inp.getMetrics()) + return HoodieCompactionOperation.newBuilder().setBaseInstantTime(inp.getBaseInstantTime()) + .setFileId(inp.getFileId()).setPartitionPath(inp.getPartitionPath()).setMetrics(inp.getMetrics()) .setDataFilePath(convertToV1Path(basePath, inp.getPartitionPath(), inp.getDataFilePath())) - .setDeltaFilePaths(inp.getDeltaFilePaths().stream().map(s -> convertToV1Path(basePath, - inp.getPartitionPath(), s)).collect(Collectors.toList())) + .setDeltaFilePaths(inp.getDeltaFilePaths().stream() + .map(s -> convertToV1Path(basePath, inp.getPartitionPath(), s)).collect(Collectors.toList())) .build(); }).collect(Collectors.toList()); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionV2MigrationHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionV2MigrationHandler.java index 7a5416afb..e1d60ba6c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionV2MigrationHandler.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionV2MigrationHandler.java @@ -46,20 +46,15 @@ public class CompactionV2MigrationHandler extends AbstractMigratorBase v2CompactionOperationList = new ArrayList<>(); if (null != input.getOperations()) { v2CompactionOperationList = input.getOperations().stream().map(inp -> { - return HoodieCompactionOperation.newBuilder() - .setBaseInstantTime(inp.getBaseInstantTime()) - .setFileId(inp.getFileId()) - .setPartitionPath(inp.getPartitionPath()) - .setMetrics(inp.getMetrics()) - .setDataFilePath(new Path(inp.getDataFilePath()).getName()) - .setDeltaFilePaths(inp.getDeltaFilePaths().stream().map(s -> new Path(s).getName()) - .collect(Collectors.toList())) + return HoodieCompactionOperation.newBuilder().setBaseInstantTime(inp.getBaseInstantTime()) + .setFileId(inp.getFileId()).setPartitionPath(inp.getPartitionPath()).setMetrics(inp.getMetrics()) + .setDataFilePath(new Path(inp.getDataFilePath()).getName()).setDeltaFilePaths( + inp.getDeltaFilePaths().stream().map(s -> new Path(s).getName()).collect(Collectors.toList())) .build(); }).collect(Collectors.toList()); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java index 71373409a..ad269b36f 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java @@ -114,8 +114,7 @@ public class TestCompactionUtils extends HoodieCommonTestHarness { fileSlice.addLogFile( new HoodieLogFile(new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN)))); op = CompactionUtils.buildFromFileSlice(DEFAULT_PARTITION_PATHS[0], fileSlice, Option.of(metricsCaptureFn)); - testFileSliceCompactionOpEquality(fileSlice, op, DEFAULT_PARTITION_PATHS[0], - LATEST_COMPACTION_METADATA_VERSION); + testFileSliceCompactionOpEquality(fileSlice, op, DEFAULT_PARTITION_PATHS[0], LATEST_COMPACTION_METADATA_VERSION); } /** @@ -126,17 +125,17 @@ public class TestCompactionUtils extends HoodieCommonTestHarness { FileSlice emptyFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "empty1"); FileSlice fileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1"); fileSlice.setDataFile(new TestHoodieDataFile(fullPartitionPath.toString() + "/data1_1_000.parquet")); - fileSlice.addLogFile( - new HoodieLogFile(new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN))))); - fileSlice.addLogFile( - new HoodieLogFile(new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN))))); + fileSlice.addLogFile(new HoodieLogFile( + new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN))))); + fileSlice.addLogFile(new HoodieLogFile( + new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN))))); FileSlice noLogFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noLog1"); noLogFileSlice.setDataFile(new TestHoodieDataFile(fullPartitionPath.toString() + "/noLog_1_000.parquet")); FileSlice noDataFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1"); - noDataFileSlice.addLogFile( - new HoodieLogFile(new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN))))); - noDataFileSlice.addLogFile( - new HoodieLogFile(new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN))))); + noDataFileSlice.addLogFile(new HoodieLogFile( + new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN))))); + noDataFileSlice.addLogFile(new HoodieLogFile( + new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN))))); List fileSliceList = Arrays.asList(emptyFileSlice, noDataFileSlice, fileSlice, noLogFileSlice); List> input = fileSliceList.stream().map(f -> Pair.of(DEFAULT_PARTITION_PATHS[0], f)).collect(Collectors.toList()); @@ -221,9 +220,8 @@ public class TestCompactionUtils extends HoodieCommonTestHarness { */ private void testFileSlicesCompactionPlanEquality(List> input, HoodieCompactionPlan plan) { Assert.assertEquals("All file-slices present", input.size(), plan.getOperations().size()); - IntStream.range(0, input.size()).boxed().forEach(idx -> - testFileSliceCompactionOpEquality(input.get(idx).getValue(), plan.getOperations().get(idx), - input.get(idx).getKey(), plan.getVersion())); + IntStream.range(0, input.size()).boxed().forEach(idx -> testFileSliceCompactionOpEquality(input.get(idx).getValue(), + plan.getOperations().get(idx), input.get(idx).getKey(), plan.getVersion())); } /** @@ -233,15 +231,15 @@ public class TestCompactionUtils extends HoodieCommonTestHarness { * @param op HoodieCompactionOperation * @param expPartitionPath Partition path */ - private void testFileSliceCompactionOpEquality(FileSlice slice, HoodieCompactionOperation op, - String expPartitionPath, int version) { + private void testFileSliceCompactionOpEquality(FileSlice slice, HoodieCompactionOperation op, String expPartitionPath, + int version) { Assert.assertEquals("Partition path is correct", expPartitionPath, op.getPartitionPath()); Assert.assertEquals("Same base-instant", slice.getBaseInstantTime(), op.getBaseInstantTime()); Assert.assertEquals("Same file-id", slice.getFileId(), op.getFileId()); if (slice.getDataFile().isPresent()) { HoodieDataFile df = slice.getDataFile().get(); - Assert.assertEquals("Same data-file", - version == COMPACTION_METADATA_VERSION_1 ? df.getPath() : df.getFileName(), op.getDataFilePath()); + Assert.assertEquals("Same data-file", version == COMPACTION_METADATA_VERSION_1 ? df.getPath() : df.getFileName(), + op.getDataFilePath()); } List paths = slice.getLogFiles().map(l -> l.getPath().toString()).collect(Collectors.toList()); IntStream.range(0, paths.size()).boxed().forEach(idx -> { diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index fa2c8b6c5..40ac91e8e 100644 --- a/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -62,25 +62,30 @@ public class HiveSyncTool { } public void syncHoodieTable() throws ClassNotFoundException { - switch (hoodieHiveClient.getTableType()) { - case COPY_ON_WRITE: - syncHoodieTable(false); - break; - case MERGE_ON_READ: - // sync a RO table for MOR - syncHoodieTable(false); - String originalTableName = cfg.tableName; - // TODO : Make realtime table registration optional using a config param - cfg.tableName = cfg.tableName + SUFFIX_REALTIME_TABLE; - // sync a RT table for MOR - syncHoodieTable(true); - cfg.tableName = originalTableName; - break; - default: - LOG.error("Unknown table type " + hoodieHiveClient.getTableType()); - throw new InvalidDatasetException(hoodieHiveClient.getBasePath()); + try { + switch (hoodieHiveClient.getTableType()) { + case COPY_ON_WRITE: + syncHoodieTable(false); + break; + case MERGE_ON_READ: + // sync a RO table for MOR + syncHoodieTable(false); + String originalTableName = cfg.tableName; + // TODO : Make realtime table registration optional using a config param + cfg.tableName = cfg.tableName + SUFFIX_REALTIME_TABLE; + // sync a RT table for MOR + syncHoodieTable(true); + cfg.tableName = originalTableName; + break; + default: + LOG.error("Unknown table type " + hoodieHiveClient.getTableType()); + throw new InvalidDatasetException(hoodieHiveClient.getBasePath()); + } + } catch (RuntimeException re) { + LOG.error("Got runtime exception when hive syncing", re); + } finally { + hoodieHiveClient.close(); } - hoodieHiveClient.close(); } private void syncHoodieTable(boolean isRealTime) throws ClassNotFoundException { diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java index 531917310..2272757c7 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java @@ -138,9 +138,11 @@ public class TimelineService { } public void close() { + log.info("Closing Timeline Service"); this.app.stop(); this.app = null; this.fsViewsManager.close(); + log.info("Closed Timeline Service"); } public Configuration getConf() {