1
0

Fixes needed to run merge-on-read testing on production scale data

This commit is contained in:
Prasanna Rajaperumal
2017-03-31 01:02:02 -07:00
committed by prazanna
parent 57ab7a2405
commit aee136777b
26 changed files with 659 additions and 199 deletions

View File

@@ -26,6 +26,7 @@ import com.uber.hoodie.avro.model.HoodieSavepointMetadata;
import com.uber.hoodie.common.HoodieCleanStat;
import com.uber.hoodie.common.HoodieRollbackStat;
import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieCompactionMetadata;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
@@ -39,6 +40,7 @@ import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.AvroUtils;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieCommitException;
import com.uber.hoodie.exception.HoodieIOException;
@@ -57,7 +59,6 @@ import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@@ -365,6 +366,18 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
new HoodieInstant(true, actionType, commitTime),
Optional.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
// Save was a success
// Do a inline compaction if enabled
if (config.isInlineCompaction()) {
Optional<HoodieCompactionMetadata> compactionMetadata = table.compact(jsc);
if (compactionMetadata.isPresent()) {
logger.info("Compacted successfully on commit " + commitTime);
metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true");
} else {
logger.info("Compaction did not run for commit " + commitTime);
metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false");
}
}
// We cannot have unbounded commit files. Archive commits if we have to archive
archiveLog.archiveIfRequired();
if(config.isAutoClean()) {
@@ -785,11 +798,12 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
public void startCommitWithTime(String commitTime) {
logger.info("Generate a new commit time " + commitTime);
HoodieTableMetaClient metaClient =
new HoodieTableMetaClient(fs, config.getBasePath(), true);
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
HoodieTable<T> table = HoodieTable
.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
String commitActionType = table.getCommitActionType();
activeTimeline.createInflight(
new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime));
new HoodieInstant(true, commitActionType, commitTime));
}
public static SparkConf registerClasses(SparkConf conf) {
@@ -829,11 +843,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
* @throws IOException
*/
private void rollbackInflightCommits() {
HoodieTableMetaClient metaClient =
new HoodieTableMetaClient(fs, config.getBasePath(), true);
HoodieTimeline inflightTimeline =
metaClient.getActiveTimeline().getCommitTimeline().filterInflights();
HoodieTable<T> table = HoodieTable
.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config);
HoodieTimeline inflightTimeline = table.getCommitTimeline().filterInflights();
List<String> commits = inflightTimeline.getInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
Collections.reverse(commits);

View File

@@ -37,6 +37,14 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
public static final String AUTO_CLEAN_PROP = "hoodie.clean.automatic";
private static final String DEFAULT_AUTO_CLEAN = "true";
// Turn on inline compaction - after fw delta commits a inline compaction will be run
public static final String INLINE_COMPACT_PROP = "hoodie.compact.inline";
private static final String DEFAULT_INLINE_COMPACT = "true";
// Run a compaction every N delta commits
public static final String INLINE_COMPACT_NUM_DELTA_COMMITS_PROP = "hoodie.compact.inline.max.delta.commits";
private static final String DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS = "4";
public static final String CLEANER_FILE_VERSIONS_RETAINED_PROP =
"hoodie.cleaner.fileversions.retained";
private static final String DEFAULT_CLEANER_FILE_VERSIONS_RETAINED = "3";
@@ -102,6 +110,16 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
return this;
}
public Builder withInlineCompaction(Boolean inlineCompaction) {
props.setProperty(INLINE_COMPACT_PROP, String.valueOf(inlineCompaction));
return this;
}
public Builder inlineCompactionEvery(int deltaCommits) {
props.setProperty(INLINE_COMPACT_PROP, String.valueOf(deltaCommits));
return this;
}
public Builder withCleanerPolicy(HoodieCleaningPolicy policy) {
props.setProperty(CLEANER_POLICY_PROP, policy.name());
return this;
@@ -153,6 +171,10 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
HoodieCompactionConfig config = new HoodieCompactionConfig(props);
setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP),
AUTO_CLEAN_PROP, DEFAULT_AUTO_CLEAN);
setDefaultOnCondition(props, !props.containsKey(INLINE_COMPACT_PROP),
INLINE_COMPACT_PROP, DEFAULT_INLINE_COMPACT);
setDefaultOnCondition(props, !props.containsKey(INLINE_COMPACT_NUM_DELTA_COMMITS_PROP),
INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS);
setDefaultOnCondition(props, !props.containsKey(CLEANER_POLICY_PROP),
CLEANER_POLICY_PROP, DEFAULT_CLEANER_POLICY);
setDefaultOnCondition(props, !props.containsKey(CLEANER_FILE_VERSIONS_RETAINED_PROP),

View File

@@ -19,6 +19,7 @@ package com.uber.hoodie.config;
import com.google.common.base.Preconditions;
import com.uber.hoodie.common.model.HoodieCleaningPolicy;
import com.uber.hoodie.config.HoodieCompactionConfig.Builder;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.io.HoodieCleaner;
import com.uber.hoodie.metrics.MetricsReporterType;
@@ -148,6 +149,15 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.AUTO_CLEAN_PROP));
}
public boolean isInlineCompaction() {
return Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.INLINE_COMPACT_PROP));
}
public int getInlineCompactDeltaCommitMax() {
return Integer.parseInt(
props.getProperty(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP));
}
/**
* index properties
**/

