1
0

[HUDI-650] Modify handleUpdate path to validate partitionPath (#1368)

This commit is contained in:
satishkotha
2020-03-20 08:37:22 -07:00
committed by GitHub
parent eeab532d79
commit 83fb9651f3
12 changed files with 170 additions and 61 deletions

View File

@@ -52,7 +52,8 @@ public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload> extend
List<WriteStatus> statuses = new ArrayList<>();
// lazily initialize the handle, for the first time
if (handle == null) {
handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable, getNextFileId(idPrefix));
handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable,
insertPayload.getPartitionPath(), getNextFileId(idPrefix));
}
if (handle.canWrite(insertPayload)) {
// write the payload, if the handle has capacity
@@ -62,7 +63,8 @@ public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload> extend
handle.close();
statuses.add(handle.getWriteStatus());
// Need to handle the rejected payload & open new handle
handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable, getNextFileId(idPrefix));
handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable,
insertPayload.getPartitionPath(), getNextFileId(idPrefix));
handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload.
}
}

View File

@@ -74,7 +74,6 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
// Buffer for holding records (to be deleted) in memory before they are flushed to disk
private List<HoodieKey> keysToDelete = new ArrayList<>();
private String partitionPath;
private Iterator<HoodieRecord<T>> recordItr;
// Total number of records written during an append
private long recordsWritten = 0;
@@ -101,21 +100,21 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
// Total number of new records inserted into the delta file
private long insertRecordsWritten = 0;
public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable, String fileId,
Iterator<HoodieRecord<T>> recordItr) {
super(config, commitTime, fileId, hoodieTable);
public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
super(config, commitTime, partitionPath, fileId, hoodieTable);
writeStatus.setStat(new HoodieDeltaWriteStat());
this.fileId = fileId;
this.recordItr = recordItr;
}
public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable, String fileId) {
this(config, commitTime, hoodieTable, fileId, null);
public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
String partitionPath, String fileId) {
this(config, commitTime, hoodieTable, partitionPath, fileId, null);
}
private void init(HoodieRecord record) {
if (doInit) {
this.partitionPath = record.getPartitionPath();
// extract some information from the first record
SliceView rtView = hoodieTable.getSliceView();
Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath, fileId);
@@ -295,6 +294,13 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
}
private void writeToBuffer(HoodieRecord<T> record) {
if (!partitionPath.equals(record.getPartitionPath())) {
HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: "
+ record.getPartitionPath() + " but trying to insert into partition: " + partitionPath);
writeStatus.markFailure(record, failureEx, record.getData().getMetadata());
return;
}
// update the new location of the record, so we know where to find it next
record.unseal();
record.setNewLocation(new HoodieRecordLocation(instantTime, fileId));

View File

@@ -57,7 +57,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
public HoodieCreateHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
String partitionPath, String fileId) {
super(config, commitTime, fileId, hoodieTable);
super(config, commitTime, partitionPath, fileId, hoodieTable);
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);

View File

