1
0

Introduce getCommitsAndCompactionsTimeline() explicitly & adjust usage across code base

This commit is contained in:
Vinoth Chandar
2017-04-26 13:36:49 -07:00
committed by prazanna
parent bae0528013
commit da17c5c607
15 changed files with 42 additions and 48 deletions

View File

@@ -40,7 +40,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@Component
@@ -71,7 +70,7 @@ public class CommitsCommand implements CommandMarker {
"limit"}, mandatory = false, help = "Limit commits", unspecifiedDefaultValue = "10")
final Integer limit) throws IOException {
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitTimeline().filterCompletedInstants();
HoodieTimeline timeline = activeTimeline.getCommitsAndCompactionsTimeline().filterCompletedInstants();
List<HoodieInstant> commits = timeline.getInstants().collect(Collectors.toList());
String[][] rows = new String[commits.size()][];
Collections.reverse(commits);
@@ -109,7 +108,7 @@ public class CommitsCommand implements CommandMarker {
@CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path")
final String sparkPropertiesPath) throws Exception {
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitTimeline().filterCompletedInstants();
HoodieTimeline timeline = activeTimeline.getCommitsAndCompactionsTimeline().filterCompletedInstants();
HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime);
if (!timeline.containsInstant(commitInstant)) {
@@ -136,7 +135,7 @@ public class CommitsCommand implements CommandMarker {
@CliOption(key = {"commit"}, help = "Commit to show")
final String commitTime) throws Exception {
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitTimeline().filterCompletedInstants();
HoodieTimeline timeline = activeTimeline.getCommitsAndCompactionsTimeline().filterCompletedInstants();
HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime);
if (!timeline.containsInstant(commitInstant)) {
@@ -184,7 +183,7 @@ public class CommitsCommand implements CommandMarker {
@CliOption(key = {"commit"}, help = "Commit to show")
final String commitTime) throws Exception {
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitTimeline().filterCompletedInstants();
HoodieTimeline timeline = activeTimeline.getCommitsAndCompactionsTimeline().filterCompletedInstants();
HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime);
if (!timeline.containsInstant(commitInstant)) {
@@ -220,9 +219,9 @@ public class CommitsCommand implements CommandMarker {
@CliOption(key = {"path"}, help = "Path of the dataset to compare to")
final String path) throws Exception {
HoodieTableMetaClient target = new HoodieTableMetaClient(HoodieCLI.fs, path);
HoodieTimeline targetTimeline = target.getActiveTimeline().getCommitTimeline().filterCompletedInstants();;
HoodieTimeline targetTimeline = target.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants();;
HoodieTableMetaClient source = HoodieCLI.tableMetadata;
HoodieTimeline sourceTimeline = source.getActiveTimeline().getCommitTimeline().filterCompletedInstants();;
HoodieTimeline sourceTimeline = source.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants();;
String targetLatestCommit =
targetTimeline.getInstants().iterator().hasNext() ? "0" : targetTimeline.lastInstant().get().getTimestamp();
String sourceLatestCommit =

View File

@@ -61,9 +61,9 @@ public class HoodieSyncCommand implements CommandMarker {
"hivePass"}, mandatory = true, unspecifiedDefaultValue = "", help = "hive password to connect to")
final String hivePass) throws Exception {
HoodieTableMetaClient target = HoodieCLI.syncTableMetadata;
HoodieTimeline targetTimeline = target.getActiveTimeline().getCommitTimeline();
HoodieTimeline targetTimeline = target.getActiveTimeline().getCommitsAndCompactionsTimeline();
HoodieTableMetaClient source = HoodieCLI.tableMetadata;
HoodieTimeline sourceTimeline = source.getActiveTimeline().getCommitTimeline();
HoodieTimeline sourceTimeline = source.getActiveTimeline().getCommitsAndCompactionsTimeline();
long sourceCount = 0;
long targetCount = 0;
if ("complete".equals(mode)) {

View File

@@ -16,7 +16,6 @@
package com.uber.hoodie.cli.commands;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.avro.model.HoodieSavepointMetadata;
import com.uber.hoodie.cli.HoodieCLI;
import com.uber.hoodie.cli.HoodiePrintHelper;
import com.uber.hoodie.cli.utils.InputStreamConsumer;
@@ -38,7 +37,6 @@ import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;

View File

@@ -484,7 +484,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
List<String> latestFiles =
view.getLatestVersionInPartition(partitionPath, commitTime)
.map(HoodieDataFile::getFileName).collect(Collectors.toList());
return new Tuple2<String, List<String>>(partitionPath, latestFiles);
return new Tuple2<>(partitionPath, latestFiles);
}).collectAsMap();
HoodieSavepointMetadata metadata =
@@ -558,7 +558,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
// Make sure the rollback was successful
Optional<HoodieInstant> lastInstant =
activeTimeline.reload().getCommitTimeline().filterCompletedInstants().lastInstant();
activeTimeline.reload().getCommitsAndCompactionsTimeline().filterCompletedInstants().lastInstant();
Preconditions.checkArgument(lastInstant.isPresent());
Preconditions.checkArgument(lastInstant.get().getTimestamp().equals(savepointTime),
savepointTime + "is not the last commit after rolling back " + commitsToRollback

View File

@@ -35,7 +35,6 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -92,8 +91,6 @@ public class HoodieCommitArchiveLog {
log.info("Deleting commits " + commitsToArchive);
HoodieTableMetaClient metaClient =
new HoodieTableMetaClient(fs, config.getBasePath(), true);
HoodieTimeline commitTimeline =
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
boolean success = true;
for (HoodieInstant commitToArchive : commitsToArchive) {
@@ -126,7 +123,7 @@ public class HoodieCommitArchiveLog {
HoodieTableMetaClient metaClient =
new HoodieTableMetaClient(fs, config.getBasePath(), true);
HoodieTimeline commitTimeline =
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
metaClient.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants();
HoodieAppendLog.Writer writer = null;
try {

View File

@@ -197,9 +197,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
case MERGE_ON_READ:
// We need to include the parquet files written out in delta commits
// Include commit action to be able to start doing a MOR over a COW dataset - no migration required
return getActiveTimeline().getTimelineOfActions(
Sets.newHashSet(HoodieActiveTimeline.COMMIT_ACTION, HoodieActiveTimeline.COMPACTION_ACTION,
HoodieActiveTimeline.DELTA_COMMIT_ACTION));
return getActiveTimeline().getCommitsAndCompactionsTimeline();
default:
throw new HoodieException("Unsupported table type :"+ metaClient.getTableType());
}
@@ -222,7 +220,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
public HoodieTimeline getCompactionCommitTimeline() {
switch (metaClient.getTableType()) {
case COPY_ON_WRITE:
return getActiveTimeline().getCommitTimeline();
return getActiveTimeline().getCommitsAndCompactionsTimeline();
case MERGE_ON_READ:
// We need to include the parquet files written out in delta commits in tagging
return getActiveTimeline().getTimelineOfActions(

View File

@@ -166,7 +166,6 @@ public class TestMergeOnReadTable {
compactor.compact(jsc, getConfig(), table);
metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath());
allFiles = HoodieTestUtils.listAllDataFilesInPath(fs, cfg.getBasePath());
dataFilesToRead = fsView.getLatestVersions(allFiles);
assertTrue(dataFilesToRead.findAny().isPresent());

View File

@@ -80,13 +80,13 @@ public class TestHoodieCommitArchiveLog {
HoodieTestDataGenerator.createCommitFile(basePath, "103");
HoodieTimeline timeline =
metadata.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
metadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants();
assertEquals("Loaded 4 commits and the count should match", 4, timeline.countInstants());
boolean result = archiveLog.archiveIfRequired();
assertTrue(result);
timeline =
metadata.getActiveTimeline().reload().getCommitTimeline().filterCompletedInstants();
metadata.getActiveTimeline().reload().getCommitsAndCompactionsTimeline().filterCompletedInstants();
assertEquals("Should not archive commits when maxCommitsToKeep is 5", 4,
timeline.countInstants());
}
@@ -107,14 +107,14 @@ public class TestHoodieCommitArchiveLog {
HoodieTestDataGenerator.createCommitFile(basePath, "105");
HoodieTimeline timeline =
metadata.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
metadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants();
List<HoodieInstant> originalCommits = timeline.getInstants().collect(Collectors.toList());
assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants());
boolean result = archiveLog.archiveIfRequired();
assertTrue(result);
timeline =
metadata.getActiveTimeline().reload().getCommitTimeline().filterCompletedInstants();
metadata.getActiveTimeline().reload().getCommitsAndCompactionsTimeline().filterCompletedInstants();
assertEquals(
"Should archive commits when maxCommitsToKeep is 5 and now the commits length should be minCommitsToKeep which is 2",
2, timeline.countInstants());
@@ -159,12 +159,12 @@ public class TestHoodieCommitArchiveLog {
HoodieTestDataGenerator.createCommitFile(basePath, "105");
HoodieTimeline timeline =
metadata.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
metadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants();
assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants());
boolean result = archiveLog.archiveIfRequired();
assertTrue(result);
timeline =
metadata.getActiveTimeline().reload().getCommitTimeline().filterCompletedInstants();
metadata.getActiveTimeline().reload().getCommitsAndCompactionsTimeline().filterCompletedInstants();
assertTrue("Archived commits should always be safe",
timeline.containsOrBeforeTimelineStarts("100"));
assertTrue("Archived commits should always be safe",

View File

@@ -122,16 +122,25 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
}
/**
* Get only the commits (inflight and completed) in the active timeline
* Get all instants (commits, delta commits, compactions) that produce new data, in the active timeline
**
* @return
*/
public HoodieTimeline getCommitsAndCompactionsTimeline() {
return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, COMPACTION_ACTION, DELTA_COMMIT_ACTION));
}
/**
* Get only pure commits (inflight and completed) in the active timeline
*
* @return
*/
public HoodieTimeline getCommitTimeline() {
return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, COMPACTION_ACTION));
return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION));
}
/**
* Get only the commits (inflight and completed) in the active timeline
* Get only the delta commits (inflight and completed) in the active timeline
*
* @return
*/

View File

@@ -99,8 +99,8 @@ public class HoodieInputFormat extends MapredParquetInputFormat
}
String tableName = metadata.getTableConfig().getTableName();
String mode = HoodieHiveUtil.readMode(Job.getInstance(job), tableName);
// FIXME(VC): This is incorrect and needs to change to include commits, delta commits, compactions, as all of them produce a base parquet file today
HoodieTimeline timeline = metadata.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
// Get all commits, delta commits, compactions, as all of them produce a base parquet file today
HoodieTimeline timeline = metadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants();
TableFileSystemView fsView = new HoodieTableFileSystemView(metadata, timeline);
if (HoodieHiveUtil.INCREMENTAL_SCAN_MODE.equals(mode)) {

View File

@@ -21,12 +21,10 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.exception.DatasetNotFoundException;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.InvalidDatasetException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
@@ -36,7 +34,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Given a path is a part of

View File

@@ -271,7 +271,7 @@ public class HiveIncrementalPuller {
HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, targetDataPath);
Optional<HoodieInstant>
lastCommit = metadata.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
lastCommit = metadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants().lastInstant();
if(lastCommit.isPresent()) {
return lastCommit.get().getTimestamp();
}
@@ -306,12 +306,12 @@ public class HiveIncrementalPuller {
private String getLastCommitTimePulled(FileSystem fs, String sourceTableLocation) throws IOException {
HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, sourceTableLocation);
List<String> commitsToSync = metadata.getActiveTimeline().getCommitTimeline().filterCompletedInstants()
List<String> commitsToSync = metadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants()
.findInstantsAfter(config.fromCommitTime, config.maxCommits).getInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
if (commitsToSync.isEmpty()) {
log.warn("Nothing to sync. All commits in " + config.sourceTable + " are " + metadata
.getActiveTimeline().getCommitTimeline().filterCompletedInstants().getInstants()
.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants().getInstants()
.collect(Collectors.toList()) + " and from commit time is "
+ config.fromCommitTime);
return null;

View File

@@ -30,27 +30,23 @@ import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.table.HoodieTable;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
@@ -74,10 +70,10 @@ public class HoodieSnapshotCopier implements Serializable {
FileSystem fs = FSUtils.getFs();
final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs, baseDir);
final TableFileSystemView fsView = new HoodieTableFileSystemView(tableMetadata,
tableMetadata.getActiveTimeline().getCommitTimeline().filterCompletedInstants());
tableMetadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants());
// Get the latest commit
Optional<HoodieInstant> latestCommit = tableMetadata.getActiveTimeline()
.getCommitTimeline().filterCompletedInstants().lastInstant();
.getCommitsAndCompactionsTimeline().filterCompletedInstants().lastInstant();
if(!latestCommit.isPresent()) {
logger.warn("No commits present. Nothing to snapshot");
return;

View File

@@ -119,7 +119,7 @@ public class HoodieDeltaStreamer implements Serializable {
if (fs.exists(new Path(cfg.targetBasePath))) {
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs, cfg.targetBasePath);
this.commitTimelineOpt = Optional.of(meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants());
this.commitTimelineOpt = Optional.of(meta.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants());
} else {
this.commitTimelineOpt = Optional.empty();
}

View File

@@ -263,6 +263,7 @@
<exclude>**/test/resources/*.schema</exclude>
<exclude>**/test/resources/*.csv</exclude>
<exclude>**/main/avro/*.avsc</exclude>
<exclude>**/target/*</exclude>
</excludes>
</configuration>
<executions>