View File

@@ -0,0 +1,28 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.exception;
public class HoodieCompactionException extends HoodieException {
public HoodieCompactionException(String msg) {
super(msg);
}
public HoodieCompactionException(String msg, Throwable e) {
super(msg, e);
}
}

View File

@@ -101,7 +101,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
throw new HoodieUpsertException(
"Failed to initialize HoodieUpdateHandle for FileId: " + fileId
+ " on commit " + commitTime + " on HDFS path " + hoodieTable
.getMetaClient().getBasePath());
.getMetaClient().getBasePath() + partitionPath, e);
}
writeStatus.getStat().setFullPath(currentLogFile.getPath().toString());
}

View File

@@ -0,0 +1,99 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.io;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.uber.hoodie.common.model.HoodieAvroPayload;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.util.AvroUtils;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Spliterator;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
/**
* This reads a bunch of HoodieRecords from avro log files and deduplicates and mantains the merged
* state in memory. This is useful for compaction and record reader
*/
public class HoodieAvroReader implements Iterable<HoodieRecord<HoodieAvroPayload>> {
private final Collection<HoodieRecord<HoodieAvroPayload>> records;
private AtomicLong totalLogFiles = new AtomicLong(0);
private AtomicLong totalLogRecords = new AtomicLong(0);
private long totalRecordsToUpdate;
public HoodieAvroReader(FileSystem fs, List<String> logFilePaths, Schema readerSchema) {
Map<String, HoodieRecord<HoodieAvroPayload>> records = Maps.newHashMap();
for (String path : logFilePaths) {
totalLogFiles.incrementAndGet();
List<HoodieRecord<HoodieAvroPayload>> recordsFromFile = AvroUtils
.loadFromFile(fs, path, readerSchema);
totalLogRecords.addAndGet(recordsFromFile.size());
for (HoodieRecord<HoodieAvroPayload> recordFromFile : recordsFromFile) {
String key = recordFromFile.getRecordKey();
if (records.containsKey(key)) {
// Merge and store the merged record
HoodieAvroPayload combinedValue = records.get(key).getData()
.preCombine(recordFromFile.getData());
records.put(key, new HoodieRecord<>(new HoodieKey(key, recordFromFile.getPartitionPath()),
combinedValue));
} else {
// Put the record as is
records.put(key, recordFromFile);
}
}
}
this.records = records.values();
this.totalRecordsToUpdate = records.size();
}
@Override
public Iterator<HoodieRecord<HoodieAvroPayload>> iterator() {
return records.iterator();
}
@Override
public void forEach(Consumer<? super HoodieRecord<HoodieAvroPayload>> consumer) {
records.forEach(consumer);
}
@Override
public Spliterator<HoodieRecord<HoodieAvroPayload>> spliterator() {
return records.spliterator();
}
public long getTotalLogFiles() {
return totalLogFiles.get();
}
public long getTotalLogRecords() {
return totalLogRecords.get();
}
public long getTotalRecordsToUpdate() {
return totalRecordsToUpdate;
}
}

View File

@@ -22,8 +22,10 @@ import com.uber.hoodie.common.HoodieCleanStat;
import com.uber.hoodie.common.model.HoodieCleaningPolicy;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.table.log.HoodieLogFile;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
@@ -107,8 +109,18 @@ public class HoodieCleaner<T extends HoodieRecordPayload<T>> {
}
// Delete the remaining files
while (commitItr.hasNext()) {
HoodieDataFile nextRecord = commitItr.next();
deletePaths.add(String.format("%s/%s/%s", config.getBasePath(), partitionPath,
commitItr.next().getFileName()));
nextRecord.getFileName()));
if (hoodieTable.getMetaClient().getTableType()
== HoodieTableType.MERGE_ON_READ) {
// If merge on read, then clean the log files for the commits as well
deletePaths.add(String
.format("%s/%s/%s", config.getBasePath(), partitionPath,
FSUtils.maskWithoutLogVersion(nextRecord.getCommitTime(),
nextRecord.getFileId(),
HoodieLogFile.DELTA_EXTENSION)));
}
}
}
return deletePaths;
@@ -182,6 +194,14 @@ public class HoodieCleaner<T extends HoodieRecordPayload<T>> {
deletePaths.add(String
.format("%s/%s/%s", config.getBasePath(), partitionPath, FSUtils
.maskWithoutTaskPartitionId(fileCommitTime, afile.getFileId())));
if (hoodieTable.getMetaClient().getTableType()
== HoodieTableType.MERGE_ON_READ) {
// If merge on read, then clean the log files for the commits as well
deletePaths.add(String
.format("%s/%s/%s", config.getBasePath(), partitionPath,
FSUtils.maskWithoutLogVersion(fileCommitTime, afile.getFileId(),
HoodieLogFile.DELTA_EXTENSION)));
}
}
}
}

View File

