1
0

[HUDI-308] Avoid Renames for tracking state transitions of all actions on dataset

This commit is contained in:
Balaji Varadarajan
2019-12-04 01:02:17 -08:00
committed by Balaji Varadarajan
parent 8963a68e6a
commit 9a1f698eef
47 changed files with 1121 additions and 403 deletions

View File

@@ -179,9 +179,10 @@ public class CompactionAdminClient extends AbstractHoodieClient {
// revert if in inflight state
metaClient.getActiveTimeline().revertCompactionInflightToRequested(inflight);
}
// Overwrite compaction plan with updated info
metaClient.getActiveTimeline().saveToCompactionRequested(
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, compactionOperationWithInstant.getLeft()),
AvroUtils.serializeCompactionPlan(newPlan));
AvroUtils.serializeCompactionPlan(newPlan), true);
}
return res;
}
@@ -218,8 +219,9 @@ public class CompactionAdminClient extends AbstractHoodieClient {
*/
private static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, String compactionInstant)
throws IOException {
HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan(metaClient.getActiveTimeline()
.getInstantAuxiliaryDetails(HoodieTimeline.getCompactionRequestedInstant(compactionInstant)).get());
HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan(
metaClient.getActiveTimeline().readPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(compactionInstant)).get());
return compactionPlan;
}

View File

