1
0

[HUDI-785] Refactor compaction/savepoint execution based on ActionExector abstraction (#1548)

- Savepoint and compaction classes moved to table.action.* packages
 - HoodieWriteClient#savepoint(...) returns void
 - Renamed HoodieCommitArchiveLog -> HoodieTimelineArchiveLog
 - Fixed tests to take into account the additional validation done
 - Moved helper code into CompactHelpers and SavepointHelpers
This commit is contained in:
vinoth chandar
2020-04-25 18:26:44 -07:00
committed by GitHub
parent 19cc15c098
commit 19ca0b5629
58 changed files with 789 additions and 601 deletions

View File

@@ -20,7 +20,6 @@ package org.apache.hudi.client;
import com.codahale.metrics.Timer;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.utils.SparkConfigUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieRollingStat;
@@ -57,7 +56,6 @@ import java.util.Map;
public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHoodieClient {
private static final Logger LOG = LogManager.getLogger(AbstractHoodieWriteClient.class);
private static final String UPDATE_STR = "update";
private final transient HoodieMetrics metrics;
private final transient HoodieIndex<T> index;
@@ -96,32 +94,6 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
return commit(instantTime, writeStatuses, extraMetadata, metaClient.getCommitActionType());
}
protected JavaRDD<WriteStatus> updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD, HoodieTable<T> table,
String instantTime) {
// cache writeStatusRDD before updating index, so that all actions before this are not triggered again for future
// RDD actions that are performed after updating the index.
writeStatusRDD = writeStatusRDD.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps()));
Timer.Context indexTimer = metrics.getIndexCtx();
// Update the index back
JavaRDD<WriteStatus> statuses = index.updateLocation(writeStatusRDD, jsc, table);
metrics.updateIndexMetrics(UPDATE_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
// Trigger the insert and collect statuses
commitOnAutoCommit(instantTime, statuses, table.getMetaClient().getCommitActionType());
return statuses;
}
protected void commitOnAutoCommit(String instantTime, JavaRDD<WriteStatus> resultRDD, String actionType) {
if (config.shouldAutoCommit()) {
LOG.info("Auto commit enabled: Committing " + instantTime);
boolean commitResult = commit(instantTime, resultRDD, Option.empty(), actionType);
if (!commitResult) {
throw new HoodieCommitException("Failed to commit " + instantTime);
}
} else {
LOG.info("Auto commit disabled for " + instantTime);
}
}
private boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
Option<Map<String, String>> extraMetadata, String actionType) {
@@ -131,7 +103,6 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
List<HoodieWriteStat> stats = writeStatuses.map(WriteStatus::getStat).collect();
updateMetadataAndRollingStats(actionType, metadata, stats);
@@ -149,10 +120,8 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
try {
activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
postCommit(metadata, instantTime, extraMetadata);
emitCommitMetrics(instantTime, metadata, actionType);
LOG.info("Committed " + instantTime);
} catch (IOException e) {
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
@@ -182,8 +151,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
* @param instantTime Instant Time
* @param extraMetadata Additional Metadata passed by user
*/
protected abstract void postCommit(HoodieCommitMetadata metadata, String instantTime,
Option<Map<String, String>> extraMetadata);
protected abstract void postCommit(HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata);
/**
* Finalize Write operation.
@@ -260,7 +228,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
protected HoodieTable getTableAndInitCtx(WriteOperationType operationType) {
HoodieTableMetaClient metaClient = createMetaClient(true);
if (operationType == WriteOperationType.DELETE) {
setWriteSchemaFromLastInstant(metaClient);
setWriteSchemaForDeletes(metaClient);
}
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable table = HoodieTable.create(metaClient, config, jsc);
@@ -275,7 +243,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
/**
* Sets write schema from last instant since deletes may not have schema set in the config.
*/
private void setWriteSchemaFromLastInstant(HoodieTableMetaClient metaClient) {
private void setWriteSchemaForDeletes(HoodieTableMetaClient metaClient) {
try {
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
Option<HoodieInstant> lastInstant =

View File

@@ -18,9 +18,10 @@
package org.apache.hudi.client;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
@@ -41,10 +42,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.compact.OperationResult;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.table.action.compact.OperationResult;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
@@ -71,10 +69,6 @@ public class CompactionAdminClient extends AbstractHoodieClient {
super(jsc, HoodieWriteConfig.newBuilder().withPath(basePath).build());
}
public CompactionAdminClient(JavaSparkContext jsc, String basePath, Option<EmbeddedTimelineService> timelineServer) {
super(jsc, HoodieWriteConfig.newBuilder().withPath(basePath).build(), timelineServer);
}
/**
* Validate all compaction operations in a compaction plan. Verifies the file-slices are consistent with corresponding
* compaction operations.

View File

@@ -19,64 +19,48 @@
package org.apache.hudi.client;
import com.codahale.metrics.Timer;
import org.apache.avro.Schema;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.utils.SparkConfigUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.HoodieRestoreException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.HoodieCommitArchiveLog;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.table.action.savepoint.SavepointHelpers;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -171,9 +155,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT);
validateSchema(table, true);
table.validateUpsertSchema();
setOperationType(WriteOperationType.UPSERT);
HoodieWriteMetadata result = table.upsert(jsc,instantTime, records);
HoodieWriteMetadata result = table.upsert(jsc, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
}
@@ -191,7 +175,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED);
validateSchema(table, true);
table.validateUpsertSchema();
setOperationType(WriteOperationType.UPSERT_PREPPED);
HoodieWriteMetadata result = table.upsertPrepped(jsc,instantTime, preppedRecords);
return postWrite(result, instantTime, table);
@@ -209,7 +193,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT);
validateSchema(table, false);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT);
HoodieWriteMetadata result = table.insert(jsc,instantTime, records);
return postWrite(result, instantTime, table);
@@ -228,7 +212,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
public JavaRDD<WriteStatus> insertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT_PREPPED);
validateSchema(table, false);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT_PREPPED);
HoodieWriteMetadata result = table.insertPrepped(jsc,instantTime, preppedRecords);
return postWrite(result, instantTime, table);
@@ -267,6 +251,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, final String instantTime,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT);
table.validateInsertSchema();
setOperationType(WriteOperationType.BULK_INSERT);
HoodieWriteMetadata result = table.bulkInsert(jsc,instantTime, records, bulkInsertPartitioner);
return postWrite(result, instantTime, table);
@@ -291,6 +276,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
public JavaRDD<WriteStatus> bulkInsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String instantTime,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED);
table.validateInsertSchema();
setOperationType(WriteOperationType.BULK_INSERT_PREPPED);
HoodieWriteMetadata result = table.bulkInsertPrepped(jsc,instantTime, preppedRecords, bulkInsertPartitioner);
return postWrite(result, instantTime, table);
@@ -344,12 +330,12 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
// Do an inline compaction if enabled
if (config.isInlineCompaction()) {
metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true");
forceCompact(extraMetadata);
inlineCompact(extraMetadata);
} else {
metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false");
}
// We cannot have unbounded commit files. Archive commits if we have to archive
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(config, createMetaClient(true));
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, createMetaClient(true));
archiveLog.archiveIfRequired(jsc);
if (config.isAutoClean()) {
// Call clean to cleanup if there is anything to cleanup after the commit,
@@ -364,35 +350,25 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
}
/**
* Savepoint a specific commit. Latest version of data files as of the passed in commitTime will be referenced in the
* savepoint and will never be cleaned. The savepointed commit will never be rolledback or archived.
* <p>
* This gives an option to rollback the state to the savepoint anytime. Savepoint needs to be manually created and
* deleted.
* <p>
* Savepoint should be on a commit that could not have been cleaned.
* Create a savepoint based on the latest commit action on the timeline.
*
* @param user - User creating the savepoint
* @param comment - Comment for the savepoint
* @return true if the savepoint was created successfully
*/
public boolean savepoint(String user, String comment) {
public void savepoint(String user, String comment) {
HoodieTable<T> table = HoodieTable.create(config, jsc);
if (table.getCompletedCommitsTimeline().empty()) {
throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty");
}
if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
}
String latestCommit = table.getCompletedCommitsTimeline().lastInstant().get().getTimestamp();
LOG.info("Savepointing latest commit " + latestCommit);
return savepoint(latestCommit, user, comment);
savepoint(latestCommit, user, comment);
}
/**
* Savepoint a specific commit. Latest version of data files as of the passed in instantTime will be referenced in the
* savepoint and will never be cleaned. The savepointed commit will never be rolledback or archived.
* Savepoint a specific commit instant time. Latest version of data files as of the passed in instantTime
* will be referenced in the savepoint and will never be cleaned. The savepointed commit will never be rolledback or archived.
* <p>
* This gives an option to rollback the state to the savepoint anytime. Savepoint needs to be manually created and
* deleted.
@@ -402,60 +378,10 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @param instantTime - commit that should be savepointed
* @param user - User creating the savepoint
* @param comment - Comment for the savepoint
* @return true if the savepoint was created successfully
*/
public boolean savepoint(String instantTime, String user, String comment) {
public void savepoint(String instantTime, String user, String comment) {
HoodieTable<T> table = HoodieTable.create(config, jsc);
if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
}
Option<HoodieInstant> cleanInstant = table.getCompletedCleanTimeline().lastInstant();
HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime);
if (!table.getCompletedCommitsTimeline().containsInstant(commitInstant)) {
throw new HoodieSavepointException("Could not savepoint non-existing commit " + commitInstant);
}
try {
// Check the last commit that was not cleaned and check if savepoint time is > that commit
String lastCommitRetained;
if (cleanInstant.isPresent()) {
HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
.deserializeHoodieCleanMetadata(table.getActiveTimeline().getInstantDetails(cleanInstant.get()).get());
lastCommitRetained = cleanMetadata.getEarliestCommitToRetain();
} else {
lastCommitRetained = table.getCompletedCommitsTimeline().firstInstant().get().getTimestamp();
}
// Cannot allow savepoint time on a commit that could have been cleaned
ValidationUtils.checkArgument(
HoodieTimeline.compareTimestamps(instantTime, lastCommitRetained, HoodieTimeline.GREATER_OR_EQUAL),
"Could not savepoint commit " + instantTime + " as this is beyond the lookup window " + lastCommitRetained);
Map<String, List<String>> latestFilesMap = jsc
.parallelize(FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning()))
.mapToPair((PairFunction<String, String, List<String>>) partitionPath -> {
// Scan all partitions files with this commit time
LOG.info("Collecting latest files in partition path " + partitionPath);
BaseFileOnlyView view = table.getBaseFileOnlyView();
List<String> latestFiles = view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime)
.map(HoodieBaseFile::getFileName).collect(Collectors.toList());
return new Tuple2<>(partitionPath, latestFiles);
}).collectAsMap();
HoodieSavepointMetadata metadata = TimelineMetadataUtils.convertSavepointMetadata(user, comment, latestFilesMap);
// Nothing to save in the savepoint
table.getActiveTimeline().createNewInstant(
new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, instantTime));
table.getActiveTimeline()
.saveAsComplete(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, instantTime),
TimelineMetadataUtils.serializeSavepointMetadata(metadata));
LOG.info("Savepoint " + instantTime + " created");
return true;
} catch (IOException e) {
throw new HoodieSavepointException("Failed to savepoint " + instantTime, e);
}
table.savepoint(jsc, instantTime, user, comment);
}
/**
@@ -467,88 +393,24 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
public void deleteSavepoint(String savepointTime) {
HoodieTable<T> table = HoodieTable.create(config, jsc);
if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
}
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
boolean isSavepointPresent = table.getCompletedSavepointTimeline().containsInstant(savePoint);
if (!isSavepointPresent) {
LOG.warn("No savepoint present " + savepointTime);
return;
}
activeTimeline.revertToInflight(savePoint);
activeTimeline.deleteInflight(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, savepointTime));
LOG.info("Savepoint " + savepointTime + " deleted");
SavepointHelpers.deleteSavepoint(table, savepointTime);
}
/**
* Delete a compaction request that is pending.
* Restore the data to the savepoint.
*
* NOTE - This is an Admin operation. With async compaction, this is expected to be called with async compaction and
* write shutdown. Otherwise, async compactor could fail with errors
*
* @param compactionTime - delete the compaction time
*/
private void deleteRequestedCompaction(String compactionTime) {
HoodieTable<T> table = HoodieTable.create(config, jsc);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieInstant compactionRequestedInstant =
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionTime);
boolean isCompactionInstantInRequestedState =
table.getActiveTimeline().filterPendingCompactionTimeline().containsInstant(compactionRequestedInstant);
HoodieTimeline commitTimeline = table.getCompletedCommitTimeline();
if (commitTimeline.empty() && !commitTimeline.findInstantsAfter(compactionTime, Integer.MAX_VALUE).empty()) {
throw new HoodieRollbackException(
"Found commits after time :" + compactionTime + ", please rollback greater commits first");
}
if (isCompactionInstantInRequestedState) {
activeTimeline.deleteCompactionRequested(compactionRequestedInstant);
} else {
throw new IllegalArgumentException("Compaction is not in requested state " + compactionTime);
}
LOG.info("Compaction " + compactionTime + " deleted");
}
/**
* Restore the state to the savepoint. WARNING: This rollsback recent commits and deleted data files. Queries
* accessing the files will mostly fail. This should be done during a downtime.
* WARNING: This rolls back recent commits and deleted data files and also pending compactions after savepoint time.
* Queries accessing the files will mostly fail. This is expected to be a manual operation and no concurrent write or
* compaction is expected to be running
*
* @param savepointTime - savepoint time to rollback to
* @return true if the savepoint was rollecback to successfully
* @return true if the savepoint was restored to successfully
*/
public boolean restoreToSavepoint(String savepointTime) {
public void restoreToSavepoint(String savepointTime) {
HoodieTable<T> table = HoodieTable.create(config, jsc);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
// Rollback to savepoint is expected to be a manual operation and no concurrent write or compaction is expected
// to be running. Rollback to savepoint also removes any pending compaction actions that are generated after
// savepoint time. Allowing pending compaction to be retained is not safe as those workload could be referencing
// file-slices that will be rolled-back as part of this operation
HoodieTimeline instantTimeline = table.getMetaClient().getCommitsAndCompactionTimeline();
HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
boolean isSavepointPresent = table.getCompletedSavepointTimeline().containsInstant(savePoint);
if (!isSavepointPresent) {
throw new HoodieRollbackException("No savepoint for instantTime " + savepointTime);
}
List<String> commitsToRollback = instantTimeline.findInstantsAfter(savepointTime, Integer.MAX_VALUE).getInstants()
.map(HoodieInstant::getTimestamp).collect(Collectors.toList());
LOG.info("Rolling back commits " + commitsToRollback);
SavepointHelpers.validateSavepointPresence(table, savepointTime);
restoreToInstant(savepointTime);
// Make sure the rollback was successful
Option<HoodieInstant> lastInstant =
activeTimeline.reload().getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants().lastInstant();
ValidationUtils.checkArgument(lastInstant.isPresent());
ValidationUtils.checkArgument(lastInstant.get().getTimestamp().equals(savepointTime),
savepointTime + "is not the last commit after rolling back " + commitsToRollback + ", last commit was "
+ lastInstant.get().getTimestamp());
return true;
SavepointHelpers.validateSavepointRestore(table, savepointTime);
}
/**
@@ -574,7 +436,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
}
return true;
} else {
LOG.info("Cannot find instant " + commitInstantTime + " in the timeline, for rollback");
LOG.warn("Cannot find instant " + commitInstantTime + " in the timeline, for rollback");
return false;
}
} catch (Exception e) {
@@ -626,9 +488,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOException {
LOG.info("Cleaner started");
final Timer.Context context = metrics.getCleanCtx();
HoodieCleanMetadata metadata = HoodieTable.create(config, jsc).clean(jsc, cleanInstantTime);
if (context != null) {
long durationMs = metrics.getDurationInMs(context.stop());
metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
@@ -636,7 +496,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
+ " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
+ " cleanerElaspsedMs" + durationMs);
}
return metadata;
}
@@ -692,11 +551,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*
* @param extraMetadata Extra Metadata to be stored
*/
public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws IOException {
public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
String instantTime = HoodieActiveTimeline.createNewInstantTime();
LOG.info("Generate a new instant time " + instantTime);
boolean notEmpty = scheduleCompactionAtInstant(instantTime, extraMetadata);
return notEmpty ? Option.of(instantTime) : Option.empty();
return scheduleCompactionAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
}
/**
@@ -705,35 +562,11 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @param instantTime Compaction Instant Time
* @param extraMetadata Extra Metadata to be stored
*/
public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata)
throws IOException {
HoodieTableMetaClient metaClient = createMetaClient(true);
// if there are inflight writes, their instantTime must not be less than that of compaction instant time
metaClient.getCommitsTimeline().filterPendingExcludingCompaction().firstInstant().ifPresent(earliestInflight -> {
ValidationUtils.checkArgument(
HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), instantTime, HoodieTimeline.GREATER),
"Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight
+ ", Compaction scheduled at " + instantTime);
});
// Committed and pending compaction instants should have strictly lower timestamps
List<HoodieInstant> conflictingInstants = metaClient
.getActiveTimeline().getCommitsAndCompactionTimeline().getInstants().filter(instant -> HoodieTimeline
.compareTimestamps(instant.getTimestamp(), instantTime, HoodieTimeline.GREATER_OR_EQUAL))
.collect(Collectors.toList());
ValidationUtils.checkArgument(conflictingInstants.isEmpty(),
"Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :"
+ conflictingInstants);
HoodieTable<T> table = HoodieTable.create(metaClient, config, jsc);
HoodieCompactionPlan workload = table.scheduleCompaction(jsc, instantTime);
if (workload != null && (workload.getOperations() != null) && (!workload.getOperations().isEmpty())) {
extraMetadata.ifPresent(workload::setExtraMetadata);
HoodieInstant compactionInstant =
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime);
metaClient.getActiveTimeline().saveToCompactionRequested(compactionInstant,
TimelineMetadataUtils.serializeCompactionPlan(workload));
return true;
}
return false;
public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
LOG.info("Scheduling compaction at instant time :" + instantTime);
Option<HoodieCompactionPlan> plan = HoodieTable.create(config, jsc)
.scheduleCompaction(jsc, instantTime, extraMetadata);
return plan.isPresent();
}
/**
@@ -742,7 +575,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @param compactionInstantTime Compaction Instant Time
* @return RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> compact(String compactionInstantTime) throws IOException {
public JavaRDD<WriteStatus> compact(String compactionInstantTime) {
return compact(compactionInstantTime, config.shouldAutoCommit());
}
@@ -754,24 +587,47 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @param extraMetadata Extra Metadata to be stored
*/
public void commitCompaction(String compactionInstantTime, JavaRDD<WriteStatus> writeStatuses,
Option<Map<String, String>> extraMetadata) throws IOException {
HoodieTableMetaClient metaClient = createMetaClient(true);
HoodieTable<T> table = HoodieTable.create(metaClient, config, jsc);
HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeCompactionPlan(
timeline.readCompactionPlanAsBytes(HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get());
// Merge extra meta-data passed by user with the one already in inflight compaction
Option<Map<String, String>> mergedMetaData = extraMetadata.map(m -> {
Map<String, String> merged = new HashMap<>();
Map<String, String> extraMetaDataFromInstantFile = compactionPlan.getExtraMetadata();
if (extraMetaDataFromInstantFile != null) {
merged.putAll(extraMetaDataFromInstantFile);
Option<Map<String, String>> extraMetadata) throws IOException {
HoodieTable<T> table = HoodieTable.create(config, jsc);
HoodieCommitMetadata metadata = CompactHelpers.createCompactionMetadata(
table, compactionInstantTime, writeStatuses, config.getSchema());
extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
completeCompaction(metadata, writeStatuses, table, compactionInstantTime);
}
/**
* Commit Compaction and track metrics.
*/
protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD<WriteStatus> writeStatuses, HoodieTable<T> table,
String compactionCommitTime) {
List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect();
finalizeWrite(table, compactionCommitTime, writeStats);
LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);
CompactHelpers.completeInflightCompaction(table, compactionCommitTime, metadata);
if (compactionTimer != null) {
long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
try {
metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).getTime(),
durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
} catch (ParseException e) {
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
+ config.getBasePath() + " at time " + compactionCommitTime, e);
}
// Overwrite/Merge with the user-passed meta-data
merged.putAll(m);
return Option.of(merged);
}).orElseGet(() -> Option.ofNullable(compactionPlan.getExtraMetadata()));
commitCompaction(writeStatuses, table, compactionInstantTime, true, mergedMetaData);
}
LOG.info("Compacted successfully on commit " + compactionCommitTime);
}
/**
* Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file to the .requested file
*
* @param inflightInstant Inflight Compaction Instant
* @param table Hoodie Table
*/
public void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) {
table.rollback(jsc, HoodieActiveTimeline.createNewInstantTime(), inflightInstant, false);
table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
}
/**
@@ -793,192 +649,32 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @param compactionInstantTime Compaction Instant Time
* @return RDD of Write Status
*/
private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean autoCommit) throws IOException {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTableMetaClient metaClient = createMetaClient(true);
HoodieTable<T> table = HoodieTable.create(metaClient, config, jsc);
HoodieTimeline pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline();
private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean shouldComplete) {
HoodieTable<T> table = HoodieTable.create(config, jsc);
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
rollbackInflightCompaction(inflightInstant, table);
metaClient.reloadActiveTimeline();
pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline();
table.getMetaClient().reloadActiveTimeline();
}
HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
if (pendingCompactionTimeline.containsInstant(instant)) {
return runCompaction(instant, metaClient.getActiveTimeline(), autoCommit);
} else {
throw new IllegalStateException(
"No Compaction request available at " + compactionInstantTime + " to run compaction");
}
}
/**
* Perform compaction operations as specified in the compaction commit file.
*
* @param compactionInstant Compaction Instant time
* @param activeTimeline Active Timeline
* @param autoCommit Commit after compaction
* @return RDD of Write Status
*/
private JavaRDD<WriteStatus> runCompaction(HoodieInstant compactionInstant, HoodieActiveTimeline activeTimeline,
boolean autoCommit) throws IOException {
HoodieTableMetaClient metaClient = createMetaClient(true);
HoodieCompactionPlan compactionPlan =
CompactionUtils.getCompactionPlan(metaClient, compactionInstant.getTimestamp());
// Mark instant as compaction inflight
activeTimeline.transitionCompactionRequestedToInflight(compactionInstant);
compactionTimer = metrics.getCompactionCtx();
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.create(metaClient, config, jsc);
JavaRDD<WriteStatus> statuses = table.compact(jsc, compactionInstant.getTimestamp(), compactionPlan);
// Force compaction action
statuses.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps()));
// pass extra-metada so that it gets stored in commit file automatically
commitCompaction(statuses, table, compactionInstant.getTimestamp(), autoCommit,
Option.ofNullable(compactionPlan.getExtraMetadata()));
HoodieWriteMetadata compactionMetadata = table.compact(jsc, compactionInstantTime);
JavaRDD<WriteStatus> statuses = compactionMetadata.getWriteStatuses();
if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) {
completeCompaction(compactionMetadata.getCommitMetadata().get(), statuses, table, compactionInstantTime);
}
return statuses;
}
/**
* Commit Compaction and track metrics.
*
* @param compactedStatuses Compaction Write status
* @param table Hoodie Table
* @param compactionCommitTime Compaction Commit Time
* @param autoCommit Auto Commit
* @param extraMetadata Extra Metadata to store
*/
protected void commitCompaction(JavaRDD<WriteStatus> compactedStatuses, HoodieTable<T> table,
String compactionCommitTime, boolean autoCommit, Option<Map<String, String>> extraMetadata) {
if (autoCommit) {
HoodieCommitMetadata metadata = doCompactionCommit(table, compactedStatuses, compactionCommitTime, extraMetadata);
if (compactionTimer != null) {
long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
try {
metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).getTime(),
durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
} catch (ParseException e) {
throw new HoodieCommitException("Commit time is not of valid format.Failed to commit compaction "
+ config.getBasePath() + " at time " + compactionCommitTime, e);
}
}
LOG.info("Compacted successfully on commit " + compactionCommitTime);
} else {
LOG.info("Compaction did not run for commit " + compactionCommitTime);
}
}
/**
* Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file to the .requested file
*
* @param inflightInstant Inflight Compaction Instant
* @param table Hoodie Table
*/
public void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) {
table.rollback(jsc, HoodieActiveTimeline.createNewInstantTime(), inflightInstant, false);
table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
}
private HoodieCommitMetadata doCompactionCommit(HoodieTable<T> table, JavaRDD<WriteStatus> writeStatuses,
String compactionCommitTime, Option<Map<String, String>> extraMetadata) {
HoodieTableMetaClient metaClient = table.getMetaClient();
List<HoodieWriteStat> updateStatusMap = writeStatuses.map(WriteStatus::getStat).collect();
HoodieCommitMetadata metadata = new HoodieCommitMetadata(true);
for (HoodieWriteStat stat : updateStatusMap) {
metadata.addWriteStat(stat.getPartitionPath(), stat);
}
metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema());
// Finalize write
finalizeWrite(table, compactionCommitTime, updateStatusMap);
// Copy extraMetadata
extraMetadata.ifPresent(m -> {
m.forEach(metadata::addMetadata);
});
LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
try {
activeTimeline.transitionCompactionInflightToComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
} catch (IOException e) {
throw new HoodieCompactionException(
"Failed to commit " + metaClient.getBasePath() + " at time " + compactionCommitTime, e);
}
return metadata;
}
/**
* Performs a compaction operation on a table, serially before or after an insert/upsert action.
*/
private Option<String> forceCompact(Option<Map<String, String>> extraMetadata) throws IOException {
private Option<String> inlineCompact(Option<Map<String, String>> extraMetadata) {
Option<String> compactionInstantTimeOpt = scheduleCompaction(extraMetadata);
compactionInstantTimeOpt.ifPresent(compactionInstantTime -> {
try {
// inline compaction should auto commit as the user is never given control
compact(compactionInstantTime, true);
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
// inline compaction should auto commit as the user is never given control
compact(compactionInstantTime, true);
});
return compactionInstantTimeOpt;
}
/**
* Ensure that the current writerSchema is compatible with the latest schema of this dataset.
*
* When inserting/updating data, we read records using the last used schema and convert them to the
* GenericRecords with writerSchema. Hence, we need to ensure that this conversion can take place without errors.
*
* @param hoodieTable The Hoodie Table
* @param isUpsert If this is a check during upserts
* @throws HoodieUpsertException If schema check fails during upserts
* @throws HoodieInsertException If schema check fails during inserts
*/
private void validateSchema(HoodieTable<T> hoodieTable, final boolean isUpsert)
throws HoodieUpsertException, HoodieInsertException {
if (!getConfig().getAvroSchemaValidate()) {
// Check not required
return;
}
boolean isValid = false;
String errorMsg = "WriterSchema is not compatible with the schema present in the Table";
Throwable internalError = null;
Schema tableSchema = null;
Schema writerSchema = null;
try {
TableSchemaResolver schemaUtil = new TableSchemaResolver(hoodieTable.getMetaClient());
writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableSchemaFromCommitMetadata());
isValid = schemaUtil.isSchemaCompatible(tableSchema, writerSchema);
} catch (Exception e) {
// Two error cases are possible:
// 1. There was no schema as no data has been inserted yet (first time only)
// 2. Failure in reading the schema
isValid = hoodieTable.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 0;
errorMsg = "Failed to read latest schema on path " + basePath;
internalError = e;
}
if (!isValid) {
LOG.error(errorMsg);
LOG.warn("WriterSchema: " + writerSchema);
LOG.warn("Table latest schema: " + tableSchema);
if (isUpsert) {
throw new HoodieUpsertException(errorMsg, internalError);
} else {
throw new HoodieInsertException(errorMsg, internalError);
}
}
}
}