@@ -1,26 +0,0 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.io.compact;
import com.uber.hoodie.common.model.HoodieCommitMetadata;
/**
* Place holder for the compaction specific meta-data, uses all the details used in a normal HoodieCommitMetadata
*/
public class HoodieCompactionMetadata extends HoodieCommitMetadata {
}

View File

@@ -16,6 +16,7 @@
package com.uber.hoodie.io.compact;
import com.uber.hoodie.common.model.HoodieCompactionMetadata;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;

View File

@@ -16,9 +16,14 @@
package com.uber.hoodie.io.compact;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.CompactionWriteStat;
import com.uber.hoodie.common.model.HoodieAvroPayload;
import com.uber.hoodie.common.model.HoodieCompactionMetadata;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
@@ -28,18 +33,19 @@ import com.uber.hoodie.common.util.AvroUtils;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieCommitException;
import com.uber.hoodie.exception.HoodieCompactionException;
import com.uber.hoodie.io.HoodieAvroReader;
import com.uber.hoodie.table.HoodieCopyOnWriteTable;
import com.uber.hoodie.table.HoodieMergeOnReadTable;
import com.uber.hoodie.table.HoodieTable;
import java.util.Collection;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.avro.Schema;
import org.apache.commons.collections.IteratorUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
@@ -48,7 +54,6 @@ import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.*;
@@ -60,118 +65,129 @@ import static java.util.stream.Collectors.*;
* @see HoodieCompactor
*/
public class HoodieRealtimeTableCompactor implements HoodieCompactor {
private static Logger log = LogManager.getLogger(HoodieRealtimeTableCompactor.class);
@Override
public HoodieCompactionMetadata compact(JavaSparkContext jsc, HoodieWriteConfig config,
HoodieTable hoodieTable,
CompactionFilter compactionFilter) throws Exception {
// TODO - rollback any compactions in flight
private static Logger log = LogManager.getLogger(HoodieRealtimeTableCompactor.class);
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
String compactionCommit = startCompactionCommit(hoodieTable);
log.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommit);
List<String> partitionPaths =
FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath());
@Override
public HoodieCompactionMetadata compact(JavaSparkContext jsc, HoodieWriteConfig config,
HoodieTable hoodieTable, CompactionFilter compactionFilter) throws IOException {
Preconditions.checkArgument(
hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ,
"HoodieRealtimeTableCompactor can only compact table of type "
+ HoodieTableType.MERGE_ON_READ + " and not " + hoodieTable.getMetaClient()
.getTableType().name());
log.info("Compaction looking for files to compact in " + partitionPaths + " partitions");
List<CompactionOperation> operations =
jsc.parallelize(partitionPaths, partitionPaths.size())
.flatMap((FlatMapFunction<String, CompactionOperation>) partitionPath -> hoodieTable.getFileSystemView()
.groupLatestDataFileWithLogFiles(partitionPath).entrySet()
.stream()
.map(s -> new CompactionOperation(s.getKey(), partitionPath, s.getValue()))
.collect(toList()).iterator()).collect();
log.info("Total of " + operations.size() + " compactions are retrieved");
// TODO - rollback any compactions in flight
// Filter the compactions with the passed in filter. This lets us choose most effective compactions only
operations = compactionFilter.filter(operations);
if(operations.isEmpty()) {
log.warn("After filtering, Nothing to compact for " + metaClient.getBasePath());
return null;
}
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
String compactionCommit = startCompactionCommit(hoodieTable);
log.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommit);
List<String> partitionPaths =
FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath());
log.info("After filtering, Compacting " + operations + " files");
List<Tuple2<String, HoodieWriteStat>> updateStatusMap =
jsc.parallelize(operations, operations.size()).map(
(Function<CompactionOperation, Iterator<List<WriteStatus>>>) compactionOperation -> executeCompaction(
metaClient, config, compactionOperation, compactionCommit)).flatMap(
(FlatMapFunction<Iterator<List<WriteStatus>>, WriteStatus>) listIterator -> {
List<List<WriteStatus>> collected = IteratorUtils.toList(listIterator);
return collected.stream().flatMap(List::stream).collect(toList()).iterator();
}).mapToPair(new PairFunction<WriteStatus, String, HoodieWriteStat>() {
@Override
public Tuple2<String, HoodieWriteStat> call(WriteStatus writeStatus)
throws Exception {
return new Tuple2<>(writeStatus.getPartitionPath(), writeStatus.getStat());
}
log.info("Compaction looking for files to compact in " + partitionPaths + " partitions");
List<CompactionOperation> operations =
jsc.parallelize(partitionPaths, partitionPaths.size())
.flatMap((FlatMapFunction<String, CompactionOperation>) partitionPath -> hoodieTable
.getFileSystemView()
.groupLatestDataFileWithLogFiles(partitionPath).entrySet()
.stream()
.map(s -> new CompactionOperation(s.getKey(), partitionPath, s.getValue()))
.collect(toList()).iterator()).collect();
log.info("Total of " + operations.size() + " compactions are retrieved");
// Filter the compactions with the passed in filter. This lets us choose most effective compactions only
operations = compactionFilter.filter(operations);
if (operations.isEmpty()) {
log.warn("After filtering, Nothing to compact for " + metaClient.getBasePath());
return null;
}
log.info("After filtering, Compacting " + operations + " files");
List<CompactionWriteStat> updateStatusMap =
jsc.parallelize(operations, operations.size())
.map(s -> executeCompaction(metaClient, config, s, compactionCommit))
.flatMap(new FlatMapFunction<List<CompactionWriteStat>, CompactionWriteStat>() {
@Override
public Iterator<CompactionWriteStat> call(
List<CompactionWriteStat> compactionWriteStats)
throws Exception {
return compactionWriteStats.iterator();
}
}).collect();
HoodieCompactionMetadata metadata = new HoodieCompactionMetadata();
for (Tuple2<String, HoodieWriteStat> stat : updateStatusMap) {
metadata.addWriteStat(stat._1(), stat._2());
}
log.info("Compaction finished with result " + metadata);
HoodieCompactionMetadata metadata = new HoodieCompactionMetadata();
for (CompactionWriteStat stat : updateStatusMap) {
metadata.addWriteStat(stat.getPartitionPath(), stat);
}
log.info("Compaction finished with result " + metadata);
//noinspection ConstantConditions
if (isCompactionSucceeded(metadata)) {
log.info("Compaction succeeded " + compactionCommit);
commitCompaction(compactionCommit, metaClient, metadata);
} else {
log.info("Compaction failed " + compactionCommit);
}
return metadata;
//noinspection ConstantConditions
if (isCompactionSucceeded(metadata)) {
log.info("Compaction succeeded " + compactionCommit);
commitCompaction(compactionCommit, metaClient, metadata);
} else {
log.info("Compaction failed " + compactionCommit);
}
return metadata;
}
private boolean isCompactionSucceeded(HoodieCompactionMetadata result) {
//TODO figure out a success factor for a compaction
return true;
}
private List<CompactionWriteStat> executeCompaction(HoodieTableMetaClient metaClient,
HoodieWriteConfig config, CompactionOperation operation, String commitTime)
throws IOException {
FileSystem fs = FSUtils.getFs();
Schema readerSchema =
HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
log.info("Compacting base " + operation.getDataFilePath() + " with delta files " + operation
.getDeltaFilePaths() + " for commit " + commitTime);
// TODO - FIX THIS
// Reads the entire avro file. Always only specific blocks should be read from the avro file (failure recover).
// Load all the delta commits since the last compaction commit and get all the blocks to be loaded and load it using CompositeAvroLogReader
// Since a DeltaCommit is not defined yet, reading all the records. revisit this soon.
HoodieAvroReader avroReader = new HoodieAvroReader(fs, operation.getDeltaFilePaths(),
readerSchema);
if (!avroReader.iterator().hasNext()) {
return Lists.newArrayList();
}
private boolean isCompactionSucceeded(HoodieCompactionMetadata result) {
//TODO figure out a success factor for a compaction
return true;
}
private Iterator<List<WriteStatus>> executeCompaction(HoodieTableMetaClient metaClient,
HoodieWriteConfig config, CompactionOperation operation, String commitTime)
throws IOException {
FileSystem fs = FSUtils.getFs();
Schema schema =
HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
log.info("Compacting base " + operation.getDataFilePath() + " with delta files " + operation
.getDeltaFilePaths() + " for commit " + commitTime);
// TODO - FIX THIS
// 1. Reads the entire avro file. Always only specific blocks should be read from the avro file (failure recover).
// Load all the delta commits since the last compaction commit and get all the blocks to be loaded and load it using CompositeAvroLogReader
// Since a DeltaCommit is not defined yet, reading all the records. revisit this soon.
// 2. naively loads all the delta records in memory to merge it,
// since we only need a iterator, we could implement a lazy iterator to load from one delta file at a time
List<HoodieRecord<HoodieAvroPayload>> readDeltaFilesInMemory =
AvroUtils.loadFromFiles(fs, operation.getDeltaFilePaths(), schema);
if(readDeltaFilesInMemory.isEmpty()) {
return IteratorUtils.emptyIterator();
}
// Compacting is very similar to applying updates to existing file
HoodieCopyOnWriteTable<HoodieAvroPayload> table =
new HoodieCopyOnWriteTable<>(config, metaClient);
return table
.handleUpdate(commitTime, operation.getFileId(), readDeltaFilesInMemory.iterator());
}
public boolean commitCompaction(String commitTime, HoodieTableMetaClient metaClient,
HoodieCompactionMetadata metadata) {
log.info("Comitting " + commitTime);
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
try {
activeTimeline.saveAsComplete(
new HoodieInstant(true, HoodieTimeline.COMPACTION_ACTION, commitTime),
Optional.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
} catch (IOException e) {
throw new HoodieCommitException(
"Failed to commit " + metaClient.getBasePath() + " at time " + commitTime, e);
}
return true;
// Compacting is very similar to applying updates to existing file
HoodieCopyOnWriteTable<HoodieAvroPayload> table =
new HoodieCopyOnWriteTable<>(config, metaClient);
Iterator<List<WriteStatus>> result = table
.handleUpdate(commitTime, operation.getFileId(), avroReader.iterator());
Iterable<List<WriteStatus>> resultIterable = () -> result;
return StreamSupport.stream(resultIterable.spliterator(), false)
.flatMap(Collection::stream)
.map(WriteStatus::getStat)
.map(s -> CompactionWriteStat.newBuilder().withHoodieWriteStat(s)
.setTotalRecordsToUpdate(avroReader.getTotalRecordsToUpdate())
.setTotalLogFiles(avroReader.getTotalLogFiles())
.setTotalLogRecords(avroReader.getTotalLogRecords())
.onPartition(operation.getPartitionPath()).build())
.collect(toList());
}
public boolean commitCompaction(String commitTime, HoodieTableMetaClient metaClient,
HoodieCompactionMetadata metadata) {
log.info("Committing Compaction " + commitTime);
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
try {
activeTimeline.saveAsComplete(
new HoodieInstant(true, HoodieTimeline.COMPACTION_ACTION, commitTime),
Optional.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
} catch (IOException e) {
throw new HoodieCompactionException(
"Failed to commit " + metaClient.getBasePath() + " at time " + commitTime, e);
}
return true;
}
}

View File

@@ -16,6 +16,7 @@
package com.uber.hoodie.table;
import com.uber.hoodie.common.model.HoodieCompactionMetadata;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
@@ -32,6 +33,7 @@ import com.uber.hoodie.exception.HoodieUpsertException;
import com.uber.hoodie.func.LazyInsertIterable;
import com.uber.hoodie.io.HoodieUpdateHandle;
import java.util.Optional;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
@@ -55,6 +57,7 @@ import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Option;
import scala.Tuple2;
@@ -470,4 +473,10 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
Partitioner partitioner) {
return handleUpsertPartition(commitTime, partition, recordItr, partitioner);
}
@Override
public Optional<HoodieCompactionMetadata> compact(JavaSparkContext jsc) {
logger.info("Nothing to compact in COW storage format");
return Optional.empty();
}
}

View File

@@ -17,11 +17,17 @@
package com.uber.hoodie.table;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieCompactionMetadata;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieCompactionException;
import com.uber.hoodie.io.HoodieAppendHandle;
import com.uber.hoodie.io.compact.CompactionFilter;
import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor;
import java.util.Optional;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -29,6 +35,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.api.java.JavaSparkContext;
/**
* Implementation of a more real-time read-optimized Hoodie Table where
@@ -59,4 +66,33 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus()))
.iterator();
}
@Override
public Optional<HoodieCompactionMetadata> compact(JavaSparkContext jsc) {
logger.info("Checking if compaction needs to be run on " + config.getBasePath());
Optional<HoodieInstant> lastCompaction = getActiveTimeline().getCompactionTimeline()
.filterCompletedInstants().lastInstant();
String deltaCommitsSinceTs = "0";
if (lastCompaction.isPresent()) {
deltaCommitsSinceTs = lastCompaction.get().getTimestamp();
}
int deltaCommitsSinceLastCompaction = getActiveTimeline().getDeltaCommitTimeline()
.findInstantsAfter(deltaCommitsSinceTs, Integer.MAX_VALUE).countInstants();
if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) {
logger.info("Not running compaction as only " + deltaCommitsSinceLastCompaction
+ " delta commits was found since last compaction " + deltaCommitsSinceTs
+ ". Waiting for " + config.getInlineCompactDeltaCommitMax());
return Optional.empty();
}
logger.info("Compacting merge on read table " + config.getBasePath());
HoodieRealtimeTableCompactor compactor = new HoodieRealtimeTableCompactor();
try {
return Optional.of(compactor.compact(jsc, config, this, CompactionFilter.allowAll()));
} catch (IOException e) {
throw new HoodieCompactionException("Could not compact " + config.getBasePath(), e);
}
}
}

View File

@@ -18,7 +18,7 @@ package com.uber.hoodie.table;
import com.google.common.collect.Sets;
import com.uber.hoodie.avro.model.HoodieSavepointMetadata;
import com.uber.hoodie.avro.model.HoodieSavepointPartitionMetadata;
import com.uber.hoodie.common.model.HoodieCompactionMetadata;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.TableFileSystemView;
@@ -34,6 +34,7 @@ import com.uber.hoodie.exception.HoodieCommitException;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieSavepointException;
import java.util.Optional;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.Partitioner;
@@ -41,9 +42,9 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.spark.api.java.JavaSparkContext;
/**
* Abstract implementation of a HoodieTable
@@ -195,8 +196,9 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
return getActiveTimeline().getCommitTimeline();
case MERGE_ON_READ:
// We need to include the parquet files written out in delta commits
// Include commit action to be able to start doing a MOR over a COW dataset - no migration required
return getActiveTimeline().getTimelineOfActions(
Sets.newHashSet(HoodieActiveTimeline.COMPACTION_ACTION,
Sets.newHashSet(HoodieActiveTimeline.COMMIT_ACTION, HoodieActiveTimeline.COMPACTION_ACTION,
HoodieActiveTimeline.DELTA_COMMIT_ACTION));
default:
throw new HoodieException("Unsupported table type :"+ metaClient.getTableType());
@@ -293,4 +295,10 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
throw new HoodieException("Unsupported table type :" + metaClient.getTableType());
}
}
/**
* Run Compaction on the table.
* Compaction arranges the data so that it is optimized for data access
*/
public abstract Optional<HoodieCompactionMetadata> compact(JavaSparkContext jsc);
}

