From aee136777be6606b51c95768df9f0575dd34bf8c Mon Sep 17 00:00:00 2001 From: Prasanna Rajaperumal Date: Fri, 31 Mar 2017 01:02:02 -0700 Subject: [PATCH] Fixes needed to run merge-on-read testing on production scale data --- .../com/uber/hoodie/HoodieWriteClient.java | 32 ++- .../hoodie/config/HoodieCompactionConfig.java | 22 ++ .../uber/hoodie/config/HoodieWriteConfig.java | 10 + .../exception/HoodieCompactionException.java | 28 +++ .../uber/hoodie/io/HoodieAppendHandle.java | 2 +- .../com/uber/hoodie/io/HoodieAvroReader.java | 99 ++++++++ .../com/uber/hoodie/io/HoodieCleaner.java | 22 +- .../io/compact/HoodieCompactionMetadata.java | 26 -- .../hoodie/io/compact/HoodieCompactor.java | 1 + .../compact/HoodieRealtimeTableCompactor.java | 230 ++++++++++-------- .../hoodie/table/HoodieCopyOnWriteTable.java | 9 + .../hoodie/table/HoodieMergeOnReadTable.java | 36 +++ .../com/uber/hoodie/table/HoodieTable.java | 14 +- .../com/uber/hoodie/TestMergeOnReadTable.java | 3 +- .../uber/hoodie/io/TestHoodieCompactor.java | 7 +- .../test/resources/log4j-surefire.properties | 1 + hoodie-common/pom.xml | 20 +- .../common/model/CompactionWriteStat.java | 108 ++++++++ .../common/model/HoodieCommitMetadata.java | 5 +- .../model/HoodieCompactionMetadata.java | 87 +++++++ .../table/log/avro/AvroLogAppender.java | 6 +- .../table/view/HoodieTableFileSystemView.java | 23 +- .../uber/hoodie/common/util/AvroUtils.java | 49 ++-- .../com/uber/hoodie/common/util/FSUtils.java | 5 + hoodie-hive/pom.xml | 10 + pom.xml | 3 +- 26 files changed, 659 insertions(+), 199 deletions(-) create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/exception/HoodieCompactionException.java create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAvroReader.java delete mode 100644 hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactionMetadata.java create mode 100644 hoodie-common/src/main/java/com/uber/hoodie/common/model/CompactionWriteStat.java create mode 100644 hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCompactionMetadata.java diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index c96217522..677f78129 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -26,6 +26,7 @@ import com.uber.hoodie.avro.model.HoodieSavepointMetadata; import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.HoodieRollbackStat; import com.uber.hoodie.common.model.HoodieCommitMetadata; +import com.uber.hoodie.common.model.HoodieCompactionMetadata; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; @@ -39,6 +40,7 @@ import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.util.AvroUtils; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieCommitException; import com.uber.hoodie.exception.HoodieIOException; @@ -57,7 +59,6 @@ import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.text.ParseException; -import java.text.SimpleDateFormat; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -365,6 +366,18 @@ public class HoodieWriteClient implements Seriali new HoodieInstant(true, actionType, commitTime), Optional.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); // Save was a success + // Do a inline compaction if enabled + if (config.isInlineCompaction()) { + Optional compactionMetadata = table.compact(jsc); + if (compactionMetadata.isPresent()) { + logger.info("Compacted successfully on commit " + commitTime); + metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true"); + } else { + logger.info("Compaction did not run for commit " + commitTime); + metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false"); + } + } + // We cannot have unbounded commit files. Archive commits if we have to archive archiveLog.archiveIfRequired(); if(config.isAutoClean()) { @@ -785,11 +798,12 @@ public class HoodieWriteClient implements Seriali public void startCommitWithTime(String commitTime) { logger.info("Generate a new commit time " + commitTime); - HoodieTableMetaClient metaClient = - new HoodieTableMetaClient(fs, config.getBasePath(), true); - HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + HoodieTable table = HoodieTable + .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); + HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); + String commitActionType = table.getCommitActionType(); activeTimeline.createInflight( - new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime)); + new HoodieInstant(true, commitActionType, commitTime)); } public static SparkConf registerClasses(SparkConf conf) { @@ -829,11 +843,9 @@ public class HoodieWriteClient implements Seriali * @throws IOException */ private void rollbackInflightCommits() { - HoodieTableMetaClient metaClient = - new HoodieTableMetaClient(fs, config.getBasePath(), true); - HoodieTimeline inflightTimeline = - metaClient.getActiveTimeline().getCommitTimeline().filterInflights(); - + HoodieTable table = HoodieTable + .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); + HoodieTimeline inflightTimeline = table.getCommitTimeline().filterInflights(); List commits = inflightTimeline.getInstants().map(HoodieInstant::getTimestamp) .collect(Collectors.toList()); Collections.reverse(commits); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java index 6a798cecf..da365619b 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java @@ -37,6 +37,14 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { public static final String AUTO_CLEAN_PROP = "hoodie.clean.automatic"; private static final String DEFAULT_AUTO_CLEAN = "true"; + // Turn on inline compaction - after fw delta commits a inline compaction will be run + public static final String INLINE_COMPACT_PROP = "hoodie.compact.inline"; + private static final String DEFAULT_INLINE_COMPACT = "true"; + + // Run a compaction every N delta commits + public static final String INLINE_COMPACT_NUM_DELTA_COMMITS_PROP = "hoodie.compact.inline.max.delta.commits"; + private static final String DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS = "4"; + public static final String CLEANER_FILE_VERSIONS_RETAINED_PROP = "hoodie.cleaner.fileversions.retained"; private static final String DEFAULT_CLEANER_FILE_VERSIONS_RETAINED = "3"; @@ -102,6 +110,16 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { return this; } + public Builder withInlineCompaction(Boolean inlineCompaction) { + props.setProperty(INLINE_COMPACT_PROP, String.valueOf(inlineCompaction)); + return this; + } + + public Builder inlineCompactionEvery(int deltaCommits) { + props.setProperty(INLINE_COMPACT_PROP, String.valueOf(deltaCommits)); + return this; + } + public Builder withCleanerPolicy(HoodieCleaningPolicy policy) { props.setProperty(CLEANER_POLICY_PROP, policy.name()); return this; @@ -153,6 +171,10 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { HoodieCompactionConfig config = new HoodieCompactionConfig(props); setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP), AUTO_CLEAN_PROP, DEFAULT_AUTO_CLEAN); + setDefaultOnCondition(props, !props.containsKey(INLINE_COMPACT_PROP), + INLINE_COMPACT_PROP, DEFAULT_INLINE_COMPACT); + setDefaultOnCondition(props, !props.containsKey(INLINE_COMPACT_NUM_DELTA_COMMITS_PROP), + INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS); setDefaultOnCondition(props, !props.containsKey(CLEANER_POLICY_PROP), CLEANER_POLICY_PROP, DEFAULT_CLEANER_POLICY); setDefaultOnCondition(props, !props.containsKey(CLEANER_FILE_VERSIONS_RETAINED_PROP), diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index 8c3f9d8f8..f6f0d84f3 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -19,6 +19,7 @@ package com.uber.hoodie.config; import com.google.common.base.Preconditions; import com.uber.hoodie.common.model.HoodieCleaningPolicy; +import com.uber.hoodie.config.HoodieCompactionConfig.Builder; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.io.HoodieCleaner; import com.uber.hoodie.metrics.MetricsReporterType; @@ -148,6 +149,15 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.AUTO_CLEAN_PROP)); } + public boolean isInlineCompaction() { + return Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.INLINE_COMPACT_PROP)); + } + + public int getInlineCompactDeltaCommitMax() { + return Integer.parseInt( + props.getProperty(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP)); + } + /** * index properties **/ diff --git a/hoodie-client/src/main/java/com/uber/hoodie/exception/HoodieCompactionException.java b/hoodie-client/src/main/java/com/uber/hoodie/exception/HoodieCompactionException.java new file mode 100644 index 000000000..9d016ec6d --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/exception/HoodieCompactionException.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.exception; + +public class HoodieCompactionException extends HoodieException { + public HoodieCompactionException(String msg) { + super(msg); + } + + public HoodieCompactionException(String msg, Throwable e) { + super(msg, e); + } +} + diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java index a10a89431..0c97831f1 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java @@ -101,7 +101,7 @@ public class HoodieAppendHandle extends HoodieIOH throw new HoodieUpsertException( "Failed to initialize HoodieUpdateHandle for FileId: " + fileId + " on commit " + commitTime + " on HDFS path " + hoodieTable - .getMetaClient().getBasePath()); + .getMetaClient().getBasePath() + partitionPath, e); } writeStatus.getStat().setFullPath(currentLogFile.getPath().toString()); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAvroReader.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAvroReader.java new file mode 100644 index 000000000..ae9c4a406 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAvroReader.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.io; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.uber.hoodie.common.model.HoodieAvroPayload; +import com.uber.hoodie.common.model.HoodieKey; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.util.AvroUtils; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Spliterator; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import org.apache.avro.Schema; +import org.apache.hadoop.fs.FileSystem; + +/** + * This reads a bunch of HoodieRecords from avro log files and deduplicates and mantains the merged + * state in memory. This is useful for compaction and record reader + */ +public class HoodieAvroReader implements Iterable> { + + private final Collection> records; + private AtomicLong totalLogFiles = new AtomicLong(0); + private AtomicLong totalLogRecords = new AtomicLong(0); + private long totalRecordsToUpdate; + + + public HoodieAvroReader(FileSystem fs, List logFilePaths, Schema readerSchema) { + Map> records = Maps.newHashMap(); + for (String path : logFilePaths) { + totalLogFiles.incrementAndGet(); + List> recordsFromFile = AvroUtils + .loadFromFile(fs, path, readerSchema); + totalLogRecords.addAndGet(recordsFromFile.size()); + for (HoodieRecord recordFromFile : recordsFromFile) { + String key = recordFromFile.getRecordKey(); + if (records.containsKey(key)) { + // Merge and store the merged record + HoodieAvroPayload combinedValue = records.get(key).getData() + .preCombine(recordFromFile.getData()); + records.put(key, new HoodieRecord<>(new HoodieKey(key, recordFromFile.getPartitionPath()), + combinedValue)); + } else { + // Put the record as is + records.put(key, recordFromFile); + } + } + } + this.records = records.values(); + this.totalRecordsToUpdate = records.size(); + } + + @Override + public Iterator> iterator() { + return records.iterator(); + } + + @Override + public void forEach(Consumer> consumer) { + records.forEach(consumer); + } + + @Override + public Spliterator> spliterator() { + return records.spliterator(); + } + + public long getTotalLogFiles() { + return totalLogFiles.get(); + } + + public long getTotalLogRecords() { + return totalLogRecords.get(); + } + + public long getTotalRecordsToUpdate() { + return totalRecordsToUpdate; + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleaner.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleaner.java index 62e0d414a..db63cce69 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleaner.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleaner.java @@ -22,8 +22,10 @@ import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.model.HoodieCleaningPolicy; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; +import com.uber.hoodie.common.table.log.HoodieLogFile; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieWriteConfig; @@ -107,8 +109,18 @@ public class HoodieCleaner> { } // Delete the remaining files while (commitItr.hasNext()) { + HoodieDataFile nextRecord = commitItr.next(); deletePaths.add(String.format("%s/%s/%s", config.getBasePath(), partitionPath, - commitItr.next().getFileName())); + nextRecord.getFileName())); + if (hoodieTable.getMetaClient().getTableType() + == HoodieTableType.MERGE_ON_READ) { + // If merge on read, then clean the log files for the commits as well + deletePaths.add(String + .format("%s/%s/%s", config.getBasePath(), partitionPath, + FSUtils.maskWithoutLogVersion(nextRecord.getCommitTime(), + nextRecord.getFileId(), + HoodieLogFile.DELTA_EXTENSION))); + } } } return deletePaths; @@ -182,6 +194,14 @@ public class HoodieCleaner> { deletePaths.add(String .format("%s/%s/%s", config.getBasePath(), partitionPath, FSUtils .maskWithoutTaskPartitionId(fileCommitTime, afile.getFileId()))); + if (hoodieTable.getMetaClient().getTableType() + == HoodieTableType.MERGE_ON_READ) { + // If merge on read, then clean the log files for the commits as well + deletePaths.add(String + .format("%s/%s/%s", config.getBasePath(), partitionPath, + FSUtils.maskWithoutLogVersion(fileCommitTime, afile.getFileId(), + HoodieLogFile.DELTA_EXTENSION))); + } } } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactionMetadata.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactionMetadata.java deleted file mode 100644 index 8cb817f18..000000000 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactionMetadata.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.uber.hoodie.io.compact; - -import com.uber.hoodie.common.model.HoodieCommitMetadata; - -/** - * Place holder for the compaction specific meta-data, uses all the details used in a normal HoodieCommitMetadata - */ -public class HoodieCompactionMetadata extends HoodieCommitMetadata { - -} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java index eb5db75ff..d7d541bfd 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java @@ -16,6 +16,7 @@ package com.uber.hoodie.io.compact; +import com.uber.hoodie.common.model.HoodieCompactionMetadata; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index 052ab54f0..daee963cc 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -16,9 +16,14 @@ package com.uber.hoodie.io.compact; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.common.model.CompactionWriteStat; import com.uber.hoodie.common.model.HoodieAvroPayload; +import com.uber.hoodie.common.model.HoodieCompactionMetadata; import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; @@ -28,18 +33,19 @@ import com.uber.hoodie.common.util.AvroUtils; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.config.HoodieWriteConfig; -import com.uber.hoodie.exception.HoodieCommitException; +import com.uber.hoodie.exception.HoodieCompactionException; +import com.uber.hoodie.io.HoodieAvroReader; import com.uber.hoodie.table.HoodieCopyOnWriteTable; -import com.uber.hoodie.table.HoodieMergeOnReadTable; import com.uber.hoodie.table.HoodieTable; +import java.util.Collection; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import org.apache.avro.Schema; -import org.apache.commons.collections.IteratorUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; @@ -48,7 +54,6 @@ import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; import static java.util.stream.Collectors.*; @@ -60,118 +65,129 @@ import static java.util.stream.Collectors.*; * @see HoodieCompactor */ public class HoodieRealtimeTableCompactor implements HoodieCompactor { - private static Logger log = LogManager.getLogger(HoodieRealtimeTableCompactor.class); - @Override - public HoodieCompactionMetadata compact(JavaSparkContext jsc, HoodieWriteConfig config, - HoodieTable hoodieTable, - CompactionFilter compactionFilter) throws Exception { - // TODO - rollback any compactions in flight + private static Logger log = LogManager.getLogger(HoodieRealtimeTableCompactor.class); - HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); - String compactionCommit = startCompactionCommit(hoodieTable); - log.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommit); - List partitionPaths = - FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath()); + @Override + public HoodieCompactionMetadata compact(JavaSparkContext jsc, HoodieWriteConfig config, + HoodieTable hoodieTable, CompactionFilter compactionFilter) throws IOException { + Preconditions.checkArgument( + hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ, + "HoodieRealtimeTableCompactor can only compact table of type " + + HoodieTableType.MERGE_ON_READ + " and not " + hoodieTable.getMetaClient() + .getTableType().name()); - log.info("Compaction looking for files to compact in " + partitionPaths + " partitions"); - List operations = - jsc.parallelize(partitionPaths, partitionPaths.size()) - .flatMap((FlatMapFunction) partitionPath -> hoodieTable.getFileSystemView() - .groupLatestDataFileWithLogFiles(partitionPath).entrySet() - .stream() - .map(s -> new CompactionOperation(s.getKey(), partitionPath, s.getValue())) - .collect(toList()).iterator()).collect(); - log.info("Total of " + operations.size() + " compactions are retrieved"); + // TODO - rollback any compactions in flight - // Filter the compactions with the passed in filter. This lets us choose most effective compactions only - operations = compactionFilter.filter(operations); - if(operations.isEmpty()) { - log.warn("After filtering, Nothing to compact for " + metaClient.getBasePath()); - return null; - } + HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); + String compactionCommit = startCompactionCommit(hoodieTable); + log.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommit); + List partitionPaths = + FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath()); - log.info("After filtering, Compacting " + operations + " files"); - List> updateStatusMap = - jsc.parallelize(operations, operations.size()).map( - (Function>>) compactionOperation -> executeCompaction( - metaClient, config, compactionOperation, compactionCommit)).flatMap( - (FlatMapFunction>, WriteStatus>) listIterator -> { - List> collected = IteratorUtils.toList(listIterator); - return collected.stream().flatMap(List::stream).collect(toList()).iterator(); - }).mapToPair(new PairFunction() { - @Override - public Tuple2 call(WriteStatus writeStatus) - throws Exception { - return new Tuple2<>(writeStatus.getPartitionPath(), writeStatus.getStat()); - } + log.info("Compaction looking for files to compact in " + partitionPaths + " partitions"); + List operations = + jsc.parallelize(partitionPaths, partitionPaths.size()) + .flatMap((FlatMapFunction) partitionPath -> hoodieTable + .getFileSystemView() + .groupLatestDataFileWithLogFiles(partitionPath).entrySet() + .stream() + .map(s -> new CompactionOperation(s.getKey(), partitionPath, s.getValue())) + .collect(toList()).iterator()).collect(); + log.info("Total of " + operations.size() + " compactions are retrieved"); + + // Filter the compactions with the passed in filter. This lets us choose most effective compactions only + operations = compactionFilter.filter(operations); + if (operations.isEmpty()) { + log.warn("After filtering, Nothing to compact for " + metaClient.getBasePath()); + return null; + } + + log.info("After filtering, Compacting " + operations + " files"); + List updateStatusMap = + jsc.parallelize(operations, operations.size()) + .map(s -> executeCompaction(metaClient, config, s, compactionCommit)) + .flatMap(new FlatMapFunction, CompactionWriteStat>() { + @Override + public Iterator call( + List compactionWriteStats) + throws Exception { + return compactionWriteStats.iterator(); + } }).collect(); - HoodieCompactionMetadata metadata = new HoodieCompactionMetadata(); - for (Tuple2 stat : updateStatusMap) { - metadata.addWriteStat(stat._1(), stat._2()); - } - log.info("Compaction finished with result " + metadata); + HoodieCompactionMetadata metadata = new HoodieCompactionMetadata(); + for (CompactionWriteStat stat : updateStatusMap) { + metadata.addWriteStat(stat.getPartitionPath(), stat); + } + log.info("Compaction finished with result " + metadata); - //noinspection ConstantConditions - if (isCompactionSucceeded(metadata)) { - log.info("Compaction succeeded " + compactionCommit); - commitCompaction(compactionCommit, metaClient, metadata); - } else { - log.info("Compaction failed " + compactionCommit); - } - return metadata; + //noinspection ConstantConditions + if (isCompactionSucceeded(metadata)) { + log.info("Compaction succeeded " + compactionCommit); + commitCompaction(compactionCommit, metaClient, metadata); + } else { + log.info("Compaction failed " + compactionCommit); + } + return metadata; + } + + private boolean isCompactionSucceeded(HoodieCompactionMetadata result) { + //TODO figure out a success factor for a compaction + return true; + } + + private List executeCompaction(HoodieTableMetaClient metaClient, + HoodieWriteConfig config, CompactionOperation operation, String commitTime) + throws IOException { + FileSystem fs = FSUtils.getFs(); + Schema readerSchema = + HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); + + log.info("Compacting base " + operation.getDataFilePath() + " with delta files " + operation + .getDeltaFilePaths() + " for commit " + commitTime); + // TODO - FIX THIS + // Reads the entire avro file. Always only specific blocks should be read from the avro file (failure recover). + // Load all the delta commits since the last compaction commit and get all the blocks to be loaded and load it using CompositeAvroLogReader + // Since a DeltaCommit is not defined yet, reading all the records. revisit this soon. + + HoodieAvroReader avroReader = new HoodieAvroReader(fs, operation.getDeltaFilePaths(), + readerSchema); + if (!avroReader.iterator().hasNext()) { + return Lists.newArrayList(); } - private boolean isCompactionSucceeded(HoodieCompactionMetadata result) { - //TODO figure out a success factor for a compaction - return true; - } - - private Iterator> executeCompaction(HoodieTableMetaClient metaClient, - HoodieWriteConfig config, CompactionOperation operation, String commitTime) - throws IOException { - FileSystem fs = FSUtils.getFs(); - Schema schema = - HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); - - log.info("Compacting base " + operation.getDataFilePath() + " with delta files " + operation - .getDeltaFilePaths() + " for commit " + commitTime); - // TODO - FIX THIS - // 1. Reads the entire avro file. Always only specific blocks should be read from the avro file (failure recover). - // Load all the delta commits since the last compaction commit and get all the blocks to be loaded and load it using CompositeAvroLogReader - // Since a DeltaCommit is not defined yet, reading all the records. revisit this soon. - - // 2. naively loads all the delta records in memory to merge it, - // since we only need a iterator, we could implement a lazy iterator to load from one delta file at a time - List> readDeltaFilesInMemory = - AvroUtils.loadFromFiles(fs, operation.getDeltaFilePaths(), schema); - - if(readDeltaFilesInMemory.isEmpty()) { - return IteratorUtils.emptyIterator(); - } - - // Compacting is very similar to applying updates to existing file - HoodieCopyOnWriteTable table = - new HoodieCopyOnWriteTable<>(config, metaClient); - return table - .handleUpdate(commitTime, operation.getFileId(), readDeltaFilesInMemory.iterator()); - } - - public boolean commitCompaction(String commitTime, HoodieTableMetaClient metaClient, - HoodieCompactionMetadata metadata) { - log.info("Comitting " + commitTime); - HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - - try { - activeTimeline.saveAsComplete( - new HoodieInstant(true, HoodieTimeline.COMPACTION_ACTION, commitTime), - Optional.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); - } catch (IOException e) { - throw new HoodieCommitException( - "Failed to commit " + metaClient.getBasePath() + " at time " + commitTime, e); - } - return true; + // Compacting is very similar to applying updates to existing file + HoodieCopyOnWriteTable table = + new HoodieCopyOnWriteTable<>(config, metaClient); + Iterator> result = table + .handleUpdate(commitTime, operation.getFileId(), avroReader.iterator()); + Iterable> resultIterable = () -> result; + return StreamSupport.stream(resultIterable.spliterator(), false) + .flatMap(Collection::stream) + .map(WriteStatus::getStat) + .map(s -> CompactionWriteStat.newBuilder().withHoodieWriteStat(s) + .setTotalRecordsToUpdate(avroReader.getTotalRecordsToUpdate()) + .setTotalLogFiles(avroReader.getTotalLogFiles()) + .setTotalLogRecords(avroReader.getTotalLogRecords()) + .onPartition(operation.getPartitionPath()).build()) + .collect(toList()); + } + + public boolean commitCompaction(String commitTime, HoodieTableMetaClient metaClient, + HoodieCompactionMetadata metadata) { + log.info("Committing Compaction " + commitTime); + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + + try { + activeTimeline.saveAsComplete( + new HoodieInstant(true, HoodieTimeline.COMPACTION_ACTION, commitTime), + Optional.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + } catch (IOException e) { + throw new HoodieCompactionException( + "Failed to commit " + metaClient.getBasePath() + " at time " + commitTime, e); } + return true; + } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index 269445b6c..20638a2c1 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -16,6 +16,7 @@ package com.uber.hoodie.table; +import com.uber.hoodie.common.model.HoodieCompactionMetadata; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; @@ -32,6 +33,7 @@ import com.uber.hoodie.exception.HoodieUpsertException; import com.uber.hoodie.func.LazyInsertIterable; import com.uber.hoodie.io.HoodieUpdateHandle; +import java.util.Optional; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; @@ -55,6 +57,7 @@ import java.util.Random; import java.util.Set; import java.util.stream.Collectors; +import org.apache.spark.api.java.JavaSparkContext; import scala.Option; import scala.Tuple2; @@ -470,4 +473,10 @@ public class HoodieCopyOnWriteTable extends Hoodi Partitioner partitioner) { return handleUpsertPartition(commitTime, partition, recordItr, partitioner); } + + @Override + public Optional compact(JavaSparkContext jsc) { + logger.info("Nothing to compact in COW storage format"); + return Optional.empty(); + } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index 4fbe608c7..7ee050c5d 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -17,11 +17,17 @@ package com.uber.hoodie.table; import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.common.model.HoodieCompactionMetadata; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.exception.HoodieCompactionException; import com.uber.hoodie.io.HoodieAppendHandle; +import com.uber.hoodie.io.compact.CompactionFilter; +import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; +import java.util.Optional; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -29,6 +35,7 @@ import java.io.IOException; import java.util.Collections; import java.util.Iterator; import java.util.List; +import org.apache.spark.api.java.JavaSparkContext; /** * Implementation of a more real-time read-optimized Hoodie Table where @@ -59,4 +66,33 @@ public class HoodieMergeOnReadTable extends Hoodi return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())) .iterator(); } + + @Override + public Optional compact(JavaSparkContext jsc) { + logger.info("Checking if compaction needs to be run on " + config.getBasePath()); + Optional lastCompaction = getActiveTimeline().getCompactionTimeline() + .filterCompletedInstants().lastInstant(); + String deltaCommitsSinceTs = "0"; + if (lastCompaction.isPresent()) { + deltaCommitsSinceTs = lastCompaction.get().getTimestamp(); + } + + int deltaCommitsSinceLastCompaction = getActiveTimeline().getDeltaCommitTimeline() + .findInstantsAfter(deltaCommitsSinceTs, Integer.MAX_VALUE).countInstants(); + if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) { + logger.info("Not running compaction as only " + deltaCommitsSinceLastCompaction + + " delta commits was found since last compaction " + deltaCommitsSinceTs + + ". Waiting for " + config.getInlineCompactDeltaCommitMax()); + return Optional.empty(); + } + + logger.info("Compacting merge on read table " + config.getBasePath()); + HoodieRealtimeTableCompactor compactor = new HoodieRealtimeTableCompactor(); + try { + return Optional.of(compactor.compact(jsc, config, this, CompactionFilter.allowAll())); + } catch (IOException e) { + throw new HoodieCompactionException("Could not compact " + config.getBasePath(), e); + } + } + } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java index 7c8e53e76..e0ea9f309 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java @@ -18,7 +18,7 @@ package com.uber.hoodie.table; import com.google.common.collect.Sets; import com.uber.hoodie.avro.model.HoodieSavepointMetadata; -import com.uber.hoodie.avro.model.HoodieSavepointPartitionMetadata; +import com.uber.hoodie.common.model.HoodieCompactionMetadata; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; @@ -34,6 +34,7 @@ import com.uber.hoodie.exception.HoodieCommitException; import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieSavepointException; +import java.util.Optional; import org.apache.hadoop.fs.FileSystem; import org.apache.spark.Partitioner; @@ -41,9 +42,9 @@ import java.io.IOException; import java.io.Serializable; import java.util.Iterator; import java.util.List; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.spark.api.java.JavaSparkContext; /** * Abstract implementation of a HoodieTable @@ -195,8 +196,9 @@ public abstract class HoodieTable implements Seri return getActiveTimeline().getCommitTimeline(); case MERGE_ON_READ: // We need to include the parquet files written out in delta commits + // Include commit action to be able to start doing a MOR over a COW dataset - no migration required return getActiveTimeline().getTimelineOfActions( - Sets.newHashSet(HoodieActiveTimeline.COMPACTION_ACTION, + Sets.newHashSet(HoodieActiveTimeline.COMMIT_ACTION, HoodieActiveTimeline.COMPACTION_ACTION, HoodieActiveTimeline.DELTA_COMMIT_ACTION)); default: throw new HoodieException("Unsupported table type :"+ metaClient.getTableType()); @@ -293,4 +295,10 @@ public abstract class HoodieTable implements Seri throw new HoodieException("Unsupported table type :" + metaClient.getTableType()); } } + + /** + * Run Compaction on the table. + * Compaction arranges the data so that it is optimized for data access + */ + public abstract Optional compact(JavaSparkContext jsc); } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java index 89ab04cc9..16f422d5d 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java @@ -189,7 +189,8 @@ public class TestMergeOnReadTable { return HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withCompactionConfig( - HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build()) + HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024) + .withInlineCompaction(false).build()) .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build()) .forTable("test-trip-table").withIndexConfig( diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java index 11b9f71c5..0709a1b06 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java @@ -20,6 +20,7 @@ import com.uber.hoodie.HoodieReadClient; import com.uber.hoodie.HoodieWriteClient; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.HoodieTestDataGenerator; +import com.uber.hoodie.common.model.HoodieCompactionMetadata; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieTableType; @@ -36,7 +37,6 @@ import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.index.HoodieBloomIndex; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.io.compact.CompactionFilter; -import com.uber.hoodie.io.compact.HoodieCompactionMetadata; import com.uber.hoodie.io.compact.HoodieCompactor; import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; import com.uber.hoodie.table.HoodieTable; @@ -100,7 +100,8 @@ public class TestHoodieCompactor { return HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withCompactionConfig( - HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build()) + HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024) + .withInlineCompaction(false).build()) .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build()) .forTable("test-trip-table").withIndexConfig( HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()); @@ -196,7 +197,7 @@ public class TestHoodieCompactor { "After compaction there should be no log files visiable on a Realtime view", logFiles.isEmpty()); } - assertTrue(result.getPartitionToWriteStats().containsKey(partitionPath)); + assertTrue(result.getPartitionToCompactionWriteStats().containsKey(partitionPath)); } } diff --git a/hoodie-client/src/test/resources/log4j-surefire.properties b/hoodie-client/src/test/resources/log4j-surefire.properties index eab225528..8b9e97ca2 100644 --- a/hoodie-client/src/test/resources/log4j-surefire.properties +++ b/hoodie-client/src/test/resources/log4j-surefire.properties @@ -15,6 +15,7 @@ # log4j.rootLogger=WARN, A1 log4j.category.com.uber=INFO +log4j.category.com.uber.hoodie.common.utils=WARN log4j.category.org.apache.parquet.hadoop=WARN # A1 is set to be a ConsoleAppender. diff --git a/hoodie-common/pom.xml b/hoodie-common/pom.xml index e705d24ad..7104cc85c 100644 --- a/hoodie-common/pom.xml +++ b/hoodie-common/pom.xml @@ -118,15 +118,15 @@ kryo test - - - - - - - - - - + + org.apache.avro + avro-mapred + + + org.mortbay.jetty + * + + + diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/CompactionWriteStat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/CompactionWriteStat.java new file mode 100644 index 000000000..bdac4babb --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/CompactionWriteStat.java @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.model; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.uber.hoodie.common.util.FSUtils; +import java.io.Serializable; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.Path; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class CompactionWriteStat implements Serializable { + + private final HoodieWriteStat writeStat; + private String partitionPath; + private final long totalLogRecords; + private final long totalLogFiles; + private final long totalRecordsToBeUpdate; + + public CompactionWriteStat(HoodieWriteStat writeStat, String partitionPath, long totalLogFiles, long totalLogRecords, + long totalRecordsToUpdate) { + this.writeStat = writeStat; + this.partitionPath = partitionPath; + this.totalLogFiles = totalLogFiles; + this.totalLogRecords = totalLogRecords; + this.totalRecordsToBeUpdate = totalRecordsToUpdate; + } + + public long getTotalLogRecords() { + return totalLogRecords; + } + + public long getTotalLogFiles() { + return totalLogFiles; + } + + public long getTotalRecordsToBeUpdate() { + return totalRecordsToBeUpdate; + } + public HoodieWriteStat getHoodieWriteStat() { + return writeStat; + } + + public String getPartitionPath() { + return partitionPath; + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + + private HoodieWriteStat writeStat; + private long totalLogRecords; + private long totalRecordsToUpdate; + private long totalLogFiles; + private String partitionPath; + + + public Builder withHoodieWriteStat(HoodieWriteStat writeStat) { + this.writeStat = writeStat; + return this; + } + + public Builder setTotalLogRecords(long records) { + this.totalLogRecords = records; + return this; + } + + public Builder setTotalLogFiles(long totalLogFiles) { + this.totalLogFiles = totalLogFiles; + return this; + } + + public Builder setTotalRecordsToUpdate(long records) { + this.totalRecordsToUpdate = records; + return this; + } + + public Builder onPartition(String path) { + this.partitionPath = path; + return this; + } + + public CompactionWriteStat build() { + return new CompactionWriteStat(writeStat, partitionPath, totalLogFiles, totalLogRecords, + totalRecordsToUpdate); + } + } +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java index b58df02de..99e291a41 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java @@ -18,10 +18,12 @@ package com.uber.hoodie.common.model; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.DeserializationFeature; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.codehaus.jackson.annotate.JsonAutoDetect; import org.codehaus.jackson.annotate.JsonMethod; +import org.codehaus.jackson.map.DeserializationConfig.Feature; import org.codehaus.jackson.map.ObjectMapper; import java.io.IOException; @@ -38,7 +40,7 @@ import java.util.Map; @JsonIgnoreProperties(ignoreUnknown = true) public class HoodieCommitMetadata implements Serializable { private static volatile Logger log = LogManager.getLogger(HoodieCommitMetadata.class); - private HashMap> partitionToWriteStats; + protected HashMap> partitionToWriteStats; private HashMap extraMetadataMap; @@ -98,6 +100,7 @@ public class HoodieCommitMetadata implements Serializable { return new HoodieCommitMetadata(); } ObjectMapper mapper = new ObjectMapper(); + mapper.configure(Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); mapper.setVisibility(JsonMethod.FIELD, JsonAutoDetect.Visibility.ANY); return mapper.readValue(jsonStr, HoodieCommitMetadata.class); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCompactionMetadata.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCompactionMetadata.java new file mode 100644 index 000000000..dc37649a0 --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCompactionMetadata.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.model; + +import com.google.common.collect.Maps; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BinaryOperator; +import java.util.function.Supplier; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.annotate.JsonMethod; +import org.codehaus.jackson.map.DeserializationConfig.Feature; +import org.codehaus.jackson.map.ObjectMapper; + +/** + * Place holder for the compaction specific meta-data, uses all the details used in a normal HoodieCommitMetadata + */ +public class HoodieCompactionMetadata extends HoodieCommitMetadata { + private static volatile Logger log = LogManager.getLogger(HoodieCompactionMetadata.class); + protected HashMap> partitionToCompactionWriteStats; + + public HoodieCompactionMetadata() { + partitionToCompactionWriteStats = new HashMap<>(); + } + + public void addWriteStat(String partitionPath, CompactionWriteStat stat) { + addWriteStat(partitionPath, stat.getHoodieWriteStat()); + if (!partitionToCompactionWriteStats.containsKey(partitionPath)) { + partitionToCompactionWriteStats.put(partitionPath, new ArrayList<>()); + } + partitionToCompactionWriteStats.get(partitionPath).add(stat); + } + + public List getCompactionWriteStats(String partitionPath) { + return partitionToCompactionWriteStats.get(partitionPath); + } + + public Map> getPartitionToCompactionWriteStats() { + return partitionToCompactionWriteStats; + } + + public String toJsonString() throws IOException { + if(partitionToCompactionWriteStats.containsKey(null)) { + log.info("partition path is null for " + partitionToCompactionWriteStats.get(null)); + partitionToCompactionWriteStats.remove(null); + } + ObjectMapper mapper = new ObjectMapper(); + mapper.setVisibility(JsonMethod.FIELD, JsonAutoDetect.Visibility.ANY); + return mapper.defaultPrettyPrintingWriter().writeValueAsString(this); + } + + public static HoodieCompactionMetadata fromJsonString(String jsonStr) throws IOException { + if (jsonStr == null || jsonStr.isEmpty()) { + // For empty commit file (no data or somethings bad happen). + return new HoodieCompactionMetadata(); + } + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); + mapper.setVisibility(JsonMethod.FIELD, JsonAutoDetect.Visibility.ANY); + return mapper.readValue(jsonStr, HoodieCompactionMetadata.class); + } + + public static HoodieCompactionMetadata fromBytes(byte[] bytes) throws IOException { + return fromJsonString(new String(bytes, Charset.forName("utf-8"))); + } + +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/AvroLogAppender.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/AvroLogAppender.java index b46f1e39e..e0cdd9153 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/AvroLogAppender.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/AvroLogAppender.java @@ -25,6 +25,7 @@ import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.mapred.FsInput; import org.apache.hadoop.fs.AvroFSInput; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext; @@ -67,7 +68,7 @@ public class AvroLogAppender implements HoodieLogAppender { //TODO - check for log corruption and roll over if needed log.info(config.getLogFile() + " exists. Appending to existing file"); // this log path exists, we will append to it - fs = FileSystem.get(fs.getConf()); + // fs = FileSystem.get(fs.getConf()); try { this.output = fs.append(path, config.getBufferSize()); } catch (RemoteException e) { @@ -85,8 +86,9 @@ public class AvroLogAppender implements HoodieLogAppender { } } } + this.writer - .appendTo(new AvroFSInput(FileContext.getFileContext(fs.getConf()), path), output); + .appendTo(new FsInput(path, fs.getConf()), output); // we always want to flush to disk everytime a avro block is written this.writer.setFlushOnEveryBlock(true); } else { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java index f10e6c290..653539ee5 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java @@ -16,7 +16,10 @@ package com.uber.hoodie.common.table.view; +import static java.util.stream.Collectors.toList; + import com.google.common.collect.Maps; +import com.uber.hoodie.common.model.HoodieCompactionMetadata; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.table.HoodieTableMetaClient; @@ -27,6 +30,7 @@ import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieIOException; +import java.util.function.BinaryOperator; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -199,23 +203,18 @@ public class HoodieTableFileSystemView implements TableFileSystemView, Serializa // All the log files filtered from the above list, sorted by version numbers List allLogFiles = Arrays.stream(files).filter(s -> s.getPath().getName() .contains(metaClient.getTableConfig().getRTFileFormat().getFileExtension())) - .map(HoodieLogFile::new).collect(Collectors.collectingAndThen(Collectors.toList(), + .map(HoodieLogFile::new).collect(Collectors.collectingAndThen(toList(), l -> l.stream().sorted(HoodieLogFile.getLogVersionComparator()) - .collect(Collectors.toList()))); + .collect(toList()))); // Filter the delta files by the commit time of the latest base fine and collect as a list Optional lastTimestamp = metaClient.getActiveTimeline().lastInstant(); - if (!lastTimestamp.isPresent()) { - return Maps.newHashMap(); - } - - return getLatestVersionInPartition(partitionPath, lastTimestamp.get().getTimestamp()).map( + return lastTimestamp.map(hoodieInstant -> getLatestVersionInPartition(partitionPath, + hoodieInstant.getTimestamp()).map( hoodieDataFile -> Pair.of(hoodieDataFile, allLogFiles.stream().filter( s -> s.getFileId().equals(hoodieDataFile.getFileId()) && s.getBaseCommitTime() .equals(hoodieDataFile.getCommitTime())).collect(Collectors.toList()))).collect( - Collectors.toMap( - (Function>, HoodieDataFile>) Pair::getKey, - (Function>, List>) Pair::getRight)); + Collectors.toMap(Pair::getKey, Pair::getRight))).orElseGet(Maps::newHashMap); } @@ -248,9 +247,9 @@ public class HoodieTableFileSystemView implements TableFileSystemView, Serializa } private Collector> toSortedFileStatus() { - return Collectors.collectingAndThen(Collectors.toList(), + return Collectors.collectingAndThen(toList(), l -> l.stream().sorted(HoodieDataFile.getCommitTimeComparator()) - .collect(Collectors.toList())); + .collect(toList())); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java index 611c16b44..f5451058f 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java @@ -47,6 +47,7 @@ import org.apache.avro.io.DatumWriter; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; +import org.apache.avro.mapred.FsInput; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.specific.SpecificRecordBase; @@ -67,33 +68,39 @@ public class AvroUtils { public static List> loadFromFiles(FileSystem fs, List deltaFilePaths, Schema expectedSchema) { - List> loadedRecords = Lists.newArrayList(); deltaFilePaths.forEach(s -> { - Path path = new Path(s); - try { - SeekableInput input = - new AvroFSInput(FileContext.getFileContext(fs.getConf()), path); - GenericDatumReader reader = new GenericDatumReader<>(); - // Set the expected schema to be the current schema to account for schema evolution - reader.setExpected(expectedSchema); - - FileReader fileReader = DataFileReader.openReader(input, reader); - for (GenericRecord deltaRecord : fileReader) { - String key = deltaRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); - String partitionPath = - deltaRecord.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); - loadedRecords.add(new HoodieRecord<>(new HoodieKey(key, partitionPath), - new HoodieAvroPayload(Optional.of(deltaRecord)))); - } - fileReader.close(); // also closes underlying FsInput - } catch (IOException e) { - throw new HoodieIOException("Could not read avro records from path " + s, e); - } + List> records = loadFromFile(fs, s, expectedSchema); + loadedRecords.addAll(records); }); return loadedRecords; } + public static List> loadFromFile(FileSystem fs, + String deltaFilePath, Schema expectedSchema) { + List> loadedRecords = Lists.newArrayList(); + Path path = new Path(deltaFilePath); + try { + SeekableInput input = new FsInput(path, fs.getConf()); + GenericDatumReader reader = new GenericDatumReader<>(); + // Set the expected schema to be the current schema to account for schema evolution + reader.setExpected(expectedSchema); + + FileReader fileReader = DataFileReader.openReader(input, reader); + for (GenericRecord deltaRecord : fileReader) { + String key = deltaRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); + String partitionPath = + deltaRecord.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); + loadedRecords.add(new HoodieRecord<>(new HoodieKey(key, partitionPath), + new HoodieAvroPayload(Optional.of(deltaRecord)))); + } + fileReader.close(); // also closes underlying FsInput + } catch (IOException e) { + throw new HoodieIOException("Could not read avro records from path " + deltaFilePath, + e); + } + return loadedRecords; + } public static HoodieCleanMetadata convertCleanMetadata(String startCleanTime, Optional durationInMs, List cleanStats) { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java index ad0909563..3ab346713 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java @@ -210,6 +210,11 @@ public class FSUtils { return String.format("%s_%s%s.%d", fileId, baseCommitTime, logFileExtension, version); } + public static String maskWithoutLogVersion(String commitTime, String fileId, String logFileExtension) { + return String.format("%s_%s%s*", fileId, commitTime, logFileExtension); + } + + /** * Get the latest log file written from the list of log files passed in * diff --git a/hoodie-hive/pom.xml b/hoodie-hive/pom.xml index 616883ae1..5ddd3f549 100644 --- a/hoodie-hive/pom.xml +++ b/hoodie-hive/pom.xml @@ -94,6 +94,16 @@ hadoop-common tests + + org.apache.hadoop + hadoop-mapreduce-client-common + test + + + org.apache.hadoop + hadoop-mapreduce-client-core + test + org.mockito mockito-all diff --git a/pom.xml b/pom.xml index 43dd6f0c0..12358f670 100644 --- a/pom.xml +++ b/pom.xml @@ -314,7 +314,8 @@ org.apache.hadoop hadoop-client - ${hadoop.version} + ${hadoop.version}-cdh${cdh.version} + provided