Refactor hoodie-common and create right abstractions for Hoodie Storage V2.0
The following is the gist of changes done - All low-level operation of creating a commit code was in HoodieClient which made it hard to share code if there was a compaction commit. - HoodieTableMetadata contained a mix of metadata and filtering files. (Also few operations required FileSystem to be passed in because those were called from TaskExecutors and others had FileSystem as a global variable). Since merge-on-read requires a lot of that code, but will have to change slightly on how it operates on the metadata and how it filters the files. The two set of operation are split into HoodieTableMetaClient and TableFileSystemView. - Everything (active commits, archived commits, cleaner log, save point log and in future delta and compaction commits) in HoodieTableMetaClient is a HoodieTimeline. Timeline is a series of instants, which has an in-built concept of inflight and completed commit markers. - A timeline can be queries for ranges, contains and also use to create new datapoint (create a new commit etc). Commit (and all the above metadata) creation/deletion is streamlined in a timeline - Multiple timelines can be merged into a single timeline, giving us an audit timeline to whatever happened in a hoodie dataset. This also helps with #55. - Move to java 8 and introduce java 8 succinct syntax in refactored code
This commit is contained in:
@@ -22,8 +22,9 @@ import com.uber.hoodie.common.model.HoodieKey;
|
||||
import com.uber.hoodie.common.model.HoodieRecord;
|
||||
import com.uber.hoodie.common.model.HoodieRecordLocation;
|
||||
import com.uber.hoodie.common.model.HoodieRecordPayload;
|
||||
import com.uber.hoodie.common.model.HoodieTableMetadata;
|
||||
import com.uber.hoodie.common.model.HoodieWriteStat;
|
||||
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
||||
import com.uber.hoodie.common.table.HoodieTimeline;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import com.uber.hoodie.exception.HoodieCommitException;
|
||||
@@ -31,7 +32,7 @@ import com.uber.hoodie.exception.HoodieIOException;
|
||||
import com.uber.hoodie.exception.HoodieInsertException;
|
||||
import com.uber.hoodie.exception.HoodieRollbackException;
|
||||
import com.uber.hoodie.exception.HoodieUpsertException;
|
||||
import com.uber.hoodie.func.BulkInsertMapFunction;
|
||||
import com.uber.hoodie.func.InsertMapFunction;
|
||||
import com.uber.hoodie.index.HoodieIndex;
|
||||
import com.uber.hoodie.io.HoodieCleaner;
|
||||
import com.uber.hoodie.io.HoodieCommitArchiveLog;
|
||||
@@ -39,7 +40,6 @@ import com.uber.hoodie.metrics.HoodieMetrics;
|
||||
import com.uber.hoodie.table.HoodieTable;
|
||||
import com.uber.hoodie.table.WorkloadProfile;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@@ -67,16 +67,19 @@ import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import scala.Option;
|
||||
import scala.Tuple2;
|
||||
|
||||
/**
|
||||
* Hoodie Write Client helps you build datasets on HDFS [insert()] and then perform efficient
|
||||
* mutations on a HDFS dataset [upsert()]
|
||||
* Hoodie Write Client helps you build datasets on HDFS [insert()] and then
|
||||
* perform efficient mutations on a HDFS dataset [upsert()]
|
||||
*
|
||||
* Note that, at any given time, there can only be one Spark job performing
|
||||
* these operatons on a Hoodie dataset.
|
||||
*
|
||||
* Note that, at any given time, there can only be one Spark job performing these operatons on a
|
||||
* Hoodie dataset.
|
||||
*/
|
||||
public class HoodieWriteClient<T extends HoodieRecordPayload> implements Serializable {
|
||||
|
||||
@@ -111,7 +114,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
this.config = clientConfig;
|
||||
this.index = HoodieIndex.createIndex(config, jsc);
|
||||
this.metrics = new HoodieMetrics(config, config.getTableName());
|
||||
this.archiveLog = new HoodieCommitArchiveLog(clientConfig);
|
||||
this.archiveLog = new HoodieCommitArchiveLog(clientConfig, fs);
|
||||
|
||||
if (rollbackInFlight) {
|
||||
rollbackInflightCommits();
|
||||
}
|
||||
@@ -125,9 +129,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
* @return A subset of hoodieRecords RDD, with existing records filtered out.
|
||||
*/
|
||||
public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> hoodieRecords) {
|
||||
final HoodieTableMetadata metadata =
|
||||
new HoodieTableMetadata(fs, config.getBasePath(), config.getTableName());
|
||||
JavaRDD<HoodieRecord<T>> recordsWithLocation = index.tagLocation(hoodieRecords, metadata);
|
||||
HoodieTableMetaClient metaClient =
|
||||
new HoodieTableMetaClient(fs, config.getBasePath(), true);
|
||||
JavaRDD<HoodieRecord<T>> recordsWithLocation = index.tagLocation(hoodieRecords, metaClient);
|
||||
return recordsWithLocation.filter(new Function<HoodieRecord<T>, Boolean>() {
|
||||
@Override
|
||||
public Boolean call(HoodieRecord<T> v1) throws Exception {
|
||||
@@ -140,19 +144,74 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
* Upserts a bunch of new records into the Hoodie table, at the supplied commitTime
|
||||
*/
|
||||
public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
|
||||
final HoodieTableMetadata metadata =
|
||||
new HoodieTableMetadata(fs, config.getBasePath(), config.getTableName());
|
||||
writeContext = metrics.getCommitCtx();
|
||||
HoodieTableMetaClient metaClient =
|
||||
new HoodieTableMetaClient(fs, config.getBasePath(), true);
|
||||
|
||||
final HoodieTable table =
|
||||
HoodieTable.getHoodieTable(metaClient, commitTime, config);
|
||||
|
||||
try {
|
||||
// De-dupe/merge if needed
|
||||
JavaRDD<HoodieRecord<T>> dedupedRecords =
|
||||
combineOnCondition(config.shouldCombineBeforeUpsert(), records,
|
||||
config.getUpsertShuffleParallelism());
|
||||
combineOnCondition(config.shouldCombineBeforeUpsert(), records,
|
||||
config.getUpsertShuffleParallelism());
|
||||
|
||||
// perform index loop up to get existing location of records
|
||||
JavaRDD<HoodieRecord<T>> taggedRecords = index.tagLocation(dedupedRecords, metadata);
|
||||
return upsertRecordsInternal(taggedRecords, commitTime, metadata, true);
|
||||
JavaRDD<HoodieRecord<T>> taggedRecords = index.tagLocation(dedupedRecords, metaClient);
|
||||
|
||||
// Cache the tagged records, so we don't end up computing both
|
||||
taggedRecords.persist(StorageLevel.MEMORY_AND_DISK_SER());
|
||||
|
||||
|
||||
WorkloadProfile profile = null;
|
||||
if (table.isWorkloadProfileNeeded()) {
|
||||
profile = new WorkloadProfile(taggedRecords);
|
||||
logger.info("Workload profile :" + profile);
|
||||
}
|
||||
|
||||
// obtain the upsert partitioner, and the run the tagger records through that & get a partitioned RDD.
|
||||
final Partitioner upsertPartitioner = table.getUpsertPartitioner(profile);
|
||||
JavaRDD<HoodieRecord<T>> partitionedRecords = taggedRecords.mapToPair(
|
||||
new PairFunction<HoodieRecord<T>, Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>>() {
|
||||
@Override
|
||||
public Tuple2<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>> call(
|
||||
HoodieRecord<T> record) throws Exception {
|
||||
return new Tuple2<>(new Tuple2<>(record.getKey(),
|
||||
Option.apply(record.getCurrentLocation())), record);
|
||||
}
|
||||
}).partitionBy(upsertPartitioner).map(
|
||||
new Function<Tuple2<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>>, HoodieRecord<T>>() {
|
||||
@Override
|
||||
public HoodieRecord<T> call(
|
||||
Tuple2<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>> tuple)
|
||||
throws Exception {
|
||||
return tuple._2();
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
// Perform the actual writing.
|
||||
JavaRDD<WriteStatus> upsertStatusRDD = partitionedRecords.mapPartitionsWithIndex(
|
||||
new Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<List<WriteStatus>>>() {
|
||||
@Override
|
||||
public Iterator<List<WriteStatus>> call(Integer partition,
|
||||
Iterator<HoodieRecord<T>> recordItr) throws Exception {
|
||||
return table.handleUpsertPartition(partition, recordItr, upsertPartitioner);
|
||||
}
|
||||
}, true).flatMap(new FlatMapFunction<List<WriteStatus>, WriteStatus>() {
|
||||
@Override
|
||||
public Iterable<WriteStatus> call(List<WriteStatus> writeStatuses)
|
||||
throws Exception {
|
||||
return writeStatuses;
|
||||
}
|
||||
});
|
||||
|
||||
// Update the index back.
|
||||
JavaRDD<WriteStatus> resultRDD = index.updateLocation(upsertStatusRDD, metaClient);
|
||||
resultRDD = resultRDD.persist(config.getWriteStatusStorageLevel());
|
||||
commitOnAutoCommit(commitTime, resultRDD);
|
||||
return resultRDD;
|
||||
} catch (Throwable e) {
|
||||
if (e instanceof HoodieUpsertException) {
|
||||
throw (HoodieUpsertException) e;
|
||||
@@ -161,38 +220,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Inserts the given HoodieRecords, into the table. This API is intended to be used for normal
|
||||
* writes.
|
||||
*
|
||||
* This implementation skips the index check and is able to leverage benefits such as
|
||||
* small file handling/blocking alignment, as with upsert(), by profiling the workload
|
||||
*
|
||||
* @param records HoodieRecords to insert
|
||||
* @param commitTime Commit Time handle
|
||||
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
|
||||
*/
|
||||
public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
|
||||
final HoodieTableMetadata metadata =
|
||||
new HoodieTableMetadata(fs, config.getBasePath(), config.getTableName());
|
||||
writeContext = metrics.getCommitCtx();
|
||||
try {
|
||||
// De-dupe/merge if needed
|
||||
JavaRDD<HoodieRecord<T>> dedupedRecords =
|
||||
combineOnCondition(config.shouldCombineBeforeInsert(), records,
|
||||
config.getInsertShuffleParallelism());
|
||||
|
||||
return upsertRecordsInternal(dedupedRecords, commitTime, metadata, false);
|
||||
} catch (Throwable e) {
|
||||
if (e instanceof HoodieInsertException) {
|
||||
throw e;
|
||||
}
|
||||
throw new HoodieInsertException("Failed to insert for commit time " + commitTime, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void commitOnAutoCommit(String commitTime, JavaRDD<WriteStatus> resultRDD) {
|
||||
if (config.shouldAutoCommit()) {
|
||||
if(config.shouldAutoCommit()) {
|
||||
logger.info("Auto commit enabled: Committing " + commitTime);
|
||||
boolean commitResult = commit(commitTime, resultRDD);
|
||||
if (!commitResult) {
|
||||
@@ -204,146 +233,65 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
}
|
||||
|
||||
private JavaRDD<HoodieRecord<T>> combineOnCondition(boolean condition,
|
||||
JavaRDD<HoodieRecord<T>> records, int parallelism) {
|
||||
if (condition) {
|
||||
JavaRDD<HoodieRecord<T>> records, int parallelism) {
|
||||
if(condition) {
|
||||
return deduplicateRecords(records, parallelism);
|
||||
}
|
||||
return records;
|
||||
}
|
||||
|
||||
private JavaRDD<HoodieRecord<T>> partition(JavaRDD<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
|
||||
return dedupedRecords.mapToPair(
|
||||
new PairFunction<HoodieRecord<T>, Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>>() {
|
||||
@Override
|
||||
public Tuple2<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>> call(
|
||||
HoodieRecord<T> record) throws Exception {
|
||||
return new Tuple2<>(new Tuple2<>(record.getKey(),
|
||||
Option.apply(record.getCurrentLocation())), record);
|
||||
}
|
||||
}).partitionBy(partitioner).map(
|
||||
new Function<Tuple2<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>>, HoodieRecord<T>>() {
|
||||
@Override
|
||||
public HoodieRecord<T> call(
|
||||
Tuple2<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>> tuple)
|
||||
throws Exception {
|
||||
return tuple._2();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private Partitioner getPartitioner(HoodieTable table, boolean isUpsert, WorkloadProfile profile) {
|
||||
if (isUpsert) {
|
||||
return table.getUpsertPartitioner(profile);
|
||||
} else {
|
||||
return table.getInsertPartitioner(profile);
|
||||
}
|
||||
}
|
||||
|
||||
private JavaRDD<WriteStatus> updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD,
|
||||
HoodieTableMetadata metadata,
|
||||
String commitTime) {
|
||||
// Update the index back
|
||||
JavaRDD<WriteStatus> statuses = index.updateLocation(writeStatusRDD, metadata);
|
||||
// Trigger the insert and collect statuses
|
||||
statuses = statuses.persist(config.getWriteStatusStorageLevel());
|
||||
commitOnAutoCommit(commitTime, statuses);
|
||||
return statuses;
|
||||
}
|
||||
|
||||
private JavaRDD<WriteStatus> upsertRecordsInternal(JavaRDD<HoodieRecord<T>> preppedRecords,
|
||||
String commitTime,
|
||||
HoodieTableMetadata metadata,
|
||||
final boolean isUpsert) {
|
||||
|
||||
final HoodieTable table =
|
||||
HoodieTable.getHoodieTable(metadata.getTableType(), commitTime, config, metadata);
|
||||
|
||||
// Cache the tagged records, so we don't end up computing both
|
||||
preppedRecords.persist(StorageLevel.MEMORY_AND_DISK_SER());
|
||||
|
||||
WorkloadProfile profile = null;
|
||||
if (table.isWorkloadProfileNeeded()) {
|
||||
profile = new WorkloadProfile(preppedRecords);
|
||||
logger.info("Workload profile :" + profile);
|
||||
}
|
||||
|
||||
// partition using the insert partitioner
|
||||
final Partitioner partitioner = getPartitioner(table, isUpsert, profile);
|
||||
JavaRDD<HoodieRecord<T>> partitionedRecords = partition(preppedRecords, partitioner);
|
||||
JavaRDD<WriteStatus> writeStatusRDD = partitionedRecords.mapPartitionsWithIndex(
|
||||
new Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<List<WriteStatus>>>() {
|
||||
@Override
|
||||
public Iterator<List<WriteStatus>> call(Integer partition,
|
||||
Iterator<HoodieRecord<T>> recordItr) throws Exception {
|
||||
if (isUpsert) {
|
||||
return table.handleUpsertPartition(partition, recordItr, partitioner);
|
||||
} else {
|
||||
return table.handleInsertPartition(partition, recordItr, partitioner);
|
||||
}
|
||||
}
|
||||
}, true).flatMap(new FlatMapFunction<List<WriteStatus>, WriteStatus>() {
|
||||
@Override
|
||||
public Iterator<WriteStatus> call(List<WriteStatus> writeStatuses)
|
||||
throws Exception {
|
||||
return writeStatuses.iterator();
|
||||
}
|
||||
});
|
||||
|
||||
return updateIndexAndCommitIfNeeded(writeStatusRDD, metadata, commitTime);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Loads the given HoodieRecords, as inserts into the table. This is suitable for doing big bulk
|
||||
* loads into a Hoodie table for the very first time (e.g: converting an existing dataset to
|
||||
* Hoodie).
|
||||
* Loads the given HoodieRecords, as inserts into the table.
|
||||
* (This implementation uses sortBy and attempts to control the numbers of files with less memory)
|
||||
*
|
||||
* This implementation uses sortBy (which does range partitioning based on reservoir sampling) and
|
||||
* attempts to control the numbers of files with less memory compared to the {@link
|
||||
* HoodieWriteClient#insert(JavaRDD, String)}
|
||||
*
|
||||
* @param records HoodieRecords to insert
|
||||
* @param records HoodieRecords to insert
|
||||
* @param commitTime Commit Time handle
|
||||
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
|
||||
*
|
||||
*/
|
||||
public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
|
||||
final HoodieTableMetadata metadata =
|
||||
new HoodieTableMetadata(fs, config.getBasePath(), config.getTableName());
|
||||
public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
|
||||
writeContext = metrics.getCommitCtx();
|
||||
HoodieTableMetaClient metaClient =
|
||||
new HoodieTableMetaClient(fs, config.getBasePath(), true);
|
||||
|
||||
try {
|
||||
// De-dupe/merge if needed
|
||||
JavaRDD<HoodieRecord<T>> dedupedRecords =
|
||||
combineOnCondition(config.shouldCombineBeforeInsert(), records,
|
||||
config.getInsertShuffleParallelism());
|
||||
combineOnCondition(config.shouldCombineBeforeInsert(), records,
|
||||
config.getInsertShuffleParallelism());
|
||||
|
||||
// Now, sort the records and line them up nicely for loading.
|
||||
JavaRDD<HoodieRecord<T>> sortedRecords =
|
||||
dedupedRecords.sortBy(new Function<HoodieRecord<T>, String>() {
|
||||
@Override
|
||||
public String call(HoodieRecord<T> record) {
|
||||
// Let's use "partitionPath + key" as the sort key. Spark, will ensure
|
||||
// the records split evenly across RDD partitions, such that small partitions fit
|
||||
// into 1 RDD partition, while big ones spread evenly across multiple RDD partitions
|
||||
return String
|
||||
.format("%s+%s", record.getPartitionPath(), record.getRecordKey());
|
||||
}
|
||||
}, true, config.getInsertShuffleParallelism());
|
||||
dedupedRecords.sortBy(new Function<HoodieRecord<T>, String>() {
|
||||
@Override
|
||||
public String call(HoodieRecord<T> record) {
|
||||
// Let's use "partitionPath + key" as the sort key. Spark, will ensure
|
||||
// the records split evenly across RDD partitions, such that small partitions fit
|
||||
// into 1 RDD partition, while big ones spread evenly across multiple RDD partitions
|
||||
return String
|
||||
.format("%s+%s", record.getPartitionPath(), record.getRecordKey());
|
||||
}
|
||||
}, true, config.getInsertShuffleParallelism());
|
||||
JavaRDD<WriteStatus> writeStatusRDD = sortedRecords
|
||||
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, config, metadata),
|
||||
true).flatMap(new FlatMapFunction<List<WriteStatus>, WriteStatus>() {
|
||||
@Override
|
||||
public Iterator<WriteStatus> call(List<WriteStatus> writeStatuses)
|
||||
throws Exception {
|
||||
return writeStatuses.iterator();
|
||||
}
|
||||
});
|
||||
|
||||
return updateIndexAndCommitIfNeeded(writeStatusRDD, metadata, commitTime);
|
||||
.mapPartitionsWithIndex(new InsertMapFunction<T>(commitTime, config, metaClient),
|
||||
true).flatMap(new FlatMapFunction<List<WriteStatus>, WriteStatus>() {
|
||||
@Override
|
||||
public Iterable<WriteStatus> call(List<WriteStatus> writeStatuses)
|
||||
throws Exception {
|
||||
return writeStatuses;
|
||||
}
|
||||
});
|
||||
// Update the index back
|
||||
JavaRDD<WriteStatus> statuses = index.updateLocation(writeStatusRDD, metaClient);
|
||||
// Trigger the insert and collect statuses
|
||||
statuses = statuses.persist(config.getWriteStatusStorageLevel());
|
||||
commitOnAutoCommit(commitTime, statuses);
|
||||
return statuses;
|
||||
} catch (Throwable e) {
|
||||
if (e instanceof HoodieInsertException) {
|
||||
throw e;
|
||||
}
|
||||
throw new HoodieInsertException("Failed to bulk insert for commit time " + commitTime, e);
|
||||
throw new HoodieInsertException("Failed to insert for commit time " + commitTime, e);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -352,118 +300,110 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
*/
|
||||
public boolean commit(String commitTime, JavaRDD<WriteStatus> writeStatuses) {
|
||||
logger.info("Comitting " + commitTime);
|
||||
Path commitFile =
|
||||
new Path(config.getBasePath() + "/.hoodie/" + FSUtils.makeCommitFileName(commitTime));
|
||||
try {
|
||||
HoodieTableMetaClient metaClient =
|
||||
new HoodieTableMetaClient(fs, config.getBasePath(), true);
|
||||
HoodieTimeline commitTimeline = metaClient.getActiveCommitTimeline();
|
||||
|
||||
if (fs.exists(commitFile)) {
|
||||
throw new HoodieCommitException("Duplicate commit found. " + commitTime);
|
||||
}
|
||||
|
||||
List<Tuple2<String, HoodieWriteStat>> stats =
|
||||
writeStatuses.mapToPair(new PairFunction<WriteStatus, String, HoodieWriteStat>() {
|
||||
@Override
|
||||
public Tuple2<String, HoodieWriteStat> call(WriteStatus writeStatus)
|
||||
throws Exception {
|
||||
return new Tuple2<>(writeStatus.getPartitionPath(), writeStatus.getStat());
|
||||
}
|
||||
}).collect();
|
||||
|
||||
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
|
||||
for (Tuple2<String, HoodieWriteStat> stat : stats) {
|
||||
metadata.addWriteStat(stat._1(), stat._2());
|
||||
}
|
||||
|
||||
// open a new file and write the commit metadata in
|
||||
Path inflightCommitFile = new Path(config.getBasePath() + "/.hoodie/" + FSUtils
|
||||
.makeInflightCommitFileName(commitTime));
|
||||
FSDataOutputStream fsout = fs.create(inflightCommitFile, true);
|
||||
fsout.writeBytes(new String(metadata.toJsonString().getBytes(StandardCharsets.UTF_8),
|
||||
StandardCharsets.UTF_8));
|
||||
fsout.close();
|
||||
|
||||
boolean success = fs.rename(inflightCommitFile, commitFile);
|
||||
if (success) {
|
||||
// We cannot have unbounded commit files. Archive commits if we have to archive
|
||||
archiveLog.archiveIfRequired();
|
||||
// Call clean to cleanup if there is anything to cleanup after the commit,
|
||||
clean();
|
||||
if (writeContext != null) {
|
||||
long durationInMs = metrics.getDurationInMs(writeContext.stop());
|
||||
metrics.updateCommitMetrics(FORMATTER.parse(commitTime).getTime(), durationInMs,
|
||||
metadata);
|
||||
writeContext = null;
|
||||
List<Tuple2<String, HoodieWriteStat>> stats =
|
||||
writeStatuses.mapToPair(new PairFunction<WriteStatus, String, HoodieWriteStat>() {
|
||||
@Override
|
||||
public Tuple2<String, HoodieWriteStat> call(WriteStatus writeStatus)
|
||||
throws Exception {
|
||||
return new Tuple2<>(writeStatus.getPartitionPath(), writeStatus.getStat());
|
||||
}
|
||||
}).collect();
|
||||
|
||||
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
|
||||
for (Tuple2<String, HoodieWriteStat> stat : stats) {
|
||||
metadata.addWriteStat(stat._1(), stat._2());
|
||||
}
|
||||
|
||||
try {
|
||||
commitTimeline.saveInstantAsComplete(commitTime,
|
||||
Optional.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
||||
// Save was a success
|
||||
// We cannot have unbounded commit files. Archive commits if we have to archive
|
||||
archiveLog.archiveIfRequired();
|
||||
// Call clean to cleanup if there is anything to cleanup after the commit,
|
||||
clean();
|
||||
if (writeContext != null) {
|
||||
long durationInMs = metrics.getDurationInMs(writeContext.stop());
|
||||
metrics.updateCommitMetrics(FORMATTER.parse(commitTime).getTime(), durationInMs,
|
||||
metadata);
|
||||
writeContext = null;
|
||||
}
|
||||
logger.info("Status of the commit " + commitTime + ": " + success);
|
||||
return success;
|
||||
logger.info("Status of the commit " + commitTime);
|
||||
} catch (IOException e) {
|
||||
throw new HoodieCommitException(
|
||||
"Failed to commit " + config.getBasePath() + " at time " + commitTime, e);
|
||||
"Failed to commit " + config.getBasePath() + " at time " + commitTime, e);
|
||||
} catch (ParseException e) {
|
||||
throw new HoodieCommitException(
|
||||
"Commit time is not of valid format.Failed to commit " + config.getBasePath()
|
||||
+ " at time " + commitTime, e);
|
||||
"Commit time is not of valid format.Failed to commit " + config.getBasePath()
|
||||
+ " at time " + commitTime, e);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Rollback the (inflight/committed) record changes with the given commit time. Three steps: (0)
|
||||
* Obtain the commit or rollback file (1) clean indexing data, (2) clean new generated parquet
|
||||
* files. (3) Finally delete .commit or .inflight file,
|
||||
* Rollback the (inflight/committed) record changes with the given commit time.
|
||||
* Three steps:
|
||||
* (1) Atomically unpublish this commit
|
||||
* (2) clean indexing data,
|
||||
* (3) clean new generated parquet files.
|
||||
* (4) Finally delete .commit or .inflight file,
|
||||
*/
|
||||
public boolean rollback(final String commitTime) throws HoodieRollbackException {
|
||||
|
||||
final Timer.Context context = metrics.getRollbackCtx();
|
||||
final HoodieTableMetadata metadata =
|
||||
new HoodieTableMetadata(fs, config.getBasePath(), config.getTableName());
|
||||
final String metaPath = config.getBasePath() + "/" + HoodieTableMetadata.METAFOLDER_NAME;
|
||||
HoodieTableMetaClient metaClient =
|
||||
new HoodieTableMetaClient(fs, config.getBasePath(), true);
|
||||
HoodieTimeline commitTimeline = metaClient.getActiveCommitTimeline();
|
||||
|
||||
try {
|
||||
// 0. Obtain the commit/.inflight file, to work on
|
||||
FileStatus[] commitFiles =
|
||||
fs.globStatus(new Path(metaPath + "/" + commitTime + ".*"));
|
||||
if (commitFiles.length != 1) {
|
||||
throw new HoodieRollbackException("Expected exactly one .commit or .inflight file for commitTime: " + commitTime);
|
||||
if (commitTimeline.lastInstant().isPresent()
|
||||
&& commitTimeline.findInstantsAfter(commitTime, Integer.MAX_VALUE).count() > 0) {
|
||||
throw new HoodieRollbackException("Found commits after time :" + commitTime +
|
||||
", please rollback greater commits first");
|
||||
}
|
||||
|
||||
// we first need to unpublish the commit by making it .inflight again. (this will ensure no future queries see this data)
|
||||
Path filePath = commitFiles[0].getPath();
|
||||
if (filePath.getName().endsWith(HoodieTableMetadata.COMMIT_FILE_SUFFIX)) {
|
||||
if (metadata.findCommitsAfter(commitTime, Integer.MAX_VALUE).size() > 0) {
|
||||
throw new HoodieRollbackException("Found commits after time :" + commitTime +
|
||||
List<String> inflights =
|
||||
commitTimeline.getInflightInstants().collect(Collectors.toList());
|
||||
if (!inflights.isEmpty() && inflights.indexOf(commitTime) != inflights.size() - 1) {
|
||||
throw new HoodieRollbackException(
|
||||
"Found in-flight commits after time :" + commitTime +
|
||||
", please rollback greater commits first");
|
||||
}
|
||||
Path newInflightPath = new Path(metaPath + "/" + commitTime + HoodieTableMetadata.INFLIGHT_FILE_SUFFIX);
|
||||
if (!fs.rename(filePath, newInflightPath)) {
|
||||
throw new HoodieRollbackException("Unable to rename .commit file to .inflight for commitTime:" + commitTime);
|
||||
}
|
||||
filePath = newInflightPath;
|
||||
}
|
||||
|
||||
// 1. Revert the index changes
|
||||
logger.info("Clean out index changes at time: " + commitTime);
|
||||
if (!index.rollbackCommit(commitTime)) {
|
||||
throw new HoodieRollbackException("Clean out index changes failed, for time :" + commitTime);
|
||||
}
|
||||
if (inflights.contains(commitTime) || (commitTimeline.lastInstant().isPresent()
|
||||
&& commitTimeline.lastInstant().get().equals(commitTime))) {
|
||||
// 1. Atomically unpublish this commit
|
||||
if(commitTimeline.containsInstant(commitTime)) {
|
||||
commitTimeline.revertInstantToInflight(commitTime);
|
||||
}
|
||||
// 2. Revert the index changes
|
||||
logger.info("Clean out index changes at time: " + commitTime);
|
||||
if (!index.rollbackCommit(commitTime)) {
|
||||
throw new HoodieRollbackException(
|
||||
"Clean out index changes failed, for time :" + commitTime);
|
||||
}
|
||||
|
||||
// 2. Delete the new generated parquet files
|
||||
logger.info("Clean out all parquet files generated at time: " + commitTime);
|
||||
final Accumulator<Integer> numFilesDeletedAccu = jsc.accumulator(0);
|
||||
jsc.parallelize(FSUtils.getAllPartitionPaths(fs, metadata.getBasePath()))
|
||||
// 3. Delete the new generated parquet files
|
||||
logger.info("Clean out all parquet files generated at time: " + commitTime);
|
||||
final Accumulator<Integer> numFilesDeletedAccu = jsc.accumulator(0);
|
||||
jsc.parallelize(FSUtils.getAllPartitionPaths(fs, metaClient.getBasePath()))
|
||||
.foreach(new VoidFunction<String>() {
|
||||
@Override
|
||||
public void call(String partitionPath) throws Exception {
|
||||
// Scan all partitions files with this commit time
|
||||
FileSystem fs = FSUtils.getFs();
|
||||
FileStatus[] toBeDeleted =
|
||||
fs.listStatus(new Path(config.getBasePath(), partitionPath),
|
||||
new PathFilter() {
|
||||
@Override
|
||||
public boolean accept(Path path) {
|
||||
return commitTime
|
||||
.equals(FSUtils.getCommitTime(path.getName()));
|
||||
}
|
||||
});
|
||||
fs.listStatus(new Path(config.getBasePath(), partitionPath),
|
||||
new PathFilter() {
|
||||
@Override
|
||||
public boolean accept(Path path) {
|
||||
return commitTime
|
||||
.equals(FSUtils.getCommitTime(path.getName()));
|
||||
}
|
||||
});
|
||||
for (FileStatus file : toBeDeleted) {
|
||||
boolean success = fs.delete(file.getPath(), false);
|
||||
logger.info("Delete file " + file.getPath() + "\t" + success);
|
||||
@@ -473,24 +413,20 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
}
|
||||
}
|
||||
});
|
||||
// 4. Remove commit
|
||||
logger.info("Clean out metadata files at time: " + commitTime);
|
||||
commitTimeline.removeInflightFromTimeline(commitTime);
|
||||
|
||||
// 3. Clean out metadata (.commit or .tmp)
|
||||
logger.info("Clean out metadata files at time: " + commitTime);
|
||||
if (!fs.delete(filePath, false)) {
|
||||
logger.error("Deleting file " + filePath + " failed.");
|
||||
throw new HoodieRollbackException("Delete file " + filePath + " failed.");
|
||||
if (context != null) {
|
||||
long durationInMs = metrics.getDurationInMs(context.stop());
|
||||
int numFilesDeleted = numFilesDeletedAccu.value();
|
||||
metrics.updateRollbackMetrics(durationInMs, numFilesDeleted);
|
||||
}
|
||||
}
|
||||
|
||||
if (context != null) {
|
||||
long durationInMs = metrics.getDurationInMs(context.stop());
|
||||
int numFilesDeleted = numFilesDeletedAccu.value();
|
||||
metrics.updateRollbackMetrics(durationInMs, numFilesDeleted);
|
||||
}
|
||||
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
throw new HoodieRollbackException("Failed to rollback " +
|
||||
config.getBasePath() + " at commit time" + commitTime, e);
|
||||
config.getBasePath() + " at commit time" + commitTime, e);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -504,35 +440,38 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
/**
|
||||
* Clean up any stale/old files/data lying around (either on file storage or index storage)
|
||||
*/
|
||||
private void clean() throws HoodieIOException {
|
||||
private void clean() throws HoodieIOException {
|
||||
try {
|
||||
logger.info("Cleaner started");
|
||||
final Timer.Context context = metrics.getCleanCtx();
|
||||
final HoodieTableMetadata metadata = new HoodieTableMetadata(fs, config.getBasePath(), config.getTableName());
|
||||
List<String> partitionsToClean = FSUtils.getAllPartitionPaths(fs, metadata.getBasePath());
|
||||
HoodieTableMetaClient metaClient =
|
||||
new HoodieTableMetaClient(fs, config.getBasePath(), true);
|
||||
HoodieTimeline commitTimeline = metaClient.getActiveCommitTimeline();
|
||||
|
||||
List<String> partitionsToClean = FSUtils.getAllPartitionPaths(fs, metaClient.getBasePath());
|
||||
// shuffle to distribute cleaning work across partitions evenly
|
||||
Collections.shuffle(partitionsToClean);
|
||||
logger.info("Partitions to clean up : " + partitionsToClean + ", with policy " + config.getCleanerPolicy());
|
||||
if (partitionsToClean.isEmpty()) {
|
||||
if(partitionsToClean.isEmpty()) {
|
||||
logger.info("Nothing to clean here mom. It is already clean");
|
||||
return;
|
||||
}
|
||||
|
||||
int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
|
||||
int numFilesDeleted = jsc.parallelize(partitionsToClean, cleanerParallelism)
|
||||
.map(new Function<String, Integer>() {
|
||||
@Override
|
||||
public Integer call(String partitionPathToClean) throws Exception {
|
||||
FileSystem fs = FSUtils.getFs();
|
||||
HoodieCleaner cleaner = new HoodieCleaner(metadata, config, fs);
|
||||
return cleaner.clean(partitionPathToClean);
|
||||
}
|
||||
}).reduce(new Function2<Integer, Integer, Integer>() {
|
||||
@Override
|
||||
public Integer call(Integer v1, Integer v2) throws Exception {
|
||||
return v1 + v2;
|
||||
}
|
||||
});
|
||||
.map(new Function<String, Integer>() {
|
||||
@Override
|
||||
public Integer call(String partitionPathToClean) throws Exception {
|
||||
FileSystem fs = FSUtils.getFs();
|
||||
HoodieCleaner cleaner = new HoodieCleaner(metaClient, config, fs);
|
||||
return cleaner.clean(partitionPathToClean);
|
||||
}
|
||||
}).reduce(new Function2<Integer, Integer, Integer>() {
|
||||
@Override
|
||||
public Integer call(Integer v1, Integer v2) throws Exception {
|
||||
return v1 + v2;
|
||||
}
|
||||
});
|
||||
logger.info("Cleaned " + numFilesDeleted + " files");
|
||||
// Emit metrics (duration, numFilesDeleted) if needed
|
||||
if (context != null) {
|
||||
@@ -556,21 +495,10 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
|
||||
public void startCommitWithTime(String commitTime) {
|
||||
logger.info("Generate a new commit time " + commitTime);
|
||||
// Create the in-flight commit file
|
||||
Path inflightCommitFilePath = new Path(
|
||||
config.getBasePath() + "/.hoodie/" + FSUtils.makeInflightCommitFileName(commitTime));
|
||||
try {
|
||||
if (fs.createNewFile(inflightCommitFilePath)) {
|
||||
logger.info("Create an inflight commit file " + inflightCommitFilePath);
|
||||
return;
|
||||
}
|
||||
throw new HoodieCommitException(
|
||||
"Failed to create the inflight commit file " + inflightCommitFilePath);
|
||||
} catch (IOException e) {
|
||||
// handled below
|
||||
throw new HoodieCommitException(
|
||||
"Failed to create the inflight commit file " + inflightCommitFilePath, e);
|
||||
}
|
||||
HoodieTableMetaClient metaClient =
|
||||
new HoodieTableMetaClient(fs, config.getBasePath(), true);
|
||||
HoodieTimeline commitTimeline = metaClient.getActiveCommitTimeline();
|
||||
commitTimeline.saveInstantAsInflight(commitTime);
|
||||
}
|
||||
|
||||
public static SparkConf registerClasses(SparkConf conf) {
|
||||
@@ -606,10 +534,16 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
|
||||
/**
|
||||
* Cleanup all inflight commits
|
||||
* @throws IOException
|
||||
*/
|
||||
private void rollbackInflightCommits() {
|
||||
final HoodieTableMetadata metadata = new HoodieTableMetadata(fs, config.getBasePath(), config.getTableName());
|
||||
for (String commit : metadata.getAllInflightCommits()) {
|
||||
HoodieTableMetaClient metaClient =
|
||||
new HoodieTableMetaClient(fs, config.getBasePath(), true);
|
||||
HoodieTimeline commitTimeline = metaClient.getActiveCommitTimeline();
|
||||
|
||||
List<String> commits = commitTimeline.getInflightInstants().collect(Collectors.toList());
|
||||
Collections.reverse(commits);
|
||||
for (String commit : commits) {
|
||||
rollback(commit);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user