View File

@@ -189,7 +189,8 @@ public class TestMergeOnReadTable {
return HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024)
.withInlineCompaction(false).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
.forTable("test-trip-table").withIndexConfig(

View File

@@ -20,6 +20,7 @@ import com.uber.hoodie.HoodieReadClient;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.model.HoodieCompactionMetadata;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTableType;
@@ -36,7 +37,6 @@ import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.index.HoodieBloomIndex;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.io.compact.CompactionFilter;
import com.uber.hoodie.io.compact.HoodieCompactionMetadata;
import com.uber.hoodie.io.compact.HoodieCompactor;
import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor;
import com.uber.hoodie.table.HoodieTable;
@@ -100,7 +100,8 @@ public class TestHoodieCompactor {
return HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024)
.withInlineCompaction(false).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
.forTable("test-trip-table").withIndexConfig(
HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build());
@@ -196,7 +197,7 @@ public class TestHoodieCompactor {
"After compaction there should be no log files visiable on a Realtime view",
logFiles.isEmpty());
}
assertTrue(result.getPartitionToWriteStats().containsKey(partitionPath));
assertTrue(result.getPartitionToCompactionWriteStats().containsKey(partitionPath));
}
}

View File

@@ -15,6 +15,7 @@
#
log4j.rootLogger=WARN, A1
log4j.category.com.uber=INFO
log4j.category.com.uber.hoodie.common.utils=WARN
log4j.category.org.apache.parquet.hadoop=WARN
# A1 is set to be a ConsoleAppender.

