diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/ArchivedCommitsCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/ArchivedCommitsCommand.java index 069c6564a..e1b03b271 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/ArchivedCommitsCommand.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/ArchivedCommitsCommand.java @@ -94,12 +94,6 @@ public class ArchivedCommitsCommand implements CommandMarker { commitDetails.add(record.get("hoodieCommitMetadata").toString()); break; } - case HoodieTimeline.COMPACTION_ACTION: { - commitDetails.add(record.get("commitTime").toString()); - commitDetails.add(record.get("actionType").toString()); - commitDetails.add(record.get("hoodieCompactionMetadata").toString()); - break; - } case HoodieTimeline.DELTA_COMMIT_ACTION: { commitDetails.add(record.get("commitTime").toString()); commitDetails.add(record.get("actionType").toString()); diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CommitsCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CommitsCommand.java index c1a9e6dd9..d6446a2c4 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CommitsCommand.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CommitsCommand.java @@ -69,7 +69,7 @@ public class CommitsCommand implements CommandMarker { "limit"}, mandatory = false, help = "Limit commits", unspecifiedDefaultValue = "10") final Integer limit) throws IOException { HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline(); - HoodieTimeline timeline = activeTimeline.getCommitsAndCompactionsTimeline() + HoodieTimeline timeline = activeTimeline.getCommitsTimeline() .filterCompletedInstants(); List commits = timeline.getInstants().collect(Collectors.toList()); String[][] rows = new String[commits.size()][]; @@ -108,7 +108,7 @@ public class CommitsCommand implements CommandMarker { @CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path") final String sparkPropertiesPath) throws Exception { HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline(); - HoodieTimeline timeline = activeTimeline.getCommitsAndCompactionsTimeline() + HoodieTimeline timeline = activeTimeline.getCommitsTimeline() .filterCompletedInstants(); HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime); @@ -137,7 +137,7 @@ public class CommitsCommand implements CommandMarker { @CliOption(key = {"commit"}, help = "Commit to show") final String commitTime) throws Exception { HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline(); - HoodieTimeline timeline = activeTimeline.getCommitsAndCompactionsTimeline() + HoodieTimeline timeline = activeTimeline.getCommitsTimeline() .filterCompletedInstants(); HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime); @@ -187,7 +187,7 @@ public class CommitsCommand implements CommandMarker { @CliOption(key = {"commit"}, help = "Commit to show") final String commitTime) throws Exception { HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline(); - HoodieTimeline timeline = activeTimeline.getCommitsAndCompactionsTimeline() + HoodieTimeline timeline = activeTimeline.getCommitsTimeline() .filterCompletedInstants(); HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime); @@ -225,11 +225,11 @@ public class CommitsCommand implements CommandMarker { @CliOption(key = {"path"}, help = "Path of the dataset to compare to") final String path) throws Exception { HoodieTableMetaClient target = new HoodieTableMetaClient(HoodieCLI.fs, path); - HoodieTimeline targetTimeline = target.getActiveTimeline().getCommitsAndCompactionsTimeline() + HoodieTimeline targetTimeline = target.getActiveTimeline().getCommitsTimeline() .filterCompletedInstants(); ; HoodieTableMetaClient source = HoodieCLI.tableMetadata; - HoodieTimeline sourceTimeline = source.getActiveTimeline().getCommitsAndCompactionsTimeline() + HoodieTimeline sourceTimeline = source.getActiveTimeline().getCommitsTimeline() .filterCompletedInstants(); ; String targetLatestCommit = diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HoodieSyncCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HoodieSyncCommand.java index b19608bed..dd9560a4a 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HoodieSyncCommand.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HoodieSyncCommand.java @@ -61,9 +61,9 @@ public class HoodieSyncCommand implements CommandMarker { "hivePass"}, mandatory = true, unspecifiedDefaultValue = "", help = "hive password to connect to") final String hivePass) throws Exception { HoodieTableMetaClient target = HoodieCLI.syncTableMetadata; - HoodieTimeline targetTimeline = target.getActiveTimeline().getCommitsAndCompactionsTimeline(); + HoodieTimeline targetTimeline = target.getActiveTimeline().getCommitsTimeline(); HoodieTableMetaClient source = HoodieCLI.tableMetadata; - HoodieTimeline sourceTimeline = source.getActiveTimeline().getCommitsAndCompactionsTimeline(); + HoodieTimeline sourceTimeline = source.getActiveTimeline().getCommitsTimeline(); long sourceCount = 0; long targetCount = 0; if ("complete".equals(mode)) { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java index 851947286..6c329c330 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java @@ -71,7 +71,7 @@ public class HoodieReadClient implements Serializable { // Create a Hoodie table which encapsulated the commits and files visible this.hoodieTable = HoodieTable .getHoodieTable(new HoodieTableMetaClient(fs, basePath, true), null); - this.commitTimeline = hoodieTable.getCompletedCompactionCommitTimeline(); + this.commitTimeline = hoodieTable.getCommitTimeline().filterCompletedInstants(); this.index = new HoodieBloomIndex(HoodieWriteConfig.newBuilder().withPath(basePath).build(), jsc); this.sqlContextOpt = Optional.absent(); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index aa2e3f479..0de261c8f 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -25,7 +25,6 @@ import com.uber.hoodie.avro.model.HoodieSavepointMetadata; import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.HoodieRollbackStat; import com.uber.hoodie.common.model.HoodieCommitMetadata; -import com.uber.hoodie.common.model.HoodieCompactionMetadata; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; @@ -54,17 +53,6 @@ import com.uber.hoodie.table.HoodieTable; import com.uber.hoodie.table.UserDefinedBulkInsertPartitioner; import com.uber.hoodie.table.WorkloadProfile; import com.uber.hoodie.table.WorkloadStat; -import java.io.IOException; -import java.io.Serializable; -import java.nio.charset.StandardCharsets; -import java.text.ParseException; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -77,6 +65,18 @@ import org.apache.spark.storage.StorageLevel; import scala.Option; import scala.Tuple2; +import java.io.IOException; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.text.ParseException; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + /** * Hoodie Write Client helps you build datasets on HDFS [insert()] and then perform efficient * mutations on a HDFS dataset [upsert()] @@ -605,7 +605,7 @@ public class HoodieWriteClient implements Seriali HoodieTable table = HoodieTable .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); - HoodieTimeline commitTimeline = table.getCommitTimeline(); + HoodieTimeline commitTimeline = table.getCommitsTimeline(); HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime); @@ -624,7 +624,7 @@ public class HoodieWriteClient implements Seriali // Make sure the rollback was successful Optional lastInstant = - activeTimeline.reload().getCommitsAndCompactionsTimeline().filterCompletedInstants() + activeTimeline.reload().getCommitsTimeline().filterCompletedInstants() .lastInstant(); Preconditions.checkArgument(lastInstant.isPresent()); Preconditions.checkArgument(lastInstant.get().getTimestamp().equals(savepointTime), @@ -829,7 +829,7 @@ public class HoodieWriteClient implements Seriali // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = HoodieTable .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); - Optional compactionMetadata = table.compact(jsc, compactionCommitTime); + Optional compactionMetadata = table.compact(jsc, compactionCommitTime); if (compactionMetadata.isPresent()) { logger.info("Compacted successfully on commit " + compactionCommitTime); } else { @@ -878,7 +878,7 @@ public class HoodieWriteClient implements Seriali private void rollbackInflightCommits() { HoodieTable table = HoodieTable .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); - HoodieTimeline inflightTimeline = table.getCommitTimeline().filterInflights(); + HoodieTimeline inflightTimeline = table.getCommitsTimeline().filterInflights(); List commits = inflightTimeline.getInstants().map(HoodieInstant::getTimestamp) .collect(Collectors.toList()); Collections.reverse(commits); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java index 44dc910c1..5a0d69002 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java @@ -239,7 +239,7 @@ public class HoodieBloomIndex extends HoodieIndex .parallelize(partitions, Math.max(partitions.size(), 1)) .flatMapToPair(partitionPath -> { java.util.Optional latestCommitTime = - hoodieTable.getCommitTimeline().filterCompletedInstants().lastInstant(); + hoodieTable.getCommitsTimeline().filterCompletedInstants().lastInstant(); List> filteredFiles = new ArrayList<>(); if (latestCommitTime.isPresent()) { filteredFiles = diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java index a707590c4..3cb697c7a 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java @@ -26,7 +26,6 @@ import com.uber.hoodie.avro.model.HoodieSavepointMetadata; import com.uber.hoodie.common.model.ActionType; import com.uber.hoodie.common.model.HoodieArchivedLogFile; import com.uber.hoodie.common.model.HoodieCommitMetadata; -import com.uber.hoodie.common.model.HoodieCompactionMetadata; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.log.HoodieLogFormat; @@ -39,12 +38,6 @@ import com.uber.hoodie.exception.HoodieCommitException; import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.table.HoodieTable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; @@ -52,6 +45,13 @@ import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + /** * Archiver to bound the growth of .commit files */ @@ -228,14 +228,6 @@ public class HoodieCommitArchiveLog { archivedMetaWrapper.setActionType(ActionType.commit.name()); break; } - case HoodieTimeline.COMPACTION_ACTION: { - com.uber.hoodie.common.model.HoodieCompactionMetadata compactionMetadata = com.uber.hoodie.common.model.HoodieCompactionMetadata - .fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get()); - archivedMetaWrapper - .setHoodieCompactionMetadata(compactionMetadataConverter(compactionMetadata)); - archivedMetaWrapper.setActionType(ActionType.compaction.name()); - break; - } case HoodieTimeline.ROLLBACK_ACTION: { archivedMetaWrapper.setHoodieRollbackMetadata(AvroUtils .deserializeAvroMetadata(commitTimeline.getInstantDetails(hoodieInstant).get(), @@ -271,14 +263,4 @@ public class HoodieCommitArchiveLog { com.uber.hoodie.avro.model.HoodieCommitMetadata.class); return avroMetaData; } - - private com.uber.hoodie.avro.model.HoodieCompactionMetadata compactionMetadataConverter( - HoodieCompactionMetadata hoodieCompactionMetadata) { - ObjectMapper mapper = new ObjectMapper(); - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - com.uber.hoodie.avro.model.HoodieCompactionMetadata avroMetaData = mapper - .convertValue(hoodieCompactionMetadata, - com.uber.hoodie.avro.model.HoodieCompactionMetadata.class); - return avroMetaData; - } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java index d5bcd9ee6..fe8227f2d 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java @@ -16,17 +16,16 @@ package com.uber.hoodie.io.compact; -import com.uber.hoodie.common.model.HoodieCompactionMetadata; +import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.table.HoodieTable; +import org.apache.spark.api.java.JavaSparkContext; + import java.io.Serializable; import java.util.Date; -import java.util.Optional; - -import org.apache.spark.api.java.JavaSparkContext; /** * A HoodieCompactor runs compaction on a hoodie table @@ -36,8 +35,8 @@ public interface HoodieCompactor extends Serializable { /** * Compact the delta files with the data files */ - HoodieCompactionMetadata compact(JavaSparkContext jsc, final HoodieWriteConfig config, - HoodieTable hoodieTable, String compactionCommitTime) throws Exception; + HoodieCommitMetadata compact(JavaSparkContext jsc, final HoodieWriteConfig config, + HoodieTable hoodieTable, String compactionCommitTime) throws Exception; // Helper methods @@ -45,7 +44,7 @@ public interface HoodieCompactor extends Serializable { String commitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()); HoodieActiveTimeline activeTimeline = hoodieTable.getActiveTimeline(); activeTimeline - .createInflight(new HoodieInstant(true, HoodieTimeline.COMPACTION_ACTION, commitTime)); + .createInflight(new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime)); return commitTime; } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index e4da087c1..92107c0cc 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -16,15 +16,13 @@ package com.uber.hoodie.io.compact; -import static java.util.stream.Collectors.toList; - import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.uber.hoodie.WriteStatus; -import com.uber.hoodie.common.model.CompactionWriteStat; -import com.uber.hoodie.common.model.HoodieCompactionMetadata; +import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieTableType; +import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.log.HoodieCompactedLogRecordScanner; @@ -36,6 +34,13 @@ import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieCompactionException; import com.uber.hoodie.table.HoodieCopyOnWriteTable; import com.uber.hoodie.table.HoodieTable; +import org.apache.avro.Schema; +import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; + import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Collection; @@ -44,12 +49,8 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import org.apache.avro.Schema; -import org.apache.hadoop.fs.FileSystem; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FlatMapFunction; + +import static java.util.stream.Collectors.toList; /** * HoodieRealtimeTableCompactor compacts a hoodie table with merge on read storage. Computes all @@ -63,8 +64,8 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { private static Logger log = LogManager.getLogger(HoodieRealtimeTableCompactor.class); @Override - public HoodieCompactionMetadata compact(JavaSparkContext jsc, HoodieWriteConfig config, - HoodieTable hoodieTable, String compactionCommitTime) throws IOException { + public HoodieCommitMetadata compact(JavaSparkContext jsc, HoodieWriteConfig config, + HoodieTable hoodieTable, String compactionCommitTime) throws IOException { Preconditions.checkArgument( hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ, "HoodieRealtimeTableCompactor can only compact table of type " @@ -99,20 +100,20 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { } log.info("After filtering, Compacting " + operations + " files"); - List updateStatusMap = + List updateStatusMap = jsc.parallelize(operations, operations.size()) .map(s -> executeCompaction(metaClient, config, s, compactionCommitTime)) - .flatMap(new FlatMapFunction, CompactionWriteStat>() { + .flatMap(new FlatMapFunction, HoodieWriteStat>() { @Override - public Iterator call( - List compactionWriteStats) + public Iterator call( + List hoodieWriteStats) throws Exception { - return compactionWriteStats.iterator(); + return hoodieWriteStats.iterator(); } }).collect(); - HoodieCompactionMetadata metadata = new HoodieCompactionMetadata(); - for (CompactionWriteStat stat : updateStatusMap) { + HoodieCommitMetadata metadata = new HoodieCommitMetadata(true); + for (HoodieWriteStat stat : updateStatusMap) { metadata.addWriteStat(stat.getPartitionPath(), stat); } @@ -128,13 +129,13 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { return metadata; } - private boolean isCompactionSucceeded(HoodieCompactionMetadata result) { + private boolean isCompactionSucceeded(HoodieCommitMetadata result) { //TODO figure out a success factor for a compaction return true; } - private List executeCompaction(HoodieTableMetaClient metaClient, - HoodieWriteConfig config, CompactionOperation operation, String commitTime) + private List executeCompaction(HoodieTableMetaClient metaClient, + HoodieWriteConfig config, CompactionOperation operation, String commitTime) throws IOException { FileSystem fs = FSUtils.getFs(); Schema readerSchema = @@ -150,7 +151,6 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { String maxInstantTime = metaClient.getActiveTimeline() .getTimelineOfActions( Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, - HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) .filterCompletedInstants().lastInstant().get().getTimestamp(); @@ -170,22 +170,23 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { return StreamSupport.stream(resultIterable.spliterator(), false) .flatMap(Collection::stream) .map(WriteStatus::getStat) - .map(s -> CompactionWriteStat.newBuilder().withHoodieWriteStat(s) - .setTotalRecordsToUpdate(scanner.getTotalRecordsToUpdate()) - .setTotalLogFiles(scanner.getTotalLogFiles()) - .setTotalLogRecords(scanner.getTotalLogRecords()) - .onPartition(operation.getPartitionPath()).build()) + .map(s -> { + s.setTotalRecordsToBeUpdate(scanner.getTotalRecordsToUpdate()); + s.setTotalLogFiles(scanner.getTotalLogFiles()); + s.setTotalLogRecords(scanner.getTotalLogRecords()); + s.setPartitionPath(operation.getPartitionPath()); + return s;}) .collect(toList()); } public boolean commitCompaction(String commitTime, HoodieTableMetaClient metaClient, - HoodieCompactionMetadata metadata) { + HoodieCommitMetadata metadata) { log.info("Committing Compaction " + commitTime); HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); try { activeTimeline.saveAsComplete( - new HoodieInstant(true, HoodieTimeline.COMPACTION_ACTION, commitTime), + new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime), Optional.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); } catch (IOException e) { throw new HoodieCompactionException( diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index 543b4c26d..a509411d7 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -22,7 +22,6 @@ import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.HoodieRollbackStat; import com.uber.hoodie.common.model.HoodieCommitMetadata; -import com.uber.hoodie.common.model.HoodieCompactionMetadata; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; @@ -486,7 +485,7 @@ public class HoodieCopyOnWriteTable extends Hoodi } @Override - public Optional compact(JavaSparkContext jsc, String commitCompactionTime) { + public Optional compact(JavaSparkContext jsc, String commitCompactionTime) { logger.info("Nothing to compact in COW storage format"); return Optional.empty(); } @@ -544,7 +543,7 @@ public class HoodieCopyOnWriteTable extends Hoodi @Override public List rollback(JavaSparkContext jsc, List commits) throws IOException { - String actionType = this.getCompactedCommitActionType(); + String actionType = this.getCommitActionType(); HoodieActiveTimeline activeTimeline = this.getActiveTimeline(); List inflights = this.getInflightCommitTimeline().getInstants() .map(HoodieInstant::getTimestamp) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index 3be0479c7..19461b025 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -21,7 +21,6 @@ import com.google.common.collect.Sets; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.HoodieRollbackStat; import com.uber.hoodie.common.model.HoodieCommitMetadata; -import com.uber.hoodie.common.model.HoodieCompactionMetadata; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; @@ -93,9 +92,9 @@ public class HoodieMergeOnReadTable extends } @Override - public Optional compact(JavaSparkContext jsc, String compactionCommitTime) { + public Optional compact(JavaSparkContext jsc, String compactionCommitTime) { logger.info("Checking if compaction needs to be run on " + config.getBasePath()); - Optional lastCompaction = getActiveTimeline().getCompactionTimeline() + Optional lastCompaction = getActiveTimeline().getCommitTimeline() .filterCompletedInstants().lastInstant(); String deltaCommitsSinceTs = "0"; if (lastCompaction.isPresent()) { @@ -130,8 +129,7 @@ public class HoodieMergeOnReadTable extends } Map commitsAndCompactions = this.getActiveTimeline() - .getTimelineOfActions(Sets.newHashSet(HoodieActiveTimeline.COMMIT_ACTION, - HoodieActiveTimeline.COMPACTION_ACTION, HoodieActiveTimeline.DELTA_COMMIT_ACTION)) + .getTimelineOfActions(Sets.newHashSet(HoodieActiveTimeline.COMMIT_ACTION, HoodieActiveTimeline.DELTA_COMMIT_ACTION)) .getInstants() .filter(i -> commits.contains(i.getTimestamp())) .collect(Collectors.toMap(i -> i.getTimestamp(), i -> i)); @@ -149,11 +147,10 @@ public class HoodieMergeOnReadTable extends List stats = null; switch (instant.getAction()) { case HoodieTimeline.COMMIT_ACTION: - case HoodieTimeline.COMPACTION_ACTION: try { logger.info("Starting to rollback Commit/Compaction " + instant); HoodieCommitMetadata commitMetadata = HoodieCommitMetadata - .fromBytes(this.getCommitTimeline().getInstantDetails( + .fromBytes(this.getCommitsTimeline().getInstantDetails( new HoodieInstant(true, instant.getAction(), instant.getTimestamp())).get()); stats = jsc.parallelize(commitMetadata.getPartitionToWriteStats().keySet().stream() @@ -174,7 +171,7 @@ public class HoodieMergeOnReadTable extends logger.info("Starting to rollback delta commit " + instant); HoodieCommitMetadata commitMetadata = HoodieCommitMetadata - .fromBytes(this.getCommitTimeline().getInstantDetails( + .fromBytes(this.getCommitsTimeline().getInstantDetails( new HoodieInstant(true, instant.getAction(), instant.getTimestamp())).get()); stats = jsc.parallelize(commitMetadata.getPartitionToWriteStats().keySet().stream() diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java index b434c7509..76ab92d99 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java @@ -16,12 +16,11 @@ package com.uber.hoodie.table; -import com.google.common.collect.Sets; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.avro.model.HoodieSavepointMetadata; import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.HoodieRollbackStat; -import com.uber.hoodie.common.model.HoodieCompactionMetadata; +import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.table.HoodieTableMetaClient; @@ -35,6 +34,12 @@ import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieCommitException; import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieSavepointException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.Partitioner; +import org.apache.spark.api.java.JavaSparkContext; + import java.io.IOException; import java.io.Serializable; import java.util.Iterator; @@ -42,11 +47,6 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.spark.Partitioner; -import org.apache.spark.api.java.JavaSparkContext; /** * Abstract implementation of a HoodieTable @@ -116,21 +116,21 @@ public abstract class HoodieTable implements Seri * Get the completed (commit + compaction) view of the file system for this table */ public TableFileSystemView getCompletedFileSystemView() { - return new HoodieTableFileSystemView(metaClient, getCommitTimeline()); + return new HoodieTableFileSystemView(metaClient, getCommitsTimeline()); } /** * Get only the completed (no-inflights) commit timeline */ public HoodieTimeline getCompletedCommitTimeline() { - return getCommitTimeline().filterCompletedInstants(); + return getCommitsTimeline().filterCompletedInstants(); } /** * Get only the inflights (no-completed) commit timeline */ public HoodieTimeline getInflightCommitTimeline() { - return getCommitTimeline().filterInflights(); + return getCommitsTimeline().filterInflights(); } @@ -185,38 +185,28 @@ public abstract class HoodieTable implements Seri /** * Get the commit timeline visible for this table */ - public HoodieTimeline getCommitTimeline() { + public HoodieTimeline getCommitsTimeline() { switch (metaClient.getTableType()) { case COPY_ON_WRITE: return getActiveTimeline().getCommitTimeline(); case MERGE_ON_READ: // We need to include the parquet files written out in delta commits // Include commit action to be able to start doing a MOR over a COW dataset - no migration required - return getActiveTimeline().getCommitsAndCompactionsTimeline(); + return getActiveTimeline().getCommitsTimeline(); default: throw new HoodieException("Unsupported table type :" + metaClient.getTableType()); } } - /** - * Get only the completed (no-inflights) compaction commit timeline - */ - public HoodieTimeline getCompletedCompactionCommitTimeline() { - return getCompactionCommitTimeline().filterCompletedInstants(); - } - - /** * Get the compacted commit timeline visible for this table */ - public HoodieTimeline getCompactionCommitTimeline() { + public HoodieTimeline getCommitTimeline() { switch (metaClient.getTableType()) { case COPY_ON_WRITE: - return getActiveTimeline().getCommitsAndCompactionsTimeline(); case MERGE_ON_READ: // We need to include the parquet files written out in delta commits in tagging - return getActiveTimeline().getTimelineOfActions( - Sets.newHashSet(HoodieActiveTimeline.COMPACTION_ACTION)); + return getActiveTimeline().getCommitTimeline(); default: throw new HoodieException("Unsupported table type :" + metaClient.getTableType()); } @@ -236,20 +226,6 @@ public abstract class HoodieTable implements Seri "Could not commit on unknown storage type " + metaClient.getTableType()); } - /** - * Gets the action type for a compaction commit - */ - public String getCompactedCommitActionType() { - switch (metaClient.getTableType()) { - case COPY_ON_WRITE: - return HoodieTimeline.COMMIT_ACTION; - case MERGE_ON_READ: - return HoodieTimeline.COMPACTION_ACTION; - } - throw new HoodieException("Unsupported table type :" + metaClient.getTableType()); - } - - /** * Perform the ultimate IO for a given upserted (RDD) partition */ @@ -279,8 +255,8 @@ public abstract class HoodieTable implements Seri * Run Compaction on the table. Compaction arranges the data so that it is optimized for data * access */ - public abstract Optional compact(JavaSparkContext jsc, - String commitCompactionTime); + public abstract Optional compact(JavaSparkContext jsc, + String commitCompactionTime); /** * Clean partition paths according to cleaning policy and returns the number of files cleaned. diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java index ccdd12839..f6995762d 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java @@ -649,7 +649,7 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath); table = HoodieTable.getHoodieTable(metadata, getConfig()); - timeline = table.getCommitTimeline(); + timeline = table.getCommitsTimeline(); TableFileSystemView fsView = table.getFileSystemView(); // Need to ensure the following @@ -1493,10 +1493,10 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { HoodieTestUtils.doesCommitExist(basePath, commitTime)); // Get parquet file paths from commit metadata - String actionType = table.getCompactedCommitActionType(); + String actionType = table.getCommitActionType(); HoodieInstant commitInstant = new HoodieInstant(false, actionType, commitTime); - HoodieTimeline commitTimeline = table.getCompletedCompactionCommitTimeline(); + HoodieTimeline commitTimeline = table.getCommitTimeline().filterCompletedInstants(); HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commitInstant).get()); String basePath = table.getMetaClient().getBasePath(); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java index fb19bf7e8..5106e8eea 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java @@ -84,7 +84,7 @@ public class TestHoodieCommitArchiveLog { HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath); HoodieTimeline timeline = - metadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants(); + metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants()); @@ -155,13 +155,13 @@ public class TestHoodieCommitArchiveLog { HoodieTestDataGenerator.createCommitFile(basePath, "103"); HoodieTimeline timeline = - metadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants(); + metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); assertEquals("Loaded 4 commits and the count should match", 4, timeline.countInstants()); boolean result = archiveLog.archiveIfRequired(); assertTrue(result); timeline = - metadata.getActiveTimeline().reload().getCommitsAndCompactionsTimeline() + metadata.getActiveTimeline().reload().getCommitsTimeline() .filterCompletedInstants(); assertEquals("Should not archive commits when maxCommitsToKeep is 5", 4, timeline.countInstants()); @@ -183,12 +183,12 @@ public class TestHoodieCommitArchiveLog { HoodieTestDataGenerator.createCommitFile(basePath, "105"); HoodieTimeline timeline = - metadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants(); + metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants()); boolean result = archiveLog.archiveIfRequired(); assertTrue(result); timeline = - metadata.getActiveTimeline().reload().getCommitsAndCompactionsTimeline() + metadata.getActiveTimeline().reload().getCommitsTimeline() .filterCompletedInstants(); assertTrue("Archived commits should always be safe", timeline.containsOrBeforeTimelineStarts("100")); @@ -217,12 +217,12 @@ public class TestHoodieCommitArchiveLog { HoodieTestDataGenerator.createCommitFile(basePath, "105"); HoodieTimeline timeline = - metadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants(); + metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants()); boolean result = archiveLog.archiveIfRequired(); assertTrue(result); timeline = - metadata.getActiveTimeline().reload().getCommitsAndCompactionsTimeline() + metadata.getActiveTimeline().reload().getCommitsTimeline() .filterCompletedInstants(); assertEquals( "Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)", diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java index e8ea13055..c842bf592 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java @@ -21,7 +21,7 @@ import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.HoodieClientTestUtils; import com.uber.hoodie.common.HoodieTestDataGenerator; import com.uber.hoodie.common.model.FileSlice; -import com.uber.hoodie.common.model.HoodieCompactionMetadata; +import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieTestUtils; @@ -49,7 +49,6 @@ import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; import java.util.List; -import java.util.Optional; import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; @@ -124,7 +123,7 @@ public class TestHoodieCompactor { JavaRDD recordsRDD = jsc.parallelize(records, 1); writeClient.insert(recordsRDD, newCommitTime).collect(); - HoodieCompactionMetadata result = + HoodieCommitMetadata result = compactor.compact(jsc, getConfig(), table, HoodieActiveTimeline.createNewCommitTime()); String basePath = table.getMetaClient().getBasePath(); assertTrue("If there is nothing to compact, result will be empty", @@ -178,7 +177,7 @@ public class TestHoodieCompactor { metaClient = new HoodieTableMetaClient(fs, basePath); table = HoodieTable.getHoodieTable(metaClient, config); - HoodieCompactionMetadata result = + HoodieCommitMetadata result = compactor.compact(jsc, getConfig(), table, HoodieActiveTimeline.createNewCommitTime()); // Verify that recently written compacted data file has no log file @@ -199,7 +198,7 @@ public class TestHoodieCompactor { "After compaction there should be no log files visiable on a Realtime view", slice.getLogFiles().collect(Collectors.toList()).isEmpty()); } - assertTrue(result.getPartitionToCompactionWriteStats().containsKey(partitionPath)); + assertTrue(result.getPartitionToWriteStats().containsKey(partitionPath)); } } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java index 337d06610..42649159c 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java @@ -182,7 +182,7 @@ public class TestMergeOnReadTable { FileStatus[] allFiles = HoodieTestUtils .listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(metaClient, - hoodieTable.getCompletedCompactionCommitTimeline(), allFiles); + hoodieTable.getCommitTimeline().filterCompletedInstants(), allFiles); Stream dataFilesToRead = roView.getLatestDataFiles(); assertTrue(!dataFilesToRead.findAny().isPresent()); @@ -231,7 +231,7 @@ public class TestMergeOnReadTable { // verify that there is a commit table = HoodieTable .getHoodieTable(new HoodieTableMetaClient(fs, cfg.getBasePath(), true), getConfig(false)); - HoodieTimeline timeline = table.getCompletedCompactionCommitTimeline(); + HoodieTimeline timeline = table.getCommitTimeline().filterCompletedInstants(); assertEquals("Expecting a single commit.", 1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants()); String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp(); @@ -299,7 +299,7 @@ public class TestMergeOnReadTable { FileStatus[] allFiles = HoodieTestUtils .listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(metaClient, - hoodieTable.getCompletedCompactionCommitTimeline(), allFiles); + hoodieTable.getCommitTimeline().filterCompletedInstants(), allFiles); Stream dataFilesToRead = roView.getLatestDataFiles(); assertTrue(!dataFilesToRead.findAny().isPresent()); @@ -455,7 +455,7 @@ public class TestMergeOnReadTable { FileStatus[] allFiles = HoodieTestUtils .listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(metaClient, - hoodieTable.getCompletedCompactionCommitTimeline(), allFiles); + hoodieTable.getCommitTimeline().filterCompletedInstants(), allFiles); Stream dataFilesToRead = roView.getLatestDataFiles(); assertTrue(!dataFilesToRead.findAny().isPresent()); @@ -524,11 +524,11 @@ public class TestMergeOnReadTable { allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath()); hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg); - roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompactionCommitTimeline(), + roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCommitsTimeline(), allFiles); final String compactedCommitTime = metaClient.getActiveTimeline().reload() - .getCommitsAndCompactionsTimeline().lastInstant().get().getTimestamp(); + .getCommitsTimeline().lastInstant().get().getTimestamp(); assertTrue(roView.getLatestDataFiles().filter(file -> { if (compactedCommitTime.equals(file.getCommitTime())) { @@ -543,7 +543,7 @@ public class TestMergeOnReadTable { allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath()); hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg); - roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompactionCommitTimeline(), + roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCommitsTimeline(), allFiles); assertFalse(roView.getLatestDataFiles().filter(file -> { diff --git a/hoodie-common/src/main/avro/HoodieCommitMetadata.avsc b/hoodie-common/src/main/avro/HoodieCommitMetadata.avsc index b8eced141..ae3df4ffc 100644 --- a/hoodie-common/src/main/avro/HoodieCommitMetadata.avsc +++ b/hoodie-common/src/main/avro/HoodieCommitMetadata.avsc @@ -44,6 +44,22 @@ { "name":"totalWriteErrors", "type":["null","long"] + }, + { + "name":"partitionPath", + "type":["null","string"] + }, + { + "name":"totalLogRecords", + "type":["null","long"] + }, + { + "name":"totalLogFiles", + "type":["null","long"] + }, + { + "name":"totalRecordsToBeUpdate", + "type":["null","long"] } ] } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/CompactionWriteStat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/CompactionWriteStat.java deleted file mode 100644 index 40f7fc363..000000000 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/CompactionWriteStat.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.uber.hoodie.common.model; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import java.io.Serializable; - -@JsonIgnoreProperties(ignoreUnknown = true) -public class CompactionWriteStat implements Serializable { - - private HoodieWriteStat writeStat; - private String partitionPath; - private long totalLogRecords; - private long totalLogFiles; - private long totalRecordsToBeUpdate; - - public CompactionWriteStat(HoodieWriteStat writeStat, String partitionPath, long totalLogFiles, - long totalLogRecords, - long totalRecordsToUpdate) { - this.writeStat = writeStat; - this.partitionPath = partitionPath; - this.totalLogFiles = totalLogFiles; - this.totalLogRecords = totalLogRecords; - this.totalRecordsToBeUpdate = totalRecordsToUpdate; - } - - public CompactionWriteStat() { - // For de-serialization - } - - public long getTotalLogRecords() { - return totalLogRecords; - } - - public long getTotalLogFiles() { - return totalLogFiles; - } - - public long getTotalRecordsToBeUpdate() { - return totalRecordsToBeUpdate; - } - - public HoodieWriteStat getHoodieWriteStat() { - return writeStat; - } - - public String getPartitionPath() { - return partitionPath; - } - - public static Builder newBuilder() { - return new Builder(); - } - - public static class Builder { - - private HoodieWriteStat writeStat; - private long totalLogRecords; - private long totalRecordsToUpdate; - private long totalLogFiles; - private String partitionPath; - - - public Builder withHoodieWriteStat(HoodieWriteStat writeStat) { - this.writeStat = writeStat; - return this; - } - - public Builder setTotalLogRecords(long records) { - this.totalLogRecords = records; - return this; - } - - public Builder setTotalLogFiles(long totalLogFiles) { - this.totalLogFiles = totalLogFiles; - return this; - } - - public Builder setTotalRecordsToUpdate(long records) { - this.totalRecordsToUpdate = records; - return this; - } - - public Builder onPartition(String path) { - this.partitionPath = path; - return this; - } - - public CompactionWriteStat build() { - return new CompactionWriteStat(writeStat, partitionPath, totalLogFiles, totalLogRecords, - totalRecordsToUpdate); - } - } -} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java index 47253637b..8a49c5c39 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java @@ -40,12 +40,19 @@ public class HoodieCommitMetadata implements Serializable { private static volatile Logger log = LogManager.getLogger(HoodieCommitMetadata.class); protected Map> partitionToWriteStats; + protected Boolean compacted; private Map extraMetadataMap; + // for ser/deser public HoodieCommitMetadata() { + this(false); + } + + public HoodieCommitMetadata(boolean compacted) { extraMetadataMap = new HashMap<>(); partitionToWriteStats = new HashMap<>(); + this.compacted = compacted; } public void addWriteStat(String partitionPath, HoodieWriteStat stat) { @@ -75,6 +82,14 @@ public class HoodieCommitMetadata implements Serializable { return extraMetadataMap.get(metaKey); } + public Boolean getCompacted() { + return compacted; + } + + public void setCompacted(Boolean compacted) { + this.compacted = compacted; + } + public HashMap getFileIdAndRelativePaths() { HashMap filePaths = new HashMap<>(); // list all partitions paths @@ -200,24 +215,21 @@ public class HoodieCommitMetadata implements Serializable { @Override public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; HoodieCommitMetadata that = (HoodieCommitMetadata) o; - return partitionToWriteStats != null ? - partitionToWriteStats.equals(that.partitionToWriteStats) : - that.partitionToWriteStats == null; + if (!partitionToWriteStats.equals(that.partitionToWriteStats)) return false; + return compacted.equals(that.compacted); } @Override public int hashCode() { - return partitionToWriteStats != null ? partitionToWriteStats.hashCode() : 0; + int result = partitionToWriteStats.hashCode(); + result = 31 * result + compacted.hashCode(); + return result; } public static HoodieCommitMetadata fromBytes(byte[] bytes) throws IOException { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCompactionMetadata.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCompactionMetadata.java deleted file mode 100644 index 043098f36..000000000 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCompactionMetadata.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.uber.hoodie.common.model; - -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.codehaus.jackson.annotate.JsonAutoDetect; -import org.codehaus.jackson.annotate.JsonMethod; -import org.codehaus.jackson.map.DeserializationConfig.Feature; -import org.codehaus.jackson.map.ObjectMapper; - -/** - * Place holder for the compaction specific meta-data, uses all the details used in a normal - * HoodieCommitMetadata - */ -public class HoodieCompactionMetadata extends HoodieCommitMetadata { - - private static volatile Logger log = LogManager.getLogger(HoodieCompactionMetadata.class); - protected HashMap> partitionToCompactionWriteStats; - - public HoodieCompactionMetadata() { - partitionToCompactionWriteStats = new HashMap<>(); - } - - public void addWriteStat(String partitionPath, CompactionWriteStat stat) { - addWriteStat(partitionPath, stat.getHoodieWriteStat()); - if (!partitionToCompactionWriteStats.containsKey(partitionPath)) { - partitionToCompactionWriteStats.put(partitionPath, new ArrayList<>()); - } - partitionToCompactionWriteStats.get(partitionPath).add(stat); - } - - public List getCompactionWriteStats(String partitionPath) { - return partitionToCompactionWriteStats.get(partitionPath); - } - - public Map> getPartitionToCompactionWriteStats() { - return partitionToCompactionWriteStats; - } - - public String toJsonString() throws IOException { - if (partitionToCompactionWriteStats.containsKey(null)) { - log.info("partition path is null for " + partitionToCompactionWriteStats.get(null)); - partitionToCompactionWriteStats.remove(null); - } - ObjectMapper mapper = new ObjectMapper(); - mapper.setVisibility(JsonMethod.FIELD, JsonAutoDetect.Visibility.ANY); - return mapper.defaultPrettyPrintingWriter().writeValueAsString(this); - } - - public static HoodieCompactionMetadata fromJsonString(String jsonStr) throws IOException { - if (jsonStr == null || jsonStr.isEmpty()) { - // For empty commit file (no data or somethings bad happen). - return new HoodieCompactionMetadata(); - } - ObjectMapper mapper = new ObjectMapper(); - mapper.configure(Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); - mapper.setVisibility(JsonMethod.FIELD, JsonAutoDetect.Visibility.ANY); - return mapper.readValue(jsonStr, HoodieCompactionMetadata.class); - } - - public static HoodieCompactionMetadata fromBytes(byte[] bytes) throws IOException { - return fromJsonString(new String(bytes, Charset.forName("utf-8"))); - } - -} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java index b69aed36c..f1a58f740 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java @@ -17,6 +17,8 @@ package com.uber.hoodie.common.model; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import javax.annotation.Nullable; import java.io.Serializable; /** @@ -68,6 +70,34 @@ public class HoodieWriteStat implements Serializable { */ private long totalWriteErrors; + /** + * Following properties are associated only with the result of a Compaction Operation + */ + + /** + * Partition Path associated with this writeStat + */ + @Nullable + private String partitionPath; + + /** + * Total number of log records that were compacted by a compaction operation + */ + @Nullable + private Long totalLogRecords; + + /** + * Total number of log files that were compacted by a compaction operation + */ + @Nullable + private Long totalLogFiles; + + /** + * Total number of records updated by a compaction operation + */ + @Nullable + private Long totalRecordsToBeUpdate; + public HoodieWriteStat() { // called by jackson json lib } @@ -136,6 +166,37 @@ public class HoodieWriteStat implements Serializable { return path; } + public String getPartitionPath() { + return partitionPath; + } + + public void setPartitionPath(String partitionPath) { + this.partitionPath = partitionPath; + } + + public Long getTotalLogRecords() { + return totalLogRecords; + } + + public void setTotalLogRecords(Long totalLogRecords) { + this.totalLogRecords = totalLogRecords; + } + + public Long getTotalLogFiles() { + return totalLogFiles; + } + + public void setTotalLogFiles(Long totalLogFiles) { + this.totalLogFiles = totalLogFiles; + } + + public Long getTotalRecordsToBeUpdate() { + return totalRecordsToBeUpdate; + } + + public void setTotalRecordsToBeUpdate(Long totalRecordsToBeUpdate) { + this.totalRecordsToBeUpdate = totalRecordsToBeUpdate; + } @Override public String toString() { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java index e2001a2c7..867fa5a10 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java @@ -41,7 +41,6 @@ public interface HoodieTimeline extends Serializable { String CLEAN_ACTION = "clean"; String ROLLBACK_ACTION = "rollback"; String SAVEPOINT_ACTION = "savepoint"; - String COMPACTION_ACTION = "compaction"; String INFLIGHT_EXTENSION = ".inflight"; String COMMIT_EXTENSION = "." + COMMIT_ACTION; @@ -49,14 +48,12 @@ public interface HoodieTimeline extends Serializable { String CLEAN_EXTENSION = "." + CLEAN_ACTION; String ROLLBACK_EXTENSION = "." + ROLLBACK_ACTION; String SAVEPOINT_EXTENSION = "." + SAVEPOINT_ACTION; - String COMPACTION_EXTENSION = "." + COMPACTION_ACTION; //this is to preserve backwards compatibility on commit in-flight filenames String INFLIGHT_COMMIT_EXTENSION = INFLIGHT_EXTENSION; String INFLIGHT_DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION + INFLIGHT_EXTENSION; String INFLIGHT_CLEAN_EXTENSION = "." + CLEAN_ACTION + INFLIGHT_EXTENSION; String INFLIGHT_ROLLBACK_EXTENSION = "." + ROLLBACK_ACTION + INFLIGHT_EXTENSION; String INFLIGHT_SAVEPOINT_EXTENSION = "." + SAVEPOINT_ACTION + INFLIGHT_EXTENSION; - String INFLIGHT_COMPACTION_EXTENSION = "." + COMPACTION_ACTION + INFLIGHT_EXTENSION; /** * Filter this timeline to just include the in-flights @@ -197,14 +194,6 @@ public interface HoodieTimeline extends Serializable { return commitTime + HoodieTimeline.SAVEPOINT_EXTENSION; } - static String makeInflightCompactionFileName(String commitTime) { - return commitTime + HoodieTimeline.INFLIGHT_COMPACTION_EXTENSION; - } - - static String makeCompactionFileName(String commitTime) { - return commitTime + HoodieTimeline.COMPACTION_EXTENSION; - } - static String makeInflightDeltaFileName(String commitTime) { return commitTime + HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION; } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java index 6848d4a21..ae004991c 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java @@ -94,8 +94,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { public HoodieActiveTimeline(FileSystem fs, String metaPath) { this(fs, metaPath, new String[]{COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION, - INFLIGHT_DELTA_COMMIT_EXTENSION, COMPACTION_EXTENSION, - INFLIGHT_COMPACTION_EXTENSION, SAVEPOINT_EXTENSION, INFLIGHT_SAVEPOINT_EXTENSION, + INFLIGHT_DELTA_COMMIT_EXTENSION, SAVEPOINT_EXTENSION, INFLIGHT_SAVEPOINT_EXTENSION, CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION}); } @@ -119,21 +118,21 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { } /** - * Get all instants (commits, delta commits, compactions) that produce new data, in the active + * Get all instants (commits, delta commits) that produce new data, in the active * timeline * */ - public HoodieTimeline getCommitsAndCompactionsTimeline() { + public HoodieTimeline getCommitsTimeline() { return getTimelineOfActions( - Sets.newHashSet(COMMIT_ACTION, COMPACTION_ACTION, DELTA_COMMIT_ACTION)); + Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION)); } /** - * Get all instants (commits, delta commits, compactions, clean, savepoint, rollback) that result + * Get all instants (commits, delta commits, clean, savepoint, rollback) that result * in actions, in the active timeline * */ public HoodieTimeline getAllCommitsTimeline() { return getTimelineOfActions( - Sets.newHashSet(COMMIT_ACTION, COMPACTION_ACTION, DELTA_COMMIT_ACTION, CLEAN_ACTION, + Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, CLEAN_ACTION, SAVEPOINT_ACTION, ROLLBACK_ACTION)); } @@ -152,14 +151,6 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { (Function> & Serializable) this::getInstantDetails); } - /** - * Get only the commits (inflight and completed) in the compaction timeline - */ - public HoodieTimeline getCompactionTimeline() { - return new HoodieDefaultTimeline(filterInstantsByAction(COMPACTION_ACTION), - (Function> & Serializable) this::getInstantDetails); - } - /** * Get a timeline of a specific set of actions. useful to create a merged timeline of multiple * actions diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java index bf27b7db2..1891c9807 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java @@ -94,10 +94,6 @@ public class HoodieInstant implements Serializable { return isInflight ? HoodieTimeline.makeInflightSavePointFileName(timestamp) : HoodieTimeline.makeSavePointFileName(timestamp); - } else if (HoodieTimeline.COMPACTION_ACTION.equals(action)) { - return isInflight ? - HoodieTimeline.makeInflightCompactionFileName(timestamp) : - HoodieTimeline.makeCompactionFileName(timestamp); } else if (HoodieTimeline.DELTA_COMMIT_ACTION.equals(action)) { return isInflight ? HoodieTimeline.makeInflightDeltaFileName(timestamp) : diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java index 1ccca51b2..9489e57ab 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java @@ -158,7 +158,7 @@ public class HoodieTestUtils { for (String commitTime : commitTimes) { boolean createFile = fs.createNewFile(new Path( basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline - .makeCompactionFileName(commitTime))); + .makeCommitFileName(commitTime))); if (!createFile) { throw new IOException("cannot create commit file for commit " + commitTime); } diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java index 03e86d3bd..ee4f5a954 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java @@ -100,7 +100,7 @@ public class HoodieInputFormat extends MapredParquetInputFormat String tableName = metadata.getTableConfig().getTableName(); String mode = HoodieHiveUtil.readMode(Job.getInstance(job), tableName); // Get all commits, delta commits, compactions, as all of them produce a base parquet file today - HoodieTimeline timeline = metadata.getActiveTimeline().getCommitsAndCompactionsTimeline() + HoodieTimeline timeline = metadata.getActiveTimeline().getCommitsTimeline() .filterCompletedInstants(); TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(metadata, timeline, statuses); diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java index c92f0e593..6add30565 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java @@ -125,7 +125,6 @@ public class HoodieRealtimeInputFormat extends HoodieInputFormat implements Conf String maxCommitTime = metaClient.getActiveTimeline() .getTimelineOfActions( Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, - HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) .filterCompletedInstants().lastInstant().get().getTimestamp(); rtSplits.add( diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java index 00d8ac5c9..dede5e5f5 100644 --- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java @@ -21,7 +21,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.uber.hoodie.common.model.HoodieCommitMetadata; -import com.uber.hoodie.common.model.HoodieCompactionMetadata; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.table.HoodieTableMetaClient; @@ -35,17 +34,6 @@ import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.InvalidDatasetException; import com.uber.hoodie.hive.util.SchemaUtil; -import java.io.IOException; -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; import org.apache.commons.dbcp.BasicDataSource; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -63,6 +51,18 @@ import parquet.hadoop.ParquetFileReader; import parquet.hadoop.metadata.ParquetMetadata; import parquet.schema.MessageType; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + @SuppressWarnings("ConstantConditions") public class HoodieHiveClient { @@ -111,7 +111,7 @@ public class HoodieHiveClient { e); } - activeTimeline = metaClient.getActiveTimeline().getCommitsAndCompactionsTimeline() + activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline() .filterCompletedInstants(); } @@ -323,7 +323,7 @@ public class HoodieHiveClient { // If this is MOR, depending on whether the latest commit is a delta commit or compaction commit // Get a datafile written and get the schema from that file Optional lastCompactionCommit = metaClient.getActiveTimeline() - .getCompactionTimeline().filterCompletedInstants().lastInstant(); + .getCommitTimeline().filterCompletedInstants().lastInstant(); LOG.info("Found the last compaction commit as " + lastCompactionCommit); Optional lastDeltaCommit; @@ -379,7 +379,7 @@ public class HoodieHiveClient { + syncConfig.basePath)); // Read from the compacted file wrote - HoodieCompactionMetadata compactionMetadata = HoodieCompactionMetadata + HoodieCommitMetadata compactionMetadata = HoodieCommitMetadata .fromBytes(activeTimeline.getInstantDetails(lastCompactionCommit).get()); String filePath = compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values() .stream().findAny() diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java index 2707377af..f2eb5e4f1 100644 --- a/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java +++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java @@ -16,9 +16,6 @@ package com.uber.hoodie.hive; -import static com.uber.hoodie.common.model.HoodieTestUtils.DEFAULT_TASK_PARTITIONID; -import static org.junit.Assert.fail; - import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -26,10 +23,8 @@ import com.uber.hoodie.avro.HoodieAvroWriteSupport; import com.uber.hoodie.common.BloomFilter; import com.uber.hoodie.common.minicluster.HdfsTestService; import com.uber.hoodie.common.minicluster.ZookeeperTestService; -import com.uber.hoodie.common.model.CompactionWriteStat; import com.uber.hoodie.common.model.HoodieAvroPayload; import com.uber.hoodie.common.model.HoodieCommitMetadata; -import com.uber.hoodie.common.model.HoodieCompactionMetadata; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieDeltaWriteStat; import com.uber.hoodie.common.model.HoodieLogFile; @@ -44,15 +39,6 @@ import com.uber.hoodie.common.table.log.block.HoodieLogBlock; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.SchemaTestUtil; import com.uber.hoodie.hive.util.HiveTestService; -import java.io.File; -import java.io.IOException; -import java.net.URISyntaxException; -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.UUID; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.commons.io.FileUtils; @@ -72,6 +58,19 @@ import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; import org.junit.runners.model.InitializationError; +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; + +import static com.uber.hoodie.common.model.HoodieTestUtils.DEFAULT_TASK_PARTITIONID; +import static org.junit.Assert.fail; + @SuppressWarnings("SameParameterValue") public class TestUtil { @@ -182,9 +181,9 @@ public class TestUtil { createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_REALTIME_TABLE); - HoodieCompactionMetadata compactionMetadata = new HoodieCompactionMetadata(); + HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata(); commitMetadata.getPartitionToWriteStats() - .forEach((key, value) -> value.stream().map(k -> new CompactionWriteStat(k, key, 0, 0, 0)) + .forEach((key, value) -> value.stream() .forEach(l -> compactionMetadata.addWriteStat(key, l))); createCompactionCommitFile(compactionMetadata, commitTime); // Write a delta commit @@ -211,9 +210,9 @@ public class TestUtil { createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_REALTIME_TABLE); - HoodieCompactionMetadata compactionMetadata = new HoodieCompactionMetadata(); + HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata(); commitMetadata.getPartitionToWriteStats() - .forEach((key, value) -> value.stream().map(k -> new CompactionWriteStat(k, key, 0, 0, 0)) + .forEach((key, value) -> value.stream() .forEach(l -> compactionMetadata.addWriteStat(key, l))); createCompactionCommitFile(compactionMetadata, commitTime); HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), @@ -342,12 +341,12 @@ public class TestUtil { } private static void createCompactionCommitFile( - HoodieCompactionMetadata commitMetadata, String commitTime) + HoodieCommitMetadata commitMetadata, String commitTime) throws IOException { byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8); Path fullPath = new Path( hiveSyncConfig.basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline - .makeCompactionFileName(commitTime)); + .makeCommitFileName(commitTime)); FSDataOutputStream fsout = fileSystem.create(fullPath, true); fsout.write(bytes); fsout.close(); diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/HoodieDataSourceHelpers.java b/hoodie-spark/src/main/java/com/uber/hoodie/HoodieDataSourceHelpers.java index fb8db4a94..b74d91707 100644 --- a/hoodie-spark/src/main/java/com/uber/hoodie/HoodieDataSourceHelpers.java +++ b/hoodie-spark/src/main/java/com/uber/hoodie/HoodieDataSourceHelpers.java @@ -70,11 +70,10 @@ public class HoodieDataSourceHelpers { .getHoodieTable(new HoodieTableMetaClient(fs, basePath, true), null); if (table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ)) { return table.getActiveTimeline().getTimelineOfActions( - Sets.newHashSet(HoodieActiveTimeline.COMPACTION_ACTION, - HoodieActiveTimeline.DELTA_COMMIT_ACTION) - ); + Sets.newHashSet(HoodieActiveTimeline.COMMIT_ACTION, + HoodieActiveTimeline.DELTA_COMMIT_ACTION)); } else { - return table.getCompletedCompactionCommitTimeline(); + return table.getCommitTimeline().filterCompletedInstants(); } } } diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/IncrementalRelation.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/IncrementalRelation.scala index 6adab8d84..4aca81f17 100644 --- a/hoodie-spark/src/main/scala/com/uber/hoodie/IncrementalRelation.scala +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/IncrementalRelation.scala @@ -53,7 +53,7 @@ class IncrementalRelation(val sqlContext: SQLContext, throw new HoodieException("Incremental view not implemented yet, for merge-on-read datasets") } val hoodieTable = HoodieTable.getHoodieTable(metaClient, null) - val commitTimeline = hoodieTable.getCompletedCompactionCommitTimeline(); + val commitTimeline = hoodieTable.getCommitTimeline.filterCompletedInstants(); if (commitTimeline.empty()) { throw new HoodieException("No instants to incrementally pull") } diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HiveIncrementalPuller.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HiveIncrementalPuller.java index 61aec29fb..5bcba8cb8 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HiveIncrementalPuller.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HiveIncrementalPuller.java @@ -294,7 +294,7 @@ public class HiveIncrementalPuller { HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, targetDataPath); Optional - lastCommit = metadata.getActiveTimeline().getCommitsAndCompactionsTimeline() + lastCommit = metadata.getActiveTimeline().getCommitsTimeline() .filterCompletedInstants().lastInstant(); if (lastCommit.isPresent()) { return lastCommit.get().getTimestamp(); @@ -332,14 +332,14 @@ public class HiveIncrementalPuller { private String getLastCommitTimePulled(FileSystem fs, String sourceTableLocation) throws IOException { HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, sourceTableLocation); - List commitsToSync = metadata.getActiveTimeline().getCommitsAndCompactionsTimeline() + List commitsToSync = metadata.getActiveTimeline().getCommitsTimeline() .filterCompletedInstants() .findInstantsAfter(config.fromCommitTime, config.maxCommits).getInstants() .map(HoodieInstant::getTimestamp) .collect(Collectors.toList()); if (commitsToSync.isEmpty()) { log.warn("Nothing to sync. All commits in " + config.sourceTable + " are " + metadata - .getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants() + .getActiveTimeline().getCommitsTimeline().filterCompletedInstants() .getInstants() .collect(Collectors.toList()) + " and from commit time is " + config.fromCommitTime); diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java index 0a74e2036..3d199e0f5 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java @@ -74,11 +74,11 @@ public class HoodieSnapshotCopier implements Serializable { final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs, baseDir); final TableFileSystemView.ReadOptimizedView fsView = new HoodieTableFileSystemView( tableMetadata, - tableMetadata.getActiveTimeline().getCommitsAndCompactionsTimeline() + tableMetadata.getActiveTimeline().getCommitsTimeline() .filterCompletedInstants()); // Get the latest commit Optional latestCommit = tableMetadata.getActiveTimeline() - .getCommitsAndCompactionsTimeline().filterCompletedInstants().lastInstant(); + .getCommitsTimeline().filterCompletedInstants().lastInstant(); if (!latestCommit.isPresent()) { logger.warn("No commits present. Nothing to snapshot"); return; diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java index ad8ccd2ed..2baafd036 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -120,7 +120,7 @@ public class HoodieDeltaStreamer implements Serializable { if (fs.exists(new Path(cfg.targetBasePath))) { HoodieTableMetaClient meta = new HoodieTableMetaClient(fs, cfg.targetBasePath); this.commitTimelineOpt = Optional - .of(meta.getActiveTimeline().getCommitsAndCompactionsTimeline() + .of(meta.getActiveTimeline().getCommitsTimeline() .filterCompletedInstants()); } else { this.commitTimelineOpt = Optional.empty();