View File

@@ -22,8 +22,8 @@ import org.apache.hudi.common.config.DefaultHoodieConfig;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.table.compact.strategy.CompactionStrategy;
import org.apache.hudi.table.compact.strategy.LogFileSizeBasedCompactionStrategy;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
import javax.annotation.concurrent.Immutable;

View File

@@ -28,7 +28,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.table.compact.strategy.CompactionStrategy;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.ParquetReaderIterator;
import org.apache.hudi.common.model.HoodieBaseFile;
@@ -46,7 +47,7 @@ import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.table.action.clean.CleanActionExecutor;
import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.BulkInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.BulkInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.DeleteCommitActionExecutor;
@@ -56,6 +57,7 @@ import org.apache.hudi.table.action.commit.UpsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.UpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroParquetReader;
@@ -130,14 +132,13 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
}
@Override
public HoodieCompactionPlan scheduleCompaction(JavaSparkContext jsc, String instantTime) {
throw new HoodieNotSupportedException("Compaction is not supported from a CopyOnWrite table");
public Option<HoodieCompactionPlan> scheduleCompaction(JavaSparkContext jsc, String instantTime, Option<Map<String, String>> extraMetadata) {
throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table");
}
@Override
public JavaRDD<WriteStatus> compact(JavaSparkContext jsc, String compactionInstantTime,
HoodieCompactionPlan compactionPlan) {
throw new HoodieNotSupportedException("Compaction is not supported from a CopyOnWrite table");
public HoodieWriteMetadata compact(JavaSparkContext jsc, String compactionInstantTime) {
throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table");
}
public Iterator<List<WriteStatus>> handleUpdate(String instantTime, String partitionPath, String fileId,
@@ -202,6 +203,11 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
return new CopyOnWriteRollbackActionExecutor(jsc, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
}
@Override
public HoodieSavepointMetadata savepoint(JavaSparkContext jsc, String instantToSavepoint, String user, String comment) {
return new SavepointActionExecutor(jsc, config, this, instantToSavepoint, user, comment).execute();
}
public HoodieRestoreMetadata restore(JavaSparkContext jsc, String restoreInstantTime, String instantToRestore) {
return new CopyOnWriteRestoreActionExecutor(jsc, config, this, restoreInstantTime, instantToRestore).execute();
}

View File

@@ -21,19 +21,17 @@ package org.apache.hudi.table;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.RunCompactionActionExecutor;
import org.apache.hudi.table.action.deltacommit.BulkInsertDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.BulkInsertPreppedDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.DeleteDeltaCommitActionExecutor;
@@ -41,17 +39,16 @@ import org.apache.hudi.table.action.deltacommit.InsertDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.InsertPreppedDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.UpsertDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.UpsertPreppedDeltaCommitActionExecutor;
import org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor;
import org.apache.hudi.table.action.restore.MergeOnReadRestoreActionExecutor;
import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
import org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import java.util.Map;
/**
* Implementation of a more real-time Hoodie Table the provides tradeoffs on read and write cost/amplification.
@@ -119,46 +116,16 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
}
@Override
public HoodieCompactionPlan scheduleCompaction(JavaSparkContext jsc, String instantTime) {
LOG.info("Checking if compaction needs to be run on " + config.getBasePath());
Option<HoodieInstant> lastCompaction =
getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
String deltaCommitsSinceTs = "0";
if (lastCompaction.isPresent()) {
deltaCommitsSinceTs = lastCompaction.get().getTimestamp();
}
int deltaCommitsSinceLastCompaction = getActiveTimeline().getDeltaCommitTimeline()
.findInstantsAfter(deltaCommitsSinceTs, Integer.MAX_VALUE).countInstants();
if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) {
LOG.info("Not running compaction as only " + deltaCommitsSinceLastCompaction
+ " delta commits was found since last compaction " + deltaCommitsSinceTs + ". Waiting for "
+ config.getInlineCompactDeltaCommitMax());
return new HoodieCompactionPlan();
}
LOG.info("Compacting merge on read table " + config.getBasePath());
HoodieMergeOnReadTableCompactor compactor = new HoodieMergeOnReadTableCompactor();
try {
return compactor.generateCompactionPlan(jsc, this, config, instantTime,
((SyncableFileSystemView) getSliceView()).getPendingCompactionOperations()
.map(instantTimeCompactionopPair -> instantTimeCompactionopPair.getValue().getFileGroupId())
.collect(Collectors.toSet()));
} catch (IOException e) {
throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e);
}
public Option<HoodieCompactionPlan> scheduleCompaction(JavaSparkContext jsc, String instantTime, Option<Map<String, String>> extraMetadata) {
ScheduleCompactionActionExecutor scheduleCompactionExecutor = new ScheduleCompactionActionExecutor(
jsc, config, this, instantTime, extraMetadata);
return scheduleCompactionExecutor.execute();
}
@Override
public JavaRDD<WriteStatus> compact(JavaSparkContext jsc, String compactionInstantTime,
HoodieCompactionPlan compactionPlan) {
HoodieMergeOnReadTableCompactor compactor = new HoodieMergeOnReadTableCompactor();
try {
return compactor.compact(jsc, compactionPlan, this, config, compactionInstantTime);
} catch (IOException e) {
throw new HoodieCompactionException("Could not compact " + config.getBasePath(), e);
}
public HoodieWriteMetadata compact(JavaSparkContext jsc, String compactionInstantTime) {
RunCompactionActionExecutor compactionExecutor = new RunCompactionActionExecutor(jsc, config, this, compactionInstantTime);
return compactionExecutor.execute();
}
@Override

View File

@@ -18,16 +18,17 @@
package org.apache.hudi.table;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.ConsistencyGuard;
import org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility;
@@ -38,10 +39,10 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
@@ -54,9 +55,10 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -295,24 +297,6 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
return getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
}
/**
* Get the list of data file names savepointed.
*/
public Stream<String> getSavepointedDataFiles(String savepointTime) {
if (!getSavepoints().contains(savepointTime)) {
throw new HoodieSavepointException(
"Could not get data files for savepoint " + savepointTime + ". No such savepoint.");
}
HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
HoodieSavepointMetadata metadata;
try {
metadata = TimelineMetadataUtils.deserializeHoodieSavepointMetadata(getActiveTimeline().getInstantDetails(instant).get());
} catch (IOException e) {
throw new HoodieSavepointException("Could not get savepointed data files for savepoint " + savepointTime, e);
}
return metadata.getPartitionMetadata().values().stream().flatMap(s -> s.getSavepointDataFile().stream());
}
public HoodieActiveTimeline getActiveTimeline() {
return metaClient.getActiveTimeline();
}
@@ -329,19 +313,21 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
*
* @param jsc Spark Context
* @param instantTime Instant Time for scheduling compaction
* @param extraMetadata additional metadata to write into plan
* @return
*/
public abstract HoodieCompactionPlan scheduleCompaction(JavaSparkContext jsc, String instantTime);
public abstract Option<HoodieCompactionPlan> scheduleCompaction(JavaSparkContext jsc,
String instantTime,
Option<Map<String, String>> extraMetadata);
/**
* Run Compaction on the table. Compaction arranges the data so that it is optimized for data access.
*
* @param jsc Spark Context
* @param compactionInstantTime Instant Time
* @param compactionPlan Compaction Plan
*/
public abstract JavaRDD<WriteStatus> compact(JavaSparkContext jsc, String compactionInstantTime,
HoodieCompactionPlan compactionPlan);
public abstract HoodieWriteMetadata compact(JavaSparkContext jsc,
String compactionInstantTime);
/**
* Executes a new clean action.
@@ -365,6 +351,15 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
HoodieInstant commitInstant,
boolean deleteInstants);
/**
* Create a savepoint at the specified instant, so that the table can be restored
* to this point-in-timeline later if needed.
*/
public abstract HoodieSavepointMetadata savepoint(JavaSparkContext jsc,
String instantToSavepoint,
String user,
String comment);
/**
* Restore the table to the given instant. Note that this is a admin table recovery operation
* that would cause any running queries that are accessing file slices written after the instant to fail.
@@ -519,4 +514,52 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
public SparkTaskContextSupplier getSparkTaskContextSupplier() {
return sparkTaskContextSupplier;
}
/**
* Ensure that the current writerSchema is compatible with the latest schema of this dataset.
*
* When inserting/updating data, we read records using the last used schema and convert them to the
* GenericRecords with writerSchema. Hence, we need to ensure that this conversion can take place without errors.
*
*/
private void validateSchema() throws HoodieUpsertException, HoodieInsertException {
if (!config.getAvroSchemaValidate() || getActiveTimeline().getCommitsTimeline().filterCompletedInstants().empty()) {
// Check not required
return;
}
Schema tableSchema;
Schema writerSchema;
boolean isValid;
try {
TableSchemaResolver schemaUtil = new TableSchemaResolver(getMetaClient());
writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableSchemaFromCommitMetadata());
isValid = TableSchemaResolver.isSchemaCompatible(tableSchema, writerSchema);
} catch (Exception e) {
throw new HoodieException("Failed to read schema/check compatibility for base path " + metaClient.getBasePath(), e);
}
if (!isValid) {
throw new HoodieException("Failed schema compatibility check for writerSchema :" + writerSchema
+ ", table schema :" + tableSchema + ", base path :" + metaClient.getBasePath());
}
}
public void validateUpsertSchema() throws HoodieUpsertException {
try {
validateSchema();
} catch (HoodieException e) {
throw new HoodieUpsertException("Failed upsert schema compatibility check.", e);
}
}
public void validateInsertSchema() throws HoodieInsertException {
try {
validateSchema();
} catch (HoodieException e) {
throw new HoodieInsertException("Failed insert schema compability check.", e);
}
}
}

View File

@@ -68,18 +68,18 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Archiver to bound the growth of <action>.commit files.
* Archiver to bound the growth of files under .hoodie meta path.
*/
public class HoodieCommitArchiveLog {
public class HoodieTimelineArchiveLog {
private static final Logger LOG = LogManager.getLogger(HoodieCommitArchiveLog.class);
private static final Logger LOG = LogManager.getLogger(HoodieTimelineArchiveLog.class);
private final Path archiveFilePath;
private final HoodieTableMetaClient metaClient;
private final HoodieWriteConfig config;
private Writer writer;
public HoodieCommitArchiveLog(HoodieWriteConfig config, HoodieTableMetaClient metaClient) {
public HoodieTimelineArchiveLog(HoodieWriteConfig config, HoodieTableMetaClient metaClient) {
this.config = config;
this.metaClient = metaClient;
this.archiveFilePath = HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath());

View File

@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
package org.apache.hudi.table.action;
import java.util.List;
import org.apache.hudi.client.WriteStatus;

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.table.action.clean;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
@@ -39,6 +40,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -50,6 +52,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Cleaner is responsible for garbage collecting older files in a given partition path. Such that
@@ -81,6 +84,25 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializa
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}
/**
* Get the list of data file names savepointed.
*/
public Stream<String> getSavepointedDataFiles(String savepointTime) {
if (!hoodieTable.getSavepoints().contains(savepointTime)) {
throw new HoodieSavepointException(
"Could not get data files for savepoint " + savepointTime + ". No such savepoint.");
}
HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
HoodieSavepointMetadata metadata;
try {
metadata = TimelineMetadataUtils.deserializeHoodieSavepointMetadata(
hoodieTable.getActiveTimeline().getInstantDetails(instant).get());
} catch (IOException e) {
throw new HoodieSavepointException("Could not get savepointed data files for savepoint " + savepointTime, e);
}
return metadata.getPartitionMetadata().values().stream().flatMap(s -> s.getSavepointDataFile().stream());
}
/**
* Returns list of partitions where clean operations needs to be performed.
*
@@ -131,7 +153,8 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializa
List<String> deletePaths = new ArrayList<>();
// Collect all the datafiles savepointed by all the savepoints
List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
.flatMap(s -> hoodieTable.getSavepointedDataFiles(s)).collect(Collectors.toList());
.flatMap(this::getSavepointedDataFiles)
.collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
int keepVersions = config.getCleanerFileVersionsRetained();
@@ -190,7 +213,8 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializa
// Collect all the datafiles savepointed by all the savepoints
List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
.flatMap(s -> hoodieTable.getSavepointedDataFiles(s)).collect(Collectors.toList());
.flatMap(this::getSavepointedDataFiles)
.collect(Collectors.toList());
// determine if we have enough commits, to start cleaning.
if (commitTimeline.countInstants() > commitsRetained) {

View File

@@ -39,6 +39,7 @@ import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
@@ -165,8 +166,6 @@ public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload<T>>
(HoodieTable<T>)table);
result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
result.setWriteStatuses(statuses);
// Trigger the insert and collect statuses
commitOnAutoCommit(result);
}
@@ -207,7 +206,6 @@ public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload<T>>
try {
activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
LOG.info("Committed " + instantTime);
} catch (IOException e) {
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

View File

@@ -30,6 +30,7 @@ import org.apache.hudi.execution.BulkInsertMapFunction;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import java.util.List;

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -58,8 +59,8 @@ public class DeleteHelper<T extends HoodieRecordPayload<T>> {
}
public static <T extends HoodieRecordPayload<T>> HoodieWriteMetadata execute(String instantTime,
JavaRDD<HoodieKey> keys, JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<T> table,
CommitActionExecutor<T> deleteExecutor) {
JavaRDD<HoodieKey> keys, JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<T> table,
CommitActionExecutor<T> deleteExecutor) {
try {
HoodieWriteMetadata result = null;
// De-dupe/merge if needed

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

View File

@@ -25,6 +25,7 @@ import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -35,9 +36,9 @@ import scala.Tuple2;
public class WriteHelper<T extends HoodieRecordPayload<T>> {
public static <T extends HoodieRecordPayload<T>> HoodieWriteMetadata write(String instantTime,
JavaRDD<HoodieRecord<T>> inputRecordsRDD, JavaSparkContext jsc,
HoodieTable<T> table, boolean shouldCombine,
int shuffleParallelism, CommitActionExecutor<T> executor, boolean performTagging) {
JavaRDD<HoodieRecord<T>> inputRecordsRDD, JavaSparkContext jsc,
HoodieTable<T> table, boolean shouldCombine,
int shuffleParallelism, CommitActionExecutor<T> executor, boolean performTagging) {
try {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords =

View File

@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaRDD;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
public class CompactHelpers {
public static HoodieCommitMetadata createCompactionMetadata(HoodieTable<?> table,
String compactionInstantTime,
JavaRDD<WriteStatus> writeStatuses,
String schema) throws IOException {
byte[] planBytes = table.getActiveTimeline().readCompactionPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get();
HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeCompactionPlan(planBytes);
List<HoodieWriteStat> updateStatusMap = writeStatuses.map(WriteStatus::getStat).collect();
org.apache.hudi.common.model.HoodieCommitMetadata metadata = new org.apache.hudi.common.model.HoodieCommitMetadata(true);
for (HoodieWriteStat stat : updateStatusMap) {
metadata.addWriteStat(stat.getPartitionPath(), stat);
}
metadata.addMetadata(org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY, schema);
if (compactionPlan.getExtraMetadata() != null) {
compactionPlan.getExtraMetadata().forEach(metadata::addMetadata);
}
return metadata;
}
public static void completeInflightCompaction(HoodieTable<?> table, String compactionCommitTime, HoodieCommitMetadata commitMetadata) {
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
try {
activeTimeline.transitionCompactionInflightToComplete(
new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
} catch (IOException e) {
throw new HoodieCompactionException(
"Failed to commit " + table.getMetaClient().getBasePath() + " at time " + compactionCommitTime, e);
}
}
}

View File

@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hudi.table.compact;
package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.WriteStatus;

View File

@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hudi.table.compact;
package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
@@ -42,7 +42,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieCopyOnWriteTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.compact.strategy.CompactionStrategy;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
@@ -71,7 +71,6 @@ import static java.util.stream.Collectors.toList;
* passes it through a CompactionFilter and executes all the compactions and writes a new version of base files and make
* a normal commit
*
* @see HoodieCompactor
*/
public class HoodieMergeOnReadTableCompactor implements HoodieCompactor {

View File

@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hudi.table.compact;
package org.apache.hudi.table.action.compact;
import org.apache.hudi.common.util.Option;

View File

@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.SparkConfigUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.util.List;
public class RunCompactionActionExecutor extends BaseActionExecutor<HoodieWriteMetadata> {
private static final Logger LOG = LogManager.getLogger(RunCompactionActionExecutor.class);
public RunCompactionActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config,
HoodieTable<?> table,
String instantTime) {
super(jsc, config, table, instantTime);
}
@Override
public HoodieWriteMetadata execute() {
HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(instantTime);
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
if (!pendingCompactionTimeline.containsInstant(instant)) {
throw new IllegalStateException(
"No Compaction request available at " + instantTime + " to run compaction");
}
HoodieWriteMetadata compactionMetadata = new HoodieWriteMetadata();
try {
HoodieActiveTimeline timeline = table.getActiveTimeline();
HoodieCompactionPlan compactionPlan =
CompactionUtils.getCompactionPlan(table.getMetaClient(), instantTime);
// Mark instant as compaction inflight
timeline.transitionCompactionRequestedToInflight(instant);
table.getMetaClient().reloadActiveTimeline();
HoodieMergeOnReadTableCompactor compactor = new HoodieMergeOnReadTableCompactor();
JavaRDD<WriteStatus> statuses = compactor.compact(jsc, compactionPlan, table, config, instantTime);
statuses.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps()));
List<HoodieWriteStat> updateStatusMap = statuses.map(WriteStatus::getStat).collect();
HoodieCommitMetadata metadata = new HoodieCommitMetadata(true);
for (HoodieWriteStat stat : updateStatusMap) {
metadata.addWriteStat(stat.getPartitionPath(), stat);
}
metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema());
compactionMetadata.setWriteStatuses(statuses);
compactionMetadata.setCommitted(false);
compactionMetadata.setCommitMetadata(Option.of(metadata));
} catch (IOException e) {
throw new HoodieCompactionException("Could not compact " + config.getBasePath(), e);
}
return compactionMetadata;
}
}