@@ -70,9 +70,9 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
private boolean useWriterSchema;
public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String fileId) {
super(config, commitTime, fileId, hoodieTable);
String partitionPath = init(fileId, recordItr);
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId) {
super(config, commitTime, partitionPath, fileId, hoodieTable);
init(fileId, recordItr);
init(fileId, partitionPath, hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId).get());
}
@@ -80,12 +80,12 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
* Called by compactor code path.
*/
public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
Map<String, HoodieRecord<T>> keyToNewRecords, String fileId, HoodieBaseFile dataFileToBeMerged) {
super(config, commitTime, fileId, hoodieTable);
Map<String, HoodieRecord<T>> keyToNewRecords, String partitionPath, String fileId,
HoodieBaseFile dataFileToBeMerged) {
super(config, commitTime, partitionPath, fileId, hoodieTable);
this.keyToNewRecords = keyToNewRecords;
this.useWriterSchema = true;
init(fileId, keyToNewRecords.get(keyToNewRecords.keySet().stream().findFirst().get()).getPartitionPath(),
dataFileToBeMerged);
init(fileId, this.partitionPath, dataFileToBeMerged);
}
public static Schema createHoodieWriteSchema(Schema originalSchema) {
@@ -143,7 +143,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
/**
* Load the new incoming records in a map and return partitionPath.
*/
private String init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
private void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
try {
// Load the new records in a map
long memoryForMerge = config.getMaxMemoryPerPartitionMerge();
@@ -153,10 +153,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
} catch (IOException io) {
throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
}
String partitionPath = null;
while (newRecordsItr.hasNext()) {
HoodieRecord<T> record = newRecordsItr.next();
partitionPath = record.getPartitionPath();
// update the new location of the record, so we know where to find it next
record.unseal();
record.setNewLocation(new HoodieRecordLocation(instantTime, fileId));
@@ -170,7 +168,6 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
+ ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize() + "Number of entries in DiskBasedMap => "
+ ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries() + "Size of file spilled to disk => "
+ ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes());
return partitionPath;
}
private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord) {
@@ -182,6 +179,12 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
private boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord) {
Option recordMetadata = hoodieRecord.getData().getMetadata();
if (!partitionPath.equals(hoodieRecord.getPartitionPath())) {
HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: "
+ hoodieRecord.getPartitionPath() + " but trying to insert into partition: " + partitionPath);
writeStatus.markFailure(hoodieRecord, failureEx, recordMetadata);
return false;
}
try {
if (indexedRecord.isPresent()) {
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema

View File

@@ -52,11 +52,14 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H
protected final Schema writerSchema;
protected HoodieTimer timer;
protected final WriteStatus writeStatus;
protected final String partitionPath;
protected final String fileId;
protected final String writeToken;
public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String fileId, HoodieTable<T> hoodieTable) {
public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath,
String fileId, HoodieTable<T> hoodieTable) {
super(config, instantTime, hoodieTable);
this.partitionPath = partitionPath;
this.fileId = fileId;
this.writeToken = makeSparkWriteToken();
this.originalSchema = new Schema.Parser().parse(config.getSchema());

View File

@@ -170,7 +170,8 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
throw new HoodieNotSupportedException("Compaction is not supported from a CopyOnWrite table");
}
public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileId, Iterator<HoodieRecord<T>> recordItr)
public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String partitionPath, String fileId,
Iterator<HoodieRecord<T>> recordItr)
throws IOException {
// This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
if (!recordItr.hasNext()) {
@@ -178,14 +179,14 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
}
// these are updates
HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileId, recordItr);
HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, partitionPath, fileId, recordItr);
return handleUpdateInternal(upsertHandle, commitTime, fileId);
}
public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileId,
public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException {
// these are updates
HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileId, keyToNewRecords, oldDataFile);
HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, partitionPath, fileId, keyToNewRecords, oldDataFile);
return handleUpdateInternal(upsertHandle, commitTime, fileId);
}
@@ -220,13 +221,14 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator();
}
protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileId, Iterator<HoodieRecord<T>> recordItr) {
return new HoodieMergeHandle<>(config, commitTime, this, recordItr, fileId);
protected HoodieMergeHandle getUpdateHandle(String commitTime, String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
return new HoodieMergeHandle<>(config, commitTime, this, recordItr, partitionPath, fileId);
}
protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileId,
protected HoodieMergeHandle getUpdateHandle(String commitTime, String partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
return new HoodieMergeHandle<>(config, commitTime, this, keyToNewRecords, fileId, dataFileToBeMerged);
return new HoodieMergeHandle<>(config, commitTime, this, keyToNewRecords,
partitionPath, fileId, dataFileToBeMerged);
}
public Iterator<List<WriteStatus>> handleInsert(String commitTime, String idPfx, Iterator<HoodieRecord<T>> recordItr)
@@ -258,7 +260,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
if (btype.equals(BucketType.INSERT)) {
return handleInsert(commitTime, binfo.fileIdPrefix, recordItr);
} else if (btype.equals(BucketType.UPDATE)) {
return handleUpdate(commitTime, binfo.fileIdPrefix, recordItr);
return handleUpdate(commitTime, binfo.partitionPath, binfo.fileIdPrefix, recordItr);
} else {
throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition);
}
@@ -523,12 +525,14 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
BucketType bucketType;
String fileIdPrefix;
String partitionPath;
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("BucketInfo {");
sb.append("bucketType=").append(bucketType).append(", ");
sb.append("fileIdPrefix=").append(fileIdPrefix);
sb.append("fileIdPrefix=").append(fileIdPrefix).append(", ");
sb.append("partitionPath=").append(partitionPath);
sb.append('}');
return sb.toString();
}
@@ -585,18 +589,22 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
private void assignUpdates(WorkloadProfile profile) {
// each update location gets a partition
WorkloadStat gStat = profile.getGlobalStat();
for (Map.Entry<String, Pair<String, Long>> updateLocEntry : gStat.getUpdateLocationToCount().entrySet()) {
addUpdateBucket(updateLocEntry.getKey());
Set<Map.Entry<String, WorkloadStat>> partitionStatEntries = profile.getPartitionPathStatMap().entrySet();
for (Map.Entry<String, WorkloadStat> partitionStat : partitionStatEntries) {
for (Map.Entry<String, Pair<String, Long>> updateLocEntry :
partitionStat.getValue().getUpdateLocationToCount().entrySet()) {
addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey());
}
}
}
private int addUpdateBucket(String fileIdHint) {
private int addUpdateBucket(String partitionPath, String fileIdHint) {
int bucket = totalBuckets;
updateLocationToBucket.put(fileIdHint, bucket);
BucketInfo bucketInfo = new BucketInfo();
bucketInfo.bucketType = BucketType.UPDATE;
bucketInfo.fileIdPrefix = fileIdHint;
bucketInfo.partitionPath = partitionPath;
bucketInfoMap.put(totalBuckets, bucketInfo);
totalBuckets++;
return bucket;
@@ -631,7 +639,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
bucket = updateLocationToBucket.get(smallFile.location.getFileId());
LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket);
} else {
bucket = addUpdateBucket(smallFile.location.getFileId());
bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId());
LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket);
}
bucketNumbers.add(bucket);
@@ -655,6 +663,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
recordsPerBucket.add(totalUnassignedInserts / insertBuckets);
BucketInfo bucketInfo = new BucketInfo();
bucketInfo.bucketType = BucketType.INSERT;
bucketInfo.partitionPath = partitionPath;
bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
bucketInfoMap.put(totalBuckets, bucketInfo);
totalBuckets++;