@@ -67,7 +67,7 @@ public class HoodieCleanClient<T extends HoodieRecordPayload> extends AbstractHo
* cleaned)
*/
public void clean() throws HoodieIOException {
String startCleanTime = HoodieActiveTimeline.createNewCommitTime();
String startCleanTime = HoodieActiveTimeline.createNewInstantTime();
clean(startCleanTime);
}
@@ -86,7 +86,7 @@ public class HoodieCleanClient<T extends HoodieRecordPayload> extends AbstractHo
// If there are inflight(failed) or previously requested clean operation, first perform them
table.getCleanTimeline().filterInflightsAndRequested().getInstants().forEach(hoodieInstant -> {
LOG.info("There were previously unfinished cleaner operations. Finishing Instant=" + hoodieInstant);
runClean(table, hoodieInstant.getTimestamp());
runClean(table, hoodieInstant);
});
Option<HoodieCleanerPlan> cleanerPlanOpt = scheduleClean(startCleanTime);
@@ -96,7 +96,7 @@ public class HoodieCleanClient<T extends HoodieRecordPayload> extends AbstractHo
if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null)
&& !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) {
final HoodieTable<T> hoodieTable = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
return runClean(hoodieTable, startCleanTime);
return runClean(hoodieTable, HoodieTimeline.getCleanRequestedInstant(startCleanTime), cleanerPlan);
}
}
return null;
@@ -136,13 +136,20 @@ public class HoodieCleanClient<T extends HoodieRecordPayload> extends AbstractHo
* Executes the Cleaner plan stored in the instant metadata.
*
* @param table Hoodie Table
* @param cleanInstantTs Cleaner Instant Timestamp
* @param cleanInstant Cleaner Instant
*/
@VisibleForTesting
protected HoodieCleanMetadata runClean(HoodieTable<T> table, String cleanInstantTs) {
HoodieInstant cleanInstant =
table.getCleanTimeline().getInstants().filter(x -> x.getTimestamp().equals(cleanInstantTs)).findFirst().get();
protected HoodieCleanMetadata runClean(HoodieTable<T> table, HoodieInstant cleanInstant) {
try {
HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(table.getMetaClient(), cleanInstant);
return runClean(table, cleanInstant, cleanerPlan);
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}
private HoodieCleanMetadata runClean(HoodieTable<T> table, HoodieInstant cleanInstant,
HoodieCleanerPlan cleanerPlan) {
Preconditions.checkArgument(
cleanInstant.getState().equals(State.REQUESTED) || cleanInstant.getState().equals(State.INFLIGHT));
@@ -152,10 +159,11 @@ public class HoodieCleanClient<T extends HoodieRecordPayload> extends AbstractHo
if (!cleanInstant.isInflight()) {
// Mark as inflight first
cleanInstant = table.getActiveTimeline().transitionCleanRequestedToInflight(cleanInstant);
cleanInstant = table.getActiveTimeline().transitionCleanRequestedToInflight(cleanInstant,
AvroUtils.serializeCleanerPlan(cleanerPlan));
}
List<HoodieCleanStat> cleanStats = table.clean(jsc, cleanInstant);
List<HoodieCleanStat> cleanStats = table.clean(jsc, cleanInstant, cleanerPlan);
if (cleanStats.isEmpty()) {
return HoodieCleanMetadata.newBuilder().build();

View File

@@ -80,8 +80,6 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -101,7 +99,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
private static final Logger LOG = LogManager.getLogger(HoodieWriteClient.class);
private static final String UPDATE_STR = "update";
private static final String LOOKUP_STR = "lookup";
private final boolean rollbackInFlight;
private final boolean rollbackPending;
private final transient HoodieMetrics metrics;
private final transient HoodieIndex<T> index;
private final transient HoodieCleanClient<T> cleanClient;
@@ -119,21 +117,21 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
/**
*
*/
public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackInFlight) {
this(jsc, clientConfig, rollbackInFlight, HoodieIndex.createIndex(clientConfig, jsc));
public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending) {
this(jsc, clientConfig, rollbackPending, HoodieIndex.createIndex(clientConfig, jsc));
}
@VisibleForTesting
HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackInFlight, HoodieIndex index) {
this(jsc, clientConfig, rollbackInFlight, index, Option.empty());
HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending, HoodieIndex index) {
this(jsc, clientConfig, rollbackPending, index, Option.empty());
}
public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackInFlight,
public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending,
HoodieIndex index, Option<EmbeddedTimelineService> timelineService) {
super(jsc, clientConfig, timelineService);
this.index = index;
this.metrics = new HoodieMetrics(config, config.getTableName());
this.rollbackInFlight = rollbackInFlight;
this.rollbackPending = rollbackPending;
this.cleanClient = new HoodieCleanClient<>(jsc, config, metrics, timelineService);
}
@@ -356,7 +354,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
return upsertRecordsInternal(taggedValidRecords, commitTime, table, true);
} else {
// if entire set of keys are non existent
JavaRDD<WriteStatus> writeStatusRDD = jsc.parallelize(Collections.EMPTY_LIST, 1);
saveWorkloadProfileMetadataToInflight(new WorkloadProfile(jsc.emptyRDD()), table, commitTime);
JavaRDD<WriteStatus> writeStatusRDD = jsc.emptyRDD();
commitOnAutoCommit(commitTime, writeStatusRDD, table.getMetaClient().getCommitActionType());
return writeStatusRDD;
}
@@ -388,6 +387,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
final List<String> fileIDPrefixes =
IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx()).collect(Collectors.toList());
table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED,
table.getMetaClient().getCommitActionType(), commitTime), Option.empty());
JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, config, table, fileIDPrefixes), true)
.flatMap(writeStatuses -> writeStatuses.iterator());
@@ -435,9 +437,10 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
});
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
Option<HoodieInstant> instant =
activeTimeline.getCommitsTimeline().filterInflightsExcludingCompaction().lastInstant();
activeTimeline.saveToInflight(instant.get(), Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
String commitActionType = table.getMetaClient().getCommitActionType();
HoodieInstant requested = new HoodieInstant(State.REQUESTED, commitActionType, commitTime);
activeTimeline.transitionRequestedToInflight(requested,
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
} catch (IOException io) {
throw new HoodieCommitException("Failed to commit " + commitTime + " unable to save inflight metadata ", io);
}
@@ -664,8 +667,11 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
HoodieSavepointMetadata metadata = AvroUtils.convertSavepointMetadata(user, comment, latestFilesMap);
// Nothing to save in the savepoint
table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, commitTime),
AvroUtils.serializeSavepointMetadata(metadata));
table.getActiveTimeline().createNewInstant(
new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, commitTime));
table.getActiveTimeline()
.saveAsComplete(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, commitTime),
AvroUtils.serializeSavepointMetadata(metadata));
LOG.info("Savepoint " + commitTime + " created");
return true;
} catch (IOException e) {
@@ -792,23 +798,25 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
.filter(instant -> HoodieActiveTimeline.GREATER.test(instant.getTimestamp(), instantTime))
.collect(Collectors.toList());
// Start a rollback instant for all commits to be rolled back
String startRollbackInstant = startInstant();
String startRollbackInstant = HoodieActiveTimeline.createNewInstantTime();
// Start the timer
final Timer.Context context = startContext();
ImmutableMap.Builder<String, List<HoodieRollbackStat>> instantsToStats = ImmutableMap.builder();
table.getActiveTimeline().createNewInstant(
new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, startRollbackInstant));
instantsToRollback.stream().forEach(instant -> {
try {
switch (instant.getAction()) {
case HoodieTimeline.COMMIT_ACTION:
case HoodieTimeline.DELTA_COMMIT_ACTION:
List<HoodieRollbackStat> statsForInstant = doRollbackAndGetStats(instant.getTimestamp());
List<HoodieRollbackStat> statsForInstant = doRollbackAndGetStats(instant);
instantsToStats.put(instant.getTimestamp(), statsForInstant);
break;
case HoodieTimeline.COMPACTION_ACTION:
// TODO : Get file status and create a rollback stat and file
// TODO : Delete the .aux files along with the instant file, okay for now since the archival process will
// delete these files when it does not see a corresponding instant file under .hoodie
List<HoodieRollbackStat> statsForCompaction = doRollbackAndGetStats(instant.getTimestamp());
List<HoodieRollbackStat> statsForCompaction = doRollbackAndGetStats(instant);
instantsToStats.put(instant.getTimestamp(), statsForCompaction);
LOG.info("Deleted compaction instant " + instant);
break;
@@ -828,17 +836,16 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
}
}
private String startInstant() {
return HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date());
}
private Timer.Context startContext() {
return metrics.getRollbackCtx();
}
private List<HoodieRollbackStat> doRollbackAndGetStats(final String commitToRollback) throws IOException {
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
HoodieTimeline inflightCommitTimeline = table.getInflightCommitTimeline();
private List<HoodieRollbackStat> doRollbackAndGetStats(final HoodieInstant instantToRollback) throws
IOException {
final String commitToRollback = instantToRollback.getTimestamp();
HoodieTable<T> table = HoodieTable.getHoodieTable(
createMetaClient(true), config, jsc);
HoodieTimeline inflightAndRequestedCommitTimeline = table.getPendingCommitTimeline();
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
// Check if any of the commits is a savepoint - do not allow rollback on those commits
List<String> savepoints = table.getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp)
@@ -850,7 +857,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
}
});
if (commitTimeline.empty() && inflightCommitTimeline.empty()) {
if (commitTimeline.empty() && inflightAndRequestedCommitTimeline.empty()) {
// nothing to rollback
LOG.info("No commits to rollback " + commitToRollback);
}
@@ -865,14 +872,14 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
"Found commits after time :" + lastCommit + ", please rollback greater commits first");
}
List<String> inflights =
inflightCommitTimeline.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
List<String> inflights = inflightAndRequestedCommitTimeline.getInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
if ((lastCommit != null) && !inflights.isEmpty() && (inflights.indexOf(lastCommit) != inflights.size() - 1)) {
throw new HoodieRollbackException(
"Found in-flight commits after time :" + lastCommit + ", please rollback greater commits first");
}
List<HoodieRollbackStat> stats = table.rollback(jsc, commitToRollback, true);
List<HoodieRollbackStat> stats = table.rollback(jsc, instantToRollback, true);
LOG.info("Deleted inflight commits " + commitToRollback);
@@ -893,9 +900,13 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
durationInMs = Option.of(metrics.getDurationInMs(context.stop()));
metrics.updateRollbackMetrics(durationInMs.get(), numFilesDeleted);
}
HoodieRollbackMetadata rollbackMetadata =
AvroUtils.convertRollbackMetadata(startRollbackTime, durationInMs, commitsToRollback, rollbackStats);
table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.ROLLBACK_ACTION, startRollbackTime),
HoodieRollbackMetadata rollbackMetadata = AvroUtils
.convertRollbackMetadata(startRollbackTime, durationInMs, commitsToRollback, rollbackStats);
//TODO: varadarb - This will be fixed in subsequent PR when Rollback and Clean transition mimics that of commit
table.getActiveTimeline().createNewInstant(new HoodieInstant(State.INFLIGHT, HoodieTimeline.ROLLBACK_ACTION,
startRollbackTime));
table.getActiveTimeline().saveAsComplete(
new HoodieInstant(true, HoodieTimeline.ROLLBACK_ACTION, startRollbackTime),
AvroUtils.serializeRollbackMetadata(rollbackMetadata));
LOG.info("Commits " + commitsToRollback + " rollback is complete");
@@ -937,17 +948,22 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
}
private void rollbackInternal(String commitToRollback) {
if (commitToRollback.isEmpty()) {
LOG.info("List of commits to rollback is empty");
return;
}
final String startRollbackTime = startInstant();
final String startRollbackTime = HoodieActiveTimeline.createNewInstantTime();
final Timer.Context context = startContext();
// Create a Hoodie table which encapsulated the commits and files visible
try {
List<HoodieRollbackStat> stats = doRollbackAndGetStats(commitToRollback);
Map<String, List<HoodieRollbackStat>> statToCommit = new HashMap<>();
finishRollback(context, stats, Arrays.asList(commitToRollback), startRollbackTime);
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.getHoodieTable(
createMetaClient(true), config, jsc);
Option<HoodieInstant> rollbackInstantOpt =
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
.filter(instant -> HoodieActiveTimeline.EQUAL.test(instant.getTimestamp(), commitToRollback))
.findFirst());
if (rollbackInstantOpt.isPresent()) {
List<HoodieRollbackStat> stats = doRollbackAndGetStats(rollbackInstantOpt.get());
finishRollback(context, stats, Arrays.asList(commitToRollback), startRollbackTime);
}
} catch (IOException e) {
throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitToRollback,
e);
@@ -992,20 +1008,20 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
public String startCommit() {
// NOTE : Need to ensure that rollback is done before a new commit is started
if (rollbackInFlight) {
// Only rollback inflight commit/delta-commits. Do not touch compaction commits
rollbackInflightCommits();
if (rollbackPending) {
// Only rollback pending commit/delta-commits. Do not touch compaction commits
rollbackPendingCommits();
}
String commitTime = HoodieActiveTimeline.createNewCommitTime();
String commitTime = HoodieActiveTimeline.createNewInstantTime();
startCommit(commitTime);
return commitTime;
}
public void startCommitWithTime(String instantTime) {
// NOTE : Need to ensure that rollback is done before a new commit is started
if (rollbackInFlight) {
if (rollbackPending) {
// Only rollback inflight commit/delta-commits. Do not touch compaction commits
rollbackInflightCommits();
rollbackPendingCommits();
}
startCommit(instantTime);
}
@@ -1023,14 +1039,14 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
String commitActionType = table.getMetaClient().getCommitActionType();
activeTimeline.createInflight(new HoodieInstant(true, commitActionType, instantTime));
activeTimeline.createNewInstant(new HoodieInstant(State.REQUESTED, commitActionType, instantTime));
}
/**
* Schedules a new compaction instant.
*/
public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws IOException {
String instantTime = HoodieActiveTimeline.createNewCommitTime();
String instantTime = HoodieActiveTimeline.createNewInstantTime();
LOG.info("Generate a new instant time " + instantTime);
boolean notEmpty = scheduleCompactionAtInstant(instantTime, extraMetadata);
return notEmpty ? Option.of(instantTime) : Option.empty();
@@ -1046,7 +1062,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
throws IOException {
HoodieTableMetaClient metaClient = createMetaClient(true);
// if there are inflight writes, their instantTime must not be less than that of compaction instant time
metaClient.getCommitsTimeline().filterInflightsExcludingCompaction().firstInstant().ifPresent(earliestInflight -> {
metaClient.getCommitsTimeline().filterPendingExcludingCompaction().firstInstant().ifPresent(earliestInflight -> {
Preconditions.checkArgument(
HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), instantTime, HoodieTimeline.GREATER),
"Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight
@@ -1091,7 +1107,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan(
timeline.getInstantAuxiliaryDetails(HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get());
timeline.readPlanAsBytes(HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get());
// Merge extra meta-data passed by user with the one already in inflight compaction
Option<Map<String, String>> mergedMetaData = extraMetadata.map(m -> {
Map<String, String> merged = new HashMap<>();
@@ -1141,11 +1157,11 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
}
/**
* Cleanup all inflight commits.
* Cleanup all pending commits.
*/
private void rollbackInflightCommits() {
private void rollbackPendingCommits() {
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterInflightsExcludingCompaction();
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
List<String> commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
for (String commit : commits) {
@@ -1238,7 +1254,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
HoodieTableMetaClient metaClient = createMetaClient(true);
HoodieCompactionPlan compactionPlan =
CompactionUtils.getCompactionPlan(metaClient, compactionInstant.getTimestamp());
// Mark instant as compaction inflight
activeTimeline.transitionCompactionRequestedToInflight(compactionInstant);
compactionTimer = metrics.getCompactionCtx();
@@ -1306,7 +1321,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
@VisibleForTesting
void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) throws IOException {
table.rollback(jsc, inflightInstant.getTimestamp(), false);
table.rollback(jsc, inflightInstant, false);
// Revert instant state file
table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
}

View File

@@ -18,7 +18,9 @@
package org.apache.hudi.client.utils;
import org.apache.hudi.common.model.TimelineLayoutVersion;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.spark.api.java.JavaSparkContext;
@@ -35,6 +37,6 @@ public class ClientUtils {
public static HoodieTableMetaClient createMetaClient(JavaSparkContext jsc, HoodieWriteConfig config,
boolean loadActiveTimelineOnLoad) {
return new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), loadActiveTimelineOnLoad,
config.getConsistencyGuardConfig());
config.getConsistencyGuardConfig(), Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())));
}
}

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.config;
import org.apache.hudi.HoodieWriteClient;
import org.apache.hudi.WriteStatus;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.ConsistencyGuardConfig;
import org.apache.hudi.common.util.ReflectionUtils;
@@ -48,6 +49,7 @@ import java.util.Properties;
public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String TABLE_NAME = "hoodie.table.name";
private static final String TIMELINE_LAYOUT_VERSION = "hoodie.timeline.layout.version";
private static final String BASE_PATH_PROP = "hoodie.base.path";
private static final String AVRO_SCHEMA = "hoodie.avro.schema";
private static final String DEFAULT_PARALLELISM = "1500";
@@ -141,6 +143,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Boolean.parseBoolean(props.getProperty(HOODIE_ASSUME_DATE_PARTITIONING_PROP));
}
public Integer getTimelineLayoutVersion() {
return Integer.parseInt(props.getProperty(TIMELINE_LAYOUT_VERSION));
}
public int getBulkInsertShuffleParallelism() {
return Integer.parseInt(props.getProperty(BULKINSERT_PARALLELISM));
}
@@ -587,6 +593,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
public Builder withTimelineLayoutVersion(int version) {
props.setProperty(TIMELINE_LAYOUT_VERSION, String.valueOf(version));
return this;
}
public Builder withBulkInsertParallelism(int bulkInsertParallelism) {
props.setProperty(BULKINSERT_PARALLELISM, String.valueOf(bulkInsertParallelism));
return this;
@@ -693,7 +704,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM), UPSERT_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(DELETE_PARALLELISM), DELETE_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM,
DEFAULT_ROLLBACK_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_INSERT_PROP), COMBINE_BEFORE_INSERT_PROP,
DEFAULT_COMBINE_BEFORE_INSERT);
setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_UPSERT_PROP), COMBINE_BEFORE_UPSERT_PROP,
@@ -733,6 +745,12 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
setDefaultOnCondition(props, !isConsistencyGuardSet,
ConsistencyGuardConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !props.containsKey(TIMELINE_LAYOUT_VERSION), TIMELINE_LAYOUT_VERSION,
String.valueOf(TimelineLayoutVersion.CURR_VERSION));
String layoutVersion = props.getProperty(TIMELINE_LAYOUT_VERSION);
// Ensure Layout Version is good
new TimelineLayoutVersion(Integer.parseInt(layoutVersion));
// Build WriteConfig at the end
HoodieWriteConfig config = new HoodieWriteConfig(props);
Preconditions.checkArgument(config.getBasePath() != null);

View File

@@ -19,7 +19,7 @@
package org.apache.hudi.io;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.common.model.ActionType;
@@ -37,7 +37,10 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.AvroUtils;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
@@ -111,15 +114,18 @@ public class HoodieCommitArchiveLog {
public boolean archiveIfRequired(final JavaSparkContext jsc) throws IOException {
try {
List<HoodieInstant> instantsToArchive = getInstantsToArchive(jsc).collect(Collectors.toList());
boolean success = true;
if (instantsToArchive.iterator().hasNext()) {
if (!instantsToArchive.isEmpty()) {
this.writer = openWriter();
LOG.info("Archiving instants " + instantsToArchive);
archive(instantsToArchive);
LOG.info("Deleting archived instants " + instantsToArchive);
success = deleteArchivedInstants(instantsToArchive);
} else {
LOG.info("No Instants to archive");
}
return success;
} finally {
close();
@@ -171,7 +177,15 @@ public class HoodieCommitArchiveLog {
}).limit(commitTimeline.countInstants() - minCommitsToKeep));
}
return instants;
// For archiving and cleaning instants, we need to include intermediate state files if they exist
HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
Map<Pair<String, String>, List<HoodieInstant>> groupByTsAction = rawActiveTimeline.getInstants()
.collect(Collectors.groupingBy(i -> Pair.of(i.getTimestamp(),
HoodieInstant.getComparableAction(i.getAction()))));
return instants.flatMap(hoodieInstant ->
groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(),
HoodieInstant.getComparableAction(hoodieInstant.getAction()))).stream());
}
private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants) throws IOException {
@@ -194,6 +208,7 @@ public class HoodieCommitArchiveLog {
return i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION)
|| (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)));
}).max(Comparator.comparing(HoodieInstant::getTimestamp)));
LOG.info("Latest Committed Instant=" + latestCommitted);
if (latestCommitted.isPresent()) {
success &= deleteAllInstantsOlderorEqualsInAuxMetaFolder(latestCommitted.get());
}
@@ -208,8 +223,8 @@ public class HoodieCommitArchiveLog {
* @throws IOException in case of error
*/
private boolean deleteAllInstantsOlderorEqualsInAuxMetaFolder(HoodieInstant thresholdInstant) throws IOException {
List<HoodieInstant> instants = HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(metaClient.getFs(),
new Path(metaClient.getMetaAuxiliaryPath()), HoodieActiveTimeline.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE);
List<HoodieInstant> instants = metaClient.scanHoodieInstantsFromFileSystem(
new Path(metaClient.getMetaAuxiliaryPath()), HoodieActiveTimeline.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE, false);
List<HoodieInstant> instantsToBeDeleted =
instants.stream().filter(instant1 -> HoodieTimeline.compareTimestamps(instant1.getTimestamp(),
@@ -270,10 +285,14 @@ public class HoodieCommitArchiveLog {
throws IOException {
HoodieArchivedMetaEntry archivedMetaWrapper = new HoodieArchivedMetaEntry();
archivedMetaWrapper.setCommitTime(hoodieInstant.getTimestamp());
archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
switch (hoodieInstant.getAction()) {
case HoodieTimeline.CLEAN_ACTION: {
archivedMetaWrapper.setHoodieCleanMetadata(AvroUtils
.deserializeAvroMetadata(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieCleanMetadata.class));
if (hoodieInstant.isCompleted()) {
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient, hoodieInstant));
} else {
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient, hoodieInstant));
}
archivedMetaWrapper.setActionType(ActionType.clean.name());
break;
}
@@ -303,8 +322,15 @@ public class HoodieCommitArchiveLog {
archivedMetaWrapper.setActionType(ActionType.commit.name());
break;
}
default:
case HoodieTimeline.COMPACTION_ACTION: {
HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(metaClient, hoodieInstant.getTimestamp());
archivedMetaWrapper.setHoodieCompactionPlan(plan);
archivedMetaWrapper.setActionType(ActionType.compaction.name());
break;
}
default: {
throw new UnsupportedOperationException("Action not fully supported yet");
}
}
return archivedMetaWrapper;
}

View File

@@ -35,7 +35,7 @@ import org.apache.hudi.common.model.HoodieRollingStatMetadata;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.AvroUtils;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
@@ -109,14 +109,15 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
Tuple2<String, String> partitionDelFileTuple = iter.next();
String partitionPath = partitionDelFileTuple._1();
String delFileName = partitionDelFileTuple._2();
String deletePathStr = new Path(new Path(basePath, partitionPath), delFileName).toString();
Path deletePath = new Path(new Path(basePath, partitionPath), delFileName);
String deletePathStr = deletePath.toString();
Boolean deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
if (!partitionCleanStatMap.containsKey(partitionPath)) {
partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath));
}
PartitionCleanStat partitionCleanStat = partitionCleanStatMap.get(partitionPath);
partitionCleanStat.addDeleteFilePatterns(deletePathStr);
partitionCleanStat.addDeletedFileResult(deletePathStr, deletedFileResult);
partitionCleanStat.addDeleteFilePatterns(deletePath.getName());
partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult);
}
return partitionCleanStatMap.entrySet().stream().map(e -> new Tuple2<>(e.getKey(), e.getValue()))
.collect(Collectors.toList()).iterator();
@@ -312,82 +313,78 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
* @throws IllegalArgumentException if unknown cleaning policy is provided
*/
@Override
public List<HoodieCleanStat> clean(JavaSparkContext jsc, HoodieInstant cleanInstant) {
try {
HoodieCleanerPlan cleanerPlan = AvroUtils.deserializeCleanerPlan(getActiveTimeline()
.getInstantAuxiliaryDetails(HoodieTimeline.getCleanRequestedInstant(cleanInstant.getTimestamp())).get());
public List<HoodieCleanStat> clean(JavaSparkContext jsc, HoodieInstant cleanInstant, HoodieCleanerPlan cleanerPlan) {
int cleanerParallelism = Math.min(
(int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(x -> x.size()).count()),
config.getCleanerParallelism());
LOG.info("Using cleanerParallelism: " + cleanerParallelism);
List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc
.parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream()
.flatMap(x -> x.getValue().stream().map(y -> new Tuple2<String, String>(x.getKey(), y)))
.collect(Collectors.toList()), cleanerParallelism)
.mapPartitionsToPair(deleteFilesFunc(this)).reduceByKey((e1, e2) -> e1.merge(e2)).collect();
int cleanerParallelism = Math.min(
(int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(x -> x.size()).count()),
config.getCleanerParallelism());
LOG.info("Using cleanerParallelism: " + cleanerParallelism);
List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc
.parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream()
.flatMap(x -> x.getValue().stream().map(y -> new Tuple2<String, String>(x.getKey(), y)))
.collect(Collectors.toList()), cleanerParallelism)
.mapPartitionsToPair(deleteFilesFunc(this)).reduceByKey((e1, e2) -> e1.merge(e2)).collect();
Map<String, PartitionCleanStat> partitionCleanStatsMap =
partitionCleanStats.stream().collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
Map<String, PartitionCleanStat> partitionCleanStatsMap =
partitionCleanStats.stream().collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
// Return PartitionCleanStat for each partition passed.
return cleanerPlan.getFilesToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {
PartitionCleanStat partitionCleanStat =
(partitionCleanStatsMap.containsKey(partitionPath)) ? partitionCleanStatsMap.get(partitionPath)
: new PartitionCleanStat(partitionPath);
HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain();
return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
.withEarliestCommitRetained(Option.ofNullable(
actionInstant != null
? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
actionInstant.getAction(), actionInstant.getTimestamp())
: null))
.withDeletePathPattern(partitionCleanStat.deletePathPatterns)
.withSuccessfulDeletes(partitionCleanStat.successDeleteFiles)
.withFailedDeletes(partitionCleanStat.failedDeleteFiles).build();
}).collect(Collectors.toList());
} catch (IOException e) {
throw new HoodieIOException("Failed to clean up after commit", e);
}
// Return PartitionCleanStat for each partition passed.
return cleanerPlan.getFilesToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {
PartitionCleanStat partitionCleanStat =
(partitionCleanStatsMap.containsKey(partitionPath)) ? partitionCleanStatsMap.get(partitionPath)
: new PartitionCleanStat(partitionPath);
HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain();
return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
.withEarliestCommitRetained(Option.ofNullable(
actionInstant != null
? new HoodieInstant(State.valueOf(actionInstant.getState()),
actionInstant.getAction(), actionInstant.getTimestamp())
: null))
.withDeletePathPattern(partitionCleanStat.deletePathPatterns)
.withSuccessfulDeletes(partitionCleanStat.successDeleteFiles)
.withFailedDeletes(partitionCleanStat.failedDeleteFiles).build();
}).collect(Collectors.toList());
}
@Override
public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, String commit, boolean deleteInstants)
public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, HoodieInstant instant, boolean deleteInstants)
throws IOException {
Long startTime = System.currentTimeMillis();
List<HoodieRollbackStat> stats = new ArrayList<>();
String actionType = metaClient.getCommitActionType();
HoodieActiveTimeline activeTimeline = this.getActiveTimeline();
List<String> inflights =
this.getInflightCommitTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
// Atomically unpublish the commits
if (!inflights.contains(commit)) {
LOG.info("Unpublishing " + commit);
activeTimeline.revertToInflight(new HoodieInstant(false, actionType, commit));
if (instant.isCompleted()) {
LOG.info("Unpublishing instant " + instant);
instant = activeTimeline.revertToInflight(instant);
}
HoodieInstant instantToRollback = new HoodieInstant(false, actionType, commit);
Long startTime = System.currentTimeMillis();
// For Requested State (like failure during index lookup), there is nothing to do rollback other than
// deleting the timeline file
if (!instant.isRequested()) {
String commit = instant.getTimestamp();
// delete all the data files for this commit
LOG.info("Clean out all parquet files generated for commit: " + commit);
List<RollbackRequest> rollbackRequests = generateRollbackRequests(instantToRollback);
// TODO: We need to persist this as rollback workload and use it in case of partial failures
List<HoodieRollbackStat> stats =
new RollbackExecutor(metaClient, config).performRollback(jsc, instantToRollback, rollbackRequests);
// delete all the data files for this commit
LOG.info("Clean out all parquet files generated for commit: " + commit);
List<RollbackRequest> rollbackRequests = generateRollbackRequests(instant);
//TODO: We need to persist this as rollback workload and use it in case of partial failures
stats = new RollbackExecutor(metaClient, config).performRollback(jsc, instant, rollbackRequests);
}
// Delete Inflight instant if enabled
deleteInflightInstant(deleteInstants, activeTimeline, new HoodieInstant(true, actionType, commit));
deleteInflightAndRequestedInstant(deleteInstants, activeTimeline, instant);
LOG.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime));
return stats;
}
private List<RollbackRequest> generateRollbackRequests(HoodieInstant instantToRollback) throws IOException {
private List<RollbackRequest> generateRollbackRequests(HoodieInstant instantToRollback)
throws IOException {
return FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning()).stream().map(partitionPath -> {
return RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback);
}).collect(Collectors.toList());
}
/**
* Delete Inflight instant if enabled.
*
@@ -395,15 +392,22 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
* @param activeTimeline Hoodie active timeline
* @param instantToBeDeleted Instant to be deleted
*/
protected void deleteInflightInstant(boolean deleteInstant, HoodieActiveTimeline activeTimeline,
protected void deleteInflightAndRequestedInstant(boolean deleteInstant, HoodieActiveTimeline activeTimeline,
HoodieInstant instantToBeDeleted) {
// Remove marker files always on rollback
deleteMarkerDir(instantToBeDeleted.getTimestamp());
// Remove the rolled back inflight commits
if (deleteInstant) {
activeTimeline.deleteInflight(instantToBeDeleted);
LOG.info("Deleted inflight commit " + instantToBeDeleted);
LOG.info("Deleting instant=" + instantToBeDeleted);
activeTimeline.deletePending(instantToBeDeleted);
if (instantToBeDeleted.isInflight() && !metaClient.getTimelineLayoutVersion().isNullVersion()) {
// Delete corresponding requested instant
instantToBeDeleted = new HoodieInstant(State.REQUESTED, instantToBeDeleted.getAction(),
instantToBeDeleted.getTimestamp());
activeTimeline.deletePending(instantToBeDeleted);
}
LOG.info("Deleted pending commit " + instantToBeDeleted);
} else {
LOG.warn("Rollback finished without deleting inflight instant file. Instant=" + instantToBeDeleted);
}

View File

@@ -44,7 +44,6 @@ import org.apache.hudi.io.compact.HoodieRealtimeTableCompactor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
@@ -167,32 +166,39 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
}
@Override
public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, String commit, boolean deleteInstants)
throws IOException {
public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, HoodieInstant instant,
boolean deleteInstants) throws IOException {
Long startTime = System.currentTimeMillis();
String commit = instant.getTimestamp();
LOG.error("Rolling back instant " + instant);
// Atomically un-publish all non-inflight commits
if (instant.isCompleted()) {
LOG.error("Un-publishing instant " + instant + ", deleteInstants=" + deleteInstants);
instant = this.getActiveTimeline().revertToInflight(instant);
}
List<HoodieRollbackStat> allRollbackStats = new ArrayList<>();
// At the moment, MOR table type does not support bulk nested rollbacks. Nested rollbacks is an experimental
// feature that is expensive. To perform nested rollbacks, initiate multiple requests of client.rollback
// (commitToRollback).
// NOTE {@link HoodieCompactionConfig#withCompactionLazyBlockReadEnabled} needs to be set to TRUE. This is
// required to avoid OOM when merging multiple LogBlocks performed during nested rollbacks.
// Atomically un-publish all non-inflight commits
Option<HoodieInstant> commitOrCompactionOption = Option.fromJavaOptional(this.getActiveTimeline()
.getTimelineOfActions(Sets.newHashSet(HoodieActiveTimeline.COMMIT_ACTION,
HoodieActiveTimeline.DELTA_COMMIT_ACTION, HoodieActiveTimeline.COMPACTION_ACTION))
.getInstants().filter(i -> commit.equals(i.getTimestamp())).findFirst());
HoodieInstant instantToRollback = commitOrCompactionOption.get();
// Atomically un-publish all non-inflight commits
if (!instantToRollback.isInflight()) {
this.getActiveTimeline().revertToInflight(instantToRollback);
// For Requested State (like failure during index lookup), there is nothing to do rollback other than
// deleting the timeline file
if (!instant.isRequested()) {
LOG.info("Unpublished " + commit);
List<RollbackRequest> rollbackRequests = generateRollbackRequests(jsc, instant);
// TODO: We need to persist this as rollback workload and use it in case of partial failures
allRollbackStats = new RollbackExecutor(metaClient, config).performRollback(jsc, instant, rollbackRequests);
}
LOG.info("Unpublished " + commit);
Long startTime = System.currentTimeMillis();
List<RollbackRequest> rollbackRequests = generateRollbackRequests(jsc, instantToRollback);
// TODO: We need to persist this as rollback workload and use it in case of partial failures
List<HoodieRollbackStat> allRollbackStats =
new RollbackExecutor(metaClient, config).performRollback(jsc, instantToRollback, rollbackRequests);
// Delete Inflight instants if enabled
deleteInflightInstant(deleteInstants, this.getActiveTimeline(),
new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp()));
deleteInflightAndRequestedInstant(deleteInstants, this.getActiveTimeline(), instant);
LOG.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime));