View File

@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class ScheduleCompactionActionExecutor extends BaseActionExecutor<Option<HoodieCompactionPlan>> {
private static final Logger LOG = LogManager.getLogger(ScheduleCompactionActionExecutor.class);
private final Option<Map<String, String>> extraMetadata;
public ScheduleCompactionActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config,
HoodieTable<?> table,
String instantTime,
Option<Map<String, String>> extraMetadata) {
super(jsc, config, table, instantTime);
this.extraMetadata = extraMetadata;
}
private HoodieCompactionPlan scheduleCompaction() {
LOG.info("Checking if compaction needs to be run on " + config.getBasePath());
Option<HoodieInstant> lastCompaction = table.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
String deltaCommitsSinceTs = "0";
if (lastCompaction.isPresent()) {
deltaCommitsSinceTs = lastCompaction.get().getTimestamp();
}
int deltaCommitsSinceLastCompaction = table.getActiveTimeline().getDeltaCommitTimeline()
.findInstantsAfter(deltaCommitsSinceTs, Integer.MAX_VALUE).countInstants();
if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) {
LOG.info("Not running compaction as only " + deltaCommitsSinceLastCompaction
+ " delta commits was found since last compaction " + deltaCommitsSinceTs + ". Waiting for "
+ config.getInlineCompactDeltaCommitMax());
return new HoodieCompactionPlan();
}
LOG.info("Compacting merge on read table " + config.getBasePath());
HoodieMergeOnReadTableCompactor compactor = new HoodieMergeOnReadTableCompactor();
try {
return compactor.generateCompactionPlan(jsc, table, config, instantTime,
((SyncableFileSystemView) table.getSliceView()).getPendingCompactionOperations()
.map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId())
.collect(Collectors.toSet()));
} catch (IOException e) {
throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e);
}
}
@Override
public Option<HoodieCompactionPlan> execute() {
// if there are inflight writes, their instantTime must not be less than that of compaction instant time
table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant()
.ifPresent(earliestInflight -> ValidationUtils.checkArgument(
HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), instantTime, HoodieTimeline.GREATER),
"Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight
+ ", Compaction scheduled at " + instantTime));
// Committed and pending compaction instants should have strictly lower timestamps
List<HoodieInstant> conflictingInstants = table.getActiveTimeline()
.getCommitsAndCompactionTimeline().getInstants()
.filter(instant -> HoodieTimeline.compareTimestamps(
instant.getTimestamp(), instantTime, HoodieTimeline.GREATER_OR_EQUAL))
.collect(Collectors.toList());
ValidationUtils.checkArgument(conflictingInstants.isEmpty(),
"Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :"
+ conflictingInstants);
HoodieCompactionPlan plan = scheduleCompaction();
if (plan != null && (plan.getOperations() != null) && (!plan.getOperations().isEmpty())) {
extraMetadata.ifPresent(plan::setExtraMetadata);
HoodieInstant compactionInstant =
new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime);
try {
table.getActiveTimeline().saveToCompactionRequested(compactionInstant,
TimelineMetadataUtils.serializeCompactionPlan(plan));
} catch (IOException ioe) {
throw new HoodieIOException("Exception scheduling compaction", ioe);
}
return Option.of(plan);
}
return Option.empty();
}
}

