1
0

Removing compaction action type and associated compaction timeline operations, replace with commit action type

This commit is contained in:
Nishith Agarwal
2017-12-05 00:58:53 -08:00
committed by vinoth chandar
parent a1c0d0dbad
commit 44839b88c6
34 changed files with 265 additions and 450 deletions

View File

@@ -94,12 +94,6 @@ public class ArchivedCommitsCommand implements CommandMarker {
commitDetails.add(record.get("hoodieCommitMetadata").toString()); commitDetails.add(record.get("hoodieCommitMetadata").toString());
break; break;
} }
case HoodieTimeline.COMPACTION_ACTION: {
commitDetails.add(record.get("commitTime").toString());
commitDetails.add(record.get("actionType").toString());
commitDetails.add(record.get("hoodieCompactionMetadata").toString());
break;
}
case HoodieTimeline.DELTA_COMMIT_ACTION: { case HoodieTimeline.DELTA_COMMIT_ACTION: {
commitDetails.add(record.get("commitTime").toString()); commitDetails.add(record.get("commitTime").toString());
commitDetails.add(record.get("actionType").toString()); commitDetails.add(record.get("actionType").toString());

View File

@@ -69,7 +69,7 @@ public class CommitsCommand implements CommandMarker {
"limit"}, mandatory = false, help = "Limit commits", unspecifiedDefaultValue = "10") "limit"}, mandatory = false, help = "Limit commits", unspecifiedDefaultValue = "10")
final Integer limit) throws IOException { final Integer limit) throws IOException {
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline(); HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitsAndCompactionsTimeline() HoodieTimeline timeline = activeTimeline.getCommitsTimeline()
.filterCompletedInstants(); .filterCompletedInstants();
List<HoodieInstant> commits = timeline.getInstants().collect(Collectors.toList()); List<HoodieInstant> commits = timeline.getInstants().collect(Collectors.toList());
String[][] rows = new String[commits.size()][]; String[][] rows = new String[commits.size()][];
@@ -108,7 +108,7 @@ public class CommitsCommand implements CommandMarker {
@CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path") @CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path")
final String sparkPropertiesPath) throws Exception { final String sparkPropertiesPath) throws Exception {
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline(); HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitsAndCompactionsTimeline() HoodieTimeline timeline = activeTimeline.getCommitsTimeline()
.filterCompletedInstants(); .filterCompletedInstants();
HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION,
commitTime); commitTime);
@@ -137,7 +137,7 @@ public class CommitsCommand implements CommandMarker {
@CliOption(key = {"commit"}, help = "Commit to show") @CliOption(key = {"commit"}, help = "Commit to show")
final String commitTime) throws Exception { final String commitTime) throws Exception {
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline(); HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitsAndCompactionsTimeline() HoodieTimeline timeline = activeTimeline.getCommitsTimeline()
.filterCompletedInstants(); .filterCompletedInstants();
HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION,
commitTime); commitTime);
@@ -187,7 +187,7 @@ public class CommitsCommand implements CommandMarker {
@CliOption(key = {"commit"}, help = "Commit to show") @CliOption(key = {"commit"}, help = "Commit to show")
final String commitTime) throws Exception { final String commitTime) throws Exception {
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline(); HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitsAndCompactionsTimeline() HoodieTimeline timeline = activeTimeline.getCommitsTimeline()
.filterCompletedInstants(); .filterCompletedInstants();
HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION,
commitTime); commitTime);
@@ -225,11 +225,11 @@ public class CommitsCommand implements CommandMarker {
@CliOption(key = {"path"}, help = "Path of the dataset to compare to") @CliOption(key = {"path"}, help = "Path of the dataset to compare to")
final String path) throws Exception { final String path) throws Exception {
HoodieTableMetaClient target = new HoodieTableMetaClient(HoodieCLI.fs, path); HoodieTableMetaClient target = new HoodieTableMetaClient(HoodieCLI.fs, path);
HoodieTimeline targetTimeline = target.getActiveTimeline().getCommitsAndCompactionsTimeline() HoodieTimeline targetTimeline = target.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants(); .filterCompletedInstants();
; ;
HoodieTableMetaClient source = HoodieCLI.tableMetadata; HoodieTableMetaClient source = HoodieCLI.tableMetadata;
HoodieTimeline sourceTimeline = source.getActiveTimeline().getCommitsAndCompactionsTimeline() HoodieTimeline sourceTimeline = source.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants(); .filterCompletedInstants();
; ;
String targetLatestCommit = String targetLatestCommit =

View File

@@ -61,9 +61,9 @@ public class HoodieSyncCommand implements CommandMarker {
"hivePass"}, mandatory = true, unspecifiedDefaultValue = "", help = "hive password to connect to") "hivePass"}, mandatory = true, unspecifiedDefaultValue = "", help = "hive password to connect to")
final String hivePass) throws Exception { final String hivePass) throws Exception {
HoodieTableMetaClient target = HoodieCLI.syncTableMetadata; HoodieTableMetaClient target = HoodieCLI.syncTableMetadata;
HoodieTimeline targetTimeline = target.getActiveTimeline().getCommitsAndCompactionsTimeline(); HoodieTimeline targetTimeline = target.getActiveTimeline().getCommitsTimeline();
HoodieTableMetaClient source = HoodieCLI.tableMetadata; HoodieTableMetaClient source = HoodieCLI.tableMetadata;
HoodieTimeline sourceTimeline = source.getActiveTimeline().getCommitsAndCompactionsTimeline(); HoodieTimeline sourceTimeline = source.getActiveTimeline().getCommitsTimeline();
long sourceCount = 0; long sourceCount = 0;
long targetCount = 0; long targetCount = 0;
if ("complete".equals(mode)) { if ("complete".equals(mode)) {

View File

@@ -71,7 +71,7 @@ public class HoodieReadClient implements Serializable {
// Create a Hoodie table which encapsulated the commits and files visible // Create a Hoodie table which encapsulated the commits and files visible
this.hoodieTable = HoodieTable this.hoodieTable = HoodieTable
.getHoodieTable(new HoodieTableMetaClient(fs, basePath, true), null); .getHoodieTable(new HoodieTableMetaClient(fs, basePath, true), null);
this.commitTimeline = hoodieTable.getCompletedCompactionCommitTimeline(); this.commitTimeline = hoodieTable.getCommitTimeline().filterCompletedInstants();
this.index = this.index =
new HoodieBloomIndex(HoodieWriteConfig.newBuilder().withPath(basePath).build(), jsc); new HoodieBloomIndex(HoodieWriteConfig.newBuilder().withPath(basePath).build(), jsc);
this.sqlContextOpt = Optional.absent(); this.sqlContextOpt = Optional.absent();

View File

@@ -25,7 +25,6 @@ import com.uber.hoodie.avro.model.HoodieSavepointMetadata;
import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.HoodieCleanStat;
import com.uber.hoodie.common.HoodieRollbackStat; import com.uber.hoodie.common.HoodieRollbackStat;
import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieCompactionMetadata;
import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecord;
@@ -54,17 +53,6 @@ import com.uber.hoodie.table.HoodieTable;
import com.uber.hoodie.table.UserDefinedBulkInsertPartitioner; import com.uber.hoodie.table.UserDefinedBulkInsertPartitioner;
import com.uber.hoodie.table.WorkloadProfile; import com.uber.hoodie.table.WorkloadProfile;
import com.uber.hoodie.table.WorkloadStat; import com.uber.hoodie.table.WorkloadStat;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@@ -77,6 +65,18 @@ import org.apache.spark.storage.StorageLevel;
import scala.Option; import scala.Option;
import scala.Tuple2; import scala.Tuple2;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
/** /**
* Hoodie Write Client helps you build datasets on HDFS [insert()] and then perform efficient * Hoodie Write Client helps you build datasets on HDFS [insert()] and then perform efficient
* mutations on a HDFS dataset [upsert()] * mutations on a HDFS dataset [upsert()]
@@ -605,7 +605,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
HoodieTable<T> table = HoodieTable HoodieTable<T> table = HoodieTable
.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieTimeline commitTimeline = table.getCommitTimeline(); HoodieTimeline commitTimeline = table.getCommitsTimeline();
HoodieInstant savePoint = HoodieInstant savePoint =
new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime); new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
@@ -624,7 +624,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
// Make sure the rollback was successful // Make sure the rollback was successful
Optional<HoodieInstant> lastInstant = Optional<HoodieInstant> lastInstant =
activeTimeline.reload().getCommitsAndCompactionsTimeline().filterCompletedInstants() activeTimeline.reload().getCommitsTimeline().filterCompletedInstants()
.lastInstant(); .lastInstant();
Preconditions.checkArgument(lastInstant.isPresent()); Preconditions.checkArgument(lastInstant.isPresent());
Preconditions.checkArgument(lastInstant.get().getTimestamp().equals(savepointTime), Preconditions.checkArgument(lastInstant.get().getTimestamp().equals(savepointTime),
@@ -829,7 +829,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
// Create a Hoodie table which encapsulated the commits and files visible // Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable HoodieTable<T> table = HoodieTable
.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config);
Optional<HoodieCompactionMetadata> compactionMetadata = table.compact(jsc, compactionCommitTime); Optional<HoodieCommitMetadata> compactionMetadata = table.compact(jsc, compactionCommitTime);
if (compactionMetadata.isPresent()) { if (compactionMetadata.isPresent()) {
logger.info("Compacted successfully on commit " + compactionCommitTime); logger.info("Compacted successfully on commit " + compactionCommitTime);
} else { } else {
@@ -878,7 +878,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
private void rollbackInflightCommits() { private void rollbackInflightCommits() {
HoodieTable<T> table = HoodieTable HoodieTable<T> table = HoodieTable
.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config);
HoodieTimeline inflightTimeline = table.getCommitTimeline().filterInflights(); HoodieTimeline inflightTimeline = table.getCommitsTimeline().filterInflights();
List<String> commits = inflightTimeline.getInstants().map(HoodieInstant::getTimestamp) List<String> commits = inflightTimeline.getInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList()); .collect(Collectors.toList());
Collections.reverse(commits); Collections.reverse(commits);

View File

@@ -239,7 +239,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
.parallelize(partitions, Math.max(partitions.size(), 1)) .parallelize(partitions, Math.max(partitions.size(), 1))
.flatMapToPair(partitionPath -> { .flatMapToPair(partitionPath -> {
java.util.Optional<HoodieInstant> latestCommitTime = java.util.Optional<HoodieInstant> latestCommitTime =
hoodieTable.getCommitTimeline().filterCompletedInstants().lastInstant(); hoodieTable.getCommitsTimeline().filterCompletedInstants().lastInstant();
List<Tuple2<String, HoodieDataFile>> filteredFiles = new ArrayList<>(); List<Tuple2<String, HoodieDataFile>> filteredFiles = new ArrayList<>();
if (latestCommitTime.isPresent()) { if (latestCommitTime.isPresent()) {
filteredFiles = filteredFiles =

View File

@@ -26,7 +26,6 @@ import com.uber.hoodie.avro.model.HoodieSavepointMetadata;
import com.uber.hoodie.common.model.ActionType; import com.uber.hoodie.common.model.ActionType;
import com.uber.hoodie.common.model.HoodieArchivedLogFile; import com.uber.hoodie.common.model.HoodieArchivedLogFile;
import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieCompactionMetadata;
import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.log.HoodieLogFormat; import com.uber.hoodie.common.table.log.HoodieLogFormat;
@@ -39,12 +38,6 @@ import com.uber.hoodie.exception.HoodieCommitException;
import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.table.HoodieTable; import com.uber.hoodie.table.HoodieTable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord; import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@@ -52,6 +45,13 @@ import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/** /**
* Archiver to bound the growth of <action>.commit files * Archiver to bound the growth of <action>.commit files
*/ */
@@ -228,14 +228,6 @@ public class HoodieCommitArchiveLog {
archivedMetaWrapper.setActionType(ActionType.commit.name()); archivedMetaWrapper.setActionType(ActionType.commit.name());
break; break;
} }
case HoodieTimeline.COMPACTION_ACTION: {
com.uber.hoodie.common.model.HoodieCompactionMetadata compactionMetadata = com.uber.hoodie.common.model.HoodieCompactionMetadata
.fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get());
archivedMetaWrapper
.setHoodieCompactionMetadata(compactionMetadataConverter(compactionMetadata));
archivedMetaWrapper.setActionType(ActionType.compaction.name());
break;
}
case HoodieTimeline.ROLLBACK_ACTION: { case HoodieTimeline.ROLLBACK_ACTION: {
archivedMetaWrapper.setHoodieRollbackMetadata(AvroUtils archivedMetaWrapper.setHoodieRollbackMetadata(AvroUtils
.deserializeAvroMetadata(commitTimeline.getInstantDetails(hoodieInstant).get(), .deserializeAvroMetadata(commitTimeline.getInstantDetails(hoodieInstant).get(),
@@ -271,14 +263,4 @@ public class HoodieCommitArchiveLog {
com.uber.hoodie.avro.model.HoodieCommitMetadata.class); com.uber.hoodie.avro.model.HoodieCommitMetadata.class);
return avroMetaData; return avroMetaData;
} }
private com.uber.hoodie.avro.model.HoodieCompactionMetadata compactionMetadataConverter(
HoodieCompactionMetadata hoodieCompactionMetadata) {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
com.uber.hoodie.avro.model.HoodieCompactionMetadata avroMetaData = mapper
.convertValue(hoodieCompactionMetadata,
com.uber.hoodie.avro.model.HoodieCompactionMetadata.class);
return avroMetaData;
}
} }

View File

@@ -16,17 +16,16 @@
package com.uber.hoodie.io.compact; package com.uber.hoodie.io.compact;
import com.uber.hoodie.common.model.HoodieCompactionMetadata; import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.table.HoodieTable; import com.uber.hoodie.table.HoodieTable;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.Serializable; import java.io.Serializable;
import java.util.Date; import java.util.Date;
import java.util.Optional;
import org.apache.spark.api.java.JavaSparkContext;
/** /**
* A HoodieCompactor runs compaction on a hoodie table * A HoodieCompactor runs compaction on a hoodie table
@@ -36,8 +35,8 @@ public interface HoodieCompactor extends Serializable {
/** /**
* Compact the delta files with the data files * Compact the delta files with the data files
*/ */
HoodieCompactionMetadata compact(JavaSparkContext jsc, final HoodieWriteConfig config, HoodieCommitMetadata compact(JavaSparkContext jsc, final HoodieWriteConfig config,
HoodieTable hoodieTable, String compactionCommitTime) throws Exception; HoodieTable hoodieTable, String compactionCommitTime) throws Exception;
// Helper methods // Helper methods
@@ -45,7 +44,7 @@ public interface HoodieCompactor extends Serializable {
String commitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()); String commitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date());
HoodieActiveTimeline activeTimeline = hoodieTable.getActiveTimeline(); HoodieActiveTimeline activeTimeline = hoodieTable.getActiveTimeline();
activeTimeline activeTimeline
.createInflight(new HoodieInstant(true, HoodieTimeline.COMPACTION_ACTION, commitTime)); .createInflight(new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime));
return commitTime; return commitTime;
} }
} }

