1
0

DeduplicateRecords based on recordKey if global index is used

This commit is contained in:
Kaushik Devarajaiah
2018-03-12 19:06:52 -07:00
committed by vinoth chandar
parent 123da020e2
commit 291a88ba94
2 changed files with 50 additions and 4 deletions

View File

@@ -17,6 +17,7 @@
package com.uber.hoodie;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.uber.hoodie.avro.model.HoodieCleanMetadata;
@@ -111,10 +112,16 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
*/
public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig,
boolean rollbackInFlight) {
this(jsc, clientConfig, rollbackInFlight, HoodieIndex.createIndex(clientConfig, jsc));
}
@VisibleForTesting
HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig,
boolean rollbackInFlight, HoodieIndex index) {
this.fs = FSUtils.getFs(clientConfig.getBasePath(), jsc.hadoopConfiguration());
this.jsc = jsc;
this.config = clientConfig;
this.index = HoodieIndex.createIndex(config, jsc);
this.index = index;
this.metrics = new HoodieMetrics(config, config.getTableName());
if (rollbackInFlight) {
@@ -1051,10 +1058,16 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
/**
* Deduplicate Hoodie records, using the given deduplication funciton.
*/
private JavaRDD<HoodieRecord<T>> deduplicateRecords(JavaRDD<HoodieRecord<T>> records,
JavaRDD<HoodieRecord<T>> deduplicateRecords(JavaRDD<HoodieRecord<T>> records,
int parallelism) {
boolean isIndexingGlobal = index.isGlobal();
return records
.mapToPair(record -> new Tuple2<>(record.getKey(), record))
.mapToPair(record -> {
HoodieKey hoodieKey = record.getKey();
// If index used is global, then records are expected to differ in their partitionPath
Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
return new Tuple2<>(key, record);
})
.reduceByKey((rec1, rec2) -> {
@SuppressWarnings("unchecked")
T reducedData = (T) rec1.getData().preCombine(rec2.getData());

View File

@@ -20,6 +20,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.google.common.collect.Iterables;
import com.uber.hoodie.common.HoodieCleanStat;
@@ -48,13 +50,13 @@ import com.uber.hoodie.config.HoodieStorageConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieRollbackException;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.table.HoodieCopyOnWriteTable;
import com.uber.hoodie.table.HoodieTable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -65,6 +67,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.IOUtils;
@@ -218,6 +221,36 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable {
testUpsertsInternal(getConfig());
}
@Test
public void testDeduplication() throws Exception {
String newCommitTime = "001";
String recordKey = UUID.randomUUID().toString();
HoodieKey keyOne = new HoodieKey(recordKey, "2018-01-01");
HoodieRecord recordOne = new HoodieRecord(keyOne,
HoodieTestDataGenerator.generateRandomValue(keyOne, newCommitTime));
HoodieKey keyTwo = new HoodieKey(recordKey, "2018-02-01");
HoodieRecord recordTwo = new HoodieRecord(keyTwo,
HoodieTestDataGenerator.generateRandomValue(keyTwo, newCommitTime));
JavaRDD<HoodieRecord> records = jsc.parallelize(Arrays.asList(recordOne, recordTwo), 1);
// dedup should be done based on recordKey only
HoodieWriteClient clientWithDummyGlobalIndex = getWriteClientWithDummyIndex(true);
assertEquals(1, clientWithDummyGlobalIndex.deduplicateRecords(records, 1).collect().size());
// dedup should be done based on both recordKey and partitionPath
HoodieWriteClient clientWithDummyNonGlobalIndex = getWriteClientWithDummyIndex(false);
assertEquals(2, clientWithDummyNonGlobalIndex.deduplicateRecords(records, 1).collect().size());
}
private HoodieWriteClient getWriteClientWithDummyIndex(final boolean isGlobal) throws Exception {
HoodieIndex index = mock(HoodieIndex.class);
when(index.isGlobal()).thenReturn(isGlobal);
return new HoodieWriteClient(jsc, getConfigBuilder().build(), false, index);
}
private void testUpsertsInternal(HoodieWriteConfig hoodieWriteConfig) throws Exception {
HoodieWriteClient client = new HoodieWriteClient(jsc, hoodieWriteConfig);