View File

@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hudi.table.compact.strategy;
package org.apache.hudi.table.action.compact.strategy;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;

View File

@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hudi.table.compact.strategy;
package org.apache.hudi.table.action.compact.strategy;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;

View File

@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hudi.table.compact.strategy;
package org.apache.hudi.table.action.compact.strategy;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
@@ -26,7 +26,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor;
import org.apache.hudi.table.action.compact.HoodieMergeOnReadTableCompactor;
import java.io.Serializable;
import java.util.HashMap;

View File

@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hudi.table.compact.strategy;
package org.apache.hudi.table.action.compact.strategy;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;

View File

@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hudi.table.compact.strategy;
package org.apache.hudi.table.action.compact.strategy;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;

View File

@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hudi.table.compact.strategy;
package org.apache.hudi.table.action.compact.strategy;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;

View File

@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hudi.table.compact.strategy;
package org.apache.hudi.table.action.compact.strategy;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;

View File

@@ -28,7 +28,7 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
import org.apache.hudi.table.action.commit.BulkInsertHelper;
import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

View File

@@ -28,7 +28,7 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
import org.apache.hudi.table.action.commit.BulkInsertHelper;
import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

View File

@@ -25,7 +25,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.DeleteHelper;
import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

View File

@@ -24,7 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.WriteHelper;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