View File

@@ -118,15 +118,15 @@
<artifactId>kryo</artifactId>
<scope>test</scope>
</dependency>
<!--<dependency>-->
<!--<groupId>org.apache.avro</groupId>-->
<!--<artifactId>avro-mapred</artifactId>-->
<!--<exclusions>-->
<!--<exclusion>-->
<!--<groupId>org.mortbay.jetty</groupId>-->
<!--<artifactId>*</artifactId>-->
<!--</exclusion>-->
<!--</exclusions>-->
<!--</dependency>-->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,108 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.uber.hoodie.common.util.FSUtils;
import java.io.Serializable;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
@JsonIgnoreProperties(ignoreUnknown = true)
public class CompactionWriteStat implements Serializable {
private final HoodieWriteStat writeStat;
private String partitionPath;
private final long totalLogRecords;
private final long totalLogFiles;
private final long totalRecordsToBeUpdate;
public CompactionWriteStat(HoodieWriteStat writeStat, String partitionPath, long totalLogFiles, long totalLogRecords,
long totalRecordsToUpdate) {
this.writeStat = writeStat;
this.partitionPath = partitionPath;
this.totalLogFiles = totalLogFiles;
this.totalLogRecords = totalLogRecords;
this.totalRecordsToBeUpdate = totalRecordsToUpdate;
}
public long getTotalLogRecords() {
return totalLogRecords;
}
public long getTotalLogFiles() {
return totalLogFiles;
}
public long getTotalRecordsToBeUpdate() {
return totalRecordsToBeUpdate;
}
public HoodieWriteStat getHoodieWriteStat() {
return writeStat;
}
public String getPartitionPath() {
return partitionPath;
}
public static Builder newBuilder() {
return new Builder();
}
public static class Builder {
private HoodieWriteStat writeStat;
private long totalLogRecords;
private long totalRecordsToUpdate;
private long totalLogFiles;
private String partitionPath;
public Builder withHoodieWriteStat(HoodieWriteStat writeStat) {
this.writeStat = writeStat;
return this;
}
public Builder setTotalLogRecords(long records) {
this.totalLogRecords = records;
return this;
}
public Builder setTotalLogFiles(long totalLogFiles) {
this.totalLogFiles = totalLogFiles;
return this;
}
public Builder setTotalRecordsToUpdate(long records) {
this.totalRecordsToUpdate = records;
return this;
}
public Builder onPartition(String path) {
this.partitionPath = path;
return this;
}
public CompactionWriteStat build() {
return new CompactionWriteStat(writeStat, partitionPath, totalLogFiles, totalLogRecords,
totalRecordsToUpdate);
}
}
}

