1
0

[HUDI-756] Organize Cleaning Action execution into a single package in hudi-client (#1485)

- Introduced a thin abstraction ActionExecutor, that all actions will implement
- Pulled cleaning code from table, writeclient into a single package
- CleanHelper is now CleanPlanner, HoodieCleanClient is no longer around
- Minor refactor of HoodieTable factory method
- HoodieTable.create() methods with and without metaclient passed in
- HoodieTable constructor now does not do a redundant instantiation
- Fixed existing unit tests to work at the HoodieWriteClient level
This commit is contained in:
vinoth chandar
2020-04-04 00:07:34 -07:00
committed by GitHub
parent 575d87cf7d
commit eaf6cc2d90
31 changed files with 585 additions and 555 deletions

View File

@@ -118,10 +118,6 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl
return config;
}
public Option<EmbeddedTimelineService> getTimelineServer() {
return timelineServer;
}
protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoad) {
return ClientUtils.createMetaClient(jsc, config, loadActiveTimelineOnLoad);
}

View File

@@ -140,9 +140,9 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
private boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
Option<Map<String, String>> extraMetadata, String actionType) {
LOG.info("Commiting " + instantTime);
LOG.info("Committing " + instantTime);
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
HoodieTable<T> table = HoodieTable.create(config, jsc);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
@@ -221,7 +221,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
// TODO : make sure we cannot rollback / archive last commit file
try {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
HoodieTable table = HoodieTable.create(config, jsc);
// 0. All of the rolling stat management is only done by the DELTA commit for MOR and COMMIT for COW other wise
// there may be race conditions
HoodieRollingStatMetadata rollingStatMetadata = new HoodieRollingStatMetadata(actionType);
@@ -272,7 +272,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
setWriteSchemaFromLastInstant(metaClient);
}
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieTable table = HoodieTable.create(metaClient, config, jsc);
if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) {
writeContext = metrics.getCommitCtx();
} else {
@@ -321,8 +321,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
// Create a Hoodie table which encapsulated the commits and files visible
try {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.getHoodieTable(
createMetaClient(true), config, jsc);
HoodieTable<T> table = HoodieTable.create(config, jsc);
Option<HoodieInstant> rollbackInstantOpt =
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
.filter(instant -> HoodieActiveTimeline.EQUAL.test(instant.getTimestamp(), commitToRollback))
@@ -341,8 +340,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
protected List<HoodieRollbackStat> doRollbackAndGetStats(final HoodieInstant instantToRollback) throws
IOException {
final String commitToRollback = instantToRollback.getTimestamp();
HoodieTable<T> table = HoodieTable.getHoodieTable(
createMetaClient(true), config, jsc);
HoodieTable<T> table = HoodieTable.create(config, jsc);
HoodieTimeline inflightAndRequestedCommitTimeline = table.getPendingCommitTimeline();
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
// Check if any of the commits is a savepoint - do not allow rollback on those commits
@@ -391,7 +389,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
private void finishRollback(final Timer.Context context, List<HoodieRollbackStat> rollbackStats,
List<String> commitsToRollback, final String startRollbackTime) throws IOException {
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
HoodieTable<T> table = HoodieTable.create(config, jsc);
Option<Long> durationInMs = Option.empty();
long numFilesDeleted = rollbackStats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum();
if (context != null) {

View File

@@ -1,197 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.HoodieTable;
import com.codahale.metrics.Timer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.util.List;
public class HoodieCleanClient<T extends HoodieRecordPayload> extends AbstractHoodieClient {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(HoodieCleanClient.class);
private final transient HoodieMetrics metrics;
public HoodieCleanClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, HoodieMetrics metrics) {
this(jsc, clientConfig, metrics, Option.empty());
}
public HoodieCleanClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, HoodieMetrics metrics,
Option<EmbeddedTimelineService> timelineService) {
super(jsc, clientConfig, timelineService);
this.metrics = metrics;
}
/**
* Clean up any stale/old files/data lying around (either on file storage or index storage) based on the
* configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be
* cleaned)
*/
public void clean() throws HoodieIOException {
String startCleanTime = HoodieActiveTimeline.createNewInstantTime();
clean(startCleanTime);
}
/**
* Clean up any stale/old files/data lying around (either on file storage or index storage) based on the
* configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be
* cleaned)
*
* @param startCleanTime Cleaner Instant Timestamp
* @throws HoodieIOException in case of any IOException
*/
public HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOException {
// Create a Hoodie table which encapsulated the commits and files visible
final HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
// If there are inflight(failed) or previously requested clean operation, first perform them
table.getCleanTimeline().filterInflightsAndRequested().getInstants().forEach(hoodieInstant -> {
LOG.info("There were previously unfinished cleaner operations. Finishing Instant=" + hoodieInstant);
try {
runClean(table, hoodieInstant);
} catch (Exception e) {
LOG.warn("Failed to perform previous clean operation, instant: " + hoodieInstant, e);
}
});
Option<HoodieCleanerPlan> cleanerPlanOpt = scheduleClean(startCleanTime);
if (cleanerPlanOpt.isPresent()) {
HoodieCleanerPlan cleanerPlan = cleanerPlanOpt.get();
if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null)
&& !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) {
final HoodieTable<T> hoodieTable = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
return runClean(hoodieTable, HoodieTimeline.getCleanRequestedInstant(startCleanTime), cleanerPlan);
}
}
return null;
}
/**
* Creates a Cleaner plan if there are files to be cleaned and stores them in instant file.
*
* @param startCleanTime Cleaner Instant Time
* @return Cleaner Plan if generated
*/
protected Option<HoodieCleanerPlan> scheduleClean(String startCleanTime) {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
HoodieCleanerPlan cleanerPlan = table.scheduleClean(jsc);
if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null)
&& !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) {
HoodieInstant cleanInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime);
// Save to both aux and timeline folder
try {
table.getActiveTimeline().saveToCleanRequested(cleanInstant, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
LOG.info("Requesting Cleaning with instant time " + cleanInstant);
} catch (IOException e) {
LOG.error("Got exception when saving cleaner requested file", e);
throw new HoodieIOException(e.getMessage(), e);
}
return Option.of(cleanerPlan);
}
return Option.empty();
}
/**
* Executes the Cleaner plan stored in the instant metadata.
*
* @param table Hoodie Table
* @param cleanInstant Cleaner Instant
*/
public HoodieCleanMetadata runClean(HoodieTable<T> table, HoodieInstant cleanInstant) {
try {
HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(table.getMetaClient(), cleanInstant);
return runClean(table, cleanInstant, cleanerPlan);
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}
private HoodieCleanMetadata runClean(HoodieTable<T> table, HoodieInstant cleanInstant,
HoodieCleanerPlan cleanerPlan) {
ValidationUtils.checkArgument(
cleanInstant.getState().equals(State.REQUESTED) || cleanInstant.getState().equals(State.INFLIGHT));
try {
LOG.info("Cleaner started");
final Timer.Context context = metrics.getCleanCtx();
if (!cleanInstant.isInflight()) {
// Mark as inflight first
cleanInstant = table.getActiveTimeline().transitionCleanRequestedToInflight(cleanInstant,
TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
}
List<HoodieCleanStat> cleanStats = table.clean(jsc, cleanInstant, cleanerPlan);
if (cleanStats.isEmpty()) {
return HoodieCleanMetadata.newBuilder().build();
}
// Emit metrics (duration, numFilesDeleted) if needed
Option<Long> durationInMs = Option.empty();
if (context != null) {
durationInMs = Option.of(metrics.getDurationInMs(context.stop()));
LOG.info("cleanerElaspsedTime (Minutes): " + durationInMs.get() / (1000 * 60));
}
HoodieTableMetaClient metaClient = createMetaClient(true);
// Create the metadata and save it
HoodieCleanMetadata metadata =
CleanerUtils.convertCleanMetadata(metaClient, cleanInstant.getTimestamp(), durationInMs, cleanStats);
LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files. Earliest Retained :" + metadata.getEarliestCommitToRetain());
metrics.updateCleanMetrics(durationInMs.orElseGet(() -> -1L), metadata.getTotalFilesDeleted());
table.getActiveTimeline().transitionCleanInflightToComplete(
new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, cleanInstant.getTimestamp()),
TimelineMetadataUtils.serializeCleanMetadata(metadata));
LOG.info("Marked clean started on " + cleanInstant.getTimestamp() + " as complete");
return metadata;
} catch (IOException e) {
throw new HoodieIOException("Failed to clean up after commit", e);
}
}
}