View File

@@ -16,15 +16,13 @@
package com.uber.hoodie.io.compact; package com.uber.hoodie.io.compact;
import static java.util.stream.Collectors.toList;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.uber.hoodie.WriteStatus; import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.CompactionWriteStat; import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieCompactionMetadata;
import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.log.HoodieCompactedLogRecordScanner; import com.uber.hoodie.common.table.log.HoodieCompactedLogRecordScanner;
@@ -36,6 +34,13 @@ import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieCompactionException; import com.uber.hoodie.exception.HoodieCompactionException;
import com.uber.hoodie.table.HoodieCopyOnWriteTable; import com.uber.hoodie.table.HoodieCopyOnWriteTable;
import com.uber.hoodie.table.HoodieTable; import com.uber.hoodie.table.HoodieTable;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Collection; import java.util.Collection;
@@ -44,12 +49,8 @@ import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.StreamSupport; import java.util.stream.StreamSupport;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem; import static java.util.stream.Collectors.toList;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
/** /**
* HoodieRealtimeTableCompactor compacts a hoodie table with merge on read storage. Computes all * HoodieRealtimeTableCompactor compacts a hoodie table with merge on read storage. Computes all
@@ -63,8 +64,8 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
private static Logger log = LogManager.getLogger(HoodieRealtimeTableCompactor.class); private static Logger log = LogManager.getLogger(HoodieRealtimeTableCompactor.class);
@Override @Override
public HoodieCompactionMetadata compact(JavaSparkContext jsc, HoodieWriteConfig config, public HoodieCommitMetadata compact(JavaSparkContext jsc, HoodieWriteConfig config,
HoodieTable hoodieTable, String compactionCommitTime) throws IOException { HoodieTable hoodieTable, String compactionCommitTime) throws IOException {
Preconditions.checkArgument( Preconditions.checkArgument(
hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ, hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ,
"HoodieRealtimeTableCompactor can only compact table of type " "HoodieRealtimeTableCompactor can only compact table of type "
@@ -99,20 +100,20 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
} }
log.info("After filtering, Compacting " + operations + " files"); log.info("After filtering, Compacting " + operations + " files");
List<CompactionWriteStat> updateStatusMap = List<HoodieWriteStat> updateStatusMap =
jsc.parallelize(operations, operations.size()) jsc.parallelize(operations, operations.size())
.map(s -> executeCompaction(metaClient, config, s, compactionCommitTime)) .map(s -> executeCompaction(metaClient, config, s, compactionCommitTime))
.flatMap(new FlatMapFunction<List<CompactionWriteStat>, CompactionWriteStat>() { .flatMap(new FlatMapFunction<List<HoodieWriteStat>, HoodieWriteStat>() {
@Override @Override
public Iterator<CompactionWriteStat> call( public Iterator<HoodieWriteStat> call(
List<CompactionWriteStat> compactionWriteStats) List<HoodieWriteStat> hoodieWriteStats)
throws Exception { throws Exception {
return compactionWriteStats.iterator(); return hoodieWriteStats.iterator();
} }
}).collect(); }).collect();
HoodieCompactionMetadata metadata = new HoodieCompactionMetadata(); HoodieCommitMetadata metadata = new HoodieCommitMetadata(true);
for (CompactionWriteStat stat : updateStatusMap) { for (HoodieWriteStat stat : updateStatusMap) {
metadata.addWriteStat(stat.getPartitionPath(), stat); metadata.addWriteStat(stat.getPartitionPath(), stat);
} }
@@ -128,13 +129,13 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
return metadata; return metadata;
} }
private boolean isCompactionSucceeded(HoodieCompactionMetadata result) { private boolean isCompactionSucceeded(HoodieCommitMetadata result) {
//TODO figure out a success factor for a compaction //TODO figure out a success factor for a compaction
return true; return true;
} }
private List<CompactionWriteStat> executeCompaction(HoodieTableMetaClient metaClient, private List<HoodieWriteStat> executeCompaction(HoodieTableMetaClient metaClient,
HoodieWriteConfig config, CompactionOperation operation, String commitTime) HoodieWriteConfig config, CompactionOperation operation, String commitTime)
throws IOException { throws IOException {
FileSystem fs = FSUtils.getFs(); FileSystem fs = FSUtils.getFs();
Schema readerSchema = Schema readerSchema =
@@ -150,7 +151,6 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
String maxInstantTime = metaClient.getActiveTimeline() String maxInstantTime = metaClient.getActiveTimeline()
.getTimelineOfActions( .getTimelineOfActions(
Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, Sets.newHashSet(HoodieTimeline.COMMIT_ACTION,
HoodieTimeline.COMPACTION_ACTION,
HoodieTimeline.DELTA_COMMIT_ACTION)) HoodieTimeline.DELTA_COMMIT_ACTION))
.filterCompletedInstants().lastInstant().get().getTimestamp(); .filterCompletedInstants().lastInstant().get().getTimestamp();
@@ -170,22 +170,23 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
return StreamSupport.stream(resultIterable.spliterator(), false) return StreamSupport.stream(resultIterable.spliterator(), false)
.flatMap(Collection::stream) .flatMap(Collection::stream)
.map(WriteStatus::getStat) .map(WriteStatus::getStat)
.map(s -> CompactionWriteStat.newBuilder().withHoodieWriteStat(s) .map(s -> {
.setTotalRecordsToUpdate(scanner.getTotalRecordsToUpdate()) s.setTotalRecordsToBeUpdate(scanner.getTotalRecordsToUpdate());
.setTotalLogFiles(scanner.getTotalLogFiles()) s.setTotalLogFiles(scanner.getTotalLogFiles());
.setTotalLogRecords(scanner.getTotalLogRecords()) s.setTotalLogRecords(scanner.getTotalLogRecords());
.onPartition(operation.getPartitionPath()).build()) s.setPartitionPath(operation.getPartitionPath());
return s;})
.collect(toList()); .collect(toList());
} }
public boolean commitCompaction(String commitTime, HoodieTableMetaClient metaClient, public boolean commitCompaction(String commitTime, HoodieTableMetaClient metaClient,
HoodieCompactionMetadata metadata) { HoodieCommitMetadata metadata) {
log.info("Committing Compaction " + commitTime); log.info("Committing Compaction " + commitTime);
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
try { try {
activeTimeline.saveAsComplete( activeTimeline.saveAsComplete(
new HoodieInstant(true, HoodieTimeline.COMPACTION_ACTION, commitTime), new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime),
Optional.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); Optional.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
} catch (IOException e) { } catch (IOException e) {
throw new HoodieCompactionException( throw new HoodieCompactionException(

View File

@@ -22,7 +22,6 @@ import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.HoodieCleanStat;
import com.uber.hoodie.common.HoodieRollbackStat; import com.uber.hoodie.common.HoodieRollbackStat;
import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieCompactionMetadata;
import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecord;
@@ -486,7 +485,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
} }
@Override @Override
public Optional<HoodieCompactionMetadata> compact(JavaSparkContext jsc, String commitCompactionTime) { public Optional<HoodieCommitMetadata> compact(JavaSparkContext jsc, String commitCompactionTime) {
logger.info("Nothing to compact in COW storage format"); logger.info("Nothing to compact in COW storage format");
return Optional.empty(); return Optional.empty();
} }
@@ -544,7 +543,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
@Override @Override
public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, List<String> commits) public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, List<String> commits)
throws IOException { throws IOException {
String actionType = this.getCompactedCommitActionType(); String actionType = this.getCommitActionType();
HoodieActiveTimeline activeTimeline = this.getActiveTimeline(); HoodieActiveTimeline activeTimeline = this.getActiveTimeline();
List<String> inflights = this.getInflightCommitTimeline().getInstants() List<String> inflights = this.getInflightCommitTimeline().getInstants()
.map(HoodieInstant::getTimestamp) .map(HoodieInstant::getTimestamp)

View File

@@ -21,7 +21,6 @@ import com.google.common.collect.Sets;
import com.uber.hoodie.WriteStatus; import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.HoodieRollbackStat; import com.uber.hoodie.common.HoodieRollbackStat;
import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieCompactionMetadata;
import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieRecordPayload;
@@ -93,9 +92,9 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
} }
@Override @Override
public Optional<HoodieCompactionMetadata> compact(JavaSparkContext jsc, String compactionCommitTime) { public Optional<HoodieCommitMetadata> compact(JavaSparkContext jsc, String compactionCommitTime) {
logger.info("Checking if compaction needs to be run on " + config.getBasePath()); logger.info("Checking if compaction needs to be run on " + config.getBasePath());
Optional<HoodieInstant> lastCompaction = getActiveTimeline().getCompactionTimeline() Optional<HoodieInstant> lastCompaction = getActiveTimeline().getCommitTimeline()
.filterCompletedInstants().lastInstant(); .filterCompletedInstants().lastInstant();
String deltaCommitsSinceTs = "0"; String deltaCommitsSinceTs = "0";
if (lastCompaction.isPresent()) { if (lastCompaction.isPresent()) {
@@ -130,8 +129,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
} }
Map<String, HoodieInstant> commitsAndCompactions = Map<String, HoodieInstant> commitsAndCompactions =
this.getActiveTimeline() this.getActiveTimeline()
.getTimelineOfActions(Sets.newHashSet(HoodieActiveTimeline.COMMIT_ACTION, .getTimelineOfActions(Sets.newHashSet(HoodieActiveTimeline.COMMIT_ACTION, HoodieActiveTimeline.DELTA_COMMIT_ACTION))
HoodieActiveTimeline.COMPACTION_ACTION, HoodieActiveTimeline.DELTA_COMMIT_ACTION))
.getInstants() .getInstants()
.filter(i -> commits.contains(i.getTimestamp())) .filter(i -> commits.contains(i.getTimestamp()))
.collect(Collectors.toMap(i -> i.getTimestamp(), i -> i)); .collect(Collectors.toMap(i -> i.getTimestamp(), i -> i));
@@ -149,11 +147,10 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
List<HoodieRollbackStat> stats = null; List<HoodieRollbackStat> stats = null;
switch (instant.getAction()) { switch (instant.getAction()) {
case HoodieTimeline.COMMIT_ACTION: case HoodieTimeline.COMMIT_ACTION:
case HoodieTimeline.COMPACTION_ACTION:
try { try {
logger.info("Starting to rollback Commit/Compaction " + instant); logger.info("Starting to rollback Commit/Compaction " + instant);
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(this.getCommitTimeline().getInstantDetails( .fromBytes(this.getCommitsTimeline().getInstantDetails(
new HoodieInstant(true, instant.getAction(), instant.getTimestamp())).get()); new HoodieInstant(true, instant.getAction(), instant.getTimestamp())).get());
stats = jsc.parallelize(commitMetadata.getPartitionToWriteStats().keySet().stream() stats = jsc.parallelize(commitMetadata.getPartitionToWriteStats().keySet().stream()
@@ -174,7 +171,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
logger.info("Starting to rollback delta commit " + instant); logger.info("Starting to rollback delta commit " + instant);
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(this.getCommitTimeline().getInstantDetails( .fromBytes(this.getCommitsTimeline().getInstantDetails(
new HoodieInstant(true, instant.getAction(), instant.getTimestamp())).get()); new HoodieInstant(true, instant.getAction(), instant.getTimestamp())).get());
stats = jsc.parallelize(commitMetadata.getPartitionToWriteStats().keySet().stream() stats = jsc.parallelize(commitMetadata.getPartitionToWriteStats().keySet().stream()

View File

@@ -16,12 +16,11 @@
package com.uber.hoodie.table; package com.uber.hoodie.table;
import com.google.common.collect.Sets;
import com.uber.hoodie.WriteStatus; import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.avro.model.HoodieSavepointMetadata; import com.uber.hoodie.avro.model.HoodieSavepointMetadata;
import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.HoodieCleanStat;
import com.uber.hoodie.common.HoodieRollbackStat; import com.uber.hoodie.common.HoodieRollbackStat;
import com.uber.hoodie.common.model.HoodieCompactionMetadata; import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTableMetaClient;
@@ -35,6 +34,12 @@ import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieCommitException; import com.uber.hoodie.exception.HoodieCommitException;
import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieSavepointException; import com.uber.hoodie.exception.HoodieSavepointException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.Iterator; import java.util.Iterator;
@@ -42,11 +47,6 @@ import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaSparkContext;
/** /**
* Abstract implementation of a HoodieTable * Abstract implementation of a HoodieTable
@@ -116,21 +116,21 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
* Get the completed (commit + compaction) view of the file system for this table * Get the completed (commit + compaction) view of the file system for this table
*/ */
public TableFileSystemView getCompletedFileSystemView() { public TableFileSystemView getCompletedFileSystemView() {
return new HoodieTableFileSystemView(metaClient, getCommitTimeline()); return new HoodieTableFileSystemView(metaClient, getCommitsTimeline());
} }
/** /**
* Get only the completed (no-inflights) commit timeline * Get only the completed (no-inflights) commit timeline
*/ */
public HoodieTimeline getCompletedCommitTimeline() { public HoodieTimeline getCompletedCommitTimeline() {
return getCommitTimeline().filterCompletedInstants(); return getCommitsTimeline().filterCompletedInstants();
} }
/** /**
* Get only the inflights (no-completed) commit timeline * Get only the inflights (no-completed) commit timeline
*/ */
public HoodieTimeline getInflightCommitTimeline() { public HoodieTimeline getInflightCommitTimeline() {
return getCommitTimeline().filterInflights(); return getCommitsTimeline().filterInflights();
} }
@@ -185,38 +185,28 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
/** /**
* Get the commit timeline visible for this table * Get the commit timeline visible for this table
*/ */
public HoodieTimeline getCommitTimeline() { public HoodieTimeline getCommitsTimeline() {
switch (metaClient.getTableType()) { switch (metaClient.getTableType()) {
case COPY_ON_WRITE: case COPY_ON_WRITE:
return getActiveTimeline().getCommitTimeline(); return getActiveTimeline().getCommitTimeline();
case MERGE_ON_READ: case MERGE_ON_READ:
// We need to include the parquet files written out in delta commits // We need to include the parquet files written out in delta commits
// Include commit action to be able to start doing a MOR over a COW dataset - no migration required // Include commit action to be able to start doing a MOR over a COW dataset - no migration required
return getActiveTimeline().getCommitsAndCompactionsTimeline(); return getActiveTimeline().getCommitsTimeline();
default: default:
throw new HoodieException("Unsupported table type :" + metaClient.getTableType()); throw new HoodieException("Unsupported table type :" + metaClient.getTableType());
} }
} }
/**
* Get only the completed (no-inflights) compaction commit timeline
*/
public HoodieTimeline getCompletedCompactionCommitTimeline() {
return getCompactionCommitTimeline().filterCompletedInstants();
}
/** /**
* Get the compacted commit timeline visible for this table * Get the compacted commit timeline visible for this table
*/ */
public HoodieTimeline getCompactionCommitTimeline() { public HoodieTimeline getCommitTimeline() {
switch (metaClient.getTableType()) { switch (metaClient.getTableType()) {
case COPY_ON_WRITE: case COPY_ON_WRITE:
return getActiveTimeline().getCommitsAndCompactionsTimeline();
case MERGE_ON_READ: case MERGE_ON_READ:
// We need to include the parquet files written out in delta commits in tagging // We need to include the parquet files written out in delta commits in tagging
return getActiveTimeline().getTimelineOfActions( return getActiveTimeline().getCommitTimeline();
Sets.newHashSet(HoodieActiveTimeline.COMPACTION_ACTION));
default: default:
throw new HoodieException("Unsupported table type :" + metaClient.getTableType()); throw new HoodieException("Unsupported table type :" + metaClient.getTableType());
} }
@@ -236,20 +226,6 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
"Could not commit on unknown storage type " + metaClient.getTableType()); "Could not commit on unknown storage type " + metaClient.getTableType());
} }
/**
* Gets the action type for a compaction commit
*/
public String getCompactedCommitActionType() {
switch (metaClient.getTableType()) {
case COPY_ON_WRITE:
return HoodieTimeline.COMMIT_ACTION;
case MERGE_ON_READ:
return HoodieTimeline.COMPACTION_ACTION;
}
throw new HoodieException("Unsupported table type :" + metaClient.getTableType());
}
/** /**
* Perform the ultimate IO for a given upserted (RDD) partition * Perform the ultimate IO for a given upserted (RDD) partition
*/ */
@@ -279,8 +255,8 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
* Run Compaction on the table. Compaction arranges the data so that it is optimized for data * Run Compaction on the table. Compaction arranges the data so that it is optimized for data
* access * access
*/ */
public abstract Optional<HoodieCompactionMetadata> compact(JavaSparkContext jsc, public abstract Optional<HoodieCommitMetadata> compact(JavaSparkContext jsc,
String commitCompactionTime); String commitCompactionTime);
/** /**
* Clean partition paths according to cleaning policy and returns the number of files cleaned. * Clean partition paths according to cleaning policy and returns the number of files cleaned.

View File

@@ -649,7 +649,7 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable {
HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath); HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath);
table = HoodieTable.getHoodieTable(metadata, getConfig()); table = HoodieTable.getHoodieTable(metadata, getConfig());
timeline = table.getCommitTimeline(); timeline = table.getCommitsTimeline();
TableFileSystemView fsView = table.getFileSystemView(); TableFileSystemView fsView = table.getFileSystemView();
// Need to ensure the following // Need to ensure the following
@@ -1493,10 +1493,10 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable {
HoodieTestUtils.doesCommitExist(basePath, commitTime)); HoodieTestUtils.doesCommitExist(basePath, commitTime));
// Get parquet file paths from commit metadata // Get parquet file paths from commit metadata
String actionType = table.getCompactedCommitActionType(); String actionType = table.getCommitActionType();
HoodieInstant commitInstant = HoodieInstant commitInstant =
new HoodieInstant(false, actionType, commitTime); new HoodieInstant(false, actionType, commitTime);
HoodieTimeline commitTimeline = table.getCompletedCompactionCommitTimeline(); HoodieTimeline commitTimeline = table.getCommitTimeline().filterCompletedInstants();
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commitInstant).get()); HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commitInstant).get());
String basePath = table.getMetaClient().getBasePath(); String basePath = table.getMetaClient().getBasePath();

View File

@@ -84,7 +84,7 @@ public class TestHoodieCommitArchiveLog {
HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath); HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath);
HoodieTimeline timeline = HoodieTimeline timeline =
metadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants(); metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants()); assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants());
@@ -155,13 +155,13 @@ public class TestHoodieCommitArchiveLog {
HoodieTestDataGenerator.createCommitFile(basePath, "103"); HoodieTestDataGenerator.createCommitFile(basePath, "103");
HoodieTimeline timeline = HoodieTimeline timeline =
metadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants(); metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals("Loaded 4 commits and the count should match", 4, timeline.countInstants()); assertEquals("Loaded 4 commits and the count should match", 4, timeline.countInstants());
boolean result = archiveLog.archiveIfRequired(); boolean result = archiveLog.archiveIfRequired();
assertTrue(result); assertTrue(result);
timeline = timeline =
metadata.getActiveTimeline().reload().getCommitsAndCompactionsTimeline() metadata.getActiveTimeline().reload().getCommitsTimeline()
.filterCompletedInstants(); .filterCompletedInstants();
assertEquals("Should not archive commits when maxCommitsToKeep is 5", 4, assertEquals("Should not archive commits when maxCommitsToKeep is 5", 4,
timeline.countInstants()); timeline.countInstants());
@@ -183,12 +183,12 @@ public class TestHoodieCommitArchiveLog {
HoodieTestDataGenerator.createCommitFile(basePath, "105"); HoodieTestDataGenerator.createCommitFile(basePath, "105");
HoodieTimeline timeline = HoodieTimeline timeline =
metadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants(); metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants()); assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants());
boolean result = archiveLog.archiveIfRequired(); boolean result = archiveLog.archiveIfRequired();
assertTrue(result); assertTrue(result);
timeline = timeline =
metadata.getActiveTimeline().reload().getCommitsAndCompactionsTimeline() metadata.getActiveTimeline().reload().getCommitsTimeline()
.filterCompletedInstants(); .filterCompletedInstants();
assertTrue("Archived commits should always be safe", assertTrue("Archived commits should always be safe",
timeline.containsOrBeforeTimelineStarts("100")); timeline.containsOrBeforeTimelineStarts("100"));
@@ -217,12 +217,12 @@ public class TestHoodieCommitArchiveLog {
HoodieTestDataGenerator.createCommitFile(basePath, "105"); HoodieTestDataGenerator.createCommitFile(basePath, "105");
HoodieTimeline timeline = HoodieTimeline timeline =
metadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants(); metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants()); assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants());
boolean result = archiveLog.archiveIfRequired(); boolean result = archiveLog.archiveIfRequired();
assertTrue(result); assertTrue(result);
timeline = timeline =
metadata.getActiveTimeline().reload().getCommitsAndCompactionsTimeline() metadata.getActiveTimeline().reload().getCommitsTimeline()
.filterCompletedInstants(); .filterCompletedInstants();
assertEquals( assertEquals(
"Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)", "Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)",

View File

@@ -21,7 +21,7 @@ import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.HoodieClientTestUtils; import com.uber.hoodie.common.HoodieClientTestUtils;
import com.uber.hoodie.common.HoodieTestDataGenerator; import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieCompactionMetadata; import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.model.HoodieTestUtils;
@@ -49,7 +49,6 @@ import org.junit.rules.TemporaryFolder;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@@ -124,7 +123,7 @@ public class TestHoodieCompactor {
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1); JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
writeClient.insert(recordsRDD, newCommitTime).collect(); writeClient.insert(recordsRDD, newCommitTime).collect();
HoodieCompactionMetadata result = HoodieCommitMetadata result =
compactor.compact(jsc, getConfig(), table, HoodieActiveTimeline.createNewCommitTime()); compactor.compact(jsc, getConfig(), table, HoodieActiveTimeline.createNewCommitTime());
String basePath = table.getMetaClient().getBasePath(); String basePath = table.getMetaClient().getBasePath();
assertTrue("If there is nothing to compact, result will be empty", assertTrue("If there is nothing to compact, result will be empty",
@@ -178,7 +177,7 @@ public class TestHoodieCompactor {
metaClient = new HoodieTableMetaClient(fs, basePath); metaClient = new HoodieTableMetaClient(fs, basePath);
table = HoodieTable.getHoodieTable(metaClient, config); table = HoodieTable.getHoodieTable(metaClient, config);
HoodieCompactionMetadata result = HoodieCommitMetadata result =
compactor.compact(jsc, getConfig(), table, HoodieActiveTimeline.createNewCommitTime()); compactor.compact(jsc, getConfig(), table, HoodieActiveTimeline.createNewCommitTime());
// Verify that recently written compacted data file has no log file // Verify that recently written compacted data file has no log file
@@ -199,7 +198,7 @@ public class TestHoodieCompactor {
"After compaction there should be no log files visiable on a Realtime view", "After compaction there should be no log files visiable on a Realtime view",
slice.getLogFiles().collect(Collectors.toList()).isEmpty()); slice.getLogFiles().collect(Collectors.toList()).isEmpty());
} }
assertTrue(result.getPartitionToCompactionWriteStats().containsKey(partitionPath)); assertTrue(result.getPartitionToWriteStats().containsKey(partitionPath));
} }
} }

View File

@@ -182,7 +182,7 @@ public class TestMergeOnReadTable {
FileStatus[] allFiles = HoodieTestUtils FileStatus[] allFiles = HoodieTestUtils
.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); .listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(metaClient, TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(metaClient,
hoodieTable.getCompletedCompactionCommitTimeline(), allFiles); hoodieTable.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieDataFile> dataFilesToRead = roView.getLatestDataFiles(); Stream<HoodieDataFile> dataFilesToRead = roView.getLatestDataFiles();
assertTrue(!dataFilesToRead.findAny().isPresent()); assertTrue(!dataFilesToRead.findAny().isPresent());
@@ -231,7 +231,7 @@ public class TestMergeOnReadTable {
// verify that there is a commit // verify that there is a commit
table = HoodieTable table = HoodieTable
.getHoodieTable(new HoodieTableMetaClient(fs, cfg.getBasePath(), true), getConfig(false)); .getHoodieTable(new HoodieTableMetaClient(fs, cfg.getBasePath(), true), getConfig(false));
HoodieTimeline timeline = table.getCompletedCompactionCommitTimeline(); HoodieTimeline timeline = table.getCommitTimeline().filterCompletedInstants();
assertEquals("Expecting a single commit.", 1, assertEquals("Expecting a single commit.", 1,
timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants()); timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants());
String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp(); String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp();
@@ -299,7 +299,7 @@ public class TestMergeOnReadTable {
FileStatus[] allFiles = HoodieTestUtils FileStatus[] allFiles = HoodieTestUtils
.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); .listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(metaClient, TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(metaClient,
hoodieTable.getCompletedCompactionCommitTimeline(), allFiles); hoodieTable.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieDataFile> dataFilesToRead = roView.getLatestDataFiles(); Stream<HoodieDataFile> dataFilesToRead = roView.getLatestDataFiles();
assertTrue(!dataFilesToRead.findAny().isPresent()); assertTrue(!dataFilesToRead.findAny().isPresent());
@@ -455,7 +455,7 @@ public class TestMergeOnReadTable {
FileStatus[] allFiles = HoodieTestUtils FileStatus[] allFiles = HoodieTestUtils
.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); .listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(metaClient, TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(metaClient,
hoodieTable.getCompletedCompactionCommitTimeline(), allFiles); hoodieTable.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieDataFile> dataFilesToRead = roView.getLatestDataFiles(); Stream<HoodieDataFile> dataFilesToRead = roView.getLatestDataFiles();
assertTrue(!dataFilesToRead.findAny().isPresent()); assertTrue(!dataFilesToRead.findAny().isPresent());
@@ -524,11 +524,11 @@ public class TestMergeOnReadTable {
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath()); metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath());
hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg); hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg);
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompactionCommitTimeline(), roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCommitsTimeline(),
allFiles); allFiles);
final String compactedCommitTime = metaClient.getActiveTimeline().reload() final String compactedCommitTime = metaClient.getActiveTimeline().reload()
.getCommitsAndCompactionsTimeline().lastInstant().get().getTimestamp(); .getCommitsTimeline().lastInstant().get().getTimestamp();
assertTrue(roView.getLatestDataFiles().filter(file -> { assertTrue(roView.getLatestDataFiles().filter(file -> {
if (compactedCommitTime.equals(file.getCommitTime())) { if (compactedCommitTime.equals(file.getCommitTime())) {
@@ -543,7 +543,7 @@ public class TestMergeOnReadTable {
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath()); metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath());
hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg); hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg);
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompactionCommitTimeline(), roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCommitsTimeline(),
allFiles); allFiles);
assertFalse(roView.getLatestDataFiles().filter(file -> { assertFalse(roView.getLatestDataFiles().filter(file -> {

View File

@@ -44,6 +44,22 @@
{ {
"name":"totalWriteErrors", "name":"totalWriteErrors",
"type":["null","long"] "type":["null","long"]
},
{
"name":"partitionPath",
"type":["null","string"]
},
{
"name":"totalLogRecords",
"type":["null","long"]
},
{
"name":"totalLogFiles",
"type":["null","long"]
},
{
"name":"totalRecordsToBeUpdate",
"type":["null","long"]
} }
] ]
} }

View File

@@ -1,108 +0,0 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import java.io.Serializable;
@JsonIgnoreProperties(ignoreUnknown = true)
public class CompactionWriteStat implements Serializable {
private HoodieWriteStat writeStat;
private String partitionPath;
private long totalLogRecords;
private long totalLogFiles;
private long totalRecordsToBeUpdate;
public CompactionWriteStat(HoodieWriteStat writeStat, String partitionPath, long totalLogFiles,
long totalLogRecords,
long totalRecordsToUpdate) {
this.writeStat = writeStat;
this.partitionPath = partitionPath;
this.totalLogFiles = totalLogFiles;
this.totalLogRecords = totalLogRecords;
this.totalRecordsToBeUpdate = totalRecordsToUpdate;
}
public CompactionWriteStat() {
// For de-serialization
}
public long getTotalLogRecords() {
return totalLogRecords;
}
public long getTotalLogFiles() {
return totalLogFiles;
}
public long getTotalRecordsToBeUpdate() {
return totalRecordsToBeUpdate;
}
public HoodieWriteStat getHoodieWriteStat() {
return writeStat;
}
public String getPartitionPath() {
return partitionPath;
}
public static Builder newBuilder() {
return new Builder();
}
public static class Builder {
private HoodieWriteStat writeStat;
private long totalLogRecords;
private long totalRecordsToUpdate;
private long totalLogFiles;
private String partitionPath;
public Builder withHoodieWriteStat(HoodieWriteStat writeStat) {
this.writeStat = writeStat;
return this;
}
public Builder setTotalLogRecords(long records) {
this.totalLogRecords = records;
return this;
}
public Builder setTotalLogFiles(long totalLogFiles) {
this.totalLogFiles = totalLogFiles;
return this;
}
public Builder setTotalRecordsToUpdate(long records) {
this.totalRecordsToUpdate = records;
return this;
}
public Builder onPartition(String path) {
this.partitionPath = path;
return this;
}
public CompactionWriteStat build() {
return new CompactionWriteStat(writeStat, partitionPath, totalLogFiles, totalLogRecords,
totalRecordsToUpdate);
}
}
}

View File

@@ -40,12 +40,19 @@ public class HoodieCommitMetadata implements Serializable {
private static volatile Logger log = LogManager.getLogger(HoodieCommitMetadata.class); private static volatile Logger log = LogManager.getLogger(HoodieCommitMetadata.class);
protected Map<String, List<HoodieWriteStat>> partitionToWriteStats; protected Map<String, List<HoodieWriteStat>> partitionToWriteStats;
protected Boolean compacted;
private Map<String, String> extraMetadataMap; private Map<String, String> extraMetadataMap;
// for ser/deser
public HoodieCommitMetadata() { public HoodieCommitMetadata() {
this(false);
}
public HoodieCommitMetadata(boolean compacted) {
extraMetadataMap = new HashMap<>(); extraMetadataMap = new HashMap<>();
partitionToWriteStats = new HashMap<>(); partitionToWriteStats = new HashMap<>();
this.compacted = compacted;
} }
public void addWriteStat(String partitionPath, HoodieWriteStat stat) { public void addWriteStat(String partitionPath, HoodieWriteStat stat) {
@@ -75,6 +82,14 @@ public class HoodieCommitMetadata implements Serializable {
return extraMetadataMap.get(metaKey); return extraMetadataMap.get(metaKey);
} }
public Boolean getCompacted() {
return compacted;
}
public void setCompacted(Boolean compacted) {
this.compacted = compacted;
}
public HashMap<String, String> getFileIdAndRelativePaths() { public HashMap<String, String> getFileIdAndRelativePaths() {
HashMap<String, String> filePaths = new HashMap<>(); HashMap<String, String> filePaths = new HashMap<>();
// list all partitions paths // list all partitions paths
@@ -200,24 +215,21 @@ public class HoodieCommitMetadata implements Serializable {
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) { if (this == o) return true;
return true; if (o == null || getClass() != o.getClass()) return false;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
HoodieCommitMetadata that = (HoodieCommitMetadata) o; HoodieCommitMetadata that = (HoodieCommitMetadata) o;
return partitionToWriteStats != null ? if (!partitionToWriteStats.equals(that.partitionToWriteStats)) return false;
partitionToWriteStats.equals(that.partitionToWriteStats) : return compacted.equals(that.compacted);
that.partitionToWriteStats == null;
} }
@Override @Override
public int hashCode() { public int hashCode() {
return partitionToWriteStats != null ? partitionToWriteStats.hashCode() : 0; int result = partitionToWriteStats.hashCode();
result = 31 * result + compacted.hashCode();
return result;
} }
public static HoodieCommitMetadata fromBytes(byte[] bytes) throws IOException { public static HoodieCommitMetadata fromBytes(byte[] bytes) throws IOException {

View File

@@ -1,86 +0,0 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.model;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.codehaus.jackson.annotate.JsonAutoDetect;
import org.codehaus.jackson.annotate.JsonMethod;
import org.codehaus.jackson.map.DeserializationConfig.Feature;
import org.codehaus.jackson.map.ObjectMapper;
/**
* Place holder for the compaction specific meta-data, uses all the details used in a normal
* HoodieCommitMetadata
*/
public class HoodieCompactionMetadata extends HoodieCommitMetadata {
private static volatile Logger log = LogManager.getLogger(HoodieCompactionMetadata.class);
protected HashMap<String, List<CompactionWriteStat>> partitionToCompactionWriteStats;
public HoodieCompactionMetadata() {
partitionToCompactionWriteStats = new HashMap<>();
}
public void addWriteStat(String partitionPath, CompactionWriteStat stat) {
addWriteStat(partitionPath, stat.getHoodieWriteStat());
if (!partitionToCompactionWriteStats.containsKey(partitionPath)) {
partitionToCompactionWriteStats.put(partitionPath, new ArrayList<>());
}
partitionToCompactionWriteStats.get(partitionPath).add(stat);
}
public List<CompactionWriteStat> getCompactionWriteStats(String partitionPath) {
return partitionToCompactionWriteStats.get(partitionPath);
}
public Map<String, List<CompactionWriteStat>> getPartitionToCompactionWriteStats() {
return partitionToCompactionWriteStats;
}
public String toJsonString() throws IOException {
if (partitionToCompactionWriteStats.containsKey(null)) {
log.info("partition path is null for " + partitionToCompactionWriteStats.get(null));
partitionToCompactionWriteStats.remove(null);
}
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(JsonMethod.FIELD, JsonAutoDetect.Visibility.ANY);
return mapper.defaultPrettyPrintingWriter().writeValueAsString(this);
}
public static HoodieCompactionMetadata fromJsonString(String jsonStr) throws IOException {
if (jsonStr == null || jsonStr.isEmpty()) {
// For empty commit file (no data or somethings bad happen).
return new HoodieCompactionMetadata();
}
ObjectMapper mapper = new ObjectMapper();
mapper.configure(Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.setVisibility(JsonMethod.FIELD, JsonAutoDetect.Visibility.ANY);
return mapper.readValue(jsonStr, HoodieCompactionMetadata.class);
}
public static HoodieCompactionMetadata fromBytes(byte[] bytes) throws IOException {
return fromJsonString(new String(bytes, Charset.forName("utf-8")));
}
}

View File

@@ -17,6 +17,8 @@
package com.uber.hoodie.common.model; package com.uber.hoodie.common.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import javax.annotation.Nullable;
import java.io.Serializable; import java.io.Serializable;
/** /**
@@ -68,6 +70,34 @@ public class HoodieWriteStat implements Serializable {
*/ */
private long totalWriteErrors; private long totalWriteErrors;
/**
* Following properties are associated only with the result of a Compaction Operation
*/
/**
* Partition Path associated with this writeStat
*/
@Nullable
private String partitionPath;
/**
* Total number of log records that were compacted by a compaction operation
*/
@Nullable
private Long totalLogRecords;
/**
* Total number of log files that were compacted by a compaction operation
*/
@Nullable
private Long totalLogFiles;
/**
* Total number of records updated by a compaction operation
*/
@Nullable
private Long totalRecordsToBeUpdate;
public HoodieWriteStat() { public HoodieWriteStat() {
// called by jackson json lib // called by jackson json lib
} }
@@ -136,6 +166,37 @@ public class HoodieWriteStat implements Serializable {
return path; return path;
} }
public String getPartitionPath() {
return partitionPath;
}
public void setPartitionPath(String partitionPath) {
this.partitionPath = partitionPath;
}
public Long getTotalLogRecords() {
return totalLogRecords;
}
public void setTotalLogRecords(Long totalLogRecords) {
this.totalLogRecords = totalLogRecords;
}
public Long getTotalLogFiles() {
return totalLogFiles;
}
public void setTotalLogFiles(Long totalLogFiles) {
this.totalLogFiles = totalLogFiles;
}
public Long getTotalRecordsToBeUpdate() {
return totalRecordsToBeUpdate;
}
public void setTotalRecordsToBeUpdate(Long totalRecordsToBeUpdate) {
this.totalRecordsToBeUpdate = totalRecordsToBeUpdate;
}
@Override @Override
public String toString() { public String toString() {

View File

@@ -41,7 +41,6 @@ public interface HoodieTimeline extends Serializable {
String CLEAN_ACTION = "clean"; String CLEAN_ACTION = "clean";
String ROLLBACK_ACTION = "rollback"; String ROLLBACK_ACTION = "rollback";
String SAVEPOINT_ACTION = "savepoint"; String SAVEPOINT_ACTION = "savepoint";
String COMPACTION_ACTION = "compaction";
String INFLIGHT_EXTENSION = ".inflight"; String INFLIGHT_EXTENSION = ".inflight";
String COMMIT_EXTENSION = "." + COMMIT_ACTION; String COMMIT_EXTENSION = "." + COMMIT_ACTION;
@@ -49,14 +48,12 @@ public interface HoodieTimeline extends Serializable {
String CLEAN_EXTENSION = "." + CLEAN_ACTION; String CLEAN_EXTENSION = "." + CLEAN_ACTION;
String ROLLBACK_EXTENSION = "." + ROLLBACK_ACTION; String ROLLBACK_EXTENSION = "." + ROLLBACK_ACTION;
String SAVEPOINT_EXTENSION = "." + SAVEPOINT_ACTION; String SAVEPOINT_EXTENSION = "." + SAVEPOINT_ACTION;
String COMPACTION_EXTENSION = "." + COMPACTION_ACTION;
//this is to preserve backwards compatibility on commit in-flight filenames //this is to preserve backwards compatibility on commit in-flight filenames
String INFLIGHT_COMMIT_EXTENSION = INFLIGHT_EXTENSION; String INFLIGHT_COMMIT_EXTENSION = INFLIGHT_EXTENSION;
String INFLIGHT_DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION + INFLIGHT_EXTENSION; String INFLIGHT_DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION + INFLIGHT_EXTENSION;
String INFLIGHT_CLEAN_EXTENSION = "." + CLEAN_ACTION + INFLIGHT_EXTENSION; String INFLIGHT_CLEAN_EXTENSION = "." + CLEAN_ACTION + INFLIGHT_EXTENSION;
String INFLIGHT_ROLLBACK_EXTENSION = "." + ROLLBACK_ACTION + INFLIGHT_EXTENSION; String INFLIGHT_ROLLBACK_EXTENSION = "." + ROLLBACK_ACTION + INFLIGHT_EXTENSION;
String INFLIGHT_SAVEPOINT_EXTENSION = "." + SAVEPOINT_ACTION + INFLIGHT_EXTENSION; String INFLIGHT_SAVEPOINT_EXTENSION = "." + SAVEPOINT_ACTION + INFLIGHT_EXTENSION;
String INFLIGHT_COMPACTION_EXTENSION = "." + COMPACTION_ACTION + INFLIGHT_EXTENSION;
/** /**
* Filter this timeline to just include the in-flights * Filter this timeline to just include the in-flights
@@ -197,14 +194,6 @@ public interface HoodieTimeline extends Serializable {
return commitTime + HoodieTimeline.SAVEPOINT_EXTENSION; return commitTime + HoodieTimeline.SAVEPOINT_EXTENSION;
} }
static String makeInflightCompactionFileName(String commitTime) {
return commitTime + HoodieTimeline.INFLIGHT_COMPACTION_EXTENSION;
}
static String makeCompactionFileName(String commitTime) {
return commitTime + HoodieTimeline.COMPACTION_EXTENSION;
}
static String makeInflightDeltaFileName(String commitTime) { static String makeInflightDeltaFileName(String commitTime) {
return commitTime + HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION; return commitTime + HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION;
} }

View File

@@ -94,8 +94,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
public HoodieActiveTimeline(FileSystem fs, String metaPath) { public HoodieActiveTimeline(FileSystem fs, String metaPath) {
this(fs, metaPath, this(fs, metaPath,
new String[]{COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION, new String[]{COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION,
INFLIGHT_DELTA_COMMIT_EXTENSION, COMPACTION_EXTENSION, INFLIGHT_DELTA_COMMIT_EXTENSION, SAVEPOINT_EXTENSION, INFLIGHT_SAVEPOINT_EXTENSION,
INFLIGHT_COMPACTION_EXTENSION, SAVEPOINT_EXTENSION, INFLIGHT_SAVEPOINT_EXTENSION,
CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION}); CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION});
} }
@@ -119,21 +118,21 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
} }
/** /**
* Get all instants (commits, delta commits, compactions) that produce new data, in the active * Get all instants (commits, delta commits) that produce new data, in the active
* timeline * * timeline *
*/ */
public HoodieTimeline getCommitsAndCompactionsTimeline() { public HoodieTimeline getCommitsTimeline() {
return getTimelineOfActions( return getTimelineOfActions(
Sets.newHashSet(COMMIT_ACTION, COMPACTION_ACTION, DELTA_COMMIT_ACTION)); Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION));
} }
/** /**
* Get all instants (commits, delta commits, compactions, clean, savepoint, rollback) that result * Get all instants (commits, delta commits, clean, savepoint, rollback) that result
* in actions, in the active timeline * * in actions, in the active timeline *
*/ */
public HoodieTimeline getAllCommitsTimeline() { public HoodieTimeline getAllCommitsTimeline() {
return getTimelineOfActions( return getTimelineOfActions(
Sets.newHashSet(COMMIT_ACTION, COMPACTION_ACTION, DELTA_COMMIT_ACTION, CLEAN_ACTION, Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, CLEAN_ACTION,
SAVEPOINT_ACTION, ROLLBACK_ACTION)); SAVEPOINT_ACTION, ROLLBACK_ACTION));
} }
@@ -152,14 +151,6 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
(Function<HoodieInstant, Optional<byte[]>> & Serializable) this::getInstantDetails); (Function<HoodieInstant, Optional<byte[]>> & Serializable) this::getInstantDetails);
} }
/**
* Get only the commits (inflight and completed) in the compaction timeline
*/
public HoodieTimeline getCompactionTimeline() {
return new HoodieDefaultTimeline(filterInstantsByAction(COMPACTION_ACTION),
(Function<HoodieInstant, Optional<byte[]>> & Serializable) this::getInstantDetails);
}
/** /**
* Get a timeline of a specific set of actions. useful to create a merged timeline of multiple * Get a timeline of a specific set of actions. useful to create a merged timeline of multiple
* actions * actions

View File

@@ -94,10 +94,6 @@ public class HoodieInstant implements Serializable {
return isInflight ? return isInflight ?
HoodieTimeline.makeInflightSavePointFileName(timestamp) : HoodieTimeline.makeInflightSavePointFileName(timestamp) :
HoodieTimeline.makeSavePointFileName(timestamp); HoodieTimeline.makeSavePointFileName(timestamp);
} else if (HoodieTimeline.COMPACTION_ACTION.equals(action)) {
return isInflight ?
HoodieTimeline.makeInflightCompactionFileName(timestamp) :
HoodieTimeline.makeCompactionFileName(timestamp);
} else if (HoodieTimeline.DELTA_COMMIT_ACTION.equals(action)) { } else if (HoodieTimeline.DELTA_COMMIT_ACTION.equals(action)) {
return isInflight ? return isInflight ?
HoodieTimeline.makeInflightDeltaFileName(timestamp) : HoodieTimeline.makeInflightDeltaFileName(timestamp) :

View File

@@ -158,7 +158,7 @@ public class HoodieTestUtils {
for (String commitTime : commitTimes) { for (String commitTime : commitTimes) {
boolean createFile = fs.createNewFile(new Path( boolean createFile = fs.createNewFile(new Path(
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline
.makeCompactionFileName(commitTime))); .makeCommitFileName(commitTime)));
if (!createFile) { if (!createFile) {
throw new IOException("cannot create commit file for commit " + commitTime); throw new IOException("cannot create commit file for commit " + commitTime);
} }

View File

@@ -100,7 +100,7 @@ public class HoodieInputFormat extends MapredParquetInputFormat
String tableName = metadata.getTableConfig().getTableName(); String tableName = metadata.getTableConfig().getTableName();
String mode = HoodieHiveUtil.readMode(Job.getInstance(job), tableName); String mode = HoodieHiveUtil.readMode(Job.getInstance(job), tableName);
// Get all commits, delta commits, compactions, as all of them produce a base parquet file today // Get all commits, delta commits, compactions, as all of them produce a base parquet file today
HoodieTimeline timeline = metadata.getActiveTimeline().getCommitsAndCompactionsTimeline() HoodieTimeline timeline = metadata.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants(); .filterCompletedInstants();
TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(metadata, TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(metadata,
timeline, statuses); timeline, statuses);

View File

@@ -125,7 +125,6 @@ public class HoodieRealtimeInputFormat extends HoodieInputFormat implements Conf
String maxCommitTime = metaClient.getActiveTimeline() String maxCommitTime = metaClient.getActiveTimeline()
.getTimelineOfActions( .getTimelineOfActions(
Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, Sets.newHashSet(HoodieTimeline.COMMIT_ACTION,
HoodieTimeline.COMPACTION_ACTION,
HoodieTimeline.DELTA_COMMIT_ACTION)) HoodieTimeline.DELTA_COMMIT_ACTION))
.filterCompletedInstants().lastInstant().get().getTimestamp(); .filterCompletedInstants().lastInstant().get().getTimestamp();
rtSplits.add( rtSplits.add(

View File

@@ -21,7 +21,6 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieCompactionMetadata;
import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTableMetaClient;
@@ -35,17 +34,6 @@ import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.InvalidDatasetException; import com.uber.hoodie.exception.InvalidDatasetException;
import com.uber.hoodie.hive.util.SchemaUtil; import com.uber.hoodie.hive.util.SchemaUtil;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.dbcp.BasicDataSource; import org.apache.commons.dbcp.BasicDataSource;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@@ -63,6 +51,18 @@ import parquet.hadoop.ParquetFileReader;
import parquet.hadoop.metadata.ParquetMetadata; import parquet.hadoop.metadata.ParquetMetadata;
import parquet.schema.MessageType; import parquet.schema.MessageType;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@SuppressWarnings("ConstantConditions") @SuppressWarnings("ConstantConditions")
public class HoodieHiveClient { public class HoodieHiveClient {
@@ -111,7 +111,7 @@ public class HoodieHiveClient {
e); e);
} }
activeTimeline = metaClient.getActiveTimeline().getCommitsAndCompactionsTimeline() activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants(); .filterCompletedInstants();
} }
@@ -323,7 +323,7 @@ public class HoodieHiveClient {
// If this is MOR, depending on whether the latest commit is a delta commit or compaction commit // If this is MOR, depending on whether the latest commit is a delta commit or compaction commit
// Get a datafile written and get the schema from that file // Get a datafile written and get the schema from that file
Optional<HoodieInstant> lastCompactionCommit = metaClient.getActiveTimeline() Optional<HoodieInstant> lastCompactionCommit = metaClient.getActiveTimeline()
.getCompactionTimeline().filterCompletedInstants().lastInstant(); .getCommitTimeline().filterCompletedInstants().lastInstant();
LOG.info("Found the last compaction commit as " + lastCompactionCommit); LOG.info("Found the last compaction commit as " + lastCompactionCommit);
Optional<HoodieInstant> lastDeltaCommit; Optional<HoodieInstant> lastDeltaCommit;
@@ -379,7 +379,7 @@ public class HoodieHiveClient {
+ syncConfig.basePath)); + syncConfig.basePath));
// Read from the compacted file wrote // Read from the compacted file wrote
HoodieCompactionMetadata compactionMetadata = HoodieCompactionMetadata HoodieCommitMetadata compactionMetadata = HoodieCommitMetadata
.fromBytes(activeTimeline.getInstantDetails(lastCompactionCommit).get()); .fromBytes(activeTimeline.getInstantDetails(lastCompactionCommit).get());
String filePath = compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values() String filePath = compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values()
.stream().findAny() .stream().findAny()

View File

@@ -16,9 +16,6 @@
package com.uber.hoodie.hive; package com.uber.hoodie.hive;
import static com.uber.hoodie.common.model.HoodieTestUtils.DEFAULT_TASK_PARTITIONID;
import static org.junit.Assert.fail;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@@ -26,10 +23,8 @@ import com.uber.hoodie.avro.HoodieAvroWriteSupport;
import com.uber.hoodie.common.BloomFilter; import com.uber.hoodie.common.BloomFilter;
import com.uber.hoodie.common.minicluster.HdfsTestService; import com.uber.hoodie.common.minicluster.HdfsTestService;
import com.uber.hoodie.common.minicluster.ZookeeperTestService; import com.uber.hoodie.common.minicluster.ZookeeperTestService;
import com.uber.hoodie.common.model.CompactionWriteStat;
import com.uber.hoodie.common.model.HoodieAvroPayload; import com.uber.hoodie.common.model.HoodieAvroPayload;
import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieCompactionMetadata;
import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieDeltaWriteStat; import com.uber.hoodie.common.model.HoodieDeltaWriteStat;
import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieLogFile;
@@ -44,15 +39,6 @@ import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.SchemaTestUtil; import com.uber.hoodie.common.util.SchemaTestUtil;
import com.uber.hoodie.hive.util.HiveTestService; import com.uber.hoodie.hive.util.HiveTestService;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord; import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
@@ -72,6 +58,19 @@ import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter; import org.joda.time.format.DateTimeFormatter;
import org.junit.runners.model.InitializationError; import org.junit.runners.model.InitializationError;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import static com.uber.hoodie.common.model.HoodieTestUtils.DEFAULT_TASK_PARTITIONID;
import static org.junit.Assert.fail;
@SuppressWarnings("SameParameterValue") @SuppressWarnings("SameParameterValue")
public class TestUtil { public class TestUtil {
@@ -182,9 +181,9 @@ public class TestUtil {
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName
+ HiveSyncTool.SUFFIX_REALTIME_TABLE); + HiveSyncTool.SUFFIX_REALTIME_TABLE);
HoodieCompactionMetadata compactionMetadata = new HoodieCompactionMetadata(); HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();
commitMetadata.getPartitionToWriteStats() commitMetadata.getPartitionToWriteStats()
.forEach((key, value) -> value.stream().map(k -> new CompactionWriteStat(k, key, 0, 0, 0)) .forEach((key, value) -> value.stream()
.forEach(l -> compactionMetadata.addWriteStat(key, l))); .forEach(l -> compactionMetadata.addWriteStat(key, l)));
createCompactionCommitFile(compactionMetadata, commitTime); createCompactionCommitFile(compactionMetadata, commitTime);
// Write a delta commit // Write a delta commit
@@ -211,9 +210,9 @@ public class TestUtil {
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName
+ HiveSyncTool.SUFFIX_REALTIME_TABLE); + HiveSyncTool.SUFFIX_REALTIME_TABLE);
HoodieCompactionMetadata compactionMetadata = new HoodieCompactionMetadata(); HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();
commitMetadata.getPartitionToWriteStats() commitMetadata.getPartitionToWriteStats()
.forEach((key, value) -> value.stream().map(k -> new CompactionWriteStat(k, key, 0, 0, 0)) .forEach((key, value) -> value.stream()
.forEach(l -> compactionMetadata.addWriteStat(key, l))); .forEach(l -> compactionMetadata.addWriteStat(key, l)));
createCompactionCommitFile(compactionMetadata, commitTime); createCompactionCommitFile(compactionMetadata, commitTime);
HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(),
@@ -342,12 +341,12 @@ public class TestUtil {
} }
private static void createCompactionCommitFile( private static void createCompactionCommitFile(
HoodieCompactionMetadata commitMetadata, String commitTime) HoodieCommitMetadata commitMetadata, String commitTime)
throws IOException { throws IOException {
byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8); byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
Path fullPath = new Path( Path fullPath = new Path(
hiveSyncConfig.basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline hiveSyncConfig.basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline
.makeCompactionFileName(commitTime)); .makeCommitFileName(commitTime));
FSDataOutputStream fsout = fileSystem.create(fullPath, true); FSDataOutputStream fsout = fileSystem.create(fullPath, true);
fsout.write(bytes); fsout.write(bytes);
fsout.close(); fsout.close();

View File

@@ -70,11 +70,10 @@ public class HoodieDataSourceHelpers {
.getHoodieTable(new HoodieTableMetaClient(fs, basePath, true), null); .getHoodieTable(new HoodieTableMetaClient(fs, basePath, true), null);
if (table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ)) { if (table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ)) {
return table.getActiveTimeline().getTimelineOfActions( return table.getActiveTimeline().getTimelineOfActions(
Sets.newHashSet(HoodieActiveTimeline.COMPACTION_ACTION, Sets.newHashSet(HoodieActiveTimeline.COMMIT_ACTION,
HoodieActiveTimeline.DELTA_COMMIT_ACTION) HoodieActiveTimeline.DELTA_COMMIT_ACTION));
);
} else { } else {
return table.getCompletedCompactionCommitTimeline(); return table.getCommitTimeline().filterCompletedInstants();
} }
} }
} }

View File

@@ -53,7 +53,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
throw new HoodieException("Incremental view not implemented yet, for merge-on-read datasets") throw new HoodieException("Incremental view not implemented yet, for merge-on-read datasets")
} }
val hoodieTable = HoodieTable.getHoodieTable(metaClient, null) val hoodieTable = HoodieTable.getHoodieTable(metaClient, null)
val commitTimeline = hoodieTable.getCompletedCompactionCommitTimeline(); val commitTimeline = hoodieTable.getCommitTimeline.filterCompletedInstants();
if (commitTimeline.empty()) { if (commitTimeline.empty()) {
throw new HoodieException("No instants to incrementally pull") throw new HoodieException("No instants to incrementally pull")
} }

View File

@@ -294,7 +294,7 @@ public class HiveIncrementalPuller {
HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, targetDataPath); HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, targetDataPath);
Optional<HoodieInstant> Optional<HoodieInstant>
lastCommit = metadata.getActiveTimeline().getCommitsAndCompactionsTimeline() lastCommit = metadata.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants().lastInstant(); .filterCompletedInstants().lastInstant();
if (lastCommit.isPresent()) { if (lastCommit.isPresent()) {
return lastCommit.get().getTimestamp(); return lastCommit.get().getTimestamp();
@@ -332,14 +332,14 @@ public class HiveIncrementalPuller {
private String getLastCommitTimePulled(FileSystem fs, String sourceTableLocation) private String getLastCommitTimePulled(FileSystem fs, String sourceTableLocation)
throws IOException { throws IOException {
HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, sourceTableLocation); HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, sourceTableLocation);
List<String> commitsToSync = metadata.getActiveTimeline().getCommitsAndCompactionsTimeline() List<String> commitsToSync = metadata.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants() .filterCompletedInstants()
.findInstantsAfter(config.fromCommitTime, config.maxCommits).getInstants() .findInstantsAfter(config.fromCommitTime, config.maxCommits).getInstants()
.map(HoodieInstant::getTimestamp) .map(HoodieInstant::getTimestamp)
.collect(Collectors.toList()); .collect(Collectors.toList());
if (commitsToSync.isEmpty()) { if (commitsToSync.isEmpty()) {
log.warn("Nothing to sync. All commits in " + config.sourceTable + " are " + metadata log.warn("Nothing to sync. All commits in " + config.sourceTable + " are " + metadata
.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants() .getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
.getInstants() .getInstants()
.collect(Collectors.toList()) + " and from commit time is " .collect(Collectors.toList()) + " and from commit time is "
+ config.fromCommitTime); + config.fromCommitTime);

View File

@@ -74,11 +74,11 @@ public class HoodieSnapshotCopier implements Serializable {
final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs, baseDir); final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs, baseDir);
final TableFileSystemView.ReadOptimizedView fsView = new HoodieTableFileSystemView( final TableFileSystemView.ReadOptimizedView fsView = new HoodieTableFileSystemView(
tableMetadata, tableMetadata,
tableMetadata.getActiveTimeline().getCommitsAndCompactionsTimeline() tableMetadata.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants()); .filterCompletedInstants());
// Get the latest commit // Get the latest commit
Optional<HoodieInstant> latestCommit = tableMetadata.getActiveTimeline() Optional<HoodieInstant> latestCommit = tableMetadata.getActiveTimeline()
.getCommitsAndCompactionsTimeline().filterCompletedInstants().lastInstant(); .getCommitsTimeline().filterCompletedInstants().lastInstant();
if (!latestCommit.isPresent()) { if (!latestCommit.isPresent()) {
logger.warn("No commits present. Nothing to snapshot"); logger.warn("No commits present. Nothing to snapshot");
return; return;

View File

@@ -120,7 +120,7 @@ public class HoodieDeltaStreamer implements Serializable {
if (fs.exists(new Path(cfg.targetBasePath))) { if (fs.exists(new Path(cfg.targetBasePath))) {
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs, cfg.targetBasePath); HoodieTableMetaClient meta = new HoodieTableMetaClient(fs, cfg.targetBasePath);
this.commitTimelineOpt = Optional this.commitTimelineOpt = Optional
.of(meta.getActiveTimeline().getCommitsAndCompactionsTimeline() .of(meta.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants()); .filterCompletedInstants());
} else { } else {
this.commitTimelineOpt = Optional.empty(); this.commitTimelineOpt = Optional.empty();