View File

@@ -23,7 +23,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

View File

@@ -24,7 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.WriteHelper;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

View File

@@ -23,7 +23,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

View File

@@ -118,7 +118,7 @@ public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecuto
config.shouldAssumeDatePartitioning());
int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1);
return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions)).flatMap(partitionPath -> {
HoodieActiveTimeline activeTimeline = table.getActiveTimeline().reload();
HoodieActiveTimeline activeTimeline = table.getMetaClient().reloadActiveTimeline();
List<RollbackRequest> partitionRollbackRequests = new ArrayList<>();
switch (instantToRollback.getAction()) {
case HoodieTimeline.COMMIT_ACTION:

View File

@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.savepoint;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class SavepointActionExecutor extends BaseActionExecutor<HoodieSavepointMetadata> {
private static final Logger LOG = LogManager.getLogger(SavepointActionExecutor.class);
private final String user;
private final String comment;
public SavepointActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config,
HoodieTable<?> table,
String instantTime,
String user,
String comment) {
super(jsc, config, table, instantTime);
this.user = user;
this.comment = comment;
}
@Override
public HoodieSavepointMetadata execute() {
if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
}
Option<HoodieInstant> cleanInstant = table.getCompletedCleanTimeline().lastInstant();
HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime);
if (!table.getCompletedCommitsTimeline().containsInstant(commitInstant)) {
throw new HoodieSavepointException("Could not savepoint non-existing commit " + commitInstant);
}
try {
// Check the last commit that was not cleaned and check if savepoint time is > that commit
String lastCommitRetained;
if (cleanInstant.isPresent()) {
HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
.deserializeHoodieCleanMetadata(table.getActiveTimeline().getInstantDetails(cleanInstant.get()).get());
lastCommitRetained = cleanMetadata.getEarliestCommitToRetain();
} else {
lastCommitRetained = table.getCompletedCommitsTimeline().firstInstant().get().getTimestamp();
}
// Cannot allow savepoint time on a commit that could have been cleaned
ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, lastCommitRetained, HoodieTimeline.GREATER_OR_EQUAL),
"Could not savepoint commit " + instantTime + " as this is beyond the lookup window " + lastCommitRetained);
Map<String, List<String>> latestFilesMap = jsc.parallelize(FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(),
table.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning()))
.mapToPair(partitionPath -> {
// Scan all partitions files with this commit time
LOG.info("Collecting latest files in partition path " + partitionPath);
TableFileSystemView.BaseFileOnlyView view = table.getBaseFileOnlyView();
List<String> latestFiles = view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime)
.map(HoodieBaseFile::getFileName).collect(Collectors.toList());
return new Tuple2<>(partitionPath, latestFiles);
})
.collectAsMap();
HoodieSavepointMetadata metadata = TimelineMetadataUtils.convertSavepointMetadata(user, comment, latestFilesMap);
// Nothing to save in the savepoint
table.getActiveTimeline().createNewInstant(
new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, instantTime));
table.getActiveTimeline()
.saveAsComplete(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, instantTime),
TimelineMetadataUtils.serializeSavepointMetadata(metadata));
LOG.info("Savepoint " + instantTime + " created");
return metadata;
} catch (IOException e) {
throw new HoodieSavepointException("Failed to savepoint " + instantTime, e);
}
}
}

