1
0

[HUDI-137] Hudi cleaning state changes should be consistent with compaction actions

Before this change, Cleaner performs cleaning of old file versions and then stores the deleted files in .clean files.
With this setup, we will not be able to track file deletions if a cleaner fails after deleting files but before writing .clean metadata.
This is fine for regular file-system view generation but Incremental timeline syncing relies on clean/commit/compaction metadata to keep a consistent file-system view.

Cleaner state transitions is now similar to that of compaction.

1. Requested : HoodieWriteClient.scheduleClean() selects the list of files that needs to be deleted and stores them in metadata
2. Inflight : HoodieWriteClient marks the state to be inflight before it starts deleting
3. Completed : HoodieWriteClient marks the state after completing the deletion according to the cleaner plan
This commit is contained in:
Balaji Varadarajan
2019-10-28 18:54:48 -07:00
committed by Balaji Varadarajan
parent 23b303e4b1
commit 1032fc3e54
34 changed files with 856 additions and 491 deletions

View File

@@ -288,8 +288,7 @@ public class CompactionCommand implements CommandMarker {
String message = "\n\n\t COMPACTION PLAN " + (valid ? "VALID" : "INVALID") + "\n\n"; String message = "\n\n\t COMPACTION PLAN " + (valid ? "VALID" : "INVALID") + "\n\n";
List<Comparable[]> rows = new ArrayList<>(); List<Comparable[]> rows = new ArrayList<>();
res.stream().forEach(r -> { res.stream().forEach(r -> {
Comparable[] row = new Comparable[]{r.getOperation().getFileId(), Comparable[] row = new Comparable[] {r.getOperation().getFileId(), r.getOperation().getBaseInstantTime(),
r.getOperation().getBaseInstantTime(),
r.getOperation().getDataFileName().isPresent() ? r.getOperation().getDataFileName().get() : "", r.getOperation().getDataFileName().isPresent() ? r.getOperation().getDataFileName().get() : "",
r.getOperation().getDeltaFileNames().size(), r.isSuccess(), r.getOperation().getDeltaFileNames().size(), r.isSuccess(),
r.getException().isPresent() ? r.getException().get().getMessage() : ""}; r.getException().isPresent() ? r.getException().get().getMessage() : ""};

View File

@@ -239,9 +239,8 @@ public class CompactionAdminClient extends AbstractHoodieClient {
FileSlice merged = FileSlice merged =
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(op.getPartitionPath(), lastInstant.getTimestamp()) fileSystemView.getLatestMergedFileSlicesBeforeOrOn(op.getPartitionPath(), lastInstant.getTimestamp())
.filter(fs -> fs.getFileId().equals(op.getFileId())).findFirst().get(); .filter(fs -> fs.getFileId().equals(op.getFileId())).findFirst().get();
final int maxVersion = final int maxVersion = op.getDeltaFileNames().stream().map(lf -> FSUtils.getFileVersionFromLog(new Path(lf)))
op.getDeltaFileNames().stream().map(lf -> FSUtils.getFileVersionFromLog(new Path(lf))) .reduce((x, y) -> x > y ? x : y).orElse(0);
.reduce((x, y) -> x > y ? x : y).orElse(0);
List<HoodieLogFile> logFilesToBeMoved = List<HoodieLogFile> logFilesToBeMoved =
merged.getLogFiles().filter(lf -> lf.getLogVersion() > maxVersion).collect(Collectors.toList()); merged.getLogFiles().filter(lf -> lf.getLogVersion() > maxVersion).collect(Collectors.toList());
return logFilesToBeMoved.stream().map(lf -> { return logFilesToBeMoved.stream().map(lf -> {
@@ -293,33 +292,31 @@ public class CompactionAdminClient extends AbstractHoodieClient {
FileSlice fs = fileSliceOptional.get(); FileSlice fs = fileSliceOptional.get();
Option<HoodieDataFile> df = fs.getDataFile(); Option<HoodieDataFile> df = fs.getDataFile();
if (operation.getDataFileName().isPresent()) { if (operation.getDataFileName().isPresent()) {
String expPath = metaClient.getFs().getFileStatus(new Path( String expPath = metaClient.getFs()
FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), .getFileStatus(
new Path(operation.getDataFileName().get()))).getPath() new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()),
.toString(); new Path(operation.getDataFileName().get())))
Preconditions.checkArgument(df.isPresent(), "Data File must be present. File Slice was : " .getPath().toString();
+ fs + ", operation :" + operation); Preconditions.checkArgument(df.isPresent(),
"Data File must be present. File Slice was : " + fs + ", operation :" + operation);
Preconditions.checkArgument(df.get().getPath().equals(expPath), Preconditions.checkArgument(df.get().getPath().equals(expPath),
"Base Path in operation is specified as " + expPath + " but got path " + df.get().getPath()); "Base Path in operation is specified as " + expPath + " but got path " + df.get().getPath());
} }
Set<HoodieLogFile> logFilesInFileSlice = fs.getLogFiles().collect(Collectors.toSet()); Set<HoodieLogFile> logFilesInFileSlice = fs.getLogFiles().collect(Collectors.toSet());
Set<HoodieLogFile> logFilesInCompactionOp = operation.getDeltaFileNames().stream() Set<HoodieLogFile> logFilesInCompactionOp = operation.getDeltaFileNames().stream().map(dp -> {
.map(dp -> { try {
try { FileStatus[] fileStatuses = metaClient.getFs().listStatus(new Path(
FileStatus[] fileStatuses = metaClient.getFs().listStatus( FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), new Path(dp)));
new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), Preconditions.checkArgument(fileStatuses.length == 1, "Expect only 1 file-status");
new Path(dp))); return new HoodieLogFile(fileStatuses[0]);
Preconditions.checkArgument(fileStatuses.length == 1, "Expect only 1 file-status"); } catch (FileNotFoundException fe) {
return new HoodieLogFile(fileStatuses[0]); throw new CompactionValidationException(fe.getMessage());
} catch (FileNotFoundException fe) { } catch (IOException ioe) {
throw new CompactionValidationException(fe.getMessage()); throw new HoodieIOException(ioe.getMessage(), ioe);
} catch (IOException ioe) { }
throw new HoodieIOException(ioe.getMessage(), ioe); }).collect(Collectors.toSet());
} Set<HoodieLogFile> missing = logFilesInCompactionOp.stream().filter(lf -> !logFilesInFileSlice.contains(lf))
}).collect(Collectors.toSet()); .collect(Collectors.toSet());
Set<HoodieLogFile> missing =
logFilesInCompactionOp.stream().filter(lf -> !logFilesInFileSlice.contains(lf))
.collect(Collectors.toSet());
Preconditions.checkArgument(missing.isEmpty(), Preconditions.checkArgument(missing.isEmpty(),
"All log files specified in compaction operation is not present. Missing :" + missing + ", Exp :" "All log files specified in compaction operation is not present. Missing :" + missing + ", Exp :"
+ logFilesInCompactionOp + ", Got :" + logFilesInFileSlice); + logFilesInCompactionOp + ", Got :" + logFilesInFileSlice);

View File

@@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.List;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.util.AvroUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
public class HoodieCleanClient<T extends HoodieRecordPayload> extends AbstractHoodieClient {
private static Logger logger = LogManager.getLogger(HoodieCleanClient.class);
private final transient HoodieMetrics metrics;
public HoodieCleanClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, HoodieMetrics metrics) {
this(jsc, clientConfig, metrics, Option.empty());
}
public HoodieCleanClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, HoodieMetrics metrics,
Option<EmbeddedTimelineService> timelineService) {
super(jsc, clientConfig, timelineService);
this.metrics = metrics;
}
/**
* Clean up any stale/old files/data lying around (either on file storage or index storage) based on the
* configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be
* cleaned)
*/
public void clean() throws HoodieIOException {
String startCleanTime = HoodieActiveTimeline.createNewCommitTime();
clean(startCleanTime);
}
/**
* Clean up any stale/old files/data lying around (either on file storage or index storage) based on the
* configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be
* cleaned)
*
* @param startCleanTime Cleaner Instant Timestamp
* @throws HoodieIOException in case of any IOException
*/
protected HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOException {
// Create a Hoodie table which encapsulated the commits and files visible
final HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
// If there are inflight(failed) or previously requested clean operation, first perform them
table.getCleanTimeline().filterInflightsAndRequested().getInstants().forEach(hoodieInstant -> {
logger.info("There were previously unfinished cleaner operations. Finishing Instant=" + hoodieInstant);
runClean(table, hoodieInstant.getTimestamp());
});
Option<HoodieCleanerPlan> cleanerPlanOpt = scheduleClean(startCleanTime);
if (cleanerPlanOpt.isPresent()) {
HoodieCleanerPlan cleanerPlan = cleanerPlanOpt.get();
if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null)
&& !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) {
final HoodieTable<T> hoodieTable = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
return runClean(hoodieTable, startCleanTime);
}
}
return null;
}
/**
* Creates a Cleaner plan if there are files to be cleaned and stores them in instant file
*
* @param startCleanTime Cleaner Instant Time
* @return Cleaner Plan if generated
*/
@VisibleForTesting
protected Option<HoodieCleanerPlan> scheduleClean(String startCleanTime) {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
HoodieCleanerPlan cleanerPlan = table.scheduleClean(jsc);
if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null)
&& !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) {
HoodieInstant cleanInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime);
// Save to both aux and timeline folder
try {
table.getActiveTimeline().saveToCleanRequested(cleanInstant, AvroUtils.serializeCleanerPlan(cleanerPlan));
logger.info("Requesting Cleaning with instant time " + cleanInstant);
} catch (IOException e) {
logger.error("Got exception when saving cleaner requested file", e);
throw new HoodieIOException(e.getMessage(), e);
}
return Option.of(cleanerPlan);
}
return Option.empty();
}
/**
* Executes the Cleaner plan stored in the instant metadata
*
* @param table Hoodie Table
* @param cleanInstantTs Cleaner Instant Timestamp
*/
@VisibleForTesting
protected HoodieCleanMetadata runClean(HoodieTable<T> table, String cleanInstantTs) {
HoodieInstant cleanInstant =
table.getCleanTimeline().getInstants().filter(x -> x.getTimestamp().equals(cleanInstantTs)).findFirst().get();
Preconditions.checkArgument(
cleanInstant.getState().equals(State.REQUESTED) || cleanInstant.getState().equals(State.INFLIGHT));
try {
logger.info("Cleaner started");
final Timer.Context context = metrics.getCleanCtx();
if (!cleanInstant.isInflight()) {
// Mark as inflight first
cleanInstant = table.getActiveTimeline().transitionCleanRequestedToInflight(cleanInstant);
}
List<HoodieCleanStat> cleanStats = table.clean(jsc, cleanInstant);
if (cleanStats.isEmpty()) {
return HoodieCleanMetadata.newBuilder().build();
}
// Emit metrics (duration, numFilesDeleted) if needed
Option<Long> durationInMs = Option.empty();
if (context != null) {
durationInMs = Option.of(metrics.getDurationInMs(context.stop()));
logger.info("cleanerElaspsedTime (Minutes): " + durationInMs.get() / (1000 * 60));
}
// Create the metadata and save it
HoodieCleanMetadata metadata =
AvroUtils.convertCleanMetadata(cleanInstant.getTimestamp(), durationInMs, cleanStats);
logger.info("Cleaned " + metadata.getTotalFilesDeleted() + " files");
metrics.updateCleanMetrics(durationInMs.orElseGet(() -> -1L), metadata.getTotalFilesDeleted());
table.getActiveTimeline().transitionCleanInflightToComplete(
new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, cleanInstant.getTimestamp()),
AvroUtils.serializeCleanMetadata(metadata));
logger.info("Marked clean started on " + cleanInstant.getTimestamp() + " as complete");
return metadata;
} catch (IOException e) {
throw new HoodieIOException("Failed to clean up after commit", e);
}
}
}

View File