View File

@@ -96,7 +96,7 @@ public class HoodieReadClient<T extends HoodieRecordPayload> implements Serializ
final String basePath = clientConfig.getBasePath();
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
this.hoodieTable = HoodieTable.getHoodieTable(metaClient, clientConfig, jsc);
this.hoodieTable = HoodieTable.create(metaClient, clientConfig, jsc);
this.index = HoodieIndex.createIndex(clientConfig, jsc);
this.sqlContextOpt = Option.empty();
}

View File

@@ -98,7 +98,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
private static final String LOOKUP_STR = "lookup";
private final boolean rollbackPending;
private final transient HoodieMetrics metrics;
private final transient HoodieCleanClient<T> cleanClient;
private transient Timer.Context compactionTimer;
/**
@@ -139,7 +138,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
super(jsc, index, clientConfig, timelineService);
this.metrics = new HoodieMetrics(config, config.getTableName());
this.rollbackPending = rollbackPending;
this.cleanClient = new HoodieCleanClient<>(jsc, config, metrics, timelineService);
}
/**
@@ -161,7 +159,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> hoodieRecords) {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
HoodieTable<T> table = HoodieTable.create(config, jsc);
Timer.Context indexTimer = metrics.getIndexCtx();
JavaRDD<HoodieRecord<T>> recordsWithLocation = getIndex().tagLocation(hoodieRecords, jsc, table);
metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
@@ -539,7 +537,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @return true if the savepoint was created successfully
*/
public boolean savepoint(String user, String comment) {
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
HoodieTable<T> table = HoodieTable.create(config, jsc);
if (table.getCompletedCommitsTimeline().empty()) {
throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty");
}
@@ -567,7 +565,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @return true if the savepoint was created successfully
*/
public boolean savepoint(String instantTime, String user, String comment) {
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
HoodieTable<T> table = HoodieTable.create(config, jsc);
if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
}
@@ -628,7 +626,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @return true if the savepoint was deleted successfully
*/
public void deleteSavepoint(String savepointTime) {
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
HoodieTable<T> table = HoodieTable.create(config, jsc);
if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
}
@@ -655,7 +653,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @param compactionTime - delete the compaction time
*/
private void deleteRequestedCompaction(String compactionTime) {
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
HoodieTable<T> table = HoodieTable.create(config, jsc);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieInstant compactionRequestedInstant =
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionTime);
@@ -682,7 +680,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @return true if the savepoint was rollecback to successfully
*/
public boolean rollbackToSavepoint(String savepointTime) {
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
HoodieTable<T> table = HoodieTable.create(config, jsc);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
// Rollback to savepoint is expected to be a manual operation and no concurrent write or compaction is expected
@@ -737,7 +735,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
public void restoreToInstant(final String instantTime) throws HoodieRollbackException {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
HoodieTable<T> table = HoodieTable.create(config, jsc);
// Get all the commits on the timeline after the provided commit time
List<HoodieInstant> instantsToRollback = table.getActiveTimeline().getCommitsAndCompactionTimeline()
.getReverseOrderedInstants()
@@ -788,7 +786,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
private void finishRestore(final Timer.Context context, Map<String, List<HoodieRollbackStat>> commitToStats,
List<String> commitsToRollback, final String startRestoreTime, final String restoreToInstant) throws IOException {
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
HoodieTable<T> table = HoodieTable.create(config, jsc);
Option<Long> durationInMs = Option.empty();
long numFilesDeleted = 0L;
for (Map.Entry<String, List<HoodieRollbackStat>> commitToStat : commitToStats.entrySet()) {
@@ -821,7 +819,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
public void close() {
// Stop timeline-server if running
super.close();
this.cleanClient.close();
}
/**
@@ -829,20 +826,25 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be
* cleaned)
*/
public void clean() throws HoodieIOException {
cleanClient.clean();
public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOException {
LOG.info("Cleaner started");
final Timer.Context context = metrics.getCleanCtx();
HoodieCleanMetadata metadata = HoodieTable.create(config, jsc).clean(jsc, cleanInstantTime);
if (context != null) {
long durationMs = metrics.getDurationInMs(context.stop());
metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
+ " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
+ " cleanerElaspsedMs" + durationMs);
}
return metadata;
}
/**
* Clean up any stale/old files/data lying around (either on file storage or index storage) based on the
* configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be
* cleaned)
*
* @param startCleanTime Cleaner Instant Timestamp
* @throws HoodieIOException in case of any IOException
*/
protected HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOException {
return cleanClient.clean(startCleanTime);
public HoodieCleanMetadata clean() {
return clean(HoodieActiveTimeline.createNewInstantTime());
}
/**
@@ -882,7 +884,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), instantTime, HoodieTimeline.LESSER),
"Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :"
+ latestPending + ", Ingesting at " + instantTime));
HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieTable<T> table = HoodieTable.create(metaClient, config, jsc);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
String commitActionType = table.getMetaClient().getCommitActionType();
activeTimeline.createNewInstant(new HoodieInstant(State.REQUESTED, commitActionType, instantTime));
@@ -924,7 +926,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
ValidationUtils.checkArgument(conflictingInstants.isEmpty(),
"Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :"
+ conflictingInstants);
HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieTable<T> table = HoodieTable.create(metaClient, config, jsc);
HoodieCompactionPlan workload = table.scheduleCompaction(jsc, instantTime);
if (workload != null && (workload.getOperations() != null) && (!workload.getOperations().isEmpty())) {
extraMetadata.ifPresent(workload::setExtraMetadata);
@@ -957,7 +959,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
public void commitCompaction(String compactionInstantTime, JavaRDD<WriteStatus> writeStatuses,
Option<Map<String, String>> extraMetadata) throws IOException {
HoodieTableMetaClient metaClient = createMetaClient(true);
HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieTable<T> table = HoodieTable.create(metaClient, config, jsc);
HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeCompactionPlan(
timeline.readCompactionPlanAsBytes(HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get());
@@ -1020,7 +1022,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* Cleanup all pending commits.
*/
private void rollbackPendingCommits() {
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
HoodieTable<T> table = HoodieTable.create(config, jsc);
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
List<String> commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
@@ -1038,7 +1040,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean autoCommit) throws IOException {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTableMetaClient metaClient = createMetaClient(true);
HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieTable<T> table = HoodieTable.create(metaClient, config, jsc);
HoodieTimeline pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline();
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
@@ -1046,7 +1048,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
rollbackInflightCompaction(inflightInstant, table);
// refresh table
metaClient = createMetaClient(true);
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
table = HoodieTable.create(metaClient, config, jsc);
pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline();
}
@@ -1076,7 +1078,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
activeTimeline.transitionCompactionRequestedToInflight(compactionInstant);
compactionTimer = metrics.getCompactionCtx();
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieTable<T> table = HoodieTable.create(metaClient, config, jsc);
JavaRDD<WriteStatus> statuses = table.compact(jsc, compactionInstant.getTimestamp(), compactionPlan);
// Force compaction action
statuses.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps()));

View File

@@ -138,7 +138,7 @@ public class HoodieCommitArchiveLog {
int maxCommitsToKeep = config.getMaxCommitsToKeep();
int minCommitsToKeep = config.getMinCommitsToKeep();
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieTable table = HoodieTable.create(metaClient, config, jsc);
// GroupBy each action and limit each action timeline to maxCommitsToKeep
// TODO: Handle ROLLBACK_ACTION in future

View File

@@ -18,22 +18,22 @@
package org.apache.hudi.table;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.ParquetReaderIterator;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieRollingStatMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
@@ -45,20 +45,15 @@ import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.CopyOnWriteLazyInsertIterable;
import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.table.action.clean.CleanActionExecutor;
import org.apache.hudi.table.rollback.RollbackHelper;
import org.apache.hudi.table.rollback.RollbackRequest;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroParquetReader;
@@ -67,9 +62,9 @@ import org.apache.parquet.hadoop.ParquetReader;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
@@ -81,9 +76,6 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
/**
* Implementation of a very heavily read-optimized Hoodie Table where, all data is stored in base files, with
* zero read amplification.
@@ -97,49 +89,8 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
private static final Logger LOG = LogManager.getLogger(HoodieCopyOnWriteTable.class);
public HoodieCopyOnWriteTable(HoodieWriteConfig config, JavaSparkContext jsc) {
super(config, jsc);
}
private static PairFlatMapFunction<Iterator<Tuple2<String, String>>, String, PartitionCleanStat> deleteFilesFunc(
HoodieTable table) {
return (PairFlatMapFunction<Iterator<Tuple2<String, String>>, String, PartitionCleanStat>) iter -> {
Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
FileSystem fs = table.getMetaClient().getFs();
Path basePath = new Path(table.getMetaClient().getBasePath());
while (iter.hasNext()) {
Tuple2<String, String> partitionDelFileTuple = iter.next();
String partitionPath = partitionDelFileTuple._1();
String delFileName = partitionDelFileTuple._2();
Path deletePath = FSUtils.getPartitionPath(FSUtils.getPartitionPath(basePath, partitionPath), delFileName);
String deletePathStr = deletePath.toString();
Boolean deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
if (!partitionCleanStatMap.containsKey(partitionPath)) {
partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath));
}
PartitionCleanStat partitionCleanStat = partitionCleanStatMap.get(partitionPath);
partitionCleanStat.addDeleteFilePatterns(deletePath.getName());
partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult);
}
return partitionCleanStatMap.entrySet().stream().map(e -> new Tuple2<>(e.getKey(), e.getValue()))
.collect(Collectors.toList()).iterator();
};
}
private static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException {
Path deletePath = new Path(deletePathStr);
LOG.debug("Working on delete path :" + deletePath);
try {
boolean deleteResult = fs.delete(deletePath, false);
if (deleteResult) {
LOG.debug("Cleaned file at path :" + deletePath);
}
return deleteResult;
} catch (FileNotFoundException fio) {
// With cleanPlan being used for retried cleaning operations, its possible to clean a file twice
return false;
}
public HoodieCopyOnWriteTable(HoodieWriteConfig config, JavaSparkContext jsc, HoodieTableMetaClient metaClient) {
super(config, jsc, metaClient);
}
@Override
@@ -278,77 +229,9 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
}
/**
* Generates List of files to be cleaned.
*
* @param jsc JavaSparkContext
* @return Cleaner Plan
*/
@Override
public HoodieCleanerPlan scheduleClean(JavaSparkContext jsc) {
try {
CleanHelper cleaner = new CleanHelper(this, config);
Option<HoodieInstant> earliestInstant = cleaner.getEarliestCommitToRetain();
List<String> partitionsToClean = cleaner.getPartitionPathsToClean(earliestInstant);
if (partitionsToClean.isEmpty()) {
LOG.info("Nothing to clean here. It is already clean");
return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
}
LOG.info(
"Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy());
int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
LOG.info("Using cleanerParallelism: " + cleanerParallelism);
Map<String, List<String>> cleanOps = jsc.parallelize(partitionsToClean, cleanerParallelism)
.map(partitionPathToClean -> Pair.of(partitionPathToClean, cleaner.getDeletePaths(partitionPathToClean)))
.collect().stream().collect(Collectors.toMap(Pair::getKey, Pair::getValue));
return new HoodieCleanerPlan(earliestInstant
.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
config.getCleanerPolicy().name(), cleanOps, 1);
} catch (IOException e) {
throw new HoodieIOException("Failed to schedule clean operation", e);
}
}
/**
* Performs cleaning of partition paths according to cleaning policy and returns the number of files cleaned. Handles
* skews in partitions to clean by making files to clean as the unit of task distribution.
*
* @throws IllegalArgumentException if unknown cleaning policy is provided
*/
@Override
public List<HoodieCleanStat> clean(JavaSparkContext jsc, HoodieInstant cleanInstant, HoodieCleanerPlan cleanerPlan) {
int cleanerParallelism = Math.min(
(int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()),
config.getCleanerParallelism());
LOG.info("Using cleanerParallelism: " + cleanerParallelism);
List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc
.parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream()
.flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(), y)))
.collect(Collectors.toList()), cleanerParallelism)
.mapPartitionsToPair(deleteFilesFunc(this)).reduceByKey(PartitionCleanStat::merge).collect();
Map<String, PartitionCleanStat> partitionCleanStatsMap =
partitionCleanStats.stream().collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
// Return PartitionCleanStat for each partition passed.
return cleanerPlan.getFilesToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {
PartitionCleanStat partitionCleanStat =
(partitionCleanStatsMap.containsKey(partitionPath)) ? partitionCleanStatsMap.get(partitionPath)
: new PartitionCleanStat(partitionPath);
HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain();
return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
.withEarliestCommitRetained(Option.ofNullable(
actionInstant != null
? new HoodieInstant(State.valueOf(actionInstant.getState()),
actionInstant.getAction(), actionInstant.getTimestamp())
: null))
.withDeletePathPattern(partitionCleanStat.deletePathPatterns)
.withSuccessfulDeletes(partitionCleanStat.successDeleteFiles)
.withFailedDeletes(partitionCleanStat.failedDeleteFiles).build();
}).collect(Collectors.toList());
public HoodieCleanMetadata clean(JavaSparkContext jsc, String cleanInstantTime) {
return new CleanActionExecutor(jsc, config, this, cleanInstantTime).execute();
}
@Override
@@ -446,40 +329,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
}
}
private static class PartitionCleanStat implements Serializable {
private final String partitionPath;
private final List<String> deletePathPatterns = new ArrayList<>();
private final List<String> successDeleteFiles = new ArrayList<>();
private final List<String> failedDeleteFiles = new ArrayList<>();
private PartitionCleanStat(String partitionPath) {
this.partitionPath = partitionPath;
}
private void addDeletedFileResult(String deletePathStr, Boolean deletedFileResult) {
if (deletedFileResult) {
successDeleteFiles.add(deletePathStr);
} else {
failedDeleteFiles.add(deletePathStr);
}
}
private void addDeleteFilePatterns(String deletePathStr) {
deletePathPatterns.add(deletePathStr);
}
private PartitionCleanStat merge(PartitionCleanStat other) {
if (!this.partitionPath.equals(other.partitionPath)) {
throw new RuntimeException(
String.format("partitionPath is not a match: (%s, %s)", partitionPath, other.partitionPath));
}
successDeleteFiles.addAll(other.successDeleteFiles);
deletePathPatterns.addAll(other.deletePathPatterns);
failedDeleteFiles.addAll(other.failedDeleteFiles);
return this;
}
}
/**
* Helper class for a small file's location and its actual size on disk.

View File

@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -84,8 +85,8 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
// UpsertPartitioner for MergeOnRead table type
private MergeOnReadUpsertPartitioner mergeOnReadUpsertPartitioner;
public HoodieMergeOnReadTable(HoodieWriteConfig config, JavaSparkContext jsc) {
super(config, jsc);
HoodieMergeOnReadTable(HoodieWriteConfig config, JavaSparkContext jsc, HoodieTableMetaClient metaClient) {
super(config, jsc, metaClient);
}
@Override

View File

@@ -18,13 +18,14 @@
package org.apache.hudi.table;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.ClientUtils;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.ConsistencyGuard;
@@ -39,22 +40,20 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
@@ -87,12 +86,12 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
protected final SparkTaskContextSupplier sparkTaskContextSupplier = new SparkTaskContextSupplier();
protected HoodieTable(HoodieWriteConfig config, JavaSparkContext jsc) {
protected HoodieTable(HoodieWriteConfig config, JavaSparkContext jsc, HoodieTableMetaClient metaClient) {
this.config = config;
this.hadoopConfiguration = new SerializableConfiguration(jsc.hadoopConfiguration());
this.viewManager = FileSystemViewManager.createViewManager(new SerializableConfiguration(jsc.hadoopConfiguration()),
config.getViewStorageConfig());
this.metaClient = ClientUtils.createMetaClient(jsc, config, true);
this.metaClient = metaClient;
this.index = HoodieIndex.createIndex(config, jsc);
}
@@ -103,13 +102,25 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
return viewManager;
}
public static <T extends HoodieRecordPayload> HoodieTable<T> getHoodieTable(HoodieTableMetaClient metaClient,
HoodieWriteConfig config, JavaSparkContext jsc) {
public static <T extends HoodieRecordPayload> HoodieTable<T> create(HoodieWriteConfig config, JavaSparkContext jsc) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(
jsc.hadoopConfiguration(),
config.getBasePath(),
true,
config.getConsistencyGuardConfig(),
Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))
);
return HoodieTable.create(metaClient, config, jsc);
}
public static <T extends HoodieRecordPayload> HoodieTable<T> create(HoodieTableMetaClient metaClient,
HoodieWriteConfig config,
JavaSparkContext jsc) {
switch (metaClient.getTableType()) {
case COPY_ON_WRITE:
return new HoodieCopyOnWriteTable<>(config, jsc);
return new HoodieCopyOnWriteTable<>(config, jsc, metaClient);
case MERGE_ON_READ:
return new HoodieMergeOnReadTable<>(config, jsc);
return new HoodieMergeOnReadTable<>(config, jsc, metaClient);
default:
throw new HoodieException("Unsupported table type :" + metaClient.getTableType());
}
@@ -280,23 +291,11 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
HoodieCompactionPlan compactionPlan);
/**
* Generates list of files that are eligible for cleaning.
*
* @param jsc Java Spark Context
* @return Cleaner Plan containing list of files to be deleted.
* Executes a new clean action.
*
* @return information on cleaned file slices
*/
public abstract HoodieCleanerPlan scheduleClean(JavaSparkContext jsc);
/**
* Cleans the files listed in the cleaner plan associated with clean instant.
*
* @param jsc Java Spark Context
* @param cleanInstant Clean Instant
* @param cleanerPlan Cleaner Plan
* @return list of Clean Stats
*/
public abstract List<HoodieCleanStat> clean(JavaSparkContext jsc, HoodieInstant cleanInstant,
HoodieCleanerPlan cleanerPlan);
public abstract HoodieCleanMetadata clean(JavaSparkContext jsc, String cleanInstantTime);
/**
* Rollback the (inflight/committed) record changes with the given commit time. Four steps: (1) Atomically unpublish

View File

@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaSparkContext;
public abstract class BaseActionExecutor<R> {
protected final JavaSparkContext jsc;
protected final HoodieWriteConfig config;
protected final HoodieTable<?> table;
protected final String instantTime;
public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
this.jsc = jsc;
this.config = config;
this.table = table;
this.instantTime = instantTime;
}
public abstract R execute();
}

View File

@@ -0,0 +1,280 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.clean;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class CleanActionExecutor extends BaseActionExecutor<HoodieCleanMetadata> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(CleanActionExecutor.class);
/**
* Generates List of files to be cleaned.
*
* @param jsc JavaSparkContext
* @return Cleaner Plan
*/
HoodieCleanerPlan requestClean(JavaSparkContext jsc) {
try {
CleanPlanner<?> planner = new CleanPlanner<>(table, config);
Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain();
List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant);
if (partitionsToClean.isEmpty()) {
LOG.info("Nothing to clean here. It is already clean");
return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
}
LOG.info("Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy());
int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
LOG.info("Using cleanerParallelism: " + cleanerParallelism);
Map<String, List<String>> cleanOps = jsc
.parallelize(partitionsToClean, cleanerParallelism)
.map(partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)))
.collect().stream()
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
return new HoodieCleanerPlan(earliestInstant
.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
config.getCleanerPolicy().name(), cleanOps, 1);
} catch (IOException e) {
throw new HoodieIOException("Failed to schedule clean operation", e);
}
}
private static PairFlatMapFunction<Iterator<Tuple2<String, String>>, String, PartitionCleanStat> deleteFilesFunc(
HoodieTable table) {
return (PairFlatMapFunction<Iterator<Tuple2<String, String>>, String, PartitionCleanStat>) iter -> {
Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
FileSystem fs = table.getMetaClient().getFs();
Path basePath = new Path(table.getMetaClient().getBasePath());
while (iter.hasNext()) {
Tuple2<String, String> partitionDelFileTuple = iter.next();
String partitionPath = partitionDelFileTuple._1();
String delFileName = partitionDelFileTuple._2();
Path deletePath = FSUtils.getPartitionPath(FSUtils.getPartitionPath(basePath, partitionPath), delFileName);
String deletePathStr = deletePath.toString();
Boolean deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
if (!partitionCleanStatMap.containsKey(partitionPath)) {
partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath));
}
PartitionCleanStat partitionCleanStat = partitionCleanStatMap.get(partitionPath);
partitionCleanStat.addDeleteFilePatterns(deletePath.getName());
partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult);
}
return partitionCleanStatMap.entrySet().stream().map(e -> new Tuple2<>(e.getKey(), e.getValue()))
.collect(Collectors.toList()).iterator();
};
}
private static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException {
Path deletePath = new Path(deletePathStr);
LOG.debug("Working on delete path :" + deletePath);
try {
boolean deleteResult = fs.delete(deletePath, false);
if (deleteResult) {
LOG.debug("Cleaned file at path :" + deletePath);
}
return deleteResult;
} catch (FileNotFoundException fio) {
// With cleanPlan being used for retried cleaning operations, its possible to clean a file twice
return false;
}
}
/**
* Performs cleaning of partition paths according to cleaning policy and returns the number of files cleaned. Handles
* skews in partitions to clean by making files to clean as the unit of task distribution.
*
* @throws IllegalArgumentException if unknown cleaning policy is provided
*/
List<HoodieCleanStat> clean(JavaSparkContext jsc, HoodieCleanerPlan cleanerPlan) {
int cleanerParallelism = Math.min(
(int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()),
config.getCleanerParallelism());
LOG.info("Using cleanerParallelism: " + cleanerParallelism);
List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc
.parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream()
.flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(), y)))
.collect(Collectors.toList()), cleanerParallelism)
.mapPartitionsToPair(deleteFilesFunc(table))
.reduceByKey(PartitionCleanStat::merge).collect();
Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats.stream()
.collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
// Return PartitionCleanStat for each partition passed.
return cleanerPlan.getFilesToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {
PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath)
? partitionCleanStatsMap.get(partitionPath)
: new PartitionCleanStat(partitionPath);
HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain();
return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
.withEarliestCommitRetained(Option.ofNullable(
actionInstant != null
? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
actionInstant.getAction(), actionInstant.getTimestamp())
: null))
.withDeletePathPattern(partitionCleanStat.deletePathPatterns())
.withSuccessfulDeletes(partitionCleanStat.successDeleteFiles())
.withFailedDeletes(partitionCleanStat.failedDeleteFiles())
.build();
}).collect(Collectors.toList());
}
public CleanActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
super(jsc, config, table, instantTime);
}
/**
* Creates a Cleaner plan if there are files to be cleaned and stores them in instant file.
*
* @param startCleanTime Cleaner Instant Time
* @return Cleaner Plan if generated
*/
Option<HoodieCleanerPlan> requestClean(String startCleanTime) {
final HoodieCleanerPlan cleanerPlan = requestClean(jsc);
if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null)
&& !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) {
final HoodieInstant cleanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime);
// Save to both aux and timeline folder
try {
table.getActiveTimeline().saveToCleanRequested(cleanInstant, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
LOG.info("Requesting Cleaning with instant time " + cleanInstant);
} catch (IOException e) {
LOG.error("Got exception when saving cleaner requested file", e);
throw new HoodieIOException(e.getMessage(), e);
}
return Option.of(cleanerPlan);
}
return Option.empty();
}
/**
* Executes the Cleaner plan stored in the instant metadata.
*/
void runPendingClean(HoodieTable<?> table, HoodieInstant cleanInstant) {
try {
HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(table.getMetaClient(), cleanInstant);
runClean(table, cleanInstant, cleanerPlan);
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}
private HoodieCleanMetadata runClean(HoodieTable<?> table, HoodieInstant cleanInstant, HoodieCleanerPlan cleanerPlan) {
ValidationUtils.checkArgument(cleanInstant.getState().equals(HoodieInstant.State.REQUESTED)
|| cleanInstant.getState().equals(HoodieInstant.State.INFLIGHT));
try {
final HoodieInstant inflightInstant;
final HoodieTimer timer = new HoodieTimer();
timer.startTimer();
if (cleanInstant.isRequested()) {
inflightInstant = table.getActiveTimeline().transitionCleanRequestedToInflight(cleanInstant,
TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
} else {
inflightInstant = cleanInstant;
}
List<HoodieCleanStat> cleanStats = clean(jsc, cleanerPlan);
if (cleanStats.isEmpty()) {
return HoodieCleanMetadata.newBuilder().build();
}
table.getMetaClient().reloadActiveTimeline();
HoodieCleanMetadata metadata = CleanerUtils.convertCleanMetadata(
inflightInstant.getTimestamp(),
Option.of(timer.endTimer()),
cleanStats
);
table.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant,
TimelineMetadataUtils.serializeCleanMetadata(metadata));
LOG.info("Marked clean started on " + inflightInstant.getTimestamp() + " as complete");
return metadata;
} catch (IOException e) {
throw new HoodieIOException("Failed to clean up after commit", e);
}
}
@Override
public HoodieCleanMetadata execute() {
// If there are inflight(failed) or previously requested clean operation, first perform them
List<HoodieInstant> pendingCleanInstants = table.getCleanTimeline()
.filterInflightsAndRequested().getInstants().collect(Collectors.toList());
if (pendingCleanInstants.size() > 0) {
pendingCleanInstants.forEach(hoodieInstant -> {
LOG.info("Finishing previously unfinished cleaner instant=" + hoodieInstant);
try {
runPendingClean(table, hoodieInstant);
} catch (Exception e) {
LOG.warn("Failed to perform previous clean operation, instant: " + hoodieInstant, e);
}
});
table.getMetaClient().reloadActiveTimeline();
}
// Plan and execute a new clean action
Option<HoodieCleanerPlan> cleanerPlanOpt = requestClean(instantTime);
if (cleanerPlanOpt.isPresent()) {
table.getMetaClient().reloadActiveTimeline();
HoodieCleanerPlan cleanerPlan = cleanerPlanOpt.get();
if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null) && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) {
return runClean(table, HoodieTimeline.getCleanRequestedInstant(instantTime), cleanerPlan);
}
}
return null;
}
}