View File

@@ -182,8 +182,8 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
/**
* Get only the inflights (no-completed) commit timeline.
*/
public HoodieTimeline getInflightCommitTimeline() {
return metaClient.getCommitsTimeline().filterInflightsExcludingCompaction();
public HoodieTimeline getPendingCommitTimeline() {
return metaClient.getCommitsTimeline().filterPendingExcludingCompaction();
}
/**
@@ -287,16 +287,18 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
*
* @param jsc Java Spark Context
* @param cleanInstant Clean Instant
* @param cleanerPlan Cleaner Plan
* @return list of Clean Stats
*/
public abstract List<HoodieCleanStat> clean(JavaSparkContext jsc, HoodieInstant cleanInstant);
public abstract List<HoodieCleanStat> clean(JavaSparkContext jsc, HoodieInstant cleanInstant,
HoodieCleanerPlan cleanerPlan);
/**
* Rollback the (inflight/committed) record changes with the given commit time. Four steps: (1) Atomically unpublish
* this commit (2) clean indexing data (3) clean new generated parquet files / log blocks (4) Finally, delete
* .<action>.commit or .<action>.inflight file if deleteInstants = true
*/
public abstract List<HoodieRollbackStat> rollback(JavaSparkContext jsc, String commit, boolean deleteInstants)
public abstract List<HoodieRollbackStat> rollback(JavaSparkContext jsc, HoodieInstant instant, boolean deleteInstants)
throws IOException;
/**

View File

@@ -116,6 +116,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
// Reload and rollback inflight compaction
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
// hoodieTable.rollback(jsc,
// new HoodieInstant(true, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), false);
client.rollbackInflightCompaction(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), hoodieTable);
@@ -166,7 +168,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
assertEquals("Pending Compaction instant has expected instant time", pendingCompactionInstant.getTimestamp(),
compactionInstantTime);
HoodieInstant inflightInstant =
metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().get();
metaClient.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get();
assertEquals("inflight instant has expected instant time", inflightInstant.getTimestamp(), inflightInstantTime);
// This should rollback
@@ -174,10 +176,10 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
// Validate
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
inflightInstant = metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().get();
inflightInstant = metaClient.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get();
assertEquals("inflight instant has expected instant time", inflightInstant.getTimestamp(), nextInflightInstantTime);
assertEquals("Expect only one inflight instant", 1, metaClient.getActiveTimeline()
.filterInflightsExcludingCompaction().getInstants().count());
.filterPendingExcludingCompaction().getInstants().count());
// Expect pending Compaction to be present
pendingCompactionInstant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
assertEquals("Pending Compaction instant has expected instant time", pendingCompactionInstant.getTimestamp(),
@@ -274,7 +276,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieInstant inflightInstant =
metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().get();
metaClient.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get();
assertEquals("inflight instant has expected instant time", inflightInstant.getTimestamp(), inflightInstantTime);
boolean gotException = false;

View File

@@ -434,7 +434,8 @@ public class TestCleaner extends TestHoodieClientBase {
metaClient.reloadActiveTimeline()
.revertToInflight(new HoodieInstant(State.COMPLETED, HoodieTimeline.CLEAN_ACTION, cleanInstantTs));
final HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieCleanMetadata cleanMetadata2 = writeClient.runClean(table, cleanInstantTs);
HoodieCleanMetadata cleanMetadata2 = writeClient.runClean(table,
HoodieTimeline.getCleanInflightInstant(cleanInstantTs));
Assert.assertTrue(
Objects.equals(cleanMetadata1.getEarliestCommitToRetain(), cleanMetadata2.getEarliestCommitToRetain()));
Assert.assertEquals(new Integer(0), cleanMetadata2.getTotalFilesDeleted());
@@ -646,8 +647,11 @@ public class TestCleaner extends TestHoodieClientBase {
HoodieCleanMetadata metadata =
CleanerUtils.convertCleanMetadata(metaClient, commitTime, Option.of(0L), Arrays.asList(cleanStat1, cleanStat2));
metadata.setVersion(CleanerUtils.CLEAN_METADATA_VERSION_1);
Assert.assertEquals(CleanerUtils.LATEST_CLEAN_METADATA_VERSION, metadata.getVersion());
// NOw upgrade and check
CleanMetadataMigrator metadataMigrator = new CleanMetadataMigrator(metaClient);
metadata = metadataMigrator.upgradeToLatest(metadata, metadata.getVersion());
testCleanMetadataPathEquality(metadata, newExpected);
CleanMetadataMigrator migrator = new CleanMetadataMigrator(metaClient);
@@ -736,7 +740,7 @@ public class TestCleaner extends TestHoodieClientBase {
.build();
// make 1 commit, with 1 file per partition
HoodieTestUtils.createCommitFiles(basePath, "000");
HoodieTestUtils.createInflightCommitFiles(basePath, "000");
String file1P0C0 =
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000");
@@ -769,7 +773,7 @@ public class TestCleaner extends TestHoodieClientBase {
file1P1C0));
// make next commit, with 1 insert & 1 update per partition
HoodieTestUtils.createCommitFiles(basePath, "001");
HoodieTestUtils.createInflightCommitFiles(basePath, "001");
metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
@@ -809,7 +813,7 @@ public class TestCleaner extends TestHoodieClientBase {
file1P1C0));
// make next commit, with 2 updates to existing files, and 1 insert
HoodieTestUtils.createCommitFiles(basePath, "002");
HoodieTestUtils.createInflightCommitFiles(basePath, "002");
metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
@@ -837,7 +841,7 @@ public class TestCleaner extends TestHoodieClientBase {
file1P0C0));
// make next commit, with 2 updates to existing files, and 1 insert
HoodieTestUtils.createCommitFiles(basePath, "003");
HoodieTestUtils.createInflightCommitFiles(basePath, "003");
metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
@@ -882,8 +886,10 @@ public class TestCleaner extends TestHoodieClientBase {
.put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
new ImmutableList.Builder<>().add(file3P0C2).build())
.build());
metaClient.getActiveTimeline().saveToInflight(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "004"),
metaClient.getActiveTimeline().createNewInstant(
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "004"));
metaClient.getActiveTimeline().transitionRequestedToInflight(
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "004"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, simulateFailureRetry);
HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
@@ -900,7 +906,6 @@ public class TestCleaner extends TestHoodieClientBase {
*/
@Test
public void testCleanMarkerDataFilesOnRollback() throws IOException {
HoodieTestUtils.createCommitFiles(basePath, "000");
List<String> markerFiles = createMarkerFiles("000", 10);
assertEquals("Some marker files are created.", 10, markerFiles.size());
assertEquals("Some marker files are created.", markerFiles.size(), getTotalTempFiles());
@@ -908,8 +913,12 @@ public class TestCleaner extends TestHoodieClientBase {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
table.rollback(jsc, "000", true);
table.getActiveTimeline().createNewInstant(new HoodieInstant(State.REQUESTED,
HoodieTimeline.COMMIT_ACTION, "000"));
table.getActiveTimeline().transitionRequestedToInflight(
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "000"), Option.empty());
metaClient.reloadActiveTimeline();
table.rollback(jsc, new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"), true);
assertEquals("All temp files are deleted.", 0, getTotalTempFiles());
}

View File

@@ -263,6 +263,8 @@ public class TestClientRollback extends TestHoodieClientBase {
String commitTime1 = "20160501010101";
String commitTime2 = "20160502020601";
String commitTime3 = "20160506030611";
String commitTime4 = "20160506030621";
String commitTime5 = "20160506030631";
new File(basePath + "/.hoodie").mkdirs();
HoodieTestDataGenerator.writePartitionMetadata(fs, new String[] {"2016/05/01", "2016/05/02", "2016/05/06"},
basePath);
@@ -292,7 +294,7 @@ public class TestClientRollback extends TestHoodieClientBase {
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
try (HoodieWriteClient client = getHoodieWriteClient(config, false);) {
client.startCommitWithTime(commitTime4);
// Check results, nothing changed
assertTrue(HoodieTestUtils.doesCommitExist(basePath, commitTime1));
assertTrue(HoodieTestUtils.doesInflightExist(basePath, commitTime2));
@@ -310,7 +312,7 @@ public class TestClientRollback extends TestHoodieClientBase {
// Turn auto rollback on
try (HoodieWriteClient client = getHoodieWriteClient(config, true)) {
client.startCommit();
client.startCommitWithTime(commitTime5);
assertTrue(HoodieTestUtils.doesCommitExist(basePath, commitTime1));
assertFalse(HoodieTestUtils.doesInflightExist(basePath, commitTime2));
assertFalse(HoodieTestUtils.doesInflightExist(basePath, commitTime3));

View File

@@ -27,9 +27,11 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRollingStat;
import org.apache.hudi.common.model.HoodieRollingStatMetadata;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.model.TimelineLayoutVersion;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.TableFileSystemView.ReadOptimizedView;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.ConsistencyGuardConfig;
import org.apache.hudi.common.util.FSUtils;
@@ -259,13 +261,16 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
/**
* Test one of HoodieWriteClient upsert(Prepped) APIs.
*
* @param hoodieWriteConfig Write Config
* @param config Write Config
* @param writeFn One of Hoodie Write Function API
* @throws Exception in case of error
*/
private void testUpsertsInternal(HoodieWriteConfig hoodieWriteConfig,
private void testUpsertsInternal(HoodieWriteConfig config,
Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPrepped)
throws Exception {
// Force using older timeline layout
HoodieWriteConfig hoodieWriteConfig = getConfigBuilder().withProps(config.getProps()).withTimelineLayoutVersion(
TimelineLayoutVersion.VERSION_0).build();
HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
// Write 1 (only inserts)
@@ -292,6 +297,44 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
deleteBatch(hoodieWriteConfig, client, newCommitTime, prevCommitTime,
initCommitTime, numRecords, HoodieWriteClient::delete, isPrepped, true,
0, 150);
// Now simulate an upgrade and perform a restore operation
HoodieWriteConfig newConfig = getConfigBuilder().withProps(config.getProps()).withTimelineLayoutVersion(
TimelineLayoutVersion.CURR_VERSION).build();
client = getHoodieWriteClient(newConfig, false);
client.restoreToInstant("004");
// Check the entire dataset has all records still
String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
for (int i = 0; i < fullPartitionPaths.length; i++) {
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
}
assertEquals("Must contain " + 200 + " records", 200,
HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
// Perform Delete again on upgraded dataset.
prevCommitTime = newCommitTime;
newCommitTime = "006";
numRecords = 50;
deleteBatch(newConfig, client, newCommitTime, prevCommitTime,
initCommitTime, numRecords, HoodieWriteClient::delete, isPrepped, true,
0, 150);
HoodieActiveTimeline activeTimeline = new HoodieActiveTimeline(metaClient, false);
List<HoodieInstant> instants = activeTimeline.getCommitTimeline().getInstants().collect(Collectors.toList());
Assert.assertEquals(5, instants.size());
Assert.assertEquals(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "001"),
instants.get(0));
Assert.assertEquals(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "004"),
instants.get(1));
// New Format should have all states of instants
Assert.assertEquals(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "006"),
instants.get(2));
Assert.assertEquals(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "006"),
instants.get(3));
Assert.assertEquals(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "006"),
instants.get(4));
}
/**

View File

@@ -145,19 +145,30 @@ public class HoodieTestDataGenerator {
createCommitFile(basePath, commitTime, HoodieTestUtils.getDefaultHadoopConf());
}
public static void createCommitFile(String basePath, String commitTime, Configuration configuration)
throws IOException {
Path commitFile = new Path(
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeCommitFileName(commitTime));
FileSystem fs = FSUtils.getFs(basePath, configuration);
FSDataOutputStream os = fs.create(commitFile, true);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
try {
// Write empty commit metadata
os.writeBytes(new String(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
} finally {
os.close();
}
public static void createCommitFile(String basePath, String commitTime, Configuration configuration) {
Arrays.asList(HoodieTimeline.makeCommitFileName(commitTime), HoodieTimeline.makeInflightCommitFileName(commitTime),
HoodieTimeline.makeRequestedCommitFileName(commitTime)).forEach(f -> {
Path commitFile = new Path(
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + f);
FSDataOutputStream os = null;
try {
FileSystem fs = FSUtils.getFs(basePath, configuration);
os = fs.create(commitFile, true);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
// Write empty commit metadata
os.writeBytes(new String(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
} finally {
if (null != os) {
try {
os.close();
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}
}
});
}
public static void createCompactionRequestedFile(String basePath, String commitTime, Configuration configuration)

View File

@@ -43,7 +43,7 @@ import static org.mockito.Mockito.when;
public class TestBoundedInMemoryExecutor extends HoodieClientTestHarness {
private final String commitTime = HoodieActiveTimeline.createNewCommitTime();
private final String commitTime = HoodieActiveTimeline.createNewInstantTime();
@Before
public void setUp() throws Exception {

View File

@@ -59,7 +59,7 @@ import static org.mockito.Mockito.when;
public class TestBoundedInMemoryQueue extends HoodieClientTestHarness {
private final String commitTime = HoodieActiveTimeline.createNewCommitTime();
private final String commitTime = HoodieActiveTimeline.createNewInstantTime();
@Before
public void setUp() throws Exception {

View File

@@ -37,6 +37,7 @@ import org.apache.hudi.index.hbase.HBaseIndexQPSResourceAllocator;
import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
@@ -131,7 +132,6 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
HoodieWriteConfig config = getConfig();
HBaseIndex index = new HBaseIndex(config);
try (HoodieWriteClient writeClient = getWriteClient(config);) {
writeClient.startCommit();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
@@ -140,6 +140,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
// Insert 200 records
writeClient.startCommitWithTime(newCommitTime);
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
assertNoWriteErrors(writeStatues.collect());
@@ -171,13 +172,19 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
HoodieWriteConfig config = getConfig();
HBaseIndex index = new HBaseIndex(config);
HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
writeClient.startCommit();
writeClient.startCommitWithTime(newCommitTime);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
JavaRDD<HoodieRecord> javaRDD1 = index.tagLocation(writeRecords, jsc, hoodieTable);
// Duplicate upsert and ensure correctness is maintained
// We are trying to approximately imitate the case when the RDD is recomputed. For RDD creating, driver code is not
// recomputed. This includes the state transitions. We need to delete the inflight instance so that subsequent
// upsert will not run into conflicts.
metaClient.getFs().delete(new Path(metaClient.getMetaPath(), "001.inflight"));
writeClient.upsert(writeRecords, newCommitTime);
assertNoWriteErrors(writeStatues.collect());

View File

@@ -45,8 +45,8 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
@@ -142,13 +142,12 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants());
HoodieTestUtils.createCleanFiles(metaClient, basePath, "100", dfs.getConf());
HoodieTestUtils.createInflightCleanFiles(basePath, dfs.getConf(), "101");
HoodieTestUtils.createCleanFiles(metaClient, basePath, "101", dfs.getConf());
HoodieTestUtils.createCleanFiles(metaClient, basePath, "102", dfs.getConf());
HoodieTestUtils.createCleanFiles(metaClient, basePath, "103", dfs.getConf());
HoodieTestUtils.createCleanFiles(metaClient, basePath, "104", dfs.getConf());
HoodieTestUtils.createCleanFiles(metaClient, basePath, "105", dfs.getConf());
HoodieTestUtils.createInflightCleanFiles(basePath, dfs.getConf(), "106", "107");
HoodieTestUtils.createPendingCleanFiles(metaClient, dfs.getConf(), "106", "107");
// reload the timeline and get all the commmits before archive
timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
@@ -157,7 +156,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
assertEquals("Loaded 6 commits and the count should match", 12, timeline.countInstants());
// verify in-flight instants before archive
verifyInflightInstants(metaClient, 3);
verifyInflightInstants(metaClient, 2);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
@@ -169,8 +168,8 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
originalCommits.removeAll(timeline.getInstants().collect(Collectors.toList()));
// Check compaction instants
List<HoodieInstant> instants = HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(metaClient.getFs(),
new Path(metaClient.getMetaAuxiliaryPath()), HoodieActiveTimeline.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE);
List<HoodieInstant> instants = metaClient.scanHoodieInstantsFromFileSystem(
new Path(metaClient.getMetaAuxiliaryPath()), HoodieActiveTimeline.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE, false);
assertEquals("Should delete all compaction instants < 104", 4, instants.size());
assertFalse("Requested Compaction must be absent for 100",
instants.contains(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "100")));
@@ -201,30 +200,31 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
Reader reader =
HoodieLogFormat.newReader(dfs, new HoodieLogFile(new Path(basePath + "/.hoodie/.commits_.archive.1_1-0-1")),
HoodieArchivedMetaEntry.getClassSchema());
int archivedRecordsCount = 0;
List<IndexedRecord> readRecords = new ArrayList<>();
// read the avro blocks and validate the number of records written in each avro block
int numBlocks = 0;
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
List<IndexedRecord> records = blk.getRecords();
readRecords.addAll(records);
assertEquals("Archived and read records for each block are same", 8, records.size());
archivedRecordsCount += records.size();
numBlocks++;
}
assertEquals("Total archived records and total read records are the same count", 8, archivedRecordsCount);
System.out.println("Read Records :" + readRecords.stream().map(r -> (GenericRecord) r)
.map(r -> r.get("actionType") + "_" + r.get("actionState") + "_" + r.get("commitTime")).collect(Collectors.toList()));
assertEquals("Total archived records and total read records are the same count", 24, archivedRecordsCount);
assertTrue("Average Archived records per block is greater than 1", archivedRecordsCount / numBlocks > 1);
// make sure the archived commits are the same as the (originalcommits - commitsleft)
List<String> readCommits = readRecords.stream().map(r -> (GenericRecord) r).map(r -> {
Set<String> readCommits = readRecords.stream().map(r -> (GenericRecord) r).map(r -> {
return r.get("commitTime").toString();
}).collect(Collectors.toList());
Collections.sort(readCommits);
}).collect(Collectors.toSet());
assertEquals("Read commits map should match the originalCommits - commitsLoadedFromArchival",
originalCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()), readCommits);
originalCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()), readCommits);
// verify in-flight instants after archive
verifyInflightInstants(metaClient, 3);
verifyInflightInstants(metaClient, 2);
reader.close();
}
@@ -272,8 +272,8 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
assertEquals("Should not archive commits when maxCommitsToKeep is 5", 4, timeline.countInstants());
List<HoodieInstant> instants = HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(metaClient.getFs(),
new Path(metaClient.getMetaAuxiliaryPath()), HoodieActiveTimeline.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE);
List<HoodieInstant> instants = metaClient.scanHoodieInstantsFromFileSystem(
new Path(metaClient.getMetaAuxiliaryPath()), HoodieActiveTimeline.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE, false);
assertEquals("Should not delete any aux compaction files when maxCommitsToKeep is 5", 8, instants.size());
assertTrue("Requested Compaction must be present for 100",
instants.contains(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "100")));

View File

@@ -101,7 +101,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
public void testCompactionOnCopyOnWriteFail() throws Exception {
metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
String compactionInstantTime = HoodieActiveTimeline.createNewCommitTime();
String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime));
}
@@ -117,7 +117,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
writeClient.insert(recordsRDD, newCommitTime).collect();
String compactionInstantTime = HoodieActiveTimeline.createNewCommitTime();
String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
JavaRDD<WriteStatus> result =
table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime));
assertTrue("If there is nothing to compact, result will be empty", result.isEmpty());
@@ -167,7 +167,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
String compactionInstantTime = HoodieActiveTimeline.createNewCommitTime();
String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
JavaRDD<WriteStatus> result =
table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime));

View File

@@ -196,7 +196,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
client.startCommit();
client.startCommitWithTime(newCommitTime);
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses);
@@ -821,6 +821,8 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
}
// Mark 2nd delta-instant as completed
metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(State.INFLIGHT,
HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime));
metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty());
@@ -1009,6 +1011,8 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, jsc);
tableRTFileSystemView = table.getRTFileSystemView();
((SyncableFileSystemView) tableRTFileSystemView).reset();
Option<HoodieInstant> lastInstant = ((SyncableFileSystemView) tableRTFileSystemView).getLastInstant();
System.out.println("Last Instant =" + lastInstant);
for (String partitionPath : dataGen.getPartitionPaths()) {
Assert.assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath)
.filter(fileSlice -> fileSlice.getDataFile().isPresent()).count() == 0);
@@ -1032,8 +1036,10 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
// Create a commit without rolling stats in metadata to test backwards compatibility
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
String commitActionType = table.getMetaClient().getCommitActionType();
HoodieInstant instant = new HoodieInstant(true, commitActionType, "000");
activeTimeline.createInflight(instant);
HoodieInstant instant = new HoodieInstant(State.REQUESTED, commitActionType, "000");
activeTimeline.createNewInstant(instant);
activeTimeline.transitionRequestedToInflight(instant, Option.empty());
instant = new HoodieInstant(State.INFLIGHT, commitActionType, "000");
activeTimeline.saveAsComplete(instant, Option.empty());
String commitTime = "001";