View File

@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.savepoint;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
public class SavepointHelpers {
private static final Logger LOG = LogManager.getLogger(SavepointHelpers.class);
public static void deleteSavepoint(HoodieTable<?> table, String savepointTime) {
if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
}
HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
boolean isSavepointPresent = table.getCompletedSavepointTimeline().containsInstant(savePoint);
if (!isSavepointPresent) {
LOG.warn("No savepoint present " + savepointTime);
return;
}
table.getActiveTimeline().revertToInflight(savePoint);
table.getActiveTimeline().deleteInflight(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, savepointTime));
LOG.info("Savepoint " + savepointTime + " deleted");
}
public static void validateSavepointRestore(HoodieTable<?> table, String savepointTime) {
// Make sure the restore was successful
table.getMetaClient().reloadActiveTimeline();
Option<HoodieInstant> lastInstant = table.getActiveTimeline()
.getCommitsAndCompactionTimeline()
.filterCompletedAndCompactionInstants()
.lastInstant();
ValidationUtils.checkArgument(lastInstant.isPresent());
ValidationUtils.checkArgument(lastInstant.get().getTimestamp().equals(savepointTime),
savepointTime + " is not the last commit after restoring to savepoint, last commit was "
+ lastInstant.get().getTimestamp());
}
public static void validateSavepointPresence(HoodieTable<?> table, String savepointTime) {
HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
boolean isSavepointPresent = table.getCompletedSavepointTimeline().containsInstant(savePoint);
if (!isSavepointPresent) {
throw new HoodieRollbackException("No savepoint for instantTime " + savepointTime);
}
}
}