View File

@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hudi.table;
package org.apache.hudi.table.action.clean;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.common.fs.FSUtils;
@@ -39,6 +39,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -56,12 +57,10 @@ import java.util.stream.Collectors;
* 1) It provides sufficient time for existing queries running on older versions, to close
* <p>
* 2) It bounds the growth of the files in the file system
* <p>
* TODO: Should all cleaning be done based on {@link HoodieCommitMetadata}
*/
public class CleanHelper<T extends HoodieRecordPayload<T>> implements Serializable {
public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializable {
private static final Logger LOG = LogManager.getLogger(CleanHelper.class);
private static final Logger LOG = LogManager.getLogger(CleanPlanner.class);
private final SyncableFileSystemView fileSystemView;
private final HoodieTimeline commitTimeline;
@@ -69,7 +68,7 @@ public class CleanHelper<T extends HoodieRecordPayload<T>> implements Serializab
private HoodieTable<T> hoodieTable;
private HoodieWriteConfig config;
public CleanHelper(HoodieTable<T> hoodieTable, HoodieWriteConfig config) {
public CleanPlanner(HoodieTable<T> hoodieTable, HoodieWriteConfig config) {
this.hoodieTable = hoodieTable;
this.fileSystemView = hoodieTable.getHoodieView();
this.commitTimeline = hoodieTable.getCompletedCommitTimeline();

View File

@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.clean;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
class PartitionCleanStat implements Serializable {
private final String partitionPath;
private final List<String> deletePathPatterns = new ArrayList<>();
private final List<String> successDeleteFiles = new ArrayList<>();
private final List<String> failedDeleteFiles = new ArrayList<>();
PartitionCleanStat(String partitionPath) {
this.partitionPath = partitionPath;
}
void addDeletedFileResult(String deletePathStr, Boolean deletedFileResult) {
if (deletedFileResult) {
successDeleteFiles.add(deletePathStr);
} else {
failedDeleteFiles.add(deletePathStr);
}
}
void addDeleteFilePatterns(String deletePathStr) {
deletePathPatterns.add(deletePathStr);
}
PartitionCleanStat merge(PartitionCleanStat other) {
if (!this.partitionPath.equals(other.partitionPath)) {
throw new RuntimeException(
String.format("partitionPath is not a match: (%s, %s)", partitionPath, other.partitionPath));
}
successDeleteFiles.addAll(other.successDeleteFiles);
deletePathPatterns.addAll(other.deletePathPatterns);
failedDeleteFiles.addAll(other.failedDeleteFiles);
return this;
}
public List<String> deletePathPatterns() {
return deletePathPatterns;
}
public List<String> successDeleteFiles() {
return successDeleteFiles;
}
public List<String> failedDeleteFiles() {
return failedDeleteFiles;
}
}

View File

@@ -90,7 +90,7 @@ public class HoodieMergeOnReadTableCompactor implements HoodieCompactor {
}
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
// Compacting is very similar to applying updates to existing file
HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc, metaClient);
List<CompactionOperation> operations = compactionPlan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
LOG.info("Compactor compacting " + operations + " files");