diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 17a94e2bc..5a1b56080 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -260,7 +260,8 @@ public class HoodieWriteClient implements Seriali } private JavaRDD> combineOnCondition(boolean condition, - JavaRDD> records, int parallelism) { + JavaRDD> records, + int parallelism) { if(condition) { return deduplicateRecords(records, parallelism); } @@ -268,9 +269,9 @@ public class HoodieWriteClient implements Seriali } private JavaRDD upsertRecordsInternal(JavaRDD> preppedRecords, - String commitTime, - HoodieTable hoodieTable, - final boolean isUpsert) { + String commitTime, + HoodieTable hoodieTable, + final boolean isUpsert) { // Cache the tagged records, so we don't end up computing both preppedRecords.persist(StorageLevel.MEMORY_AND_DISK_SER()); @@ -318,10 +319,10 @@ public class HoodieWriteClient implements Seriali private JavaRDD> partition(JavaRDD> dedupedRecords, Partitioner partitioner) { return dedupedRecords - .mapToPair((PairFunction, Tuple2>, HoodieRecord>) record -> + .mapToPair(record -> new Tuple2<>(new Tuple2<>(record.getKey(), Option.apply(record.getCurrentLocation())), record)) .partitionBy(partitioner) - .map((Function>, HoodieRecord>, HoodieRecord>) tuple -> tuple._2()); + .map(tuple -> tuple._2()); } /** @@ -347,7 +348,7 @@ public class HoodieWriteClient implements Seriali List> stats = writeStatuses .mapToPair((PairFunction) writeStatus -> - new Tuple2(writeStatus.getPartitionPath(), writeStatus.getStat())) + new Tuple2<>(writeStatus.getPartitionPath(), writeStatus.getStat())) .collect(); HoodieCommitMetadata metadata = new HoodieCommitMetadata(); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java index bbcc065de..e9026e97d 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java @@ -30,19 +30,27 @@ import java.util.Properties; */ @Immutable public class HoodieIndexConfig extends DefaultHoodieConfig { + public static final String INDEX_TYPE_PROP = "hoodie.index.type"; public static final String DEFAULT_INDEX_TYPE = HoodieIndex.IndexType.BLOOM.name(); + + // ***** Bloom Index configs ***** public static final String BLOOM_FILTER_NUM_ENTRIES = "hoodie.index.bloom.num_entries"; public static final String DEFAULT_BLOOM_FILTER_NUM_ENTRIES = "60000"; public static final String BLOOM_FILTER_FPP = "hoodie.index.bloom.fpp"; public static final String DEFAULT_BLOOM_FILTER_FPP = "0.000000001"; - public final static String HBASE_ZKQUORUM_PROP = "hoodie.index.hbase.zkquorum"; - public final static String HBASE_ZKPORT_PROP = "hoodie.index.hbase.zkport"; - public final static String HBASE_TABLENAME_PROP = "hoodie.index.hbase.table"; public static final String BLOOM_INDEX_PARALLELISM_PROP = "hoodie.bloom.index.parallelism"; // Disable explicit bloom index parallelism setting by default - hoodie auto computes public static final String DEFAULT_BLOOM_INDEX_PARALLELISM = "0"; + // ***** HBase Index Configs ***** + public final static String HBASE_ZKQUORUM_PROP = "hoodie.index.hbase.zkquorum"; + public final static String HBASE_ZKPORT_PROP = "hoodie.index.hbase.zkport"; + public final static String HBASE_TABLENAME_PROP = "hoodie.index.hbase.table"; + + // ***** Bucketed Index Configs ***** + public final static String BUCKETED_INDEX_NUM_BUCKETS_PROP = "hoodie.index.bucketed.numbuckets"; + private HoodieIndexConfig(Properties props) { super(props); } @@ -104,6 +112,11 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { return this; } + public Builder numBucketsPerPartition(int numBuckets) { + props.setProperty(BUCKETED_INDEX_NUM_BUCKETS_PROP, String.valueOf(numBuckets)); + return this; + } + public HoodieIndexConfig build() { HoodieIndexConfig config = new HoodieIndexConfig(props); setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP), diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index e38637130..6133952d3 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -203,6 +203,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Integer.parseInt(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_PARALLELISM_PROP)); } + public int getNumBucketsPerPartition() { + return Integer.parseInt(props.getProperty(HoodieIndexConfig.BUCKETED_INDEX_NUM_BUCKETS_PROP)); + } + /** * storage properties **/ diff --git a/hoodie-client/src/main/java/com/uber/hoodie/func/LazyInsertIterable.java b/hoodie-client/src/main/java/com/uber/hoodie/func/LazyInsertIterable.java index aa1b27891..aa11e7efe 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/func/LazyInsertIterable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/func/LazyInsertIterable.java @@ -16,14 +16,13 @@ package com.uber.hoodie.func; -import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.io.HoodieIOHandle; -import com.uber.hoodie.io.HoodieInsertHandle; +import com.uber.hoodie.io.HoodieCreateHandle; import com.uber.hoodie.table.HoodieTable; import org.apache.spark.TaskContext; @@ -43,7 +42,7 @@ public class LazyInsertIterable extends LazyItera private final String commitTime; private final HoodieTable hoodieTable; private Set partitionsCleaned; - private HoodieInsertHandle handle; + private HoodieCreateHandle handle; public LazyInsertIterable(Iterator> sortedRecordItr, HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable) { @@ -79,7 +78,7 @@ public class LazyInsertIterable extends LazyItera // lazily initialize the handle, for the first time if (handle == null) { handle = - new HoodieInsertHandle(hoodieConfig, commitTime, hoodieTable, + new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, record.getPartitionPath()); } @@ -91,7 +90,7 @@ public class LazyInsertIterable extends LazyItera statuses.add(handle.close()); // Need to handle the rejected record & open new handle handle = - new HoodieInsertHandle(hoodieConfig, commitTime, hoodieTable, + new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, record.getPartitionPath()); handle.write(record); // we should be able to write 1 record. break; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/BucketedIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/BucketedIndex.java new file mode 100644 index 000000000..6152b0407 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/BucketedIndex.java @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.index; + +import com.google.common.base.Optional; + +import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.common.model.HoodieKey; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordLocation; +import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.exception.HoodieIndexException; +import com.uber.hoodie.table.HoodieTable; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +import scala.Tuple2; + +/** + * An `stateless` index implementation that will using a deterministic mapping function to + * determine the fileID for a given record. + * + * Pros: + * - Fast + * + * Cons : + * - Need to tune the number of buckets per partition path manually (FIXME: Need to autotune this) + * - Could increase write amplification on copy-on-write storage since inserts always rewrite files + * - Not global. + * + */ +public class BucketedIndex extends HoodieIndex { + + private static Logger logger = LogManager.getLogger(BucketedIndex.class); + + public BucketedIndex(HoodieWriteConfig config, JavaSparkContext jsc) { + super(config, jsc); + } + + private String getBucket(String recordKey) { + return String.valueOf(recordKey.hashCode() % config.getNumBucketsPerPartition()); + } + + @Override + public JavaPairRDD> fetchRecordLocation(JavaRDD hoodieKeys, HoodieTable table) { + return hoodieKeys.mapToPair(hk -> new Tuple2<>(hk, Optional.of(getBucket(hk.getRecordKey())))); + } + + @Override + public JavaRDD> tagLocation(JavaRDD> recordRDD, HoodieTable hoodieTable) throws HoodieIndexException { + return recordRDD.map(record -> { + String bucket = getBucket(record.getRecordKey()); + //HACK(vc) a non-existent commit is provided here. + record.setCurrentLocation(new HoodieRecordLocation("000", bucket)); + return record; + }); + } + + @Override + public JavaRDD updateLocation(JavaRDD writeStatusRDD, HoodieTable hoodieTable) throws HoodieIndexException { + return writeStatusRDD; + } + + @Override + public boolean rollbackCommit(String commitTime) { + // nothing to rollback in the index. + return true; + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/HBaseIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/HBaseIndex.java index 8ef567b97..83ad5652e 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/HBaseIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/HBaseIndex.java @@ -67,7 +67,7 @@ public class HBaseIndex extends HoodieIndex { @Override public JavaPairRDD> fetchRecordLocation( - JavaRDD hoodieKeys, HoodieTable hoodieTable) { + JavaRDD hoodieKeys, HoodieTable table) { throw new UnsupportedOperationException("HBase index does not implement check exist yet"); } @@ -234,7 +234,8 @@ public class HBaseIndex extends HoodieIndex { @Override public boolean rollbackCommit(String commitTime) { - // TODO (weiy) + // Can't really rollback here. HBase only can let you go from recordKey to fileID, + // not the other way around return true; } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java index e7153adb6..ecf2036b7 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java @@ -85,13 +85,13 @@ public class HoodieBloomIndex extends HoodieIndex } public JavaPairRDD> fetchRecordLocation( - JavaRDD hoodieKeys, final HoodieTable hoodieTable) { + JavaRDD hoodieKeys, final HoodieTable table) { JavaPairRDD partitionRecordKeyPairRDD = hoodieKeys.mapToPair(key -> new Tuple2<>(key.getPartitionPath(), key.getRecordKey())); // Lookup indexes for all the partition/recordkey pair JavaPairRDD rowKeyFilenamePairRDD = - lookupIndex(partitionRecordKeyPairRDD, hoodieTable); + lookupIndex(partitionRecordKeyPairRDD, table); JavaPairRDD rowKeyHoodieKeyPairRDD = hoodieKeys.mapToPair(key -> new Tuple2<>(key.getRecordKey(), key)); @@ -103,7 +103,7 @@ public class HoodieBloomIndex extends HoodieIndex String fileName = keyPathTuple._2._2.get(); String partitionPath = keyPathTuple._2._1.getPartitionPath(); recordLocationPath = Optional.of(new Path( - new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath), + new Path(table.getMetaClient().getBasePath(), partitionPath), fileName).toUri().getPath()); } else { recordLocationPath = Optional.absent(); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java index dcae31ff7..0c1bc52bd 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java @@ -36,7 +36,6 @@ import java.io.Serializable; /** * Base class for different types of indexes to determine the mapping from uuid * - * TODO(vc): need methods for recovery and rollback */ public abstract class HoodieIndex implements Serializable { protected transient JavaSparkContext jsc = null; @@ -44,7 +43,8 @@ public abstract class HoodieIndex implements Seri public enum IndexType { HBASE, INMEMORY, - BLOOM + BLOOM, + BUCKETED } protected final HoodieWriteConfig config; @@ -60,11 +60,11 @@ public abstract class HoodieIndex implements Seri * value is present, it is the path component (without scheme) of the URI underlying file * * @param hoodieKeys - * @param metaClient + * @param table * @return */ public abstract JavaPairRDD> fetchRecordLocation( - JavaRDD hoodieKeys, final HoodieTable metaClient); + JavaRDD hoodieKeys, final HoodieTable table); /** * Looks up the index and tags each incoming record with a location of a file that contains the @@ -95,6 +95,8 @@ public abstract class HoodieIndex implements Seri return new InMemoryHashIndex<>(config, jsc); case BLOOM: return new HoodieBloomIndex<>(config, jsc); + case BUCKETED: + return new BucketedIndex<>(config, jsc); } throw new HoodieIndexException("Index type unspecified, set " + config.getIndexType()); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java index 81b96ff8a..0b271b77f 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java @@ -55,7 +55,7 @@ public class InMemoryHashIndex extends HoodieInde @Override public JavaPairRDD> fetchRecordLocation( - JavaRDD hoodieKeys, final HoodieTable hoodieTable) { + JavaRDD hoodieKeys, final HoodieTable table) { throw new UnsupportedOperationException("InMemory index does not implement check exist yet"); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java index c145ebce4..d3307d081 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java @@ -46,8 +46,13 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; +/** + * IO Operation to append data onto an existing file. + * + * @param + */ public class HoodieAppendHandle extends HoodieIOHandle { - private static Logger logger = LogManager.getLogger(HoodieUpdateHandle.class); + private static Logger logger = LogManager.getLogger(HoodieMergeHandle.class); private static AtomicLong recordIndex = new AtomicLong(1); private final WriteStatus writeStatus; @@ -59,8 +64,11 @@ public class HoodieAppendHandle extends HoodieIOH private HoodieLogFile currentLogFile; private Writer writer; - public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, - HoodieTable hoodieTable, String fileId, Iterator> recordItr) { + public HoodieAppendHandle(HoodieWriteConfig config, + String commitTime, + HoodieTable hoodieTable, + String fileId, + Iterator> recordItr) { super(config, commitTime, hoodieTable); WriteStatus writeStatus = new WriteStatus(); writeStatus.setStat(new HoodieDeltaWriteStat()); @@ -76,6 +84,7 @@ public class HoodieAppendHandle extends HoodieIOH // extract some information from the first record if (partitionPath == null) { partitionPath = record.getPartitionPath(); + // HACK(vc) This also assumes a base file. It will break, if appending without one. String latestValidFilePath = fileSystemView.getLatestDataFilesForFileId(record.getPartitionPath(), fileId) .findFirst().get().getFileName(); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieInsertHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java similarity index 96% rename from hoodie-client/src/main/java/com/uber/hoodie/io/HoodieInsertHandle.java rename to hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java index 881fada1f..a5ecad7cb 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieInsertHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java @@ -38,8 +38,8 @@ import java.io.IOException; import java.util.Optional; import java.util.UUID; -public class HoodieInsertHandle extends HoodieIOHandle { - private static Logger logger = LogManager.getLogger(HoodieInsertHandle.class); +public class HoodieCreateHandle extends HoodieIOHandle { + private static Logger logger = LogManager.getLogger(HoodieCreateHandle.class); private final WriteStatus status; private final HoodieStorageWriter storageWriter; @@ -47,7 +47,7 @@ public class HoodieInsertHandle extends HoodieIOH private long recordsWritten = 0; private long recordsDeleted = 0; - public HoodieInsertHandle(HoodieWriteConfig config, String commitTime, + public HoodieCreateHandle(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable, String partitionPath) { super(config, commitTime, hoodieTable); this.status = new WriteStatus(); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieUpdateHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java similarity index 95% rename from hoodie-client/src/main/java/com/uber/hoodie/io/HoodieUpdateHandle.java rename to hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java index 415c6a2eb..f7976c4fd 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieUpdateHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java @@ -41,8 +41,8 @@ import java.util.Iterator; import java.util.Optional; @SuppressWarnings("Duplicates") -public class HoodieUpdateHandle extends HoodieIOHandle { - private static Logger logger = LogManager.getLogger(HoodieUpdateHandle.class); +public class HoodieMergeHandle extends HoodieIOHandle { + private static Logger logger = LogManager.getLogger(HoodieMergeHandle.class); private WriteStatus writeStatus; private HashMap> keyToNewRecords; @@ -52,13 +52,12 @@ public class HoodieUpdateHandle extends HoodieIO private long recordsWritten = 0; private long recordsDeleted = 0; private long updatedRecordsWritten = 0; - private String fileId; - public HoodieUpdateHandle(HoodieWriteConfig config, - String commitTime, - HoodieTable hoodieTable, - Iterator> recordItr, - String fileId) { + public HoodieMergeHandle(HoodieWriteConfig config, + String commitTime, + HoodieTable hoodieTable, + Iterator> recordItr, + String fileId) { super(config, commitTime, hoodieTable); init(fileId, recordItr); } @@ -70,7 +69,6 @@ public class HoodieUpdateHandle extends HoodieIO WriteStatus writeStatus = new WriteStatus(); writeStatus.setStat(new HoodieWriteStat()); this.writeStatus = writeStatus; - this.fileId = fileId; this.keyToNewRecords = new HashMap<>(); try { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index 35a76a070..4b836bd54 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -34,7 +34,20 @@ import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.HoodieUpsertException; import com.uber.hoodie.func.LazyInsertIterable; import com.uber.hoodie.io.HoodieCleanHelper; -import com.uber.hoodie.io.HoodieUpdateHandle; +import com.uber.hoodie.io.HoodieMergeHandle; + +import java.util.Optional; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.spark.Partitioner; + import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -70,7 +83,7 @@ import scala.Option; * INSERTS - Produce new files, block aligned to desired size (or) * Merge with the smallest existing file, to expand it * - * UPDATES - Produce a new version of the file containing the invalidated records + * UPDATES - Produce a new version of the file, just replacing the updated records with new values * */ public class HoodieCopyOnWriteTable extends HoodieTable { @@ -405,7 +418,7 @@ public class HoodieCopyOnWriteTable extends Hoodi public Iterator> handleUpdate(String commitTime, String fileLoc, Iterator> recordItr) throws IOException { // these are updates - HoodieUpdateHandle upsertHandle = getUpdateHandle(commitTime, fileLoc, recordItr); + HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileLoc, recordItr); if (upsertHandle.getOldFilePath() == null) { throw new HoodieUpsertException("Error in finding the old file path at commit " + commitTime +" at fileLoc: " + fileLoc); @@ -439,8 +452,8 @@ public class HoodieCopyOnWriteTable extends Hoodi return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator(); } - protected HoodieUpdateHandle getUpdateHandle(String commitTime, String fileLoc, Iterator> recordItr) { - return new HoodieUpdateHandle<>(config, commitTime, this, recordItr, fileLoc); + protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileLoc, Iterator> recordItr) { + return new HoodieMergeHandle<>(config, commitTime, this, recordItr, fileLoc); } public Iterator> handleInsert(String commitTime, Iterator> recordItr) throws Exception { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/WorkloadProfile.java b/hoodie-client/src/main/java/com/uber/hoodie/table/WorkloadProfile.java index 7109f3065..1d1332ae8 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/WorkloadProfile.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/WorkloadProfile.java @@ -63,13 +63,10 @@ public class WorkloadProfile implements Serializa private void buildProfile() { - Map>, Long> partitionLocationCounts = - taggedRecords.mapToPair(new PairFunction, Tuple2>, HoodieRecord>() { - @Override - public Tuple2>, HoodieRecord> call(HoodieRecord record) throws Exception { - return new Tuple2<>(new Tuple2<>(record.getPartitionPath(), Option.apply(record.getCurrentLocation())), record); - } - }).countByKey(); + Map>, Long> partitionLocationCounts = taggedRecords + .mapToPair(record -> + new Tuple2<>(new Tuple2<>(record.getPartitionPath(), Option.apply(record.getCurrentLocation())), record)) + .countByKey(); for (Map.Entry>, Long> e: partitionLocationCounts.entrySet()) { String partitionPath = e.getKey()._1(); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java index 666f940a4..7c0d1e267 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java @@ -32,7 +32,7 @@ import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.ParquetUtils; import com.uber.hoodie.config.HoodieCompactionConfig; -import com.uber.hoodie.io.HoodieInsertHandle; +import com.uber.hoodie.io.HoodieCreateHandle; import com.uber.hoodie.config.HoodieStorageConfig; import org.apache.avro.generic.GenericRecord; import org.apache.commons.io.IOUtils; @@ -92,7 +92,7 @@ public class TestCopyOnWriteTable { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(FSUtils.getFs(), basePath); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config); - HoodieInsertHandle io = new HoodieInsertHandle(config, commitTime, table, partitionPath); + HoodieCreateHandle io = new HoodieCreateHandle(config, commitTime, table, partitionPath); Path newPath = io.makeNewPath(record.getPartitionPath(), unitNumber, fileName); assertTrue(newPath.toString().equals(this.basePath + "/" + partitionPath + "/" + FSUtils .makeDataFileName(commitTime, unitNumber, fileName)));