View File

@@ -18,10 +18,12 @@ package com.uber.hoodie.common.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.codehaus.jackson.annotate.JsonAutoDetect;
import org.codehaus.jackson.annotate.JsonMethod;
import org.codehaus.jackson.map.DeserializationConfig.Feature;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
@@ -38,7 +40,7 @@ import java.util.Map;
@JsonIgnoreProperties(ignoreUnknown = true)
public class HoodieCommitMetadata implements Serializable {
private static volatile Logger log = LogManager.getLogger(HoodieCommitMetadata.class);
private HashMap<String, List<HoodieWriteStat>> partitionToWriteStats;
protected HashMap<String, List<HoodieWriteStat>> partitionToWriteStats;
private HashMap<String, String> extraMetadataMap;
@@ -98,6 +100,7 @@ public class HoodieCommitMetadata implements Serializable {
return new HoodieCommitMetadata();
}
ObjectMapper mapper = new ObjectMapper();
mapper.configure(Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.setVisibility(JsonMethod.FIELD, JsonAutoDetect.Visibility.ANY);
return mapper.readValue(jsonStr, HoodieCommitMetadata.class);
}

View File

@@ -0,0 +1,87 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.model;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BinaryOperator;
import java.util.function.Supplier;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.codehaus.jackson.annotate.JsonAutoDetect;
import org.codehaus.jackson.annotate.JsonMethod;
import org.codehaus.jackson.map.DeserializationConfig.Feature;
import org.codehaus.jackson.map.ObjectMapper;
/**
* Place holder for the compaction specific meta-data, uses all the details used in a normal HoodieCommitMetadata
*/
public class HoodieCompactionMetadata extends HoodieCommitMetadata {
private static volatile Logger log = LogManager.getLogger(HoodieCompactionMetadata.class);
protected HashMap<String, List<CompactionWriteStat>> partitionToCompactionWriteStats;
public HoodieCompactionMetadata() {
partitionToCompactionWriteStats = new HashMap<>();
}
public void addWriteStat(String partitionPath, CompactionWriteStat stat) {
addWriteStat(partitionPath, stat.getHoodieWriteStat());
if (!partitionToCompactionWriteStats.containsKey(partitionPath)) {
partitionToCompactionWriteStats.put(partitionPath, new ArrayList<>());
}
partitionToCompactionWriteStats.get(partitionPath).add(stat);
}
public List<CompactionWriteStat> getCompactionWriteStats(String partitionPath) {
return partitionToCompactionWriteStats.get(partitionPath);
}
public Map<String, List<CompactionWriteStat>> getPartitionToCompactionWriteStats() {
return partitionToCompactionWriteStats;
}
public String toJsonString() throws IOException {
if(partitionToCompactionWriteStats.containsKey(null)) {
log.info("partition path is null for " + partitionToCompactionWriteStats.get(null));
partitionToCompactionWriteStats.remove(null);
}
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(JsonMethod.FIELD, JsonAutoDetect.Visibility.ANY);
return mapper.defaultPrettyPrintingWriter().writeValueAsString(this);
}
public static HoodieCompactionMetadata fromJsonString(String jsonStr) throws IOException {
if (jsonStr == null || jsonStr.isEmpty()) {
// For empty commit file (no data or somethings bad happen).
return new HoodieCompactionMetadata();
}
ObjectMapper mapper = new ObjectMapper();
mapper.configure(Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.setVisibility(JsonMethod.FIELD, JsonAutoDetect.Visibility.ANY);
return mapper.readValue(jsonStr, HoodieCompactionMetadata.class);
}
public static HoodieCompactionMetadata fromBytes(byte[] bytes) throws IOException {
return fromJsonString(new String(bytes, Charset.forName("utf-8")));
}
}

View File

@@ -25,6 +25,7 @@ import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.mapred.FsInput;
import org.apache.hadoop.fs.AvroFSInput;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
@@ -67,7 +68,7 @@ public class AvroLogAppender implements HoodieLogAppender<IndexedRecord> {
//TODO - check for log corruption and roll over if needed
log.info(config.getLogFile() + " exists. Appending to existing file");
// this log path exists, we will append to it
fs = FileSystem.get(fs.getConf());
// fs = FileSystem.get(fs.getConf());
try {
this.output = fs.append(path, config.getBufferSize());
} catch (RemoteException e) {
@@ -85,8 +86,9 @@ public class AvroLogAppender implements HoodieLogAppender<IndexedRecord> {
}
}
}
this.writer
.appendTo(new AvroFSInput(FileContext.getFileContext(fs.getConf()), path), output);
.appendTo(new FsInput(path, fs.getConf()), output);
// we always want to flush to disk everytime a avro block is written
this.writer.setFlushOnEveryBlock(true);
} else {

View File

@@ -16,7 +16,10 @@
package com.uber.hoodie.common.table.view;
import static java.util.stream.Collectors.toList;
import com.google.common.collect.Maps;
import com.uber.hoodie.common.model.HoodieCompactionMetadata;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
@@ -27,6 +30,7 @@ import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import java.util.function.BinaryOperator;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -199,23 +203,18 @@ public class HoodieTableFileSystemView implements TableFileSystemView, Serializa
// All the log files filtered from the above list, sorted by version numbers
List<HoodieLogFile> allLogFiles = Arrays.stream(files).filter(s -> s.getPath().getName()
.contains(metaClient.getTableConfig().getRTFileFormat().getFileExtension()))
.map(HoodieLogFile::new).collect(Collectors.collectingAndThen(Collectors.toList(),
.map(HoodieLogFile::new).collect(Collectors.collectingAndThen(toList(),
l -> l.stream().sorted(HoodieLogFile.getLogVersionComparator())
.collect(Collectors.toList())));
.collect(toList())));
// Filter the delta files by the commit time of the latest base fine and collect as a list
Optional<HoodieInstant> lastTimestamp = metaClient.getActiveTimeline().lastInstant();
if (!lastTimestamp.isPresent()) {
return Maps.newHashMap();
}
return getLatestVersionInPartition(partitionPath, lastTimestamp.get().getTimestamp()).map(
return lastTimestamp.map(hoodieInstant -> getLatestVersionInPartition(partitionPath,
hoodieInstant.getTimestamp()).map(
hoodieDataFile -> Pair.of(hoodieDataFile, allLogFiles.stream().filter(
s -> s.getFileId().equals(hoodieDataFile.getFileId()) && s.getBaseCommitTime()
.equals(hoodieDataFile.getCommitTime())).collect(Collectors.toList()))).collect(
Collectors.toMap(
(Function<Pair<HoodieDataFile, List<HoodieLogFile>>, HoodieDataFile>) Pair::getKey,
(Function<Pair<HoodieDataFile, List<HoodieLogFile>>, List<HoodieLogFile>>) Pair::getRight));
Collectors.toMap(Pair::getKey, Pair::getRight))).orElseGet(Maps::newHashMap);
}
@@ -248,9 +247,9 @@ public class HoodieTableFileSystemView implements TableFileSystemView, Serializa
}
private Collector<HoodieDataFile, ?, List<HoodieDataFile>> toSortedFileStatus() {
return Collectors.collectingAndThen(Collectors.toList(),
return Collectors.collectingAndThen(toList(),
l -> l.stream().sorted(HoodieDataFile.getCommitTimeComparator())
.collect(Collectors.toList()));
.collect(toList()));
}