View File

@@ -58,7 +58,7 @@ public class TestClientRollback extends TestHoodieClientBase {
public void testSavepointAndRollback() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder().withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build()).build();
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
try (HoodieWriteClient client = getHoodieWriteClient(cfg)) {
HoodieTestDataGenerator.writePartitionMetadata(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath);
/**

View File

@@ -32,7 +32,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.compact.OperationResult;
import org.apache.hudi.table.action.compact.OperationResult;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

View File

@@ -31,7 +31,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieCommitArchiveLog;
import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -79,7 +79,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2).forTable("test-trip-table").build();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
boolean result = archiveLog.archiveIfRequired(jsc);
assertTrue(result);
}
@@ -157,7 +157,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
verifyInflightInstants(metaClient, 2);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
assertTrue(archiveLog.archiveIfRequired(jsc));
@@ -216,7 +216,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
// Requested Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "100"), dfs.getConf());
@@ -281,7 +281,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "102", dfs.getConf());
@@ -307,7 +307,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
HoodieTestDataGenerator.createSavepointFile(basePath, "101", dfs.getConf());
@@ -339,7 +339,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
HoodieTestDataGenerator.createCompactionRequestedFile(basePath, "101", dfs.getConf());
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
@@ -386,7 +386,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "1", dfs.getConf());
HoodieInstant instant1 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "1");
@@ -426,7 +426,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
org.apache.hudi.avro.model.HoodieCommitMetadata expectedCommitMetadata = archiveLog.convertCommitMetadata(hoodieCommitMetadata);
assertEquals(expectedCommitMetadata.getOperationType(), WriteOperationType.INSERT.toString());

View File

@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hudi.table.compact;
package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;

View File

@@ -16,8 +16,9 @@
* limitations under the License.
*/
package org.apache.hudi.table.compact;
package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.HoodieClientTestHarness;
@@ -29,6 +30,7 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
@@ -50,6 +52,7 @@ import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
public class TestHoodieCompactor extends HoodieClientTestHarness {
@@ -100,9 +103,10 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
@Test(expected = HoodieNotSupportedException.class)
public void testCompactionOnCopyOnWriteFail() throws Exception {
metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
HoodieTable table = HoodieTable.create(metaClient, getConfig(), jsc);
HoodieTable<?> table = HoodieTable.create(metaClient, getConfig(), jsc);
String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime));
table.scheduleCompaction(jsc, compactionInstantTime, Option.empty());
table.compact(jsc, compactionInstantTime);
}
@Test
@@ -118,9 +122,8 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
writeClient.insert(recordsRDD, newCommitTime).collect();
String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
JavaRDD<WriteStatus> result =
table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime));
assertTrue("If there is nothing to compact, result will be empty", result.isEmpty());
Option<HoodieCompactionPlan> plan = table.scheduleCompaction(jsc, compactionInstantTime, Option.empty());
assertFalse("If there is nothing to compact, result will be empty", plan.isPresent());
}
}
@@ -128,18 +131,16 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
public void testWriteStatusContentsAfterCompaction() throws Exception {
// insert 100 records
HoodieWriteConfig config = getConfig();
try (HoodieWriteClient writeClient = getWriteClient(config);) {
try (HoodieWriteClient writeClient = getWriteClient(config)) {
String newCommitTime = "100";
writeClient.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
List<WriteStatus> statuses = writeClient.insert(recordsRDD, newCommitTime).collect();
writeClient.insert(recordsRDD, newCommitTime).collect();
// Update all the 100 records
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.create(metaClient, config, jsc);
HoodieTable table = HoodieTable.create(config, jsc);
newCommitTime = "101";
writeClient.startCommitWithTime(newCommitTime);
@@ -153,8 +154,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, updatedRecords);
// Verify that all data file has one log file
metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.create(metaClient, config, jsc);
table = HoodieTable.create(config, jsc);
for (String partitionPath : dataGen.getPartitionPaths()) {
List<FileSlice> groupedLogFiles =
table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
@@ -162,14 +162,14 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
assertEquals("There should be 1 log file written for every data file", 1, fileSlice.getLogFiles().count());
}
}
HoodieTestUtils.createDeltaCommitFiles(basePath, newCommitTime);
// Do a compaction
metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.create(metaClient, config, jsc);
String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
JavaRDD<WriteStatus> result =
table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime));
table = HoodieTable.create(config, jsc);
String compactionInstantTime = "102";
table.scheduleCompaction(jsc, compactionInstantTime, Option.empty());
table.getMetaClient().reloadActiveTimeline();
JavaRDD<WriteStatus> result = table.compact(jsc, compactionInstantTime).getWriteStatuses();
// Verify that all partition paths are present in the WriteStatus result
for (String partitionPath : dataGen.getPartitionPaths()) {
@@ -184,8 +184,4 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
protected HoodieTableType getTableType() {
return HoodieTableType.MERGE_ON_READ;
}
// TODO - after modifying HoodieReadClient to support mor tables - add more tests to make
// sure the data read is the updated data (compaction correctness)
// TODO - add more test cases for compactions after a failed commit/compaction
}

View File

@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hudi.table.compact.strategy;
package org.apache.hudi.table.action.compact.strategy;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.common.model.HoodieBaseFile;