@@ -38,7 +38,6 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieDataFile; import org.apache.hudi.common.model.HoodieDataFile;
@@ -98,6 +97,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
private final boolean rollbackInFlight; private final boolean rollbackInFlight;
private final transient HoodieMetrics metrics; private final transient HoodieMetrics metrics;
private final transient HoodieIndex<T> index; private final transient HoodieIndex<T> index;
private final transient HoodieCleanClient<T> cleanClient;
private transient Timer.Context writeContext = null; private transient Timer.Context writeContext = null;
private transient Timer.Context compactionTimer; private transient Timer.Context compactionTimer;
private transient Timer.Context indexTimer = null; private transient Timer.Context indexTimer = null;
@@ -131,6 +131,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
this.index = index; this.index = index;
this.metrics = new HoodieMetrics(config, config.getTableName()); this.metrics = new HoodieMetrics(config, config.getTableName());
this.rollbackInFlight = rollbackInFlight; this.rollbackInFlight = rollbackInFlight;
this.cleanClient = new HoodieCleanClient<>(jsc, config, metrics, timelineService);
} }
public static SparkConf registerClasses(SparkConf conf) { public static SparkConf registerClasses(SparkConf conf) {
@@ -918,6 +919,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
public void close() { public void close() {
// Stop timeline-server if running // Stop timeline-server if running
super.close(); super.close();
this.cleanClient.close();
// Calling this here releases any resources used by your index, so make sure to finish any related operations // Calling this here releases any resources used by your index, so make sure to finish any related operations
// before this point // before this point
this.index.close(); this.index.close();
@@ -927,55 +929,24 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* Clean up any stale/old files/data lying around (either on file storage or index storage) based on the * Clean up any stale/old files/data lying around (either on file storage or index storage) based on the
* configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be * configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be
* cleaned) * cleaned)
*
* @throws HoodieIOException
*/ */
public void clean() throws HoodieIOException { public void clean() throws HoodieIOException {
String startCleanTime = HoodieActiveTimeline.createNewCommitTime(); cleanClient.clean();
clean(startCleanTime);
} }
/** /**
* Clean up any stale/old files/data lying around (either on file storage or index storage) based on the * Clean up any stale/old files/data lying around (either on file storage or index storage) based on the
* configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be * configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be
* cleaned) * cleaned)
*
* @param startCleanTime Cleaner Instant Timestamp
* @return
* @throws HoodieIOException in case of any IOException
*/ */
private void clean(String startCleanTime) throws HoodieIOException { protected HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOException {
try { return cleanClient.clean(startCleanTime);
logger.info("Cleaner started");
final Timer.Context context = metrics.getCleanCtx();
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
List<HoodieCleanStat> cleanStats = table.clean(jsc);
if (cleanStats.isEmpty()) {
return;
}
// Emit metrics (duration, numFilesDeleted) if needed
Option<Long> durationInMs = Option.empty();
if (context != null) {
durationInMs = Option.of(metrics.getDurationInMs(context.stop()));
logger.info("cleanerElaspsedTime (Minutes): " + durationInMs.get() / (1000 * 60));
}
// Create the metadata and save it
HoodieCleanMetadata metadata = AvroUtils.convertCleanMetadata(startCleanTime, durationInMs, cleanStats);
logger.info("Cleaned " + metadata.getTotalFilesDeleted() + " files");
metrics.updateCleanMetrics(durationInMs.orElseGet(() -> -1L), metadata.getTotalFilesDeleted());
table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, startCleanTime),
AvroUtils.serializeCleanMetadata(metadata));
logger.info("Marked clean started on " + startCleanTime + " as complete");
if (!table.getActiveTimeline().getCleanerTimeline().empty()) {
// Cleanup of older cleaner meta files
// TODO - make the commit archival generic and archive clean metadata
FSUtils.deleteOlderCleanMetaFiles(fs, table.getMetaClient().getMetaPath(),
table.getActiveTimeline().getCleanerTimeline().getInstants());
}
} catch (IOException e) {
throw new HoodieIOException("Failed to clean up after commit", e);
}
} }
/** /**
@@ -1176,8 +1147,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
private JavaRDD<WriteStatus> runCompaction(HoodieInstant compactionInstant, HoodieActiveTimeline activeTimeline, private JavaRDD<WriteStatus> runCompaction(HoodieInstant compactionInstant, HoodieActiveTimeline activeTimeline,
boolean autoCommit) throws IOException { boolean autoCommit) throws IOException {
HoodieTableMetaClient metaClient = createMetaClient(true); HoodieTableMetaClient metaClient = createMetaClient(true);
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(metaClient, HoodieCompactionPlan compactionPlan =
compactionInstant.getTimestamp()); CompactionUtils.getCompactionPlan(metaClient, compactionInstant.getTimestamp());
// Mark instant as compaction inflight // Mark instant as compaction inflight
activeTimeline.transitionCompactionRequestedToInflight(compactionInstant); activeTimeline.transitionCompactionRequestedToInflight(compactionInstant);

View File

@@ -97,9 +97,11 @@ public class EmbeddedTimelineService {
public void stop() { public void stop() {
if (null != server) { if (null != server) {
logger.info("Closing Timeline server");
this.server.close(); this.server.close();
this.server = null; this.server = null;
this.viewManager = null; this.viewManager = null;
logger.info("Closed Timeline server");
} }
} }
} }

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.io; package org.apache.hudi.io;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@@ -52,7 +53,7 @@ import org.apache.log4j.Logger;
* <p> * <p>
* TODO: Should all cleaning be done based on {@link HoodieCommitMetadata} * TODO: Should all cleaning be done based on {@link HoodieCommitMetadata}
*/ */
public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> { public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> implements Serializable {
private static Logger logger = LogManager.getLogger(HoodieCleanHelper.class); private static Logger logger = LogManager.getLogger(HoodieCleanHelper.class);
@@ -114,12 +115,11 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> {
FileSlice nextSlice = fileSliceIterator.next(); FileSlice nextSlice = fileSliceIterator.next();
if (nextSlice.getDataFile().isPresent()) { if (nextSlice.getDataFile().isPresent()) {
HoodieDataFile dataFile = nextSlice.getDataFile().get(); HoodieDataFile dataFile = nextSlice.getDataFile().get();
deletePaths.add(dataFile.getPath()); deletePaths.add(dataFile.getFileName());
} }
if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
// If merge on read, then clean the log files for the commits as well // If merge on read, then clean the log files for the commits as well
deletePaths deletePaths.addAll(nextSlice.getLogFiles().map(file -> file.getFileName()).collect(Collectors.toList()));
.addAll(nextSlice.getLogFiles().map(file -> file.getPath().toString()).collect(Collectors.toList()));
} }
} }
} }
@@ -187,11 +187,10 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> {
if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline
.compareTimestamps(earliestCommitToRetain.getTimestamp(), fileCommitTime, HoodieTimeline.GREATER)) { .compareTimestamps(earliestCommitToRetain.getTimestamp(), fileCommitTime, HoodieTimeline.GREATER)) {
// this is a commit, that should be cleaned. // this is a commit, that should be cleaned.
aFile.ifPresent(hoodieDataFile -> deletePaths.add(hoodieDataFile.getPath())); aFile.ifPresent(hoodieDataFile -> deletePaths.add(hoodieDataFile.getFileName()));
if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
// If merge on read, then clean the log files for the commits as well // If merge on read, then clean the log files for the commits as well
deletePaths deletePaths.addAll(aSlice.getLogFiles().map(file -> file.getFileName()).collect(Collectors.toList()));
.addAll(aSlice.getLogFiles().map(file -> file.getPath().toString()).collect(Collectors.toList()));
} }
} }
} }

View File

@@ -195,9 +195,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
// Load the new records in a map // Load the new records in a map
long memoryForMerge = config.getMaxMemoryPerPartitionMerge(); long memoryForMerge = config.getMaxMemoryPerPartitionMerge();
logger.info("MaxMemoryPerPartitionMerge => " + memoryForMerge); logger.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(),
config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(originalSchema));
new HoodieRecordSizeEstimator(originalSchema));
} catch (IOException io) { } catch (IOException io) {
throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io); throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
} }

View File