View File

@@ -47,6 +47,7 @@ import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.mapred.FsInput;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecordBase;
@@ -67,33 +68,39 @@ public class AvroUtils {
public static List<HoodieRecord<HoodieAvroPayload>> loadFromFiles(FileSystem fs,
List<String> deltaFilePaths, Schema expectedSchema) {
List<HoodieRecord<HoodieAvroPayload>> loadedRecords = Lists.newArrayList();
deltaFilePaths.forEach(s -> {
Path path = new Path(s);
try {
SeekableInput input =
new AvroFSInput(FileContext.getFileContext(fs.getConf()), path);
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>();
// Set the expected schema to be the current schema to account for schema evolution
reader.setExpected(expectedSchema);
FileReader<GenericRecord> fileReader = DataFileReader.openReader(input, reader);
for (GenericRecord deltaRecord : fileReader) {
String key = deltaRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
String partitionPath =
deltaRecord.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
loadedRecords.add(new HoodieRecord<>(new HoodieKey(key, partitionPath),
new HoodieAvroPayload(Optional.of(deltaRecord))));
}
fileReader.close(); // also closes underlying FsInput
} catch (IOException e) {
throw new HoodieIOException("Could not read avro records from path " + s, e);
}
List<HoodieRecord<HoodieAvroPayload>> records = loadFromFile(fs, s, expectedSchema);
loadedRecords.addAll(records);
});
return loadedRecords;
}
public static List<HoodieRecord<HoodieAvroPayload>> loadFromFile(FileSystem fs,
String deltaFilePath, Schema expectedSchema) {
List<HoodieRecord<HoodieAvroPayload>> loadedRecords = Lists.newArrayList();
Path path = new Path(deltaFilePath);
try {
SeekableInput input = new FsInput(path, fs.getConf());
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>();
// Set the expected schema to be the current schema to account for schema evolution
reader.setExpected(expectedSchema);
FileReader<GenericRecord> fileReader = DataFileReader.openReader(input, reader);
for (GenericRecord deltaRecord : fileReader) {
String key = deltaRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
String partitionPath =
deltaRecord.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
loadedRecords.add(new HoodieRecord<>(new HoodieKey(key, partitionPath),
new HoodieAvroPayload(Optional.of(deltaRecord))));
}
fileReader.close(); // also closes underlying FsInput
} catch (IOException e) {
throw new HoodieIOException("Could not read avro records from path " + deltaFilePath,
e);
}
return loadedRecords;
}
public static HoodieCleanMetadata convertCleanMetadata(String startCleanTime,
Optional<Long> durationInMs, List<HoodieCleanStat> cleanStats) {

View File

@@ -210,6 +210,11 @@ public class FSUtils {
return String.format("%s_%s%s.%d", fileId, baseCommitTime, logFileExtension, version);
}
public static String maskWithoutLogVersion(String commitTime, String fileId, String logFileExtension) {
return String.format("%s_%s%s*", fileId, commitTime, logFileExtension);
}
/**
* Get the latest log file written from the list of log files passed in
*

View File

@@ -94,6 +94,16 @@
<artifactId>hadoop-common</artifactId>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>

View File

@@ -314,7 +314,8 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<version>${hadoop.version}-cdh${cdh.version}</version>
<scope>provided</scope>
</dependency>
<dependency>