Fix for updating duplicate records in same/different files in same parition
This commit is contained in:
committed by
vinoth chandar
parent
fa73a911cc
commit
4d1fba24c9
@@ -394,6 +394,13 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
|
|||||||
if (v1._2().isPresent()) {
|
if (v1._2().isPresent()) {
|
||||||
String filename = v1._2().get();
|
String filename = v1._2().get();
|
||||||
if (filename != null && !filename.isEmpty()) {
|
if (filename != null && !filename.isEmpty()) {
|
||||||
|
// When you have a record in multiple files in the same partition, then rowKeyRecordPairRDD will have 2
|
||||||
|
// entries with the same exact in memory copy of the HoodieRecord and the 2 separate filenames that the
|
||||||
|
// record is found in. This will result in setting currentLocation 2 times and it will fail the second time.
|
||||||
|
// This check will create a new in memory copy of the hoodie record.
|
||||||
|
if (record.getCurrentLocation() != null) {
|
||||||
|
record = new HoodieRecord<T>(record.getKey(), record.getData());
|
||||||
|
}
|
||||||
record.setCurrentLocation(new HoodieRecordLocation(FSUtils.getCommitTime(filename),
|
record.setCurrentLocation(new HoodieRecordLocation(FSUtils.getCommitTime(filename),
|
||||||
FSUtils.getFileId(filename)));
|
FSUtils.getFileId(filename)));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -35,9 +35,11 @@ import com.uber.hoodie.io.storage.HoodieStorageWriter;
|
|||||||
import com.uber.hoodie.io.storage.HoodieStorageWriterFactory;
|
import com.uber.hoodie.io.storage.HoodieStorageWriterFactory;
|
||||||
import com.uber.hoodie.table.HoodieTable;
|
import com.uber.hoodie.table.HoodieTable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.Set;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.avro.generic.IndexedRecord;
|
import org.apache.avro.generic.IndexedRecord;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
@@ -52,6 +54,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
|
|||||||
|
|
||||||
private WriteStatus writeStatus;
|
private WriteStatus writeStatus;
|
||||||
private Map<String, HoodieRecord<T>> keyToNewRecords;
|
private Map<String, HoodieRecord<T>> keyToNewRecords;
|
||||||
|
private Set<String> writtenRecordKeys;
|
||||||
private HoodieStorageWriter<IndexedRecord> storageWriter;
|
private HoodieStorageWriter<IndexedRecord> storageWriter;
|
||||||
private TableFileSystemView.ReadOptimizedView fileSystemView;
|
private TableFileSystemView.ReadOptimizedView fileSystemView;
|
||||||
private Path newFilePath;
|
private Path newFilePath;
|
||||||
@@ -81,6 +84,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
|
|||||||
* Extract old file path, initialize StorageWriter and WriteStatus
|
* Extract old file path, initialize StorageWriter and WriteStatus
|
||||||
*/
|
*/
|
||||||
private void init(String fileId, String partitionPath) {
|
private void init(String fileId, String partitionPath) {
|
||||||
|
this.writtenRecordKeys = new HashSet<>();
|
||||||
|
|
||||||
WriteStatus writeStatus = ReflectionUtils.loadClass(config.getWriteStatusClassName());
|
WriteStatus writeStatus = ReflectionUtils.loadClass(config.getWriteStatusClassName());
|
||||||
writeStatus.setStat(new HoodieWriteStat());
|
writeStatus.setStat(new HoodieWriteStat());
|
||||||
this.writeStatus = writeStatus;
|
this.writeStatus = writeStatus;
|
||||||
@@ -205,7 +210,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
|
|||||||
*/
|
*/
|
||||||
copyOldRecord = false;
|
copyOldRecord = false;
|
||||||
}
|
}
|
||||||
keyToNewRecords.remove(key);
|
writtenRecordKeys.add(key);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new HoodieUpsertException(
|
throw new HoodieUpsertException(
|
||||||
"Failed to combine/merge new record with old value in storage, for new record {"
|
"Failed to combine/merge new record with old value in storage, for new record {"
|
||||||
@@ -239,10 +244,13 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
|
|||||||
Iterator<String> pendingRecordsItr = keyToNewRecords.keySet().iterator();
|
Iterator<String> pendingRecordsItr = keyToNewRecords.keySet().iterator();
|
||||||
while (pendingRecordsItr.hasNext()) {
|
while (pendingRecordsItr.hasNext()) {
|
||||||
String key = pendingRecordsItr.next();
|
String key = pendingRecordsItr.next();
|
||||||
|
if (!writtenRecordKeys.contains(key)) {
|
||||||
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
|
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
|
||||||
writeUpdateRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(schema));
|
writeUpdateRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(schema));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
keyToNewRecords.clear();
|
keyToNewRecords.clear();
|
||||||
|
writtenRecordKeys.clear();
|
||||||
|
|
||||||
if (storageWriter != null) {
|
if (storageWriter != null) {
|
||||||
storageWriter.close();
|
storageWriter.close();
|
||||||
|
|||||||
@@ -0,0 +1,276 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.uber.hoodie.io;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
|
||||||
|
import com.uber.hoodie.HoodieWriteClient;
|
||||||
|
import com.uber.hoodie.WriteStatus;
|
||||||
|
import com.uber.hoodie.common.HoodieClientTestUtils;
|
||||||
|
import com.uber.hoodie.common.HoodieTestDataGenerator;
|
||||||
|
import com.uber.hoodie.common.model.HoodieRecord;
|
||||||
|
import com.uber.hoodie.common.model.HoodieTestUtils;
|
||||||
|
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
||||||
|
import com.uber.hoodie.common.table.HoodieTimeline;
|
||||||
|
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
|
||||||
|
import com.uber.hoodie.common.util.FSUtils;
|
||||||
|
import com.uber.hoodie.config.HoodieCompactionConfig;
|
||||||
|
import com.uber.hoodie.config.HoodieIndexConfig;
|
||||||
|
import com.uber.hoodie.config.HoodieStorageConfig;
|
||||||
|
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||||
|
import com.uber.hoodie.index.HoodieIndex;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Row;
|
||||||
|
import org.apache.spark.sql.SQLContext;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public class TestHoodieMergeHandleDuplicateRecords {
|
||||||
|
|
||||||
|
protected transient JavaSparkContext jsc = null;
|
||||||
|
protected transient SQLContext sqlContext;
|
||||||
|
protected transient FileSystem fs;
|
||||||
|
protected String basePath = null;
|
||||||
|
protected transient HoodieTestDataGenerator dataGen = null;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void init() throws IOException {
|
||||||
|
// Initialize a local spark env
|
||||||
|
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieMergeHandleDuplicateRecords"));
|
||||||
|
|
||||||
|
//SQLContext stuff
|
||||||
|
sqlContext = new SQLContext(jsc);
|
||||||
|
|
||||||
|
// Create a temp folder as the base path
|
||||||
|
TemporaryFolder folder = new TemporaryFolder();
|
||||||
|
folder.create();
|
||||||
|
basePath = folder.getRoot().getAbsolutePath();
|
||||||
|
fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
|
||||||
|
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
|
||||||
|
dataGen = new HoodieTestDataGenerator();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void clean() {
|
||||||
|
if (basePath != null) {
|
||||||
|
new File(basePath).delete();
|
||||||
|
}
|
||||||
|
if (jsc != null) {
|
||||||
|
jsc.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpsertsForMultipleRecordsInSameFile() throws Exception {
|
||||||
|
|
||||||
|
// Create records in a single partition
|
||||||
|
String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
|
||||||
|
dataGen = new HoodieTestDataGenerator(new String[]{partitionPath});
|
||||||
|
|
||||||
|
// Build a write config with bulkinsertparallelism set
|
||||||
|
HoodieWriteConfig cfg = getConfigBuilder().build();
|
||||||
|
|
||||||
|
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
|
||||||
|
FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write 1 (only inserts)
|
||||||
|
*/
|
||||||
|
String newCommitTime = "001";
|
||||||
|
client.startCommitWithTime(newCommitTime);
|
||||||
|
|
||||||
|
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 4);
|
||||||
|
|
||||||
|
HoodieRecord record1 = records.get(0);
|
||||||
|
HoodieRecord record2 = records.get(1);
|
||||||
|
|
||||||
|
for (int i = 0; i < 20; i++) {
|
||||||
|
HoodieRecord dup = dataGen.generateUpdateRecord(record1.getKey(), newCommitTime);
|
||||||
|
records.add(dup);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < 20; i++) {
|
||||||
|
HoodieRecord dup = dataGen.generateUpdateRecord(record2.getKey(), newCommitTime);
|
||||||
|
records.add(dup);
|
||||||
|
}
|
||||||
|
|
||||||
|
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||||
|
|
||||||
|
List<WriteStatus> statuses = client.bulkInsert(writeRecords, newCommitTime).collect();
|
||||||
|
assertNoWriteErrors(statuses);
|
||||||
|
|
||||||
|
// verify that there is a commit
|
||||||
|
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||||
|
HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
|
||||||
|
|
||||||
|
assertEquals("Expecting a single commit.", 1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants());
|
||||||
|
assertEquals("Latest commit should be 001", newCommitTime, timeline.lastInstant().get().getTimestamp());
|
||||||
|
assertEquals("Must contain 44 records",
|
||||||
|
records.size(),
|
||||||
|
HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write 2 (insert)
|
||||||
|
*/
|
||||||
|
newCommitTime = "002";
|
||||||
|
client.startCommitWithTime(newCommitTime);
|
||||||
|
|
||||||
|
// Do 1 more bulk insert with the same dup record1
|
||||||
|
List<HoodieRecord> newRecords = new ArrayList<>();
|
||||||
|
HoodieRecord sameAsRecord1 = dataGen.generateUpdateRecord(record1.getKey(), newCommitTime);
|
||||||
|
newRecords.add(sameAsRecord1);
|
||||||
|
|
||||||
|
writeRecords = jsc.parallelize(newRecords, 1);
|
||||||
|
|
||||||
|
statuses = client.bulkInsert(writeRecords, newCommitTime).collect();
|
||||||
|
assertNoWriteErrors(statuses);
|
||||||
|
|
||||||
|
// verify that there are 2 commits
|
||||||
|
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||||
|
timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
|
||||||
|
|
||||||
|
assertEquals("Expecting two commits.", 2, timeline.findInstantsAfter("000", Integer.MAX_VALUE)
|
||||||
|
.countInstants());
|
||||||
|
assertEquals("Latest commit should be 002", newCommitTime, timeline.lastInstant().get().getTimestamp());
|
||||||
|
Dataset<Row> dataSet = getRecords();
|
||||||
|
assertEquals("Must contain 45 records", 45, dataSet.count());
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write 3 (insert)
|
||||||
|
*/
|
||||||
|
newCommitTime = "003";
|
||||||
|
client.startCommitWithTime(newCommitTime);
|
||||||
|
|
||||||
|
newRecords = dataGen.generateInserts(newCommitTime, 2);
|
||||||
|
|
||||||
|
writeRecords = jsc.parallelize(newRecords, 1);
|
||||||
|
|
||||||
|
statuses = client.bulkInsert(writeRecords, newCommitTime).collect();
|
||||||
|
assertNoWriteErrors(statuses);
|
||||||
|
|
||||||
|
// verify that there are not 3 commits
|
||||||
|
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||||
|
timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
|
||||||
|
|
||||||
|
assertEquals("Expecting three commits.", 3, timeline.findInstantsAfter("000", Integer.MAX_VALUE)
|
||||||
|
.countInstants());
|
||||||
|
assertEquals("Latest commit should be 003", newCommitTime, timeline.lastInstant().get().getTimestamp());
|
||||||
|
dataSet = getRecords();
|
||||||
|
assertEquals("Must contain 47 records", 47, dataSet.count());
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write 4 (updates)
|
||||||
|
*/
|
||||||
|
newCommitTime = "004";
|
||||||
|
client.startCommitWithTime(newCommitTime);
|
||||||
|
|
||||||
|
List<HoodieRecord> updateRecords = new ArrayList<>();
|
||||||
|
|
||||||
|
// This exists in 001 and 002 and should be updated in both
|
||||||
|
sameAsRecord1 = dataGen.generateUpdateRecord(record1.getKey(), newCommitTime);
|
||||||
|
updateRecords.add(sameAsRecord1);
|
||||||
|
|
||||||
|
// This exists in 001 and should be updated
|
||||||
|
HoodieRecord sameAsRecord2 = dataGen.generateUpdateRecord(record2.getKey(), newCommitTime);
|
||||||
|
updateRecords.add(sameAsRecord2);
|
||||||
|
|
||||||
|
JavaRDD<HoodieRecord> updateRecordsRDD = jsc.parallelize(updateRecords, 1);
|
||||||
|
|
||||||
|
statuses = client.upsert(updateRecordsRDD, newCommitTime).collect();
|
||||||
|
// Verify there are no errors
|
||||||
|
assertNoWriteErrors(statuses);
|
||||||
|
|
||||||
|
// verify there are now 4 commits
|
||||||
|
timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
|
||||||
|
assertEquals("Expecting four commits.", 4, timeline.findInstantsAfter("000", Integer.MAX_VALUE)
|
||||||
|
.countInstants());
|
||||||
|
assertEquals("Latest commit should be 004", timeline.lastInstant().get().getTimestamp(), newCommitTime);
|
||||||
|
|
||||||
|
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||||
|
|
||||||
|
// Check the entire dataset has 47 records still
|
||||||
|
dataSet = getRecords();
|
||||||
|
assertEquals("Must contain 47 records", 47, dataSet.count());
|
||||||
|
|
||||||
|
Row[] rows = (Row[]) dataSet.collect();
|
||||||
|
int record1Count = 0;
|
||||||
|
int record2Count = 0;
|
||||||
|
for (Row row : rows) {
|
||||||
|
if (row.getAs("_hoodie_record_key").equals(record1.getKey().getRecordKey())) {
|
||||||
|
record1Count++;
|
||||||
|
|
||||||
|
// assert each duplicate record is updated
|
||||||
|
assertEquals(row.getAs("rider"), "rider-004");
|
||||||
|
assertEquals(row.getAs("driver"), "driver-004");
|
||||||
|
} else if (row.getAs("_hoodie_record_key").equals(record2.getKey().getRecordKey())) {
|
||||||
|
record2Count++;
|
||||||
|
|
||||||
|
// assert each duplicate record is updated
|
||||||
|
assertEquals(row.getAs("rider"), "rider-004");
|
||||||
|
assertEquals(row.getAs("driver"), "driver-004");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(22, record1Count);
|
||||||
|
assertEquals(21, record2Count);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Dataset<Row> getRecords() {
|
||||||
|
// Check the entire dataset has 8 records still
|
||||||
|
String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
|
||||||
|
for (int i = 0; i < fullPartitionPaths.length; i++) {
|
||||||
|
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
Dataset<Row> dataSet = HoodieClientTestUtils.read(basePath, sqlContext, fs, fullPartitionPaths);
|
||||||
|
|
||||||
|
return dataSet;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert no failures in writing hoodie files
|
||||||
|
*
|
||||||
|
* @param statuses List of Write Status
|
||||||
|
*/
|
||||||
|
void assertNoWriteErrors(List<WriteStatus> statuses) {
|
||||||
|
// Verify there are no errors
|
||||||
|
for (WriteStatus status : statuses) {
|
||||||
|
assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
HoodieWriteConfig.Builder getConfigBuilder() {
|
||||||
|
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
|
||||||
|
.withParallelism(2, 2)
|
||||||
|
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
|
||||||
|
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
|
||||||
|
.forTable("test-trip-table")
|
||||||
|
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
|
||||||
|
.withBulkInsertParallelism(2);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user