From 11d2fd34281aa8a7365c46f033a65eab29898042 Mon Sep 17 00:00:00 2001 From: prazanna Date: Mon, 6 Feb 2017 14:32:32 -0800 Subject: [PATCH] Introduce RealtimeTableView and Implement HoodieRealtimeTableCompactor (#73) --- .../com/uber/hoodie/HoodieWriteClient.java | 8 +- .../hoodie/io/compact/CompactionFilter.java | 34 +++ .../io/compact/CompactionOperation.java | 79 +++++++ .../io/compact/HoodieCompactionMetadata.java | 26 +++ .../hoodie/io/compact/HoodieCompactor.java | 53 +++++ .../compact/HoodieRealtimeTableCompactor.java | 169 +++++++++++++++ .../hoodie/table/HoodieCopyOnWriteTable.java | 14 +- .../common/HoodieTestDataGenerator.java | 3 +- .../uber/hoodie/io/TestHoodieCompactor.java | 200 ++++++++++++++++++ .../common/model/HoodieAvroPayload.java | 52 +++++ .../hoodie/common/model/HoodieFileFormat.java | 2 +- .../hoodie/common/model/HoodieTableType.java | 2 +- .../common/table/HoodieTableConfig.java | 21 +- .../common/table/HoodieTableMetaClient.java | 8 + .../hoodie/common/table/HoodieTimeline.java | 12 +- .../table/log/HoodieLogAppendConfig.java | 14 +- .../common/table/log/HoodieLogFile.java | 22 +- .../log/avro/CompositeAvroLogReader.java | 4 +- .../table/timeline/HoodieActiveTimeline.java | 29 ++- .../common/table/timeline/HoodieInstant.java | 4 + .../view/AbstractTableFileSystemView.java | 81 ++++--- .../table/view/ReadOptimizedTableView.java | 16 +- .../common/table/view/RealtimeTableView.java | 80 +++++++ .../uber/hoodie/common/util/AvroUtils.java | 70 ++++++ .../com/uber/hoodie/common/util/FSUtils.java | 48 +++-- .../hoodie/common/util/HoodieAvroUtils.java | 6 +- .../hoodie/exception/HoodieException.java | 4 +- .../SchemaCompatabilityException.java | 31 +++ .../hoodie/common/model/HoodieTestUtils.java | 60 ++++++ .../table/log/avro/AvroLogAppenderTest.java | 17 +- 30 files changed, 1074 insertions(+), 95 deletions(-) create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/io/compact/CompactionFilter.java create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/io/compact/CompactionOperation.java create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactionMetadata.java create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java create mode 100644 hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java create mode 100644 hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java create mode 100644 hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RealtimeTableView.java create mode 100644 hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java create mode 100644 hoodie-common/src/main/java/com/uber/hoodie/exception/SchemaCompatabilityException.java diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 856533b8a..029ec6c3e 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -64,7 +64,6 @@ import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.text.ParseException; -import java.text.SimpleDateFormat; import java.util.Collections; import java.util.Date; import java.util.Iterator; @@ -94,8 +93,6 @@ public class HoodieWriteClient implements Seriali private transient final HoodieCommitArchiveLog archiveLog; private transient Timer.Context writeContext = null; - private final SimpleDateFormat FORMATTER = new SimpleDateFormat("yyyyMMddHHmmss"); - /** * @param jsc * @param clientConfig @@ -331,7 +328,8 @@ public class HoodieWriteClient implements Seriali clean(); if (writeContext != null) { long durationInMs = metrics.getDurationInMs(writeContext.stop()); - metrics.updateCommitMetrics(FORMATTER.parse(commitTime).getTime(), durationInMs, + metrics.updateCommitMetrics( + HoodieActiveTimeline.COMMIT_FORMATTER.parse(commitTime).getTime(), durationInMs, metadata); writeContext = null; } @@ -495,7 +493,7 @@ public class HoodieWriteClient implements Seriali * Provides a new commit time for a write operation (insert/update) */ public String startCommit() { - String commitTime = FORMATTER.format(new Date()); + String commitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()); startCommitWithTime(commitTime); return commitTime; } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/CompactionFilter.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/CompactionFilter.java new file mode 100644 index 000000000..81762144f --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/CompactionFilter.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.io.compact; + +import java.util.List; + +/** + * Implementations of CompactionFilter allows prioritizing and filtering certain type of + * compactions over other compactions. + * + * e.g. Filter in-efficient compaction like compacting a very large old parquet file with a small avro file + */ +public interface CompactionFilter { + List filter(List input); + + // Default implementation - do not filter anything + static CompactionFilter allowAll() { + return s -> s; + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/CompactionOperation.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/CompactionOperation.java new file mode 100644 index 000000000..785d4a53a --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/CompactionOperation.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.io.compact; + +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.table.log.HoodieLogFile; + +import java.io.Serializable; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Encapsulates all the needed information about a compaction + * and make a decision whether this compaction is effective or not + * + * @see CompactionFilter + */ +public class CompactionOperation implements Serializable { + private String dataFileCommitTime; + private long dataFileSize; + private List deltaFilePaths; + private String dataFilePath; + private String fileId; + private String partitionPath; + + //Only for serialization/de-serialization + @Deprecated + public CompactionOperation() { + } + + public CompactionOperation(HoodieDataFile dataFile, String partitionPath, + List value) { + this.dataFilePath = dataFile.getPath(); + this.fileId = dataFile.getFileId(); + this.partitionPath = partitionPath; + this.dataFileCommitTime = dataFile.getCommitTime(); + this.dataFileSize = dataFile.getFileStatus().getLen(); + this.deltaFilePaths = value.stream().map(s -> s.getPath().toString()).collect( + Collectors.toList()); + } + + public String getDataFileCommitTime() { + return dataFileCommitTime; + } + + public long getDataFileSize() { + return dataFileSize; + } + + public List getDeltaFilePaths() { + return deltaFilePaths; + } + + public String getDataFilePath() { + return dataFilePath; + } + + public String getFileId() { + return fileId; + } + + public String getPartitionPath() { + return partitionPath; + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactionMetadata.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactionMetadata.java new file mode 100644 index 000000000..8cb817f18 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactionMetadata.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.io.compact; + +import com.uber.hoodie.common.model.HoodieCommitMetadata; + +/** + * Place holder for the compaction specific meta-data, uses all the details used in a normal HoodieCommitMetadata + */ +public class HoodieCompactionMetadata extends HoodieCommitMetadata { + +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java new file mode 100644 index 000000000..bb6e54bdf --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.io.compact; + +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.table.view.RealtimeTableView; +import com.uber.hoodie.config.HoodieWriteConfig; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.Date; +import java.util.Optional; + +/** + * A HoodieCompactor runs compaction on a hoodie table + */ +public interface HoodieCompactor extends Serializable { + /** + * Compact the delta files with the data files + * @throws Exception + */ + HoodieCompactionMetadata compact(JavaSparkContext jsc, final HoodieWriteConfig config, + HoodieTableMetaClient metaClient, RealtimeTableView fsView, + CompactionFilter compactionFilter) throws Exception; + + + // Helper methods + default String startCompactionCommit(HoodieTableMetaClient metaClient) { + String commitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()); + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + activeTimeline + .createInflight(new HoodieInstant(true, HoodieTimeline.COMPACTION_ACTION, commitTime)); + return commitTime; + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java new file mode 100644 index 000000000..8cf0c1a72 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -0,0 +1,169 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.io.compact; + +import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.common.model.HoodieAvroPayload; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieWriteStat; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.table.view.RealtimeTableView; +import com.uber.hoodie.common.util.AvroUtils; +import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.HoodieAvroUtils; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.exception.HoodieCommitException; +import com.uber.hoodie.table.HoodieCopyOnWriteTable; +import org.apache.avro.Schema; +import org.apache.commons.collections.IteratorUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; +import scala.Tuple2; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * HoodieRealtimeTableCompactor compacts a hoodie table with merge on read storage. + * Computes all possible compactions, passes it through a CompactionFilter and executes + * all the compactions and writes a new version of base files and make a normal commit + * + * @see HoodieCompactor + */ +public class HoodieRealtimeTableCompactor implements HoodieCompactor { + private static Logger log = LogManager.getLogger(HoodieRealtimeTableCompactor.class); + + @Override + public HoodieCompactionMetadata compact(JavaSparkContext jsc, HoodieWriteConfig config, + HoodieTableMetaClient metaClient, RealtimeTableView fsView, + CompactionFilter compactionFilter) throws Exception { + // TODO - rollback any compactions in flight + + String compactionCommit = startCompactionCommit(metaClient); + log.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommit); + List partitionPaths = + FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath()); + + log.info("Compaction looking for files to compact in " + partitionPaths + " partitions"); + List operations = + jsc.parallelize(partitionPaths, partitionPaths.size()) + .flatMap((FlatMapFunction) partitionPath -> { + FileSystem fileSystem = FSUtils.getFs(); + return fsView.groupLatestDataFileWithLogFiles(fileSystem, partitionPath) + .entrySet().stream() + .map(s -> new CompactionOperation(s.getKey(), partitionPath, s.getValue())) + .collect(Collectors.toList()); + }).collect(); + log.info("Total of " + operations.size() + " compactions are retrieved"); + + // Filter the compactions with the passed in filter. This lets us choose most effective compactions only + operations = compactionFilter.filter(operations); + if(operations.isEmpty()) { + log.warn("After filtering, Nothing to compact for " + metaClient.getBasePath()); + return null; + } + + log.info("After filtering, Compacting " + operations + " files"); + List> updateStatusMap = + jsc.parallelize(operations, operations.size()).map( + (Function>>) compactionOperation -> executeCompaction( + metaClient, config, compactionOperation, compactionCommit)).flatMap( + (FlatMapFunction>, WriteStatus>) listIterator -> { + List> collected = IteratorUtils.toList(listIterator); + return collected.stream().flatMap(List::stream).collect(Collectors.toList()); + }).mapToPair(new PairFunction() { + @Override + public Tuple2 call(WriteStatus writeStatus) + throws Exception { + return new Tuple2<>(writeStatus.getPartitionPath(), writeStatus.getStat()); + } + }).collect(); + + HoodieCompactionMetadata metadata = new HoodieCompactionMetadata(); + for (Tuple2 stat : updateStatusMap) { + metadata.addWriteStat(stat._1(), stat._2()); + } + log.info("Compaction finished with result " + metadata); + + //noinspection ConstantConditions + if (isCompactionSucceeded(metadata)) { + log.info("Compaction succeeded " + compactionCommit); + commitCompaction(compactionCommit, metaClient, metadata); + } else { + log.info("Compaction failed " + compactionCommit); + } + return metadata; + } + + private boolean isCompactionSucceeded(HoodieCompactionMetadata result) { + //TODO figure out a success factor for a compaction + return true; + } + + private Iterator> executeCompaction(HoodieTableMetaClient metaClient, + HoodieWriteConfig config, CompactionOperation operation, String commitTime) + throws IOException { + FileSystem fs = FSUtils.getFs(); + Schema schema = + HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); + + log.info("Compacting base " + operation.getDataFilePath() + " with delta files " + operation + .getDeltaFilePaths() + " for commit " + commitTime); + // TODO - FIX THIS + // 1. Reads the entire avro file. Always only specific blocks should be read from the avro file (failure recover). + // Load all the delta commits since the last compaction commit and get all the blocks to be loaded and load it using CompositeAvroLogReader + // Since a DeltaCommit is not defined yet, reading all the records. revisit this soon. + + // 2. naively loads all the delta records in memory to merge it, + // since we only need a iterator, we could implement a lazy iterator to load from one delta file at a time + List> readDeltaFilesInMemory = + AvroUtils.loadFromFiles(fs, operation.getDeltaFilePaths(), schema); + + HoodieCopyOnWriteTable table = + new HoodieCopyOnWriteTable<>(commitTime, config, metaClient); + return table.handleUpdate(operation.getFileId(), readDeltaFilesInMemory.iterator()); + } + + public boolean commitCompaction(String commitTime, HoodieTableMetaClient metaClient, + HoodieCompactionMetadata metadata) { + log.info("Comitting " + commitTime); + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + + try { + activeTimeline.saveAsComplete( + new HoodieInstant(true, HoodieTimeline.COMPACTION_ACTION, commitTime), + Optional.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + } catch (IOException e) { + throw new HoodieCommitException( + "Failed to commit " + metaClient.getBasePath() + " at time " + commitTime, e); + } + return true; + } + +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index 20636fabf..d055d896e 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -30,7 +30,6 @@ import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.util.FSUtils; -import com.uber.hoodie.exception.HoodieInsertException; import com.uber.hoodie.exception.HoodieUpsertException; import com.uber.hoodie.func.LazyInsertIterable; import com.uber.hoodie.io.HoodieUpdateHandle; @@ -390,7 +389,7 @@ public class HoodieCopyOnWriteTable extends Hoodi @Override public Partitioner getInsertPartitioner(WorkloadProfile profile) { - return getUpsertPartitioner(profile); + return null; } @Override @@ -399,7 +398,9 @@ public class HoodieCopyOnWriteTable extends Hoodi } - public Iterator> handleUpdate(String fileLoc, Iterator> recordItr) throws Exception { + + public Iterator> handleUpdate(String fileLoc, Iterator> recordItr) + throws IOException { // these are updates HoodieUpdateHandle upsertHandle = new HoodieUpdateHandle<>(config, commitTime, metaClient, recordItr, fileLoc); @@ -462,11 +463,4 @@ public class HoodieCopyOnWriteTable extends Hoodi throw new HoodieUpsertException(msg, t); } } - - @Override - public Iterator> handleInsertPartition(Integer partition, - Iterator recordItr, - Partitioner partitioner) { - return handleUpsertPartition(partition, recordItr, partitioner); - } } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java index 800ee5e95..47bddde2b 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java @@ -19,6 +19,7 @@ package com.uber.hoodie.common; import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.util.FSUtils; @@ -66,7 +67,7 @@ public class HoodieTestDataGenerator { private List existingKeysList = new ArrayList<>(); - private static Schema avroSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA)); + public static Schema avroSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA)); private static Random rand = new Random(46474747); private String[] partitionPaths = {"2016/03/15", "2015/03/16", "2015/03/17"}; diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java new file mode 100644 index 000000000..b0583c6d5 --- /dev/null +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java @@ -0,0 +1,200 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.io; + +import com.uber.hoodie.HoodieReadClient; +import com.uber.hoodie.HoodieWriteClient; +import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.common.HoodieTestDataGenerator; +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieTableType; +import com.uber.hoodie.common.model.HoodieTestUtils; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.log.HoodieLogFile; +import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.table.view.RealtimeTableView; +import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.config.HoodieCompactionConfig; +import com.uber.hoodie.config.HoodieIndexConfig; +import com.uber.hoodie.config.HoodieStorageConfig; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.index.HoodieBloomIndex; +import com.uber.hoodie.index.HoodieIndex; +import com.uber.hoodie.io.compact.CompactionFilter; +import com.uber.hoodie.io.compact.HoodieCompactionMetadata; +import com.uber.hoodie.io.compact.HoodieCompactor; +import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; +import org.apache.hadoop.fs.FileSystem; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SQLContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestHoodieCompactor { + private transient JavaSparkContext jsc = null; + private transient SQLContext sqlContext; + private String basePath = null; + private HoodieCompactor compactor; + private transient HoodieTestDataGenerator dataGen = null; + + @Before + public void init() throws IOException { + // Initialize a local spark env + SparkConf sparkConf = + new SparkConf().setAppName("TestHoodieCompactor").setMaster("local[4]"); + jsc = new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf)); + + // Create a temp folder as the base path + TemporaryFolder folder = new TemporaryFolder(); + folder.create(); + basePath = folder.getRoot().getAbsolutePath(); + HoodieTestUtils.initTableType(basePath, HoodieTableType.MERGE_ON_READ); + + dataGen = new HoodieTestDataGenerator(); + compactor = new HoodieRealtimeTableCompactor(); + } + + @After + public void clean() { + if (basePath != null) { + new File(basePath).delete(); + } + if (jsc != null) { + jsc.stop(); + } + } + + private HoodieWriteConfig getConfig() { + return getConfigBuilder().build(); + } + + private HoodieWriteConfig.Builder getConfigBuilder() { + return HoodieWriteConfig.newBuilder().withPath(basePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) + .withCompactionConfig( + HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build()) + .forTable("test-trip-table").withIndexConfig( + HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()); + } + + @Test(expected = IllegalArgumentException.class) + public void testCompactionOnCopyOnWriteFail() throws Exception { + HoodieTestUtils.initTableType(basePath, HoodieTableType.COPY_ON_WRITE); + + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(FSUtils.getFs(), basePath); + RealtimeTableView fsView = new RealtimeTableView(FSUtils.getFs(), metaClient); + compactor.compact(jsc, getConfig(), metaClient, fsView, CompactionFilter.allowAll()); + } + + @Test + public void testCompactionEmpty() throws Exception { + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(FSUtils.getFs(), basePath); + RealtimeTableView fsView = new RealtimeTableView(FSUtils.getFs(), metaClient); + HoodieWriteConfig config = getConfig(); + HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); + String newCommitTime = writeClient.startCommit(); + List records = dataGen.generateInserts(newCommitTime, 100); + JavaRDD recordsRDD = jsc.parallelize(records, 1); + writeClient.insert(recordsRDD, newCommitTime).collect(); + + HoodieCompactionMetadata result = + compactor.compact(jsc, getConfig(), metaClient, fsView, CompactionFilter.allowAll()); + assertTrue("If there is nothing to compact, result wull be null", result == null); + } + + @Test + public void testLogFileCountsAfterCompaction() throws Exception { + FileSystem fs = FSUtils.getFs(); + // insert 100 records + HoodieWriteConfig config = getConfig(); + HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); + String newCommitTime = "100"; + List records = dataGen.generateInserts(newCommitTime, 100); + JavaRDD recordsRDD = jsc.parallelize(records, 1); + List statuses = writeClient.insert(recordsRDD, newCommitTime).collect(); + + // Update all the 100 records + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); + newCommitTime = "101"; + List updatedRecords = dataGen.generateUpdates(newCommitTime, records); + JavaRDD updatedRecordsRDD = jsc.parallelize(updatedRecords, 1); + HoodieIndex index = new HoodieBloomIndex<>(config, jsc); + updatedRecords = index.tagLocation(updatedRecordsRDD, metaClient).collect(); + + // Write them to corresponding avro logfiles + HoodieTestUtils + .writeRecordsToLogFiles(metaClient.getBasePath(), HoodieTestDataGenerator.avroSchema, + updatedRecords); + + // Verify that all data file has one log file + metaClient = new HoodieTableMetaClient(fs, basePath); + RealtimeTableView fsView = new RealtimeTableView(fs, metaClient); + for (String partitionPath : dataGen.getPartitionPaths()) { + Map> groupedLogFiles = + fsView.groupLatestDataFileWithLogFiles(fs, partitionPath); + for (List logFiles : groupedLogFiles.values()) { + assertEquals("There should be 1 log file written for every data file", 1, + logFiles.size()); + } + } + + // Do a compaction + metaClient = new HoodieTableMetaClient(fs, basePath); + fsView = new RealtimeTableView(fs, metaClient); + HoodieCompactionMetadata result = + compactor.compact(jsc, getConfig(), metaClient, fsView, CompactionFilter.allowAll()); + + // Verify that recently written compacted data file has no log file + metaClient = new HoodieTableMetaClient(fs, basePath); + fsView = new RealtimeTableView(fs, metaClient); + HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); + + assertTrue("Compaction commit should be > than last insert", timeline + .compareTimestamps(timeline.lastInstant().get().getTimestamp(), newCommitTime, + HoodieTimeline.GREATER)); + + for (String partitionPath : dataGen.getPartitionPaths()) { + Map> groupedLogFiles = + fsView.groupLatestDataFileWithLogFiles(fs, partitionPath); + for (List logFiles : groupedLogFiles.values()) { + assertTrue( + "After compaction there should be no log files visiable on a Realtime view", + logFiles.isEmpty()); + } + assertTrue(result.getPartitionToWriteStats().containsKey(partitionPath)); + } + } + + // TODO - after modifying HoodieReadClient to support realtime tables - add more tests to make sure the data read is the updated data (compaction correctness) + // TODO - add more test cases for compactions after a failed commit/compaction +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java new file mode 100644 index 000000000..e552d7e13 --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.model; + +import com.uber.hoodie.common.util.HoodieAvroUtils; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; + +import java.io.IOException; + +/** + * This is a payload to wrap a existing Hoodie Avro Record. + * Useful to create a HoodieRecord over existing GenericRecords in a hoodie datasets (useful in compactions) + */ +public class HoodieAvroPayload implements HoodieRecordPayload { + private final GenericRecord record; + + public HoodieAvroPayload(GenericRecord record) { + this.record = record; + } + + @Override + public HoodieAvroPayload preCombine(HoodieAvroPayload another) { + return this; + } + + @Override + public IndexedRecord combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) + throws IOException { + return getInsertValue(schema); + } + + @Override + public IndexedRecord getInsertValue(Schema schema) throws IOException { + return HoodieAvroUtils.rewriteRecord(record, schema); + } +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieFileFormat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieFileFormat.java index 73da01ca9..3b6c0de9f 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieFileFormat.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieFileFormat.java @@ -17,7 +17,7 @@ package com.uber.hoodie.common.model; public enum HoodieFileFormat { - PARQUET(".parquet"); + PARQUET(".parquet"), AVRO(".avro"); private final String extension; diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieTableType.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieTableType.java index d37252045..fb91bc1ca 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieTableType.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieTableType.java @@ -31,5 +31,5 @@ package com.uber.hoodie.common.model; * SIMPLE_LSM - A simple 2 level LSM tree. */ public enum HoodieTableType { - COPY_ON_WRITE + COPY_ON_WRITE, MERGE_ON_READ } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableConfig.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableConfig.java index 33fd78152..dc1a86472 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableConfig.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableConfig.java @@ -47,8 +47,12 @@ public class HoodieTableConfig implements Serializable { public static final String HOODIE_TABLE_TYPE_PROP_NAME = "hoodie.table.type"; public static final String HOODIE_RO_FILE_FORMAT_PROP_NAME = "hoodie.table.ro.file.format"; + public static final String HOODIE_RT_FILE_FORMAT_PROP_NAME = + "hoodie.table.rt.file.format"; + public static final HoodieTableType DEFAULT_TABLE_TYPE = HoodieTableType.COPY_ON_WRITE; public static final HoodieFileFormat DEFAULT_RO_FILE_FORMAT = HoodieFileFormat.PARQUET; + public static final HoodieFileFormat DEFAULT_RT_FILE_FORMAT = HoodieFileFormat.AVRO; private Properties props; public HoodieTableConfig(FileSystem fs, String metaPath) { @@ -108,7 +112,7 @@ public class HoodieTableConfig implements Serializable { * @return */ public HoodieTableType getTableType() { - if (props.contains(HOODIE_TABLE_TYPE_PROP_NAME)) { + if (props.containsKey(HOODIE_TABLE_TYPE_PROP_NAME)) { return HoodieTableType.valueOf(props.getProperty(HOODIE_TABLE_TYPE_PROP_NAME)); } return DEFAULT_TABLE_TYPE; @@ -129,9 +133,22 @@ public class HoodieTableConfig implements Serializable { * @return HoodieFileFormat for the Read Optimized Storage format */ public HoodieFileFormat getROFileFormat() { - if (props.contains(HOODIE_RO_FILE_FORMAT_PROP_NAME)) { + if (props.containsKey(HOODIE_RO_FILE_FORMAT_PROP_NAME)) { return HoodieFileFormat.valueOf(props.getProperty(HOODIE_RO_FILE_FORMAT_PROP_NAME)); } return DEFAULT_RO_FILE_FORMAT; } + + /** + * Get the Read Optimized Storage Format + * + * @return HoodieFileFormat for the Read Optimized Storage format + */ + public HoodieFileFormat getRTFileFormat() { + if (props.containsKey(HOODIE_RT_FILE_FORMAT_PROP_NAME)) { + return HoodieFileFormat.valueOf(props.getProperty(HOODIE_RT_FILE_FORMAT_PROP_NAME)); + } + return DEFAULT_RT_FILE_FORMAT; + } + } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java index f80000bc9..d145d21fb 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java @@ -137,6 +137,14 @@ public class HoodieTableMetaClient implements Serializable { return tableConfig; } + /** + * Get the FS implementation for this table + * @return + */ + public FileSystem getFs() { + return fs; + } + /** * Get the active instants as a timeline * diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java index c83c19c9d..761acece8 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java @@ -41,15 +41,17 @@ public interface HoodieTimeline extends Serializable { String COMMIT_ACTION = "commit"; String CLEAN_ACTION = "clean"; String SAVEPOINT_ACTION = "savepoint"; + String COMPACTION_ACTION = "compaction"; String INFLIGHT_EXTENSION = ".inflight"; String COMMIT_EXTENSION = "." + COMMIT_ACTION; String CLEAN_EXTENSION = "." + CLEAN_ACTION; String SAVEPOINT_EXTENSION = "." + SAVEPOINT_ACTION; + String COMPACTION_EXTENSION = "." + COMPACTION_ACTION; //this is to preserve backwards compatibility on commit in-flight filenames String INFLIGHT_COMMIT_EXTENSION = INFLIGHT_EXTENSION; String INFLIGHT_CLEAN_EXTENSION = "." + CLEAN_ACTION + INFLIGHT_EXTENSION; String INFLIGHT_SAVEPOINT_EXTENSION = "." + SAVEPOINT_ACTION + INFLIGHT_EXTENSION; - + String INFLIGHT_COMPACTION_EXTENSION = "." + COMPACTION_ACTION + INFLIGHT_EXTENSION; /** * Filter this timeline to just include the in-flights @@ -193,6 +195,14 @@ public interface HoodieTimeline extends Serializable { return commitTime + HoodieTimeline.SAVEPOINT_EXTENSION; } + static String makeInflightCompactionFileName(String commitTime) { + return commitTime + HoodieTimeline.INFLIGHT_COMPACTION_EXTENSION; + } + + static String makeCompactionFileName(String commitTime) { + return commitTime + HoodieTimeline.COMPACTION_EXTENSION; + } + static String getCommitFromCommitFile(String commitFileName) { return commitFileName.split("\\.")[0]; } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogAppendConfig.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogAppendConfig.java index 34d727ede..cc6642a91 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogAppendConfig.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogAppendConfig.java @@ -117,6 +117,8 @@ public class HoodieLogAppendConfig { private Integer fileVersion; // Partition path for the log file private Path partitionPath; + // The base commit time for which the log files are accumulated + private String baseCommitTime; public Builder withBufferSize(int bufferSize) { this.bufferSize = bufferSize; @@ -173,6 +175,11 @@ public class HoodieLogAppendConfig { return this; } + public Builder withBaseCommitTime(String commitTime) { + this.baseCommitTime = commitTime; + return this; + } + public HoodieLogAppendConfig build() throws IOException { log.info("Building HoodieLogAppendConfig"); if (schema == null) { @@ -185,6 +192,9 @@ public class HoodieLogAppendConfig { if (fileId == null) { throw new IllegalArgumentException("FileID is not specified"); } + if (baseCommitTime == null) { + throw new IllegalArgumentException("BaseCommitTime is not specified"); + } if (logFileExtension == null) { throw new IllegalArgumentException("File extension is not specified"); } @@ -194,14 +204,14 @@ public class HoodieLogAppendConfig { if (fileVersion == null) { log.info("Computing the next log version for " + fileId + " in " + partitionPath); fileVersion = - FSUtils.getCurrentLogVersion(fs, partitionPath, fileId, logFileExtension); + FSUtils.getCurrentLogVersion(fs, partitionPath, fileId, logFileExtension, baseCommitTime); log.info( "Computed the next log version for " + fileId + " in " + partitionPath + " as " + fileVersion); } Path logPath = new Path(partitionPath, - FSUtils.makeLogFileName(fileId, logFileExtension, fileVersion)); + FSUtils.makeLogFileName(fileId, logFileExtension, baseCommitTime, fileVersion)); log.info("LogConfig created on path " + logPath); HoodieLogFile logFile = new HoodieLogFile(logPath); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFile.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFile.java index 356cfbf17..8759c86a5 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFile.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFile.java @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.IOException; -import java.io.Serializable; +import java.util.Comparator; import java.util.Optional; /** @@ -51,6 +51,10 @@ public class HoodieLogFile { return FSUtils.getFileIdFromLogPath(path); } + public String getBaseCommitTime() { + return FSUtils.getBaseCommitTimeFromLogPath(path); + } + public int getLogVersion() { return FSUtils.getFileVersionFromLog(path); } @@ -74,10 +78,12 @@ public class HoodieLogFile { public HoodieLogFile rollOver(FileSystem fs) throws IOException { String fileId = getFileId(); - int newVersion = - FSUtils.computeNextLogVersion(fs, path.getParent(), fileId, DELTA_EXTENSION); + String baseCommitTime = getBaseCommitTime(); + int newVersion = FSUtils + .computeNextLogVersion(fs, path.getParent(), fileId, + DELTA_EXTENSION, baseCommitTime); return new HoodieLogFile(new Path(path.getParent(), - FSUtils.makeLogFileName(fileId, DELTA_EXTENSION, newVersion))); + FSUtils.makeLogFileName(fileId, DELTA_EXTENSION, baseCommitTime, newVersion))); } public boolean shouldRollOver(HoodieLogAppender currentWriter, HoodieLogAppendConfig config) @@ -85,6 +91,14 @@ public class HoodieLogFile { return currentWriter.getCurrentSize() > config.getSizeThreshold(); } + public static Comparator getLogVersionComparator() { + return (o1, o2) -> { + // reverse the order + return new Integer(o2.getLogVersion()).compareTo(o1.getLogVersion()); + }; + } + + @Override public String toString() { return "HoodieLogFile{" + path + '}'; diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/CompositeAvroLogReader.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/CompositeAvroLogReader.java index 7ed6f497d..881fe30cf 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/CompositeAvroLogReader.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/CompositeAvroLogReader.java @@ -43,10 +43,10 @@ import java.util.stream.Stream; public class CompositeAvroLogReader { private final Map readers; - public CompositeAvroLogReader(Path partitionPath, String fileId, FileSystem fs, + public CompositeAvroLogReader(Path partitionPath, String fileId, String baseCommitTime, FileSystem fs, Schema readerSchema, String logFileExtension) throws IOException { Stream allLogFiles = - FSUtils.getAllLogFiles(fs, partitionPath, fileId, logFileExtension); + FSUtils.getAllLogFiles(fs, partitionPath, fileId, logFileExtension, baseCommitTime); this.readers = allLogFiles.map(hoodieLogFile -> { try { return new AvroLogReader(hoodieLogFile, fs, readerSchema); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java index 17ba91116..d1654974a 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java @@ -31,9 +31,11 @@ import org.apache.log4j.Logger; import java.io.IOException; import java.io.Serializable; +import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Comparator; import java.util.Optional; +import java.util.Set; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -50,6 +52,8 @@ import java.util.stream.Stream; * This class can be serialized and de-serialized and on de-serialization the FileSystem is re-initialized. */ public class HoodieActiveTimeline extends HoodieDefaultTimeline { + public static final SimpleDateFormat COMMIT_FORMATTER = new SimpleDateFormat("yyyyMMddHHmmss"); + private final transient static Logger log = LogManager.getLogger(HoodieActiveTimeline.class); private String metaPath; private transient FileSystem fs; @@ -81,8 +85,8 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { public HoodieActiveTimeline(FileSystem fs, String metaPath) { this(fs, metaPath, - new String[] {COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, SAVEPOINT_EXTENSION, - INFLIGHT_SAVEPOINT_EXTENSION, CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION}); + new String[] {COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, SAVEPOINT_EXTENSION, COMPACTION_EXTENSION, + INFLIGHT_SAVEPOINT_EXTENSION, CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION, COMPACTION_EXTENSION}); } /** @@ -113,6 +117,27 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { (Function> & Serializable) this::getInstantDetails); } + /** + * Get only the commits (inflight and completed) in the compaction timeline + * + * @return + */ + public HoodieTimeline getCompactionTimeline() { + return new HoodieDefaultTimeline(filterInstantsByAction(COMPACTION_ACTION), + (Function> & Serializable) this::getInstantDetails); + } + + /** + * Get a timeline of a specific set of actions. useful to create a merged timeline of multiple actions + * + * @param actions actions allowed in the timeline + * @return + */ + public HoodieTimeline getTimelineOfActions(Set actions) { + return new HoodieDefaultTimeline(instants.stream().filter(s -> actions.contains(s.getAction())), + (Function> & Serializable) this::getInstantDetails); + } + /** * Get only the cleaner action (inflight and completed) in the active timeline * diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java index 1d67bbeb9..1679f0b13 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java @@ -93,6 +93,10 @@ public class HoodieInstant implements Serializable { return isInflight ? HoodieTimeline.makeInflightSavePointFileName(timestamp) : HoodieTimeline.makeSavePointFileName(timestamp); + } else if (HoodieTimeline.COMPACTION_ACTION.equals(action)) { + return isInflight ? + HoodieTimeline.makeInflightCompactionFileName(timestamp) : + HoodieTimeline.makeCompactionFileName(timestamp); } throw new IllegalArgumentException("Cannot get file name for unknown action " + action); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java index 032d27096..2c44044de 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java @@ -21,12 +21,15 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.exception.HoodieIOException; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import java.io.IOException; +import java.io.Serializable; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -47,22 +50,38 @@ import java.util.stream.Stream; * @see ReadOptimizedTableView * @since 0.3.0 */ -public abstract class AbstractTableFileSystemView implements TableFileSystemView { - protected final HoodieTableMetaClient metaClient; - protected final transient FileSystem fs; - protected final HoodieTimeline activeCommitTimeline; +public abstract class AbstractTableFileSystemView implements TableFileSystemView, Serializable { + protected HoodieTableMetaClient metaClient; + protected transient FileSystem fs; + // This is the commits that will be visible for all views extending this view + protected HoodieTimeline visibleActiveCommitTimeline; - public AbstractTableFileSystemView(FileSystem fs, HoodieTableMetaClient metaClient) { + public AbstractTableFileSystemView(FileSystem fs, HoodieTableMetaClient metaClient, + HoodieTimeline visibleActiveCommitTimeline) { this.metaClient = metaClient; this.fs = fs; - // Get the active timeline and filter only completed commits - this.activeCommitTimeline = - metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + this.visibleActiveCommitTimeline = visibleActiveCommitTimeline; + } + + /** + * This method is only used when this object is deserialized in a spark executor. + * + * @deprecated + */ + private void readObject(java.io.ObjectInputStream in) + throws IOException, ClassNotFoundException { + in.defaultReadObject(); + this.fs = FSUtils.getFs(); + } + + private void writeObject(java.io.ObjectOutputStream out) + throws IOException { + out.defaultWriteObject(); } public Stream getLatestDataFilesForFileId(final String partitionPath, String fileId) { - Optional lastInstant = activeCommitTimeline.lastInstant(); + Optional lastInstant = visibleActiveCommitTimeline.lastInstant(); if (lastInstant.isPresent()) { return getLatestVersionInPartition(partitionPath, lastInstant.get().getTimestamp()) .filter(hoodieDataFile -> hoodieDataFile.getFileId().equals(fileId)); @@ -73,22 +92,17 @@ public abstract class AbstractTableFileSystemView implements TableFileSystemView @Override public Stream getLatestVersionInPartition(String partitionPathStr, String maxCommitTime) { - try { - return getLatestVersionsBeforeOrOn(listDataFilesInPartition(partitionPathStr), - maxCommitTime); - } catch (IOException e) { - throw new HoodieIOException( - "Could not get latest versions in Partition " + partitionPathStr, e); - } + return getLatestVersionsBeforeOrOn(listDataFilesInPartition(partitionPathStr), + maxCommitTime); } @Override public Stream> getEveryVersionInPartition(String partitionPath) { try { - if (activeCommitTimeline.lastInstant().isPresent()) { + if (visibleActiveCommitTimeline.lastInstant().isPresent()) { return getFilesByFileId(listDataFilesInPartition(partitionPath), - activeCommitTimeline.lastInstant().get().getTimestamp()); + visibleActiveCommitTimeline.lastInstant().get().getTimestamp()); } return Stream.empty(); } catch (IOException e) { @@ -97,18 +111,26 @@ public abstract class AbstractTableFileSystemView implements TableFileSystemView } } - protected abstract FileStatus[] listDataFilesInPartition(String partitionPathStr) - throws IOException; + protected FileStatus[] listDataFilesInPartition(String partitionPathStr) { + Path partitionPath = new Path(metaClient.getBasePath(), partitionPathStr); + try { + return fs.listStatus(partitionPath, path -> path.getName() + .contains(metaClient.getTableConfig().getROFileFormat().getFileExtension())); + } catch (IOException e) { + throw new HoodieIOException( + "Failed to list data files in partition " + partitionPathStr, e); + } + } @Override public Stream getLatestVersionInRange(FileStatus[] fileStatuses, List commitsToReturn) { - if (activeCommitTimeline.empty() || commitsToReturn.isEmpty()) { + if (visibleActiveCommitTimeline.empty() || commitsToReturn.isEmpty()) { return Stream.empty(); } try { return getFilesByFileId(fileStatuses, - activeCommitTimeline.lastInstant().get().getTimestamp()) + visibleActiveCommitTimeline.lastInstant().get().getTimestamp()) .map((Function, Optional>) fss -> { for (HoodieDataFile fs : fss) { if (commitsToReturn.contains(fs.getCommitTime())) { @@ -127,14 +149,14 @@ public abstract class AbstractTableFileSystemView implements TableFileSystemView public Stream getLatestVersionsBeforeOrOn(FileStatus[] fileStatuses, String maxCommitToReturn) { try { - if (activeCommitTimeline.empty()) { + if (visibleActiveCommitTimeline.empty()) { return Stream.empty(); } return getFilesByFileId(fileStatuses, - activeCommitTimeline.lastInstant().get().getTimestamp()) + visibleActiveCommitTimeline.lastInstant().get().getTimestamp()) .map((Function, Optional>) fss -> { for (HoodieDataFile fs1 : fss) { - if (activeCommitTimeline + if (visibleActiveCommitTimeline .compareTimestamps(fs1.getCommitTime(), maxCommitToReturn, HoodieTimeline.LESSER_OR_EQUAL)) { return Optional.of(fs1); @@ -150,11 +172,11 @@ public abstract class AbstractTableFileSystemView implements TableFileSystemView @Override public Stream getLatestVersions(FileStatus[] fileStatuses) { try { - if (activeCommitTimeline.empty()) { + if (visibleActiveCommitTimeline.empty()) { return Stream.empty(); } return getFilesByFileId(fileStatuses, - activeCommitTimeline.lastInstant().get().getTimestamp()) + visibleActiveCommitTimeline.lastInstant().get().getTimestamp()) .map(statuses -> statuses.get(0)); } catch (IOException e) { throw new HoodieIOException("Could not filter files for latest version ", e); @@ -178,8 +200,9 @@ public abstract class AbstractTableFileSystemView implements TableFileSystemView String maxCommitTime) throws IOException { return Arrays.stream(files).flatMap(fileStatus -> { HoodieDataFile dataFile = new HoodieDataFile(fileStatus); - if (activeCommitTimeline.containsOrBeforeTimelineStarts(dataFile.getCommitTime()) - && activeCommitTimeline.compareTimestamps(dataFile.getCommitTime(), maxCommitTime, + if (visibleActiveCommitTimeline.containsOrBeforeTimelineStarts(dataFile.getCommitTime()) + && visibleActiveCommitTimeline + .compareTimestamps(dataFile.getCommitTime(), maxCommitTime, HoodieTimeline.LESSER_OR_EQUAL)) { return Stream.of(Pair.of(dataFile.getFileId(), dataFile)); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/ReadOptimizedTableView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/ReadOptimizedTableView.java index 53d440d8b..3aeaa4914 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/ReadOptimizedTableView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/ReadOptimizedTableView.java @@ -29,19 +29,9 @@ import java.io.IOException; */ public class ReadOptimizedTableView extends AbstractTableFileSystemView { public ReadOptimizedTableView(FileSystem fs, HoodieTableMetaClient metaClient) { - super(fs, metaClient); + // Get the active timeline and filter only completed commits + super(fs, metaClient, + metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants()); } - protected FileStatus[] listDataFilesInPartition(String partitionPathStr) { - Path partitionPath = new Path(metaClient.getBasePath(), partitionPathStr); - try { - return fs.listStatus(partitionPath, path -> path.getName() - .contains(metaClient.getTableConfig().getROFileFormat().getFileExtension())); - } catch (IOException e) { - throw new HoodieIOException( - "Failed to list data files in partition " + partitionPathStr, e); - } - } - - } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RealtimeTableView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RealtimeTableView.java new file mode 100644 index 000000000..c5b1feb4a --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RealtimeTableView.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.table.view; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodieTableType; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.log.HoodieLogFile; +import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Realtime Table View which includes both ROStorageformat files and RTStorageFormat files + */ +public class RealtimeTableView extends AbstractTableFileSystemView { + public RealtimeTableView(FileSystem fs, HoodieTableMetaClient metaClient) { + // For realtime table view, visibleActiveCommitTimeline is a merged timeline of all commits and compactions + super(fs, metaClient, metaClient.getActiveTimeline().getTimelineOfActions( + Sets.newHashSet(HoodieActiveTimeline.COMMIT_ACTION, + HoodieActiveTimeline.COMPACTION_ACTION)).filterCompletedInstants()); + Preconditions.checkArgument(metaClient.getTableType() == HoodieTableType.MERGE_ON_READ, + "Realtime view can only be constructed on Hoodie tables with MERGE_ON_READ storage type"); + } + + public Map> groupLatestDataFileWithLogFiles(FileSystem fs, + String partitionPath) throws IOException { + // All the files in the partition + FileStatus[] files = fs.listStatus(new Path(metaClient.getBasePath(), partitionPath)); + // All the log files filtered from the above list, sorted by version numbers + List allLogFiles = Arrays.stream(files).filter(s -> s.getPath().getName() + .contains(metaClient.getTableConfig().getRTFileFormat().getFileExtension())) + .map(HoodieLogFile::new).collect(Collectors.collectingAndThen(Collectors.toList(), + l -> l.stream().sorted(HoodieLogFile.getLogVersionComparator()) + .collect(Collectors.toList()))); + + // Filter the delta files by the commit time of the latest base fine and collect as a list + Optional lastTimestamp = metaClient.getActiveTimeline().lastInstant(); + if(!lastTimestamp.isPresent()) { + return Maps.newHashMap(); + } + + return getLatestVersionInPartition(partitionPath, lastTimestamp.get().getTimestamp()).map( + hoodieDataFile -> Pair.of(hoodieDataFile, allLogFiles.stream().filter( + s -> s.getFileId().equals(hoodieDataFile.getFileId()) && s.getBaseCommitTime() + .equals(hoodieDataFile.getCommitTime())).collect(Collectors.toList()))).collect( + Collectors.toMap( + (Function>, HoodieDataFile>) Pair::getKey, + (Function>, List>) Pair::getRight)); + } + +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java new file mode 100644 index 000000000..707b191d5 --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.util; + +import com.google.common.collect.Lists; +import com.uber.hoodie.common.model.HoodieAvroPayload; +import com.uber.hoodie.common.model.HoodieKey; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.exception.HoodieIOException; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.FileReader; +import org.apache.avro.file.SeekableInput; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.hadoop.fs.AvroFSInput; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.List; + +public class AvroUtils { + + public static List> loadFromFiles(FileSystem fs, + List deltaFilePaths, Schema expectedSchema) { + + List> loadedRecords = Lists.newArrayList(); + deltaFilePaths.forEach(s -> { + Path path = new Path(s); + try { + SeekableInput input = + new AvroFSInput(FileContext.getFileContext(fs.getConf()), path); + GenericDatumReader reader = new GenericDatumReader<>(); + // Set the expected schema to be the current schema to account for schema evolution + reader.setExpected(expectedSchema); + + FileReader fileReader = DataFileReader.openReader(input, reader); + for (GenericRecord deltaRecord : fileReader) { + String key = deltaRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); + String partitionPath = + deltaRecord.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); + loadedRecords.add(new HoodieRecord<>(new HoodieKey(key, partitionPath), + new HoodieAvroPayload(deltaRecord))); + } + fileReader.close(); // also closes underlying FsInput + } catch (IOException e) { + throw new HoodieIOException("Could not read avro records from path " + s, e); + } + }); + return loadedRecords; + } +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java index 5cbc6f2ea..cc8cb0ee8 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java @@ -17,6 +17,7 @@ package com.uber.hoodie.common.util; import com.google.common.base.Preconditions; +import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.log.HoodieLogFile; import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.InvalidHoodiePathException; @@ -45,8 +46,8 @@ import java.util.stream.Stream; public class FSUtils { private static final Logger LOG = LogManager.getLogger(FSUtils.class); - // Log files are of this pattern - b5068208-e1a4-11e6-bf01-fe55135034f3.avro.delta.1 - private static final Pattern LOG_FILE_PATTERN = Pattern.compile("(.*)\\.(.*)\\.(.*)\\.([0-9]*)"); + // Log files are of this pattern - b5068208-e1a4-11e6-bf01-fe55135034f3_20170101134598.avro.delta.1 + private static final Pattern LOG_FILE_PATTERN = Pattern.compile("(.*)_(.*)\\.(.*)\\.(.*)\\.([0-9]*)"); private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10; public static FileSystem getFs() { @@ -140,7 +141,7 @@ public class FSUtils { if(!matcher.find()) { throw new InvalidHoodiePathException(logPath, "LogFile"); } - return matcher.group(2) + "." + matcher.group(3); + return matcher.group(3) + "." + matcher.group(4); } /** @@ -158,6 +159,21 @@ public class FSUtils { return matcher.group(1); } + /** + * Get the first part of the file name in the log file. That will be the fileId. + * Log file do not have commitTime in the file name. + * + * @param path + * @return + */ + public static String getBaseCommitTimeFromLogPath(Path path) { + Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName()); + if(!matcher.find()) { + throw new InvalidHoodiePathException(path, "LogFile"); + } + return matcher.group(2); + } + /** * Get the last part of the file name in the log file and convert to int. * @@ -169,11 +185,12 @@ public class FSUtils { if(!matcher.find()) { throw new InvalidHoodiePathException(logPath, "LogFile"); } - return Integer.parseInt(matcher.group(4)); + return Integer.parseInt(matcher.group(5)); } - public static String makeLogFileName(String fileId, String logFileExtension, int version) { - return String.format("%s%s.%d", fileId, logFileExtension, version); + public static String makeLogFileName(String fileId, String logFileExtension, + String baseCommitTime, int version) { + return String.format("%s_%s%s.%d", fileId, baseCommitTime, logFileExtension, version); } /** @@ -198,10 +215,10 @@ public class FSUtils { * @return */ public static Stream getAllLogFiles(FileSystem fs, Path partitionPath, - final String fileId, final String logFileExtension) throws IOException { + final String fileId, final String logFileExtension, final String baseCommitTime) throws IOException { return Arrays.stream(fs.listStatus(partitionPath, - path -> path.getName().startsWith(fileId) && path.getName() - .contains(logFileExtension))).map(HoodieLogFile::new); + path -> path.getName().startsWith(fileId) && path.getName().contains(logFileExtension))) + .map(HoodieLogFile::new).filter(s -> s.getBaseCommitTime().equals(baseCommitTime)); } /** @@ -215,9 +232,9 @@ public class FSUtils { * @throws IOException */ public static Optional getLatestLogVersion(FileSystem fs, Path partitionPath, - final String fileId, final String logFileExtension) throws IOException { + final String fileId, final String logFileExtension, final String baseCommitTime) throws IOException { Optional latestLogFile = - getLatestLogFile(getAllLogFiles(fs, partitionPath, fileId, logFileExtension)); + getLatestLogFile(getAllLogFiles(fs, partitionPath, fileId, logFileExtension, baseCommitTime)); if (latestLogFile.isPresent()) { return Optional.of(latestLogFile.get().getLogVersion()); } @@ -225,9 +242,9 @@ public class FSUtils { } public static int getCurrentLogVersion(FileSystem fs, Path partitionPath, - final String fileId, final String logFileExtension) throws IOException { + final String fileId, final String logFileExtension, final String baseCommitTime) throws IOException { Optional currentVersion = - getLatestLogVersion(fs, partitionPath, fileId, logFileExtension); + getLatestLogVersion(fs, partitionPath, fileId, logFileExtension, baseCommitTime); // handle potential overflow return (currentVersion.isPresent()) ? currentVersion.get() : 1; } @@ -242,9 +259,9 @@ public class FSUtils { * @throws IOException */ public static int computeNextLogVersion(FileSystem fs, Path partitionPath, final String fileId, - final String logFileExtension) throws IOException { + final String logFileExtension, final String baseCommitTime) throws IOException { Optional currentVersion = - getLatestLogVersion(fs, partitionPath, fileId, logFileExtension); + getLatestLogVersion(fs, partitionPath, fileId, logFileExtension, baseCommitTime); // handle potential overflow return (currentVersion.isPresent()) ? currentVersion.get() + 1 : 1; } @@ -287,4 +304,5 @@ public class FSUtils { return recovered; } + } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java index e59f77028..c3f0d0358 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java @@ -18,6 +18,7 @@ package com.uber.hoodie.common.util; import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.exception.SchemaCompatabilityException; import org.apache.avro.Schema; import org.apache.avro.generic.*; import org.apache.avro.io.BinaryEncoder; @@ -124,14 +125,13 @@ public class HoodieAvroUtils { /** * Given a avro record with a given schema, rewrites it into the new schema */ - public static GenericRecord rewriteRecord(GenericRecord record, Schema newSchema) - throws Exception { + public static GenericRecord rewriteRecord(GenericRecord record, Schema newSchema) { GenericRecord newRecord = new GenericData.Record(newSchema); for (Schema.Field f : record.getSchema().getFields()) { newRecord.put(f.name(), record.get(f.name())); } if (!new GenericData().validate(newSchema, newRecord)) { - throw new Exception( + throw new SchemaCompatabilityException( "Unable to validate the rewritten record " + record + " against schema " + newSchema); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/exception/HoodieException.java b/hoodie-common/src/main/java/com/uber/hoodie/exception/HoodieException.java index d8eb86c48..4c933826e 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/exception/HoodieException.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/exception/HoodieException.java @@ -16,6 +16,8 @@ package com.uber.hoodie.exception; +import java.io.Serializable; + /** *

* Exception thrown for Hoodie failures. The root of @@ -27,7 +29,7 @@ package com.uber.hoodie.exception; *

* */ -public class HoodieException extends RuntimeException { +public class HoodieException extends RuntimeException implements Serializable { public HoodieException() { super(); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/exception/SchemaCompatabilityException.java b/hoodie-common/src/main/java/com/uber/hoodie/exception/SchemaCompatabilityException.java new file mode 100644 index 000000000..773f4e53d --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/exception/SchemaCompatabilityException.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.exception; + +public class SchemaCompatabilityException extends HoodieException { + public SchemaCompatabilityException(String message) { + super(message); + } + + public SchemaCompatabilityException(String message, Throwable t) { + super(message, t); + } + + public SchemaCompatabilityException(Throwable t) { + super(t); + } +} diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java index 3a2aaa8dc..d6251273f 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java @@ -20,12 +20,25 @@ import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.google.common.collect.Maps; import com.uber.hoodie.common.table.HoodieTableConfig; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.log.HoodieLogAppendConfig; +import com.uber.hoodie.common.table.log.HoodieLogFile; +import com.uber.hoodie.common.table.log.avro.AvroLogAppender; +import com.uber.hoodie.common.table.log.avro.RollingAvroLogAppender; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.HoodieAvroUtils; +import com.uber.hoodie.common.util.SchemaTestUtil; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.jute.Index; import org.junit.rules.TemporaryFolder; import java.io.ByteArrayInputStream; @@ -38,11 +51,16 @@ import java.io.Serializable; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; public class HoodieTestUtils { public static FileSystem fs = FSUtils.getFs(); @@ -51,8 +69,13 @@ public class HoodieTestUtils { public static final int DEFAULT_TASK_PARTITIONID = 1; public static HoodieTableMetaClient init(String basePath) throws IOException { + return initTableType(basePath, HoodieTableType.COPY_ON_WRITE); + } + + public static HoodieTableMetaClient initTableType(String basePath, HoodieTableType tableType) throws IOException { Properties properties = new Properties(); properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, RAW_TRIPS_TEST_NAME); + properties.setProperty(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name()); return HoodieTableMetaClient.initializePathAsHoodieDataset(fs, basePath, properties); } @@ -143,4 +166,41 @@ public class HoodieTestUtils { input.close(); return deseralizedObject; } + + public static void writeRecordsToLogFiles(String basePath, Schema schema, List updatedRecords) { + Map> groupedUpdated = updatedRecords.stream() + .collect(Collectors.groupingBy(HoodieRecord::getCurrentLocation)); + + groupedUpdated.entrySet().forEach(s -> { + HoodieRecordLocation location = s.getKey(); + String partitionPath = s.getValue().get(0).getPartitionPath(); + + HoodieLogAppendConfig logConfig = null; + try { + logConfig = HoodieLogAppendConfig.newBuilder() + .onPartitionPath(new Path(basePath, partitionPath)) + .withLogFileExtension(HoodieLogFile.DELTA_EXTENSION) + .withFileId(location.getFileId()) + .withBaseCommitTime(location.getCommitTime()) + .withSchema(schema).withFs(fs).build(); + + AvroLogAppender log = new AvroLogAppender(logConfig); + log.append(s.getValue().stream().map(r -> { + try { + GenericRecord val = (GenericRecord) r.getData().getInsertValue(schema); + HoodieAvroUtils.addHoodieKeyToRecord(val, + r.getRecordKey(), + r.getPartitionPath(), + ""); + return val; + } catch (IOException e) { + return null; + } + }).collect(Collectors.toList())); + log.close(); + } catch (Exception e) { + fail(e.toString()); + } + }); + } } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/avro/AvroLogAppenderTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/avro/AvroLogAppenderTest.java index 3764a2875..cd79e9775 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/avro/AvroLogAppenderTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/avro/AvroLogAppenderTest.java @@ -82,6 +82,7 @@ public class AvroLogAppenderTest { HoodieLogAppendConfig logConfig = HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath) .withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .withBaseCommitTime("100") .withSchema(SchemaTestUtil.getSimpleSchema()).withFs(fs).build(); RollingAvroLogAppender logAppender = new RollingAvroLogAppender(logConfig); logAppender.append(SchemaTestUtil.generateTestRecords(0, 100)); @@ -119,6 +120,7 @@ public class AvroLogAppenderTest { HoodieLogAppendConfig logConfig = HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath) .withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .withBaseCommitTime("100") .withSchema(SchemaTestUtil.getSimpleSchema()).withFs(fs).build(); RollingAvroLogAppender logAppender = new RollingAvroLogAppender(logConfig); logAppender.append(SchemaTestUtil.generateTestRecords(0, 100)); @@ -139,6 +141,7 @@ public class AvroLogAppenderTest { HoodieLogAppendConfig logConfig = HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath) .withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .withBaseCommitTime("100") .withSchema(SchemaTestUtil.getSimpleSchema()).withFs(fs).build(); RollingAvroLogAppender logAppender = new RollingAvroLogAppender(logConfig); logAppender.append(SchemaTestUtil.generateTestRecords(0, 100)); @@ -166,6 +169,7 @@ public class AvroLogAppenderTest { HoodieLogAppendConfig logConfig = HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath) .withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .withBaseCommitTime("100") .withSchema(SchemaTestUtil.getSimpleSchema()).withFs(fs).build(); RollingAvroLogAppender logAppender = new RollingAvroLogAppender(logConfig); long size1 = logAppender.getCurrentSize(); @@ -188,6 +192,7 @@ public class AvroLogAppenderTest { HoodieLogAppendConfig logConfig = HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath) .withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .withBaseCommitTime("100") .withSchema(SchemaTestUtil.getSimpleSchema()).withFs(fs).build(); RollingAvroLogAppender logAppender = new RollingAvroLogAppender(logConfig); logAppender.append(SchemaTestUtil.generateTestRecords(0, 100)); @@ -233,6 +238,7 @@ public class AvroLogAppenderTest { HoodieLogAppendConfig logConfig = HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath) .withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .withBaseCommitTime("100") .withSchema(SchemaTestUtil.getSimpleSchema()).withFs(fs).build(); RollingAvroLogAppender logAppender = new RollingAvroLogAppender(logConfig); long size1 = logAppender.getCurrentSize(); @@ -272,6 +278,7 @@ public class AvroLogAppenderTest { HoodieLogAppendConfig logConfig = HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath) .withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .withBaseCommitTime("100") .withSchema(SchemaTestUtil.getSimpleSchema()).withSizeThreshold(500).withFs(fs) .build(); @@ -284,6 +291,7 @@ public class AvroLogAppenderTest { // Need to rebuild config to set the latest version as path logConfig = HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath) .withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .withBaseCommitTime("100") .withSchema(SchemaTestUtil.getSimpleSchema()).withSizeThreshold(500).withFs(fs).build(); logAppender = new RollingAvroLogAppender(logConfig); long size2 = logAppender.getCurrentSize(); @@ -293,18 +301,21 @@ public class AvroLogAppenderTest { logConfig = HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath) .withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .withBaseCommitTime("100") .withSchema(SchemaTestUtil.getSimpleSchema()).withSizeThreshold(500).withFs(fs).build(); List allLogFiles = FSUtils .getAllLogFiles(fs, partitionPath, logConfig.getLogFile().getFileId(), - HoodieLogFile.DELTA_EXTENSION).collect(Collectors.toList()); + HoodieLogFile.DELTA_EXTENSION, logConfig.getLogFile().getBaseCommitTime()) + .collect(Collectors.toList()); assertEquals("", 2, allLogFiles.size()); SortedMap> offsets = Maps.newTreeMap(); offsets.put(1, Lists.newArrayList(size1)); offsets.put(2, Lists.newArrayList(size2)); CompositeAvroLogReader reader = - new CompositeAvroLogReader(partitionPath, logConfig.getLogFile().getFileId(), fs, - logConfig.getSchema(), HoodieLogFile.DELTA_EXTENSION); + new CompositeAvroLogReader(partitionPath, logConfig.getLogFile().getFileId(), + logConfig.getLogFile().getBaseCommitTime(), fs, logConfig.getSchema(), + HoodieLogFile.DELTA_EXTENSION); Iterator results = reader.readBlocks(offsets); List totalBatch = IteratorUtils.toList(results); assertEquals("Stream collect should return all 200 records", 200, totalBatch.size());