@@ -101,8 +101,8 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
FileSystem fs = metaClient.getFs(); FileSystem fs = metaClient.getFs();
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
log.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation log.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames()
.getDeltaFileNames() + " for commit " + commitTime); + " for commit " + commitTime);
// TODO - FIX THIS // TODO - FIX THIS
// Reads the entire avro file. Always only specific blocks should be read from the avro file // Reads the entire avro file. Always only specific blocks should be read from the avro file
// (failure recover). // (failure recover).
@@ -115,20 +115,19 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
.filterCompletedInstants().lastInstant().get().getTimestamp(); .filterCompletedInstants().lastInstant().get().getTimestamp();
log.info("MaxMemoryPerCompaction => " + config.getMaxMemoryPerCompaction()); log.info("MaxMemoryPerCompaction => " + config.getMaxMemoryPerCompaction());
List<String> logFiles = operation.getDeltaFileNames().stream() List<String> logFiles = operation.getDeltaFileNames().stream().map(
.map(p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString())
p).toString()).collect(toList()); .collect(toList());
HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, metaClient.getBasePath(), logFiles,
metaClient.getBasePath(), logFiles, readerSchema, maxInstantTime, readerSchema, maxInstantTime, config.getMaxMemoryPerCompaction(), config.getCompactionLazyBlockReadEnabled(),
config.getMaxMemoryPerCompaction(), config.getCompactionLazyBlockReadEnabled(),
config.getCompactionReverseLogReadEnabled(), config.getMaxDFSStreamBufferSize(), config.getCompactionReverseLogReadEnabled(), config.getMaxDFSStreamBufferSize(),
config.getSpillableMapBasePath()); config.getSpillableMapBasePath());
if (!scanner.iterator().hasNext()) { if (!scanner.iterator().hasNext()) {
return Lists.<WriteStatus>newArrayList(); return Lists.<WriteStatus>newArrayList();
} }
Option<HoodieDataFile> oldDataFileOpt = operation.getBaseFile(metaClient.getBasePath(), Option<HoodieDataFile> oldDataFileOpt =
operation.getPartitionPath()); operation.getBaseFile(metaClient.getBasePath(), operation.getPartitionPath());
// Compacting is very similar to applying updates to existing file // Compacting is very similar to applying updates to existing file
Iterator<List<WriteStatus>> result; Iterator<List<WriteStatus>> result;
@@ -189,28 +188,22 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
RealtimeView fileSystemView = hoodieTable.getRTFileSystemView(); RealtimeView fileSystemView = hoodieTable.getRTFileSystemView();
log.info("Compaction looking for files to compact in " + partitionPaths + " partitions"); log.info("Compaction looking for files to compact in " + partitionPaths + " partitions");
List<HoodieCompactionOperation> operations = List<HoodieCompactionOperation> operations = jsc.parallelize(partitionPaths, partitionPaths.size())
jsc.parallelize(partitionPaths, partitionPaths.size()) .flatMap((FlatMapFunction<String, CompactionOperation>) partitionPath -> fileSystemView
.flatMap((FlatMapFunction<String, CompactionOperation>) partitionPath -> fileSystemView .getLatestFileSlices(partitionPath)
.getLatestFileSlices(partitionPath) .filter(slice -> !fgIdsInPendingCompactions.contains(slice.getFileGroupId())).map(s -> {
.filter(slice -> List<HoodieLogFile> logFiles =
!fgIdsInPendingCompactions.contains(slice.getFileGroupId())) s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
.map( totalLogFiles.add((long) logFiles.size());
s -> { totalFileSlices.add(1L);
List<HoodieLogFile> logFiles = s.getLogFiles().sorted(HoodieLogFile // Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO
.getLogFileComparator()).collect(Collectors.toList()); // for spark Map operations and collecting them finally in Avro generated classes for storing
totalLogFiles.add((long) logFiles.size()); // into meta files.
totalFileSlices.add(1L); Option<HoodieDataFile> dataFile = s.getDataFile();
// Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO return new CompactionOperation(dataFile, partitionPath, logFiles,
// for spark Map operations and collecting them finally in Avro generated classes for storing config.getCompactionStrategy().captureMetrics(config, dataFile, partitionPath, logFiles));
// into meta files. }).filter(c -> !c.getDeltaFileNames().isEmpty()).collect(toList()).iterator())
Option<HoodieDataFile> dataFile = s.getDataFile(); .collect().stream().map(CompactionUtils::buildHoodieCompactionOperation).collect(toList());
return new CompactionOperation(dataFile, partitionPath, logFiles,
config.getCompactionStrategy().captureMetrics(config, dataFile, partitionPath, logFiles));
})
.filter(c -> !c.getDeltaFileNames().isEmpty())
.collect(toList()).iterator()).collect().stream().map(CompactionUtils::buildHoodieCompactionOperation)
.collect(toList());
log.info("Total of " + operations.size() + " compactions are retrieved"); log.info("Total of " + operations.size() + " compactions are retrieved");
log.info("Total number of latest files slices " + totalFileSlices.value()); log.info("Total number of latest files slices " + totalFileSlices.value());
log.info("Total number of log files " + totalLogFiles.value()); log.info("Total number of log files " + totalLogFiles.value());

View File

@@ -96,8 +96,7 @@ public abstract class CompactionStrategy implements Serializable {
// Strategy implementation can overload this method to set specific compactor-id // Strategy implementation can overload this method to set specific compactor-id
return HoodieCompactionPlan.newBuilder() return HoodieCompactionPlan.newBuilder()
.setOperations(orderAndFilter(writeConfig, operations, pendingCompactionPlans)) .setOperations(orderAndFilter(writeConfig, operations, pendingCompactionPlans))
.setVersion(CompactionUtils.LATEST_COMPACTION_METADATA_VERSION) .setVersion(CompactionUtils.LATEST_COMPACTION_METADATA_VERSION).build();
.build();
} }
/** /**

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.table; package org.apache.hudi.table;
import com.google.common.hash.Hashing; import com.google.common.hash.Hashing;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@@ -35,9 +36,12 @@ import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hudi.WriteStatus; import org.apache.hudi.WriteStatus;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieDataFile; import org.apache.hudi.common.model.HoodieDataFile;
import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieKey;
@@ -48,6 +52,7 @@ import org.apache.hudi.common.model.HoodieRollingStatMetadata;
import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.AvroUtils;
import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
@@ -72,10 +77,10 @@ import org.apache.parquet.hadoop.ParquetReader;
import org.apache.spark.Partitioner; import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2; import scala.Tuple2;
/** /**
* Implementation of a very heavily read-optimized Hoodie Table where * Implementation of a very heavily read-optimized Hoodie Table where
* <p> * <p>
@@ -97,10 +102,12 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>(); Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
FileSystem fs = table.getMetaClient().getFs(); FileSystem fs = table.getMetaClient().getFs();
Path basePath = new Path(table.getMetaClient().getBasePath());
while (iter.hasNext()) { while (iter.hasNext()) {
Tuple2<String, String> partitionDelFileTuple = iter.next(); Tuple2<String, String> partitionDelFileTuple = iter.next();
String partitionPath = partitionDelFileTuple._1(); String partitionPath = partitionDelFileTuple._1();
String deletePathStr = partitionDelFileTuple._2(); String delFileName = partitionDelFileTuple._2();
String deletePathStr = new Path(new Path(basePath, partitionPath), delFileName).toString();
Boolean deletedFileResult = deleteFileAndGetResult(fs, deletePathStr); Boolean deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
if (!partitionCleanStatMap.containsKey(partitionPath)) { if (!partitionCleanStatMap.containsKey(partitionPath)) {
partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath)); partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath));
@@ -109,29 +116,24 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
partitionCleanStat.addDeleteFilePatterns(deletePathStr); partitionCleanStat.addDeleteFilePatterns(deletePathStr);
partitionCleanStat.addDeletedFileResult(deletePathStr, deletedFileResult); partitionCleanStat.addDeletedFileResult(deletePathStr, deletedFileResult);
} }
return partitionCleanStatMap.entrySet().stream().map(e -> new Tuple2<>(e.getKey(), e.getValue())) return partitionCleanStatMap.entrySet().stream().map(e -> new Tuple2<>(e.getKey(), e.getValue()))
.collect(Collectors.toList()).iterator(); .collect(Collectors.toList()).iterator();
}; };
} }
private static PairFlatMapFunction<String, String, String> getFilesToDeleteFunc(HoodieTable table,
HoodieWriteConfig config) {
return (PairFlatMapFunction<String, String, String>) partitionPathToClean -> {
HoodieCleanHelper cleaner = new HoodieCleanHelper(table, config);
return cleaner.getDeletePaths(partitionPathToClean).stream()
.map(deleteFile -> new Tuple2<>(partitionPathToClean, deleteFile.toString())).iterator();
};
}
private static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException { private static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException {
Path deletePath = new Path(deletePathStr); Path deletePath = new Path(deletePathStr);
logger.debug("Working on delete path :" + deletePath); logger.debug("Working on delete path :" + deletePath);
boolean deleteResult = fs.delete(deletePath, false); try {
if (deleteResult) { boolean deleteResult = fs.delete(deletePath, false);
logger.debug("Cleaned file at path :" + deletePath); if (deleteResult) {
logger.debug("Cleaned file at path :" + deletePath);
}
return deleteResult;
} catch (FileNotFoundException fio) {
// With cleanPlan being used for retried cleaning operations, its possible to clean a file twice
return false;
} }
return deleteResult;
} }
@Override @Override
@@ -268,6 +270,40 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
return handleUpsertPartition(commitTime, partition, recordItr, partitioner); return handleUpsertPartition(commitTime, partition, recordItr, partitioner);
} }
/**
* Generates List of files to be cleaned
*
* @param jsc JavaSparkContext
* @return Cleaner Plan
*/
public HoodieCleanerPlan scheduleClean(JavaSparkContext jsc) {
try {
FileSystem fs = getMetaClient().getFs();
List<String> partitionsToClean =
FSUtils.getAllPartitionPaths(fs, getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning());
if (partitionsToClean.isEmpty()) {
logger.info("Nothing to clean here. It is already clean");
return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
}
logger.info(
"Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy());
int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
logger.info("Using cleanerParallelism: " + cleanerParallelism);
HoodieCleanHelper cleaner = new HoodieCleanHelper(this, config);
Option<HoodieInstant> earliestInstant = cleaner.getEarliestCommitToRetain();
Map<String, List<String>> cleanOps = jsc.parallelize(partitionsToClean, cleanerParallelism)
.map(partitionPathToClean -> Pair.of(partitionPathToClean, cleaner.getDeletePaths(partitionPathToClean)))
.collect().stream().collect(Collectors.toMap(Pair::getKey, Pair::getValue));
return new HoodieCleanerPlan(earliestInstant
.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
config.getCleanerPolicy().name(), cleanOps, 1);
} catch (IOException e) {
throw new HoodieIOException("Failed to schedule clean operation", e);
}
}
/** /**
* Performs cleaning of partition paths according to cleaning policy and returns the number of files cleaned. Handles * Performs cleaning of partition paths according to cleaning policy and returns the number of files cleaned. Handles
* skews in partitions to clean by making files to clean as the unit of task distribution. * skews in partitions to clean by making files to clean as the unit of task distribution.
@@ -275,17 +311,40 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
* @throws IllegalArgumentException if unknown cleaning policy is provided * @throws IllegalArgumentException if unknown cleaning policy is provided
*/ */
@Override @Override
public List<HoodieCleanStat> clean(JavaSparkContext jsc) { public List<HoodieCleanStat> clean(JavaSparkContext jsc, HoodieInstant cleanInstant) {
try { try {
FileSystem fs = getMetaClient().getFs(); HoodieCleanerPlan cleanerPlan = AvroUtils.deserializeCleanerPlan(getActiveTimeline()
List<String> partitionsToClean = .getInstantAuxiliaryDetails(HoodieTimeline.getCleanRequestedInstant(cleanInstant.getTimestamp())).get());
FSUtils.getAllPartitionPaths(fs, getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning());
logger.info("Partitions to clean up : " + partitionsToClean + ", with policy " + config.getCleanerPolicy()); int cleanerParallelism = Math.min(
if (partitionsToClean.isEmpty()) { (int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(x -> x.size()).count()),
logger.info("Nothing to clean here mom. It is already clean"); config.getCleanerParallelism());
return Collections.emptyList(); logger.info("Using cleanerParallelism: " + cleanerParallelism);
} List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc
return cleanPartitionPaths(partitionsToClean, jsc); .parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream()
.flatMap(x -> x.getValue().stream().map(y -> new Tuple2<String, String>(x.getKey(), y)))
.collect(Collectors.toList()), cleanerParallelism)
.mapPartitionsToPair(deleteFilesFunc(this)).reduceByKey((e1, e2) -> e1.merge(e2)).collect();
Map<String, PartitionCleanStat> partitionCleanStatsMap =
partitionCleanStats.stream().collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
// Return PartitionCleanStat for each partition passed.
return cleanerPlan.getFilesToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {
PartitionCleanStat partitionCleanStat =
(partitionCleanStatsMap.containsKey(partitionPath)) ? partitionCleanStatsMap.get(partitionPath)
: new PartitionCleanStat(partitionPath);
HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain();
return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
.withEarliestCommitRetained(Option.ofNullable(
actionInstant != null
? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
actionInstant.getAction(), actionInstant.getTimestamp())
: null))
.withDeletePathPattern(partitionCleanStat.deletePathPatterns)
.withSuccessfulDeletes(partitionCleanStat.successDeleteFiles)
.withFailedDeletes(partitionCleanStat.failedDeleteFiles).build();
}).collect(Collectors.toList());
} catch (IOException e) { } catch (IOException e) {
throw new HoodieIOException("Failed to clean up after commit", e); throw new HoodieIOException("Failed to clean up after commit", e);
} }
@@ -311,7 +370,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
logger.info("Clean out all parquet files generated for commit: " + commit); logger.info("Clean out all parquet files generated for commit: " + commit);
List<RollbackRequest> rollbackRequests = generateRollbackRequests(instantToRollback); List<RollbackRequest> rollbackRequests = generateRollbackRequests(instantToRollback);
//TODO: We need to persist this as rollback workload and use it in case of partial failures // TODO: We need to persist this as rollback workload and use it in case of partial failures
List<HoodieRollbackStat> stats = List<HoodieRollbackStat> stats =
new RollbackExecutor(metaClient, config).performRollback(jsc, instantToRollback, rollbackRequests); new RollbackExecutor(metaClient, config).performRollback(jsc, instantToRollback, rollbackRequests);
@@ -321,8 +380,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
return stats; return stats;
} }
private List<RollbackRequest> generateRollbackRequests(HoodieInstant instantToRollback) private List<RollbackRequest> generateRollbackRequests(HoodieInstant instantToRollback) throws IOException {
throws IOException {
return FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(), return FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning()).stream().map(partitionPath -> { config.shouldAssumeDatePartitioning()).stream().map(partitionPath -> {
return RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback); return RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback);
@@ -350,34 +408,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
} }
} }
private List<HoodieCleanStat> cleanPartitionPaths(List<String> partitionsToClean, JavaSparkContext jsc) {
int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
logger.info("Using cleanerParallelism: " + cleanerParallelism);
List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc
.parallelize(partitionsToClean, cleanerParallelism).flatMapToPair(getFilesToDeleteFunc(this, config))
.repartition(cleanerParallelism) // repartition to remove skews
.mapPartitionsToPair(deleteFilesFunc(this)).reduceByKey(
// merge partition level clean stats below
(Function2<PartitionCleanStat, PartitionCleanStat, PartitionCleanStat>) (e1, e2) -> e1.merge(e2))
.collect();
Map<String, PartitionCleanStat> partitionCleanStatsMap =
partitionCleanStats.stream().collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
HoodieCleanHelper cleaner = new HoodieCleanHelper(this, config);
// Return PartitionCleanStat for each partition passed.
return partitionsToClean.stream().map(partitionPath -> {
PartitionCleanStat partitionCleanStat =
(partitionCleanStatsMap.containsKey(partitionPath)) ? partitionCleanStatsMap.get(partitionPath)
: new PartitionCleanStat(partitionPath);
return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
.withEarliestCommitRetained(cleaner.getEarliestCommitToRetain())
.withDeletePathPattern(partitionCleanStat.deletePathPatterns)
.withSuccessfulDeletes(partitionCleanStat.successDeleteFiles)
.withFailedDeletes(partitionCleanStat.failedDeleteFiles).build();
}).collect(Collectors.toList());
}
enum BucketType { enum BucketType {
UPDATE, INSERT UPDATE, INSERT
} }

View File

@@ -185,12 +185,12 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
logger.info("Unpublished " + commit); logger.info("Unpublished " + commit);
Long startTime = System.currentTimeMillis(); Long startTime = System.currentTimeMillis();
List<RollbackRequest> rollbackRequests = generateRollbackRequests(jsc, instantToRollback); List<RollbackRequest> rollbackRequests = generateRollbackRequests(jsc, instantToRollback);
//TODO: We need to persist this as rollback workload and use it in case of partial failures // TODO: We need to persist this as rollback workload and use it in case of partial failures
List<HoodieRollbackStat> allRollbackStats = List<HoodieRollbackStat> allRollbackStats =
new RollbackExecutor(metaClient, config).performRollback(jsc, instantToRollback, rollbackRequests); new RollbackExecutor(metaClient, config).performRollback(jsc, instantToRollback, rollbackRequests);
// Delete Inflight instants if enabled // Delete Inflight instants if enabled
deleteInflightInstant(deleteInstants, this.getActiveTimeline(), new HoodieInstant(true, instantToRollback deleteInflightInstant(deleteInstants, this.getActiveTimeline(),
.getAction(), instantToRollback.getTimestamp())); new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp()));
logger.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime)); logger.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime));
@@ -200,6 +200,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
/** /**
* Generate all rollback requests that we need to perform for rolling back this action without actually performing * Generate all rollback requests that we need to perform for rolling back this action without actually performing
* rolling back * rolling back
*
* @param jsc JavaSparkContext * @param jsc JavaSparkContext
* @param instantToRollback Instant to Rollback * @param instantToRollback Instant to Rollback
* @return list of rollback requests * @return list of rollback requests
@@ -211,92 +212,92 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
List<String> partitions = FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(), List<String> partitions = FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning()); config.shouldAssumeDatePartitioning());
int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1); int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1);
return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions)) return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions)).flatMap(partitionPath -> {
.flatMap(partitionPath -> { HoodieActiveTimeline activeTimeline = this.getActiveTimeline().reload();
HoodieActiveTimeline activeTimeline = this.getActiveTimeline().reload(); List<RollbackRequest> partitionRollbackRequests = new ArrayList<>();
List<RollbackRequest> partitionRollbackRequests = new ArrayList<>(); switch (instantToRollback.getAction()) {
switch (instantToRollback.getAction()) { case HoodieTimeline.COMMIT_ACTION:
case HoodieTimeline.COMMIT_ACTION: logger.info(
logger.info("Rolling back commit action. There are higher delta commits. So only rolling back this " "Rolling back commit action. There are higher delta commits. So only rolling back this " + "instant");
+ "instant"); partitionRollbackRequests.add(
partitionRollbackRequests.add(RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction( RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback));
partitionPath, instantToRollback)); break;
break; case HoodieTimeline.COMPACTION_ACTION:
case HoodieTimeline.COMPACTION_ACTION: // If there is no delta commit present after the current commit (if compaction), no action, else we
// If there is no delta commit present after the current commit (if compaction), no action, else we // need to make sure that a compaction commit rollback also deletes any log files written as part of the
// need to make sure that a compaction commit rollback also deletes any log files written as part of the // succeeding deltacommit.
// succeeding deltacommit. boolean higherDeltaCommits =
boolean higherDeltaCommits = !activeTimeline.getDeltaCommitTimeline() !activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit, 1).empty();
.filterCompletedInstants().findInstantsAfter(commit, 1).empty(); if (higherDeltaCommits) {
if (higherDeltaCommits) { // Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled
// Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled // and has not yet finished. In this scenario we should delete only the newly created parquet files
// and has not yet finished. In this scenario we should delete only the newly created parquet files // and not corresponding base commit log files created with this as baseCommit since updates would
// and not corresponding base commit log files created with this as baseCommit since updates would // have been written to the log files.
// have been written to the log files. logger.info("Rolling back compaction. There are higher delta commits. So only deleting data files");
logger.info("Rolling back compaction. There are higher delta commits. So only deleting data files"); partitionRollbackRequests.add(
partitionRollbackRequests.add(RollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction( RollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction(partitionPath, instantToRollback));
partitionPath, instantToRollback)); } else {
} else { // No deltacommits present after this compaction commit (inflight or requested). In this case, we
// No deltacommits present after this compaction commit (inflight or requested). In this case, we // can also delete any log files that were created with this compaction commit as base
// can also delete any log files that were created with this compaction commit as base // commit.
// commit. logger.info("Rolling back compaction plan. There are NO higher delta commits. So deleting both data and"
logger.info("Rolling back compaction plan. There are NO higher delta commits. So deleting both data and" + " log files");
+ " log files"); partitionRollbackRequests.add(
partitionRollbackRequests.add( RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback));
RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath,
instantToRollback));
}
break;
case HoodieTimeline.DELTA_COMMIT_ACTION:
// --------------------------------------------------------------------------------------------------
// (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal
// --------------------------------------------------------------------------------------------------
// (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In
// this scenario we would want to delete these log files.
// (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario,
// HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks.
// (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is
// being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and
// and hence will end up deleting these log files. This is done so there are no orphan log files
// lying around.
// (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions
// taken in this scenario is a combination of (A.2) and (A.3)
// ---------------------------------------------------------------------------------------------------
// (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal
// ---------------------------------------------------------------------------------------------------
// (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no entries.
// In this scenario, we delete all the parquet files written for the failed commit.
// (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In
// this scenario, perform (A.1) and for updates written to log files, write rollback blocks.
// (B.3) Rollback triggered for first commit - Same as (B.1)
// (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files
// as well if the base parquet file gets deleted.
try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
metaClient.getCommitTimeline().getInstantDetails(
new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp()))
.get(), HoodieCommitMetadata.class);
// In case all data was inserts and the commit failed, delete the file belonging to that commit
// We do not know fileIds for inserts (first inserts are either log files or parquet files),
// delete all files for the corresponding failed commit, if present (same as COW)
partitionRollbackRequests.add(RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(
partitionPath, instantToRollback));
// append rollback blocks for updates
if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
partitionRollbackRequests
.addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata));
}
break;
} catch (IOException io) {
throw new UncheckedIOException("Failed to collect rollback actions for commit " + commit, io);
}
default:
break;
} }
return partitionRollbackRequests.iterator(); break;
}).filter(Objects::nonNull).collect(); case HoodieTimeline.DELTA_COMMIT_ACTION:
// --------------------------------------------------------------------------------------------------
// (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal
// --------------------------------------------------------------------------------------------------
// (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In
// this scenario we would want to delete these log files.
// (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario,
// HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks.
// (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is
// being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and
// and hence will end up deleting these log files. This is done so there are no orphan log files
// lying around.
// (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions
// taken in this scenario is a combination of (A.2) and (A.3)
// ---------------------------------------------------------------------------------------------------
// (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal
// ---------------------------------------------------------------------------------------------------
// (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no entries.
// In this scenario, we delete all the parquet files written for the failed commit.
// (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In
// this scenario, perform (A.1) and for updates written to log files, write rollback blocks.
// (B.3) Rollback triggered for first commit - Same as (B.1)
// (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files
// as well if the base parquet file gets deleted.
try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
metaClient.getCommitTimeline()
.getInstantDetails(
new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp()))
.get(),
HoodieCommitMetadata.class);
// In case all data was inserts and the commit failed, delete the file belonging to that commit
// We do not know fileIds for inserts (first inserts are either log files or parquet files),
// delete all files for the corresponding failed commit, if present (same as COW)
partitionRollbackRequests.add(
RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback));
// append rollback blocks for updates
if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
partitionRollbackRequests
.addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata));
}
break;
} catch (IOException io) {
throw new UncheckedIOException("Failed to collect rollback actions for commit " + commit, io);
}
default:
break;
}
return partitionRollbackRequests.iterator();
}).filter(Objects::nonNull).collect();
} }
@Override @Override
@@ -428,27 +429,27 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
// baseCommit always by listing the file slice // baseCommit always by listing the file slice
Map<String, String> fileIdToBaseCommitTimeForLogMap = this.getRTFileSystemView().getLatestFileSlices(partitionPath) Map<String, String> fileIdToBaseCommitTimeForLogMap = this.getRTFileSystemView().getLatestFileSlices(partitionPath)
.collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime)); .collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime));
return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream() return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat -> {
.filter(wStat -> {
// Filter out stats without prevCommit since they are all inserts // Filter out stats without prevCommit since they are all inserts
boolean validForRollback = (wStat != null) && (wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT) boolean validForRollback = (wStat != null) && (wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT)
&& (wStat.getPrevCommit() != null) && fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId()); && (wStat.getPrevCommit() != null) && fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId());
if (validForRollback) { if (validForRollback) {
// For sanity, log instant time can never be less than base-commit on which we are rolling back // For sanity, log instant time can never be less than base-commit on which we are rolling back
Preconditions.checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get( Preconditions
wStat.getFileId()), rollbackInstant.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL)); .checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()),
} rollbackInstant.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL));
}
return validForRollback && HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get( return validForRollback && HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(
// Base Ts should be strictly less. If equal (for inserts-to-logs), the caller employs another option // Base Ts should be strictly less. If equal (for inserts-to-logs), the caller employs another option
// to delete and we should not step on it // to delete and we should not step on it
wStat.getFileId()), rollbackInstant.getTimestamp(), HoodieTimeline.LESSER); wStat.getFileId()), rollbackInstant.getTimestamp(), HoodieTimeline.LESSER);
}).map(wStat -> { }).map(wStat -> {
String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()); String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
return RollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, wStat.getFileId(), return RollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, wStat.getFileId(),
baseCommitTime, rollbackInstant); baseCommitTime, rollbackInstant);
}).collect(Collectors.toList()); }).collect(Collectors.toList());
} }
} }

View File

@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hudi.WriteStatus; import org.apache.hudi.WriteStatus;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.utils.ClientUtils; import org.apache.hudi.client.utils.ClientUtils;
@@ -190,6 +191,13 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
return getActiveTimeline().getCleanerTimeline().filterCompletedInstants(); return getActiveTimeline().getCleanerTimeline().filterCompletedInstants();
} }
/**
* Get clean timeline
*/
public HoodieTimeline getCleanTimeline() {
return getActiveTimeline().getCleanerTimeline();
}
/** /**
* Get only the completed (no-inflights) savepoint timeline * Get only the completed (no-inflights) savepoint timeline
*/ */
@@ -265,9 +273,21 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
HoodieCompactionPlan compactionPlan); HoodieCompactionPlan compactionPlan);
/** /**
* Clean partition paths according to cleaning policy and returns the number of files cleaned. * Generates list of files that are eligible for cleaning
*
* @param jsc Java Spark Context
* @return Cleaner Plan containing list of files to be deleted.
*/ */
public abstract List<HoodieCleanStat> clean(JavaSparkContext jsc); public abstract HoodieCleanerPlan scheduleClean(JavaSparkContext jsc);
/**
* Cleans the files listed in the cleaner plan associated with clean instant
*
* @param jsc Java Spark Context
* @param cleanInstant Clean Instant
* @return list of Clean Stats
*/
public abstract List<HoodieCleanStat> clean(JavaSparkContext jsc, HoodieInstant cleanInstant);
/** /**
* Rollback the (inflight/committed) record changes with the given commit time. Four steps: (1) Atomically unpublish * Rollback the (inflight/committed) record changes with the given commit time. Four steps: (1) Atomically unpublish

View File

@@ -65,8 +65,8 @@ public class RollbackExecutor implements Serializable {
/** /**
* Performs all rollback actions that we have collected in parallel. * Performs all rollback actions that we have collected in parallel.
*/ */
public List<HoodieRollbackStat> performRollback(JavaSparkContext jsc, public List<HoodieRollbackStat> performRollback(JavaSparkContext jsc, HoodieInstant instantToRollback,
HoodieInstant instantToRollback, List<RollbackRequest> rollbackRequests) { List<RollbackRequest> rollbackRequests) {
SerializablePathFilter filter = (path) -> { SerializablePathFilter filter = (path) -> {
if (path.toString().contains(".parquet")) { if (path.toString().contains(".parquet")) {
@@ -101,11 +101,10 @@ public class RollbackExecutor implements Serializable {
Writer writer = null; Writer writer = null;
boolean success = false; boolean success = false;
try { try {
writer = HoodieLogFormat.newWriterBuilder().onParentPath( writer = HoodieLogFormat.newWriterBuilder()
FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath())) .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
.withFileId(rollbackRequest.getFileId().get()) .withFileId(rollbackRequest.getFileId().get())
.overBaseCommit(rollbackRequest.getLatestBaseInstant().get()) .overBaseCommit(rollbackRequest.getLatestBaseInstant().get()).withFs(metaClient.getFs())
.withFs(metaClient.getFs())
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
// generate metadata // generate metadata
@@ -114,8 +113,7 @@ public class RollbackExecutor implements Serializable {
writer = writer.appendBlock(new HoodieCommandBlock(header)); writer = writer.appendBlock(new HoodieCommandBlock(header));
success = true; success = true;
} catch (IOException | InterruptedException io) { } catch (IOException | InterruptedException io) {
throw new HoodieRollbackException( throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
"Failed to rollback for instant " + instantToRollback, io);
} finally { } finally {
try { try {
if (writer != null) { if (writer != null) {
@@ -130,8 +128,7 @@ public class RollbackExecutor implements Serializable {
// getFileStatus would reflect correct stats and FileNotFoundException is not thrown in // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
// cloud-storage : HUDI-168 // cloud-storage : HUDI-168
Map<FileStatus, Long> filesToNumBlocksRollback = new HashMap<>(); Map<FileStatus, Long> filesToNumBlocksRollback = new HashMap<>();
filesToNumBlocksRollback.put(metaClient.getFs() filesToNumBlocksRollback.put(metaClient.getFs().getFileStatus(writer.getLogFile().getPath()), 1L);
.getFileStatus(writer.getLogFile().getPath()), 1L);
return new Tuple2<String, HoodieRollbackStat>(rollbackRequest.getPartitionPath(), return new Tuple2<String, HoodieRollbackStat>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withRollbackBlockAppendResults(filesToNumBlocksRollback).build()); .withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
@@ -180,8 +177,7 @@ public class RollbackExecutor implements Serializable {
* Common method used for cleaning out parquet files under a partition path during rollback of a set of commits * Common method used for cleaning out parquet files under a partition path during rollback of a set of commits
*/ */
private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
Map<FileStatus, Boolean> results, String partitionPath, Map<FileStatus, Boolean> results, String partitionPath, PathFilter filter) throws IOException {
PathFilter filter) throws IOException {
logger.info("Cleaning path " + partitionPath); logger.info("Cleaning path " + partitionPath);
FileSystem fs = metaClient.getFs(); FileSystem fs = metaClient.getFs();
FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);

View File

@@ -30,9 +30,7 @@ public class RollbackRequest {
* Rollback Action Types * Rollback Action Types
*/ */
public enum RollbackAction { public enum RollbackAction {
DELETE_DATA_FILES_ONLY, DELETE_DATA_FILES_ONLY, DELETE_DATA_AND_LOG_FILES, APPEND_ROLLBACK_BLOCK
DELETE_DATA_AND_LOG_FILES,
APPEND_ROLLBACK_BLOCK
} }
/** /**
@@ -60,8 +58,8 @@ public class RollbackRequest {
*/ */
private final RollbackAction rollbackAction; private final RollbackAction rollbackAction;
public RollbackRequest(String partitionPath, HoodieInstant rollbackInstant, public RollbackRequest(String partitionPath, HoodieInstant rollbackInstant, Option<String> fileId,
Option<String> fileId, Option<String> latestBaseInstant, RollbackAction rollbackAction) { Option<String> latestBaseInstant, RollbackAction rollbackAction) {
this.partitionPath = partitionPath; this.partitionPath = partitionPath;
this.rollbackInstant = rollbackInstant; this.rollbackInstant = rollbackInstant;
this.fileId = fileId; this.fileId = fileId;

View File

@@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocalFileSystem;
@@ -52,6 +53,11 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
protected transient HoodieTestDataGenerator dataGen = null; protected transient HoodieTestDataGenerator dataGen = null;
protected transient ExecutorService executorService; protected transient ExecutorService executorService;
protected transient HoodieTableMetaClient metaClient; protected transient HoodieTableMetaClient metaClient;
private static AtomicInteger instantGen = new AtomicInteger(1);
public String getNextInstant() {
return String.format("%09d", instantGen.getAndIncrement());
}
// dfs // dfs
protected String dfsBasePath; protected String dfsBasePath;

View File

@@ -30,6 +30,7 @@ import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.function.Predicate; import java.util.function.Predicate;
@@ -37,6 +38,8 @@ import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.HoodieTestDataGenerator;
@@ -69,12 +72,8 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.apache.spark.util.AccumulatorV2;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import scala.collection.Iterator;
/** /**
* Test Cleaning related logic * Test Cleaning related logic
@@ -396,6 +395,62 @@ public class TestCleaner extends TestHoodieClientBase {
}); });
} }
/**
* Helper to run cleaner and collect Clean Stats
*
* @param config HoodieWriteConfig
*/
private List<HoodieCleanStat> runCleaner(HoodieWriteConfig config) {
return runCleaner(config, false);
}
/**
* Helper to run cleaner and collect Clean Stats
*
* @param config HoodieWriteConfig
*/
private List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, boolean simulateRetryFailure) {
HoodieCleanClient writeClient = getHoodieCleanClient(config);
String cleanInstantTs = getNextInstant();
HoodieCleanMetadata cleanMetadata1 = writeClient.clean(cleanInstantTs);
if (null == cleanMetadata1) {
return new ArrayList<>();
}
if (simulateRetryFailure) {
metaClient.reloadActiveTimeline()
.revertToInflight(new HoodieInstant(State.COMPLETED, HoodieTimeline.CLEAN_ACTION, cleanInstantTs));
final HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieCleanMetadata cleanMetadata2 = writeClient.runClean(table, cleanInstantTs);
Assert.assertTrue(
Objects.equals(cleanMetadata1.getEarliestCommitToRetain(), cleanMetadata2.getEarliestCommitToRetain()));
Assert.assertEquals(new Integer(0), cleanMetadata2.getTotalFilesDeleted());
Assert.assertEquals(cleanMetadata1.getPartitionMetadata().keySet(),
cleanMetadata2.getPartitionMetadata().keySet());
cleanMetadata1.getPartitionMetadata().keySet().stream().forEach(k -> {
HoodieCleanPartitionMetadata p1 = cleanMetadata1.getPartitionMetadata().get(k);
HoodieCleanPartitionMetadata p2 = cleanMetadata2.getPartitionMetadata().get(k);
Assert.assertEquals(p1.getDeletePathPatterns(), p2.getDeletePathPatterns());
Assert.assertEquals(p1.getSuccessDeleteFiles(), p2.getFailedDeleteFiles());
Assert.assertEquals(p1.getPartitionPath(), p2.getPartitionPath());
Assert.assertEquals(k, p1.getPartitionPath());
});
}
List<HoodieCleanStat> stats = cleanMetadata1.getPartitionMetadata().values().stream()
.map(x -> new HoodieCleanStat.Builder().withPartitionPath(x.getPartitionPath())
.withFailedDeletes(x.getFailedDeleteFiles()).withSuccessfulDeletes(x.getSuccessDeleteFiles())
.withPolicy(HoodieCleaningPolicy.valueOf(x.getPolicy())).withDeletePathPattern(x.getDeletePathPatterns())
.withEarliestCommitRetained(Option.ofNullable(cleanMetadata1.getEarliestCommitToRetain() != null
? new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "000")
: null))
.build())
.collect(Collectors.toList());
return stats;
}
/** /**
* Test HoodieTable.clean() Cleaning by versions logic * Test HoodieTable.clean() Cleaning by versions logic
*/ */
@@ -417,7 +472,7 @@ public class TestCleaner extends TestHoodieClientBase {
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
List<HoodieCleanStat> hoodieCleanStatsOne = table.clean(jsc); List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config);
assertEquals("Must not clean any files", 0, assertEquals("Must not clean any files", 0,
getCleanStat(hoodieCleanStatsOne, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() getCleanStat(hoodieCleanStatsOne, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
.size()); .size());
@@ -441,7 +496,7 @@ public class TestCleaner extends TestHoodieClientBase {
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // update HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // update
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // update HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // update
List<HoodieCleanStat> hoodieCleanStatsTwo = table.clean(jsc); List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config);;
assertEquals("Must clean 1 file", 1, assertEquals("Must clean 1 file", 1,
getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
.size()); .size());
@@ -467,7 +522,7 @@ public class TestCleaner extends TestHoodieClientBase {
String file3P0C2 = String file3P0C2 =
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002"); HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002");
List<HoodieCleanStat> hoodieCleanStatsThree = table.clean(jsc); List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config);;
assertEquals("Must clean two files", 2, assertEquals("Must clean two files", 2,
getCleanStat(hoodieCleanStatsThree, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) getCleanStat(hoodieCleanStatsThree, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
.getSuccessDeleteFiles().size()); .getSuccessDeleteFiles().size());
@@ -480,7 +535,7 @@ public class TestCleaner extends TestHoodieClientBase {
// No cleaning on partially written file, with no commit. // No cleaning on partially written file, with no commit.
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file3P0C2); // update HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file3P0C2); // update
List<HoodieCleanStat> hoodieCleanStatsFour = table.clean(jsc); List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config);
assertEquals("Must not clean any files", 0, assertEquals("Must not clean any files", 0,
getCleanStat(hoodieCleanStatsFour, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() getCleanStat(hoodieCleanStatsFour, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
.size()); .size());
@@ -525,7 +580,7 @@ public class TestCleaner extends TestHoodieClientBase {
HoodieTestUtils.createCompactionCommitFiles(fs, basePath, "001"); HoodieTestUtils.createCompactionCommitFiles(fs, basePath, "001");
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
List<HoodieCleanStat> hoodieCleanStats = table.clean(jsc); List<HoodieCleanStat> hoodieCleanStats = runCleaner(config);;
assertEquals("Must clean three files, one parquet and 2 log files", 3, assertEquals("Must clean three files, one parquet and 2 log files", 3,
getCleanStat(hoodieCleanStats, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() getCleanStat(hoodieCleanStats, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
.size()); .size());
@@ -542,6 +597,22 @@ public class TestCleaner extends TestHoodieClientBase {
*/ */
@Test @Test
public void testKeepLatestCommits() throws IOException { public void testKeepLatestCommits() throws IOException {
testKeepLatestCommits(false);
}
/**
* Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. Here the operations are simulated
* such that first clean attempt failed after files were cleaned and a subsequent cleanup succeeds.
*/
@Test
public void testKeepLatestCommitsWithFailureRetry() throws IOException {
testKeepLatestCommits(true);
}
/**
* Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files.
*/
private void testKeepLatestCommits(boolean simulateFailureRetry) throws IOException {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()) .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
@@ -558,7 +629,7 @@ public class TestCleaner extends TestHoodieClientBase {
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
List<HoodieCleanStat> hoodieCleanStatsOne = table.clean(jsc); List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry);
assertEquals("Must not clean any files", 0, assertEquals("Must not clean any files", 0,
getCleanStat(hoodieCleanStatsOne, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() getCleanStat(hoodieCleanStatsOne, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
.size()); .size());
@@ -582,7 +653,7 @@ public class TestCleaner extends TestHoodieClientBase {
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // update HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // update
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // update HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // update
List<HoodieCleanStat> hoodieCleanStatsTwo = table.clean(jsc); List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry);
assertEquals("Must not clean any files", 0, assertEquals("Must not clean any files", 0,
getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
.size()); .size());
@@ -608,7 +679,7 @@ public class TestCleaner extends TestHoodieClientBase {
String file3P0C2 = String file3P0C2 =
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002"); HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002");
List<HoodieCleanStat> hoodieCleanStatsThree = table.clean(jsc); List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config, simulateFailureRetry);
assertEquals("Must not clean any file. We have to keep 1 version before the latest commit time to keep", 0, assertEquals("Must not clean any file. We have to keep 1 version before the latest commit time to keep", 0,
getCleanStat(hoodieCleanStatsThree, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) getCleanStat(hoodieCleanStatsThree, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
.getSuccessDeleteFiles().size()); .getSuccessDeleteFiles().size());
@@ -626,7 +697,7 @@ public class TestCleaner extends TestHoodieClientBase {
String file4P0C3 = String file4P0C3 =
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003"); HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003");
List<HoodieCleanStat> hoodieCleanStatsFour = table.clean(jsc); List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config, simulateFailureRetry);
assertEquals("Must not clean one old file", 1, assertEquals("Must not clean one old file", 1,
getCleanStat(hoodieCleanStatsFour, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() getCleanStat(hoodieCleanStatsFour, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
.size()); .size());
@@ -648,7 +719,7 @@ public class TestCleaner extends TestHoodieClientBase {
// No cleaning on partially written file, with no commit. // No cleaning on partially written file, with no commit.
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "004", file3P0C2); // update HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "004", file3P0C2); // update
List<HoodieCleanStat> hoodieCleanStatsFive = table.clean(jsc); List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, simulateFailureRetry);
assertEquals("Must not clean any files", 0, assertEquals("Must not clean any files", 0,
getCleanStat(hoodieCleanStatsFive, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() getCleanStat(hoodieCleanStatsFive, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
.size()); .size());
@@ -694,88 +765,10 @@ public class TestCleaner extends TestHoodieClientBase {
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
List<HoodieCleanStat> hoodieCleanStatsOne = table.clean(jsc); List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config);
assertTrue("HoodieCleanStats should be empty for a table with empty partitionPaths", hoodieCleanStatsOne.isEmpty()); assertTrue("HoodieCleanStats should be empty for a table with empty partitionPaths", hoodieCleanStatsOne.isEmpty());
} }
/**
* Test Clean-by-commits behavior in the presence of skewed partitions
*/
@Test
public void testCleaningSkewedPartitons() throws IOException {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
.build();
Map<Long, Long> stageOneShuffleReadTaskRecordsCountMap = new HashMap<>();
// Since clean involves repartition in order to uniformly distribute data,
// we can inspect the number of records read by various tasks in stage 1.
// There should not be skew in the number of records read in the task.
// SparkListener below listens to the stage end events and captures number of
// records read by various tasks in stage-1.
jsc.sc().addSparkListener(new SparkListener() {
@Override
public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
Iterator<AccumulatorV2<?, ?>> iterator = taskEnd.taskMetrics().accumulators().iterator();
while (iterator.hasNext()) {
AccumulatorV2 accumulator = iterator.next();
if (taskEnd.stageId() == 1 && accumulator.isRegistered() && accumulator.name().isDefined()
&& accumulator.name().get().equals("internal.metrics.shuffle.read.recordsRead")) {
stageOneShuffleReadTaskRecordsCountMap.put(taskEnd.taskInfo().taskId(), (Long) accumulator.value());
}
}
}
});
// make 1 commit, with 100 files in one partition and 10 in other two
HoodieTestUtils.createCommitFiles(basePath, "000");
List<String> filesP0C0 = createFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", 100);
List<String> filesP1C0 = createFilesInPartition(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000", 10);
List<String> filesP2C0 = createFilesInPartition(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "000", 10);
HoodieTestUtils.createCommitFiles(basePath, "001");
updateAllFilesInPartition(filesP0C0, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001");
updateAllFilesInPartition(filesP1C0, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001");
updateAllFilesInPartition(filesP2C0, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "001");
HoodieTestUtils.createCommitFiles(basePath, "002");
updateAllFilesInPartition(filesP0C0, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002");
updateAllFilesInPartition(filesP1C0, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "002");
updateAllFilesInPartition(filesP2C0, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "002");
HoodieTestUtils.createCommitFiles(basePath, "003");
updateAllFilesInPartition(filesP0C0, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003");
updateAllFilesInPartition(filesP1C0, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "003");
updateAllFilesInPartition(filesP2C0, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "003");
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
List<HoodieCleanStat> hoodieCleanStats = table.clean(jsc);
assertEquals(100, getCleanStat(hoodieCleanStats, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
.getSuccessDeleteFiles().size());
assertEquals(10, getCleanStat(hoodieCleanStats, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)
.getSuccessDeleteFiles().size());
assertEquals(10, getCleanStat(hoodieCleanStats, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH)
.getSuccessDeleteFiles().size());
// 3 tasks are expected since the number of partitions is 3
assertEquals(3, stageOneShuffleReadTaskRecordsCountMap.keySet().size());
// Sum of all records processed = total number of files to clean
assertEquals(120,
stageOneShuffleReadTaskRecordsCountMap.values().stream().reduce((a, b) -> a + b).get().intValue());
assertTrue(
"The skew in handling files to clean is not removed. "
+ "Each task should handle more records than the partitionPath with least files "
+ "and less records than the partitionPath with most files.",
stageOneShuffleReadTaskRecordsCountMap.values().stream().filter(a -> a > 10 && a < 100).count() == 3);
}
/** /**
* Test Keep Latest Commits when there are pending compactions * Test Keep Latest Commits when there are pending compactions
*/ */
@@ -794,14 +787,28 @@ public class TestCleaner extends TestHoodieClientBase {
// FileId3 1 2 3 001 // FileId3 1 2 3 001
// FileId2 0 0 0 000 // FileId2 0 0 0 000
// FileId1 0 0 0 000 // FileId1 0 0 0 000
testPendingCompactions(config, 48, 18); testPendingCompactions(config, 48, 18, false);
} }
/**
* Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. Here the operations are simulated
* such that first clean attempt failed after files were cleaned and a subsequent cleanup succeeds.
*/
@Test
public void testKeepLatestVersionsWithPendingCompactions() throws IOException {
testKeepLatestVersionsWithPendingCompactions(false);
}
/** /**
* Test Keep Latest Versions when there are pending compactions * Test Keep Latest Versions when there are pending compactions
*/ */
@Test @Test
public void testKeepLatestVersionsWithPendingCompactions() throws IOException { public void testKeepLatestVersionsWithPendingCompactionsAndFailureRetry() throws IOException {
testKeepLatestVersionsWithPendingCompactions(true);
}
private void testKeepLatestVersionsWithPendingCompactions(boolean retryFailure) throws IOException {
HoodieWriteConfig config = HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder()
@@ -816,7 +823,7 @@ public class TestCleaner extends TestHoodieClientBase {
// FileId3 0 0 0 000, 001 // FileId3 0 0 0 000, 001
// FileId2 0 0 0 000 // FileId2 0 0 0 000
// FileId1 0 0 0 000 // FileId1 0 0 0 000
testPendingCompactions(config, 36, 9); testPendingCompactions(config, 36, 9, retryFailure);
} }
/** /**
@@ -825,8 +832,8 @@ public class TestCleaner extends TestHoodieClientBase {
* @param config Hoodie Write Config * @param config Hoodie Write Config
* @param expNumFilesDeleted Number of files deleted * @param expNumFilesDeleted Number of files deleted
*/ */
public void testPendingCompactions(HoodieWriteConfig config, int expNumFilesDeleted, private void testPendingCompactions(HoodieWriteConfig config, int expNumFilesDeleted,
int expNumFilesUnderCompactionDeleted) throws IOException { int expNumFilesUnderCompactionDeleted, boolean retryFailure) throws IOException {
HoodieTableMetaClient metaClient = HoodieTableMetaClient metaClient =
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ); HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ);
String[] instants = new String[] {"000", "001", "003", "005", "007", "009", "011", "013"}; String[] instants = new String[] {"000", "001", "003", "005", "007", "009", "011", "013"};
@@ -897,7 +904,7 @@ public class TestCleaner extends TestHoodieClientBase {
// Clean now // Clean now
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
List<HoodieCleanStat> hoodieCleanStats = table.clean(jsc); List<HoodieCleanStat> hoodieCleanStats = runCleaner(config, retryFailure);
// Test for safety // Test for safety
final HoodieTableMetaClient newMetaClient = HoodieTableMetaClient.reload(metaClient); final HoodieTableMetaClient newMetaClient = HoodieTableMetaClient.reload(metaClient);

View File

@@ -52,6 +52,7 @@ import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@@ -77,6 +78,10 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
cleanupResources(); cleanupResources();
} }
protected HoodieCleanClient getHoodieCleanClient(HoodieWriteConfig cfg) {
return new HoodieCleanClient(jsc, cfg, new HoodieMetrics(cfg, cfg.getTableName()));
}
protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) { protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
return getHoodieWriteClient(cfg, false); return getHoodieWriteClient(cfg, false);
} }

View File

@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"namespace": "org.apache.hudi.avro.model",
"type": "record",
"name": "HoodieCleanerPlan",
"fields": [
{
"name": "earliestInstantToRetain",
"type":["null", {
"type": "record",
"name": "HoodieActionInstant",
"fields": [
{
"name": "timestamp",
"type": "string"
},
{
"name": "action",
"type": "string"
},
{
"name": "state",
"type": "string"
}
]
}],
"default" : null
},
{
"name": "policy",
"type": "string"
},
{
"name": "filesToBeDeletedPerPartition",
"type": [
"null", {
"type":"map",
"values": {
"type":"array",
"items":{
"name":"filePath",
"type": "string"
}
}}],
"default" : null
},
{
"name":"version",
"type":["int", "null"],
"default": 1
}
]
}

View File

@@ -134,14 +134,9 @@ public class CompactionOperation implements Serializable {
@Override @Override
public String toString() { public String toString() {
return "CompactionOperation{" return "CompactionOperation{" + "baseInstantTime='" + baseInstantTime + '\'' + ", dataFileCommitTime="
+ "baseInstantTime='" + baseInstantTime + '\'' + dataFileCommitTime + ", deltaFileNames=" + deltaFileNames + ", dataFileName=" + dataFileName + ", id='" + id
+ ", dataFileCommitTime=" + dataFileCommitTime + '\'' + ", metrics=" + metrics + '}';
+ ", deltaFileNames=" + deltaFileNames
+ ", dataFileName=" + dataFileName
+ ", id='" + id + '\''
+ ", metrics=" + metrics
+ '}';
} }
@Override @Override
@@ -156,8 +151,7 @@ public class CompactionOperation implements Serializable {
return Objects.equals(baseInstantTime, operation.baseInstantTime) return Objects.equals(baseInstantTime, operation.baseInstantTime)
&& Objects.equals(dataFileCommitTime, operation.dataFileCommitTime) && Objects.equals(dataFileCommitTime, operation.dataFileCommitTime)
&& Objects.equals(deltaFileNames, operation.deltaFileNames) && Objects.equals(deltaFileNames, operation.deltaFileNames)
&& Objects.equals(dataFileName, operation.dataFileName) && Objects.equals(dataFileName, operation.dataFileName) && Objects.equals(id, operation.id);
&& Objects.equals(id, operation.id);
} }
@Override @Override

View File

@@ -63,6 +63,7 @@ public interface HoodieTimeline extends Serializable {
String INFLIGHT_COMMIT_EXTENSION = INFLIGHT_EXTENSION; String INFLIGHT_COMMIT_EXTENSION = INFLIGHT_EXTENSION;
String INFLIGHT_DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION + INFLIGHT_EXTENSION; String INFLIGHT_DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION + INFLIGHT_EXTENSION;
String INFLIGHT_CLEAN_EXTENSION = "." + CLEAN_ACTION + INFLIGHT_EXTENSION; String INFLIGHT_CLEAN_EXTENSION = "." + CLEAN_ACTION + INFLIGHT_EXTENSION;
String REQUESTED_CLEAN_EXTENSION = "." + CLEAN_ACTION + REQUESTED_EXTENSION;
String INFLIGHT_ROLLBACK_EXTENSION = "." + ROLLBACK_ACTION + INFLIGHT_EXTENSION; String INFLIGHT_ROLLBACK_EXTENSION = "." + ROLLBACK_ACTION + INFLIGHT_EXTENSION;
String INFLIGHT_SAVEPOINT_EXTENSION = "." + SAVEPOINT_ACTION + INFLIGHT_EXTENSION; String INFLIGHT_SAVEPOINT_EXTENSION = "." + SAVEPOINT_ACTION + INFLIGHT_EXTENSION;
String REQUESTED_COMPACTION_SUFFIX = StringUtils.join(COMPACTION_ACTION, REQUESTED_EXTENSION); String REQUESTED_COMPACTION_SUFFIX = StringUtils.join(COMPACTION_ACTION, REQUESTED_EXTENSION);
@@ -80,6 +81,13 @@ public interface HoodieTimeline extends Serializable {
*/ */
HoodieTimeline filterInflights(); HoodieTimeline filterInflights();
/**
* Filter this timeline to include requested and in-flights
*
* @return New instance of HoodieTimeline with just in-flights and requested instants
*/
HoodieTimeline filterInflightsAndRequested();
/** /**
* Filter this timeline to just include the in-flights excluding compaction instants * Filter this timeline to just include the in-flights excluding compaction instants
* *
@@ -219,7 +227,19 @@ public interface HoodieTimeline extends Serializable {
} }
static HoodieInstant getCompletedInstant(final HoodieInstant instant) { static HoodieInstant getCompletedInstant(final HoodieInstant instant) {
return new HoodieInstant(false, instant.getAction(), instant.getTimestamp()); return new HoodieInstant(State.COMPLETED, instant.getAction(), instant.getTimestamp());
}
static HoodieInstant getRequestedInstant(final HoodieInstant instant) {
return new HoodieInstant(State.REQUESTED, instant.getAction(), instant.getTimestamp());
}
static HoodieInstant getCleanRequestedInstant(final String timestamp) {
return new HoodieInstant(State.REQUESTED, CLEAN_ACTION, timestamp);
}
static HoodieInstant getCleanInflightInstant(final String timestamp) {
return new HoodieInstant(State.INFLIGHT, CLEAN_ACTION, timestamp);
} }
static HoodieInstant getCompactionRequestedInstant(final String timestamp) { static HoodieInstant getCompactionRequestedInstant(final String timestamp) {
@@ -246,6 +266,10 @@ public interface HoodieTimeline extends Serializable {
return StringUtils.join(instant, HoodieTimeline.CLEAN_EXTENSION); return StringUtils.join(instant, HoodieTimeline.CLEAN_EXTENSION);
} }
static String makeRequestedCleanerFileName(String instant) {
return StringUtils.join(instant, HoodieTimeline.REQUESTED_CLEAN_EXTENSION);
}
static String makeInflightCleanerFileName(String instant) { static String makeInflightCleanerFileName(String instant) {
return StringUtils.join(instant, HoodieTimeline.INFLIGHT_CLEAN_EXTENSION); return StringUtils.join(instant, HoodieTimeline.INFLIGHT_CLEAN_EXTENSION);
} }

View File

@@ -58,10 +58,11 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
public static final SimpleDateFormat COMMIT_FORMATTER = new SimpleDateFormat("yyyyMMddHHmmss"); public static final SimpleDateFormat COMMIT_FORMATTER = new SimpleDateFormat("yyyyMMddHHmmss");
public static final Set<String> VALID_EXTENSIONS_IN_ACTIVE_TIMELINE = new HashSet<>(Arrays.asList(new String[] { public static final Set<String> VALID_EXTENSIONS_IN_ACTIVE_TIMELINE =
COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION, INFLIGHT_DELTA_COMMIT_EXTENSION, new HashSet<>(Arrays.asList(new String[] {COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION,
SAVEPOINT_EXTENSION, INFLIGHT_SAVEPOINT_EXTENSION, CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION, INFLIGHT_DELTA_COMMIT_EXTENSION, SAVEPOINT_EXTENSION, INFLIGHT_SAVEPOINT_EXTENSION, CLEAN_EXTENSION,
INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION})); INFLIGHT_CLEAN_EXTENSION, REQUESTED_CLEAN_EXTENSION, INFLIGHT_COMPACTION_EXTENSION,
REQUESTED_COMPACTION_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION}));
private static final transient Logger log = LogManager.getLogger(HoodieActiveTimeline.class); private static final transient Logger log = LogManager.getLogger(HoodieActiveTimeline.class);
protected HoodieTableMetaClient metaClient; protected HoodieTableMetaClient metaClient;
@@ -212,11 +213,19 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
} }
public void revertToInflight(HoodieInstant instant) { public void revertToInflight(HoodieInstant instant) {
log.info("Reverting instant to inflight " + instant); log.info("Reverting " + instant + " to inflight ");
revertCompleteToInflight(instant, HoodieTimeline.getInflightInstant(instant)); revertStateTransition(instant, HoodieTimeline.getInflightInstant(instant));
log.info("Reverted " + instant + " to inflight"); log.info("Reverted " + instant + " to inflight");
} }
public HoodieInstant revertToRequested(HoodieInstant instant) {
log.warn("Reverting " + instant + " to requested ");
HoodieInstant requestedInstant = HoodieTimeline.getRequestedInstant(instant);
revertStateTransition(instant, HoodieTimeline.getRequestedInstant(instant));
log.warn("Reverted " + instant + " to requested");
return requestedInstant;
}
public void deleteInflight(HoodieInstant instant) { public void deleteInflight(HoodieInstant instant) {
Preconditions.checkArgument(instant.isInflight()); Preconditions.checkArgument(instant.isInflight());
deleteInstantFile(instant); deleteInstantFile(instant);
@@ -311,6 +320,39 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
* END - COMPACTION RELATED META-DATA MANAGEMENT * END - COMPACTION RELATED META-DATA MANAGEMENT
**/ **/
/**
* Transition Clean State from inflight to Committed
*
* @param inflightInstant Inflight instant
* @param data Extra Metadata
* @return commit instant
*/
public HoodieInstant transitionCleanInflightToComplete(HoodieInstant inflightInstant, Option<byte[]> data) {
Preconditions.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.CLEAN_ACTION));
Preconditions.checkArgument(inflightInstant.isInflight());
HoodieInstant commitInstant = new HoodieInstant(State.COMPLETED, CLEAN_ACTION, inflightInstant.getTimestamp());
// First write metadata to aux folder
createFileInAuxiliaryFolder(commitInstant, data);
// Then write to timeline
transitionState(inflightInstant, commitInstant, data);
return commitInstant;
}
/**
* Transition Clean State from requested to inflight
*
* @param requestedInstant requested instant
* @return commit instant
*/
public HoodieInstant transitionCleanRequestedToInflight(HoodieInstant requestedInstant) {
Preconditions.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.CLEAN_ACTION));
Preconditions.checkArgument(requestedInstant.isRequested());
HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, CLEAN_ACTION, requestedInstant.getTimestamp());
transitionState(requestedInstant, inflight, Option.empty());
return inflight;
}
private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data) { private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data) {
Preconditions.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp())); Preconditions.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp()));
Path commitFilePath = new Path(metaClient.getMetaPath(), toInstant.getFileName()); Path commitFilePath = new Path(metaClient.getMetaPath(), toInstant.getFileName());
@@ -327,19 +369,20 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
} }
} }
private void revertCompleteToInflight(HoodieInstant completed, HoodieInstant inflight) { private void revertStateTransition(HoodieInstant curr, HoodieInstant revert) {
Preconditions.checkArgument(completed.getTimestamp().equals(inflight.getTimestamp())); Preconditions.checkArgument(curr.getTimestamp().equals(revert.getTimestamp()));
Path inFlightCommitFilePath = new Path(metaClient.getMetaPath(), inflight.getFileName()); Path revertFilePath = new Path(metaClient.getMetaPath(), revert.getFileName());
try { try {
if (!metaClient.getFs().exists(inFlightCommitFilePath)) { if (!metaClient.getFs().exists(revertFilePath)) {
Path commitFilePath = new Path(metaClient.getMetaPath(), completed.getFileName()); Path currFilePath = new Path(metaClient.getMetaPath(), curr.getFileName());
boolean success = metaClient.getFs().rename(commitFilePath, inFlightCommitFilePath); boolean success = metaClient.getFs().rename(currFilePath, revertFilePath);
if (!success) { if (!success) {
throw new HoodieIOException("Could not rename " + commitFilePath + " to " + inFlightCommitFilePath); throw new HoodieIOException("Could not rename " + currFilePath + " to " + revertFilePath);
} }
log.info("Renamed " + currFilePath + " to " + revertFilePath);
} }
} catch (IOException e) { } catch (IOException e) {
throw new HoodieIOException("Could not complete revert " + completed, e); throw new HoodieIOException("Could not complete revert " + curr, e);
} }
} }
@@ -355,6 +398,15 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
createFileInMetaPath(instant.getFileName(), content); createFileInMetaPath(instant.getFileName(), content);
} }
public void saveToCleanRequested(HoodieInstant instant, Option<byte[]> content) {
Preconditions.checkArgument(instant.getAction().equals(HoodieTimeline.CLEAN_ACTION));
Preconditions.checkArgument(instant.getState().equals(State.REQUESTED));
// Write workload to auxiliary folder
createFileInAuxiliaryFolder(instant, content);
// Plan is only stored in auxiliary folder
createFileInMetaPath(instant.getFileName(), Option.empty());
}
private void createFileInMetaPath(String filename, Option<byte[]> content) { private void createFileInMetaPath(String filename, Option<byte[]> content) {
Path fullPath = new Path(metaClient.getMetaPath(), filename); Path fullPath = new Path(metaClient.getMetaPath(), filename);
createFileInPath(fullPath, content); createFileInPath(fullPath, content);

View File

@@ -30,6 +30,7 @@ import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
@@ -83,6 +84,13 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
return new HoodieDefaultTimeline(instants.stream().filter(HoodieInstant::isInflight), details); return new HoodieDefaultTimeline(instants.stream().filter(HoodieInstant::isInflight), details);
} }
@Override
public HoodieTimeline filterInflightsAndRequested() {
return new HoodieDefaultTimeline(
instants.stream().filter(i -> i.getState().equals(State.REQUESTED) || i.getState().equals(State.INFLIGHT)),
details);
}
@Override @Override
public HoodieTimeline filterInflightsExcludingCompaction() { public HoodieTimeline filterInflightsExcludingCompaction() {
return new HoodieDefaultTimeline(instants.stream().filter(instant -> { return new HoodieDefaultTimeline(instants.stream().filter(instant -> {

View File

@@ -69,7 +69,7 @@ public class HoodieInstant implements Serializable {
} else if (action.contains(HoodieTimeline.INFLIGHT_EXTENSION)) { } else if (action.contains(HoodieTimeline.INFLIGHT_EXTENSION)) {
state = State.INFLIGHT; state = State.INFLIGHT;
action = action.replace(HoodieTimeline.INFLIGHT_EXTENSION, ""); action = action.replace(HoodieTimeline.INFLIGHT_EXTENSION, "");
} else if (action.equals(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX)) { } else if (action.contains(HoodieTimeline.REQUESTED_EXTENSION)) {
state = State.REQUESTED; state = State.REQUESTED;
action = action.replace(HoodieTimeline.REQUESTED_EXTENSION, ""); action = action.replace(HoodieTimeline.REQUESTED_EXTENSION, "");
} }
@@ -117,7 +117,8 @@ public class HoodieInstant implements Serializable {
: HoodieTimeline.makeCommitFileName(timestamp); : HoodieTimeline.makeCommitFileName(timestamp);
} else if (HoodieTimeline.CLEAN_ACTION.equals(action)) { } else if (HoodieTimeline.CLEAN_ACTION.equals(action)) {
return isInflight() ? HoodieTimeline.makeInflightCleanerFileName(timestamp) return isInflight() ? HoodieTimeline.makeInflightCleanerFileName(timestamp)
: HoodieTimeline.makeCleanerFileName(timestamp); : isRequested() ? HoodieTimeline.makeRequestedCleanerFileName(timestamp)
: HoodieTimeline.makeCleanerFileName(timestamp);
} else if (HoodieTimeline.ROLLBACK_ACTION.equals(action)) { } else if (HoodieTimeline.ROLLBACK_ACTION.equals(action)) {
return isInflight() ? HoodieTimeline.makeInflightRollbackFileName(timestamp) return isInflight() ? HoodieTimeline.makeInflightRollbackFileName(timestamp)
: HoodieTimeline.makeRollbackFileName(timestamp); : HoodieTimeline.makeRollbackFileName(timestamp);

View File

@@ -333,8 +333,10 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste
@Override @Override
public void close() { public void close() {
log.info("Closing Rocksdb !!");
closed = true; closed = true;
rocksDB.close(); rocksDB.close();
log.info("Closed Rocksdb !!");
} }
@Override @Override

View File

@@ -36,6 +36,7 @@ import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecordBase; import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata; import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata;
@@ -110,6 +111,11 @@ public class AvroUtils {
return serializeAvroMetadata(compactionWorkload, HoodieCompactionPlan.class); return serializeAvroMetadata(compactionWorkload, HoodieCompactionPlan.class);
} }
public static Option<byte[]> serializeCleanerPlan(HoodieCleanerPlan cleanPlan) throws IOException {
return serializeAvroMetadata(cleanPlan, HoodieCleanerPlan.class);
}
public static Option<byte[]> serializeCleanMetadata(HoodieCleanMetadata metadata) throws IOException { public static Option<byte[]> serializeCleanMetadata(HoodieCleanMetadata metadata) throws IOException {
return serializeAvroMetadata(metadata, HoodieCleanMetadata.class); return serializeAvroMetadata(metadata, HoodieCleanMetadata.class);
} }
@@ -137,6 +143,10 @@ public class AvroUtils {
return Option.of(baos.toByteArray()); return Option.of(baos.toByteArray());
} }
public static HoodieCleanerPlan deserializeCleanerPlan(byte[] bytes) throws IOException {
return deserializeAvroMetadata(bytes, HoodieCleanerPlan.class);
}
public static HoodieCompactionPlan deserializeCompactionPlan(byte[] bytes) throws IOException { public static HoodieCompactionPlan deserializeCompactionPlan(byte[] bytes) throws IOException {
return deserializeAvroMetadata(bytes, HoodieCompactionPlan.class); return deserializeAvroMetadata(bytes, HoodieCompactionPlan.class);
} }

View File

@@ -141,10 +141,9 @@ public class ParquetUtils {
* Read out the bloom filter from the parquet file meta data. * Read out the bloom filter from the parquet file meta data.
*/ */
public static BloomFilter readBloomFilterFromParquetMetadata(Configuration configuration, Path parquetFilePath) { public static BloomFilter readBloomFilterFromParquetMetadata(Configuration configuration, Path parquetFilePath) {
Map<String, String> footerVals = Map<String, String> footerVals = readParquetFooter(configuration, false, parquetFilePath,
readParquetFooter(configuration, false, parquetFilePath, HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY,
HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
String footerVal = footerVals.get(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY); String footerVal = footerVals.get(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
if (null == footerVal) { if (null == footerVal) {
// We use old style key "com.uber.hoodie.bloomfilter" // We use old style key "com.uber.hoodie.bloomfilter"

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.common.util.collection.Pair;
/** /**
* Migrates a specific metadata type stored in .hoodie folder to latest version * Migrates a specific metadata type stored in .hoodie folder to latest version
*
* @param <T> * @param <T>
*/ */
public class MetadataMigrator<T> { public class MetadataMigrator<T> {
@@ -36,15 +37,16 @@ public class MetadataMigrator<T> {
private final Integer oldestVersion; private final Integer oldestVersion;
public MetadataMigrator(HoodieTableMetaClient metaClient, List<VersionMigrator<T>> migratorList) { public MetadataMigrator(HoodieTableMetaClient metaClient, List<VersionMigrator<T>> migratorList) {
migrators = migratorList.stream().map(m -> migrators = migratorList.stream().map(m -> Pair.of(m.getManagedVersion(), m))
Pair.of(m.getManagedVersion(), m)).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
latestVersion = migrators.keySet().stream().reduce((x, y) -> x > y ? x : y).get(); latestVersion = migrators.keySet().stream().reduce((x, y) -> x > y ? x : y).get();
oldestVersion = migrators.keySet().stream().reduce((x, y) -> x < y ? x : y).get(); oldestVersion = migrators.keySet().stream().reduce((x, y) -> x < y ? x : y).get();
} }
/** /**
* Upgrade Metadata version to its latest * Upgrade Metadata version to its latest
* @param metadata Metadata *
* @param metadata Metadata
* @param metadataVersion Current version of metadata * @param metadataVersion Current version of metadata
* @return Metadata conforming to the latest version of this metadata * @return Metadata conforming to the latest version of this metadata
*/ */
@@ -64,9 +66,10 @@ public class MetadataMigrator<T> {
/** /**
* Migrate metadata to a specific version * Migrate metadata to a specific version
* @param metadata Hoodie Table Meta Client *
* @param metadataVersion Metadata Version * @param metadata Hoodie Table Meta Client
* @param targetVersion Target Version * @param metadataVersion Metadata Version
* @param targetVersion Target Version
* @return Metadata conforming to the target version * @return Metadata conforming to the target version
*/ */
public T migrateToVersion(T metadata, int metadataVersion, int targetVersion) { public T migrateToVersion(T metadata, int metadataVersion, int targetVersion) {

View File

@@ -22,18 +22,21 @@ import java.io.Serializable;
/** /**
* Responsible for upgrading and downgrading metadata versions for a specific metadata * Responsible for upgrading and downgrading metadata versions for a specific metadata
*
* @param <T> Metadata Type * @param <T> Metadata Type
*/ */
public interface VersionMigrator<T> extends Serializable { public interface VersionMigrator<T> extends Serializable {
/** /**
* Version of Metadata that this class will handle * Version of Metadata that this class will handle
*
* @return * @return
*/ */
Integer getManagedVersion(); Integer getManagedVersion();
/** /**
* Upgrades metadata of type T from previous version to this version * Upgrades metadata of type T from previous version to this version
*
* @param input Metadata as of previous version. * @param input Metadata as of previous version.
* @return Metadata compatible with the version managed by this class * @return Metadata compatible with the version managed by this class
*/ */
@@ -41,6 +44,7 @@ public interface VersionMigrator<T> extends Serializable {
/** /**
* Downgrades metadata of type T from next version to this version * Downgrades metadata of type T from next version to this version
*
* @param input Metadata as of next highest version * @param input Metadata as of next highest version
* @return Metadata compatible with the version managed by this class * @return Metadata compatible with the version managed by this class
*/ */

View File

@@ -29,8 +29,7 @@ import org.apache.hudi.common.versioning.MetadataMigrator;
public class CompactionPlanMigrator extends MetadataMigrator<HoodieCompactionPlan> { public class CompactionPlanMigrator extends MetadataMigrator<HoodieCompactionPlan> {
public CompactionPlanMigrator(HoodieTableMetaClient metaClient) { public CompactionPlanMigrator(HoodieTableMetaClient metaClient) {
super(metaClient, Arrays.asList( super(metaClient,
new CompactionV1MigrationHandler(metaClient), Arrays.asList(new CompactionV1MigrationHandler(metaClient), new CompactionV2MigrationHandler(metaClient)));
new CompactionV2MigrationHandler(metaClient)));
} }
} }

View File

@@ -52,21 +52,17 @@ public class CompactionV1MigrationHandler extends AbstractMigratorBase<HoodieCom
@Override @Override
public HoodieCompactionPlan downgradeFrom(HoodieCompactionPlan input) { public HoodieCompactionPlan downgradeFrom(HoodieCompactionPlan input) {
Preconditions.checkArgument(input.getVersion() == 2, "Input version is " Preconditions.checkArgument(input.getVersion() == 2, "Input version is " + input.getVersion() + ". Must be 2");
+ input.getVersion() + ". Must be 2");
HoodieCompactionPlan compactionPlan = new HoodieCompactionPlan(); HoodieCompactionPlan compactionPlan = new HoodieCompactionPlan();
final Path basePath = new Path(metaClient.getBasePath()); final Path basePath = new Path(metaClient.getBasePath());
List<HoodieCompactionOperation> v1CompactionOperationList = new ArrayList<>(); List<HoodieCompactionOperation> v1CompactionOperationList = new ArrayList<>();
if (null != input.getOperations()) { if (null != input.getOperations()) {
v1CompactionOperationList = input.getOperations().stream().map(inp -> { v1CompactionOperationList = input.getOperations().stream().map(inp -> {
return HoodieCompactionOperation.newBuilder() return HoodieCompactionOperation.newBuilder().setBaseInstantTime(inp.getBaseInstantTime())
.setBaseInstantTime(inp.getBaseInstantTime()) .setFileId(inp.getFileId()).setPartitionPath(inp.getPartitionPath()).setMetrics(inp.getMetrics())
.setFileId(inp.getFileId())
.setPartitionPath(inp.getPartitionPath())
.setMetrics(inp.getMetrics())
.setDataFilePath(convertToV1Path(basePath, inp.getPartitionPath(), inp.getDataFilePath())) .setDataFilePath(convertToV1Path(basePath, inp.getPartitionPath(), inp.getDataFilePath()))
.setDeltaFilePaths(inp.getDeltaFilePaths().stream().map(s -> convertToV1Path(basePath, .setDeltaFilePaths(inp.getDeltaFilePaths().stream()
inp.getPartitionPath(), s)).collect(Collectors.toList())) .map(s -> convertToV1Path(basePath, inp.getPartitionPath(), s)).collect(Collectors.toList()))
.build(); .build();
}).collect(Collectors.toList()); }).collect(Collectors.toList());
} }

View File

@@ -46,20 +46,15 @@ public class CompactionV2MigrationHandler extends AbstractMigratorBase<HoodieCom
@Override @Override
public HoodieCompactionPlan upgradeFrom(HoodieCompactionPlan input) { public HoodieCompactionPlan upgradeFrom(HoodieCompactionPlan input) {
Preconditions.checkArgument(input.getVersion() == 1, "Input version is " Preconditions.checkArgument(input.getVersion() == 1, "Input version is " + input.getVersion() + ". Must be 1");
+ input.getVersion() + ". Must be 1");
HoodieCompactionPlan compactionPlan = new HoodieCompactionPlan(); HoodieCompactionPlan compactionPlan = new HoodieCompactionPlan();
List<HoodieCompactionOperation> v2CompactionOperationList = new ArrayList<>(); List<HoodieCompactionOperation> v2CompactionOperationList = new ArrayList<>();
if (null != input.getOperations()) { if (null != input.getOperations()) {
v2CompactionOperationList = input.getOperations().stream().map(inp -> { v2CompactionOperationList = input.getOperations().stream().map(inp -> {
return HoodieCompactionOperation.newBuilder() return HoodieCompactionOperation.newBuilder().setBaseInstantTime(inp.getBaseInstantTime())
.setBaseInstantTime(inp.getBaseInstantTime()) .setFileId(inp.getFileId()).setPartitionPath(inp.getPartitionPath()).setMetrics(inp.getMetrics())
.setFileId(inp.getFileId()) .setDataFilePath(new Path(inp.getDataFilePath()).getName()).setDeltaFilePaths(
.setPartitionPath(inp.getPartitionPath()) inp.getDeltaFilePaths().stream().map(s -> new Path(s).getName()).collect(Collectors.toList()))
.setMetrics(inp.getMetrics())
.setDataFilePath(new Path(inp.getDataFilePath()).getName())
.setDeltaFilePaths(inp.getDeltaFilePaths().stream().map(s -> new Path(s).getName())
.collect(Collectors.toList()))
.build(); .build();
}).collect(Collectors.toList()); }).collect(Collectors.toList());
} }

View File

@@ -114,8 +114,7 @@ public class TestCompactionUtils extends HoodieCommonTestHarness {
fileSlice.addLogFile( fileSlice.addLogFile(
new HoodieLogFile(new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN)))); new HoodieLogFile(new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN))));
op = CompactionUtils.buildFromFileSlice(DEFAULT_PARTITION_PATHS[0], fileSlice, Option.of(metricsCaptureFn)); op = CompactionUtils.buildFromFileSlice(DEFAULT_PARTITION_PATHS[0], fileSlice, Option.of(metricsCaptureFn));
testFileSliceCompactionOpEquality(fileSlice, op, DEFAULT_PARTITION_PATHS[0], testFileSliceCompactionOpEquality(fileSlice, op, DEFAULT_PARTITION_PATHS[0], LATEST_COMPACTION_METADATA_VERSION);
LATEST_COMPACTION_METADATA_VERSION);
} }
/** /**
@@ -126,17 +125,17 @@ public class TestCompactionUtils extends HoodieCommonTestHarness {
FileSlice emptyFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "empty1"); FileSlice emptyFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "empty1");
FileSlice fileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1"); FileSlice fileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1");
fileSlice.setDataFile(new TestHoodieDataFile(fullPartitionPath.toString() + "/data1_1_000.parquet")); fileSlice.setDataFile(new TestHoodieDataFile(fullPartitionPath.toString() + "/data1_1_000.parquet"));
fileSlice.addLogFile( fileSlice.addLogFile(new HoodieLogFile(
new HoodieLogFile(new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN))))); new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN)))));
fileSlice.addLogFile( fileSlice.addLogFile(new HoodieLogFile(
new HoodieLogFile(new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN))))); new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN)))));
FileSlice noLogFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noLog1"); FileSlice noLogFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noLog1");
noLogFileSlice.setDataFile(new TestHoodieDataFile(fullPartitionPath.toString() + "/noLog_1_000.parquet")); noLogFileSlice.setDataFile(new TestHoodieDataFile(fullPartitionPath.toString() + "/noLog_1_000.parquet"));
FileSlice noDataFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1"); FileSlice noDataFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1");
noDataFileSlice.addLogFile( noDataFileSlice.addLogFile(new HoodieLogFile(
new HoodieLogFile(new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN))))); new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN)))));
noDataFileSlice.addLogFile( noDataFileSlice.addLogFile(new HoodieLogFile(
new HoodieLogFile(new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN))))); new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN)))));
List<FileSlice> fileSliceList = Arrays.asList(emptyFileSlice, noDataFileSlice, fileSlice, noLogFileSlice); List<FileSlice> fileSliceList = Arrays.asList(emptyFileSlice, noDataFileSlice, fileSlice, noLogFileSlice);
List<Pair<String, FileSlice>> input = List<Pair<String, FileSlice>> input =
fileSliceList.stream().map(f -> Pair.of(DEFAULT_PARTITION_PATHS[0], f)).collect(Collectors.toList()); fileSliceList.stream().map(f -> Pair.of(DEFAULT_PARTITION_PATHS[0], f)).collect(Collectors.toList());
@@ -221,9 +220,8 @@ public class TestCompactionUtils extends HoodieCommonTestHarness {
*/ */
private void testFileSlicesCompactionPlanEquality(List<Pair<String, FileSlice>> input, HoodieCompactionPlan plan) { private void testFileSlicesCompactionPlanEquality(List<Pair<String, FileSlice>> input, HoodieCompactionPlan plan) {
Assert.assertEquals("All file-slices present", input.size(), plan.getOperations().size()); Assert.assertEquals("All file-slices present", input.size(), plan.getOperations().size());
IntStream.range(0, input.size()).boxed().forEach(idx -> IntStream.range(0, input.size()).boxed().forEach(idx -> testFileSliceCompactionOpEquality(input.get(idx).getValue(),
testFileSliceCompactionOpEquality(input.get(idx).getValue(), plan.getOperations().get(idx), plan.getOperations().get(idx), input.get(idx).getKey(), plan.getVersion()));
input.get(idx).getKey(), plan.getVersion()));
} }
/** /**
@@ -233,15 +231,15 @@ public class TestCompactionUtils extends HoodieCommonTestHarness {
* @param op HoodieCompactionOperation * @param op HoodieCompactionOperation
* @param expPartitionPath Partition path * @param expPartitionPath Partition path
*/ */
private void testFileSliceCompactionOpEquality(FileSlice slice, HoodieCompactionOperation op, private void testFileSliceCompactionOpEquality(FileSlice slice, HoodieCompactionOperation op, String expPartitionPath,
String expPartitionPath, int version) { int version) {
Assert.assertEquals("Partition path is correct", expPartitionPath, op.getPartitionPath()); Assert.assertEquals("Partition path is correct", expPartitionPath, op.getPartitionPath());
Assert.assertEquals("Same base-instant", slice.getBaseInstantTime(), op.getBaseInstantTime()); Assert.assertEquals("Same base-instant", slice.getBaseInstantTime(), op.getBaseInstantTime());
Assert.assertEquals("Same file-id", slice.getFileId(), op.getFileId()); Assert.assertEquals("Same file-id", slice.getFileId(), op.getFileId());
if (slice.getDataFile().isPresent()) { if (slice.getDataFile().isPresent()) {
HoodieDataFile df = slice.getDataFile().get(); HoodieDataFile df = slice.getDataFile().get();
Assert.assertEquals("Same data-file", Assert.assertEquals("Same data-file", version == COMPACTION_METADATA_VERSION_1 ? df.getPath() : df.getFileName(),
version == COMPACTION_METADATA_VERSION_1 ? df.getPath() : df.getFileName(), op.getDataFilePath()); op.getDataFilePath());
} }
List<String> paths = slice.getLogFiles().map(l -> l.getPath().toString()).collect(Collectors.toList()); List<String> paths = slice.getLogFiles().map(l -> l.getPath().toString()).collect(Collectors.toList());
IntStream.range(0, paths.size()).boxed().forEach(idx -> { IntStream.range(0, paths.size()).boxed().forEach(idx -> {

View File

@@ -62,25 +62,30 @@ public class HiveSyncTool {
} }
public void syncHoodieTable() throws ClassNotFoundException { public void syncHoodieTable() throws ClassNotFoundException {
switch (hoodieHiveClient.getTableType()) { try {
case COPY_ON_WRITE: switch (hoodieHiveClient.getTableType()) {
syncHoodieTable(false); case COPY_ON_WRITE:
break; syncHoodieTable(false);
case MERGE_ON_READ: break;
// sync a RO table for MOR case MERGE_ON_READ:
syncHoodieTable(false); // sync a RO table for MOR
String originalTableName = cfg.tableName; syncHoodieTable(false);
// TODO : Make realtime table registration optional using a config param String originalTableName = cfg.tableName;
cfg.tableName = cfg.tableName + SUFFIX_REALTIME_TABLE; // TODO : Make realtime table registration optional using a config param
// sync a RT table for MOR cfg.tableName = cfg.tableName + SUFFIX_REALTIME_TABLE;
syncHoodieTable(true); // sync a RT table for MOR
cfg.tableName = originalTableName; syncHoodieTable(true);
break; cfg.tableName = originalTableName;
default: break;
LOG.error("Unknown table type " + hoodieHiveClient.getTableType()); default:
throw new InvalidDatasetException(hoodieHiveClient.getBasePath()); LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
throw new InvalidDatasetException(hoodieHiveClient.getBasePath());
}
} catch (RuntimeException re) {
LOG.error("Got runtime exception when hive syncing", re);
} finally {
hoodieHiveClient.close();
} }
hoodieHiveClient.close();
} }
private void syncHoodieTable(boolean isRealTime) throws ClassNotFoundException { private void syncHoodieTable(boolean isRealTime) throws ClassNotFoundException {

View File

@@ -138,9 +138,11 @@ public class TimelineService {
} }
public void close() { public void close() {
log.info("Closing Timeline Service");
this.app.stop(); this.app.stop();
this.app = null; this.app = null;
this.fsViewsManager.close(); this.fsViewsManager.close();
log.info("Closed Timeline Service");
} }
public Configuration getConf() { public Configuration getConf() {