View File

@@ -98,15 +98,17 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
}
@Override
public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileId, Iterator<HoodieRecord<T>> recordItr)
public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String partitionPath,
String fileId, Iterator<HoodieRecord<T>> recordItr)
throws IOException {
LOG.info("Merging updates for commit " + commitTime + " for file " + fileId);
if (!index.canIndexLogFiles() && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
LOG.info("Small file corrections for updates for commit " + commitTime + " for file " + fileId);
return super.handleUpdate(commitTime, fileId, recordItr);
return super.handleUpdate(commitTime, partitionPath, fileId, recordItr);
} else {
HoodieAppendHandle<T> appendHandle = new HoodieAppendHandle<>(config, commitTime, this, fileId, recordItr);
HoodieAppendHandle<T> appendHandle = new HoodieAppendHandle<>(config, commitTime, this,
partitionPath, fileId, recordItr);
appendHandle.doAppend();
appendHandle.close();
return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())).iterator();

View File

@@ -95,6 +95,10 @@ public class WorkloadProfile<T extends HoodieRecordPayload> implements Serializa
return partitionPathStatMap.keySet();
}
public HashMap<String, WorkloadStat> getPartitionPathStatMap() {
return partitionPathStatMap;
}
public WorkloadStat getWorkloadStat(String partitionPath) {
return partitionPathStatMap.get(partitionPath);
}

View File

@@ -136,7 +136,8 @@ public class HoodieMergeOnReadTableCompactor implements HoodieCompactor {
// If the dataFile is present, there is a base parquet file present, perform updates else perform inserts into a
// new base parquet file.
if (oldDataFileOpt.isPresent()) {
result = hoodieCopyOnWriteTable.handleUpdate(commitTime, operation.getFileId(), scanner.getRecords(),
result = hoodieCopyOnWriteTable.handleUpdate(commitTime, operation.getPartitionPath(),
operation.getFileId(), scanner.getRecords(),
oldDataFileOpt.get());
} else {
result = hoodieCopyOnWriteTable.handleInsert(commitTime, operation.getPartitionPath(), operation.getFileId(),