1
0

[HUDI-4207] HoodieFlinkWriteClient.getOrCreateWriteHandle throws an e… (#5788)

Adding more logs to assist in debugging with HoodieFlinkWriteClient.getOrCreateWriteHandle throwing exception
This commit is contained in:
HunterXHunter
2022-06-13 22:36:06 +08:00
committed by GitHub
parent 4774c4248f
commit 264b15df87

View File

@@ -61,6 +61,7 @@ import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.NoSuchElementException;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@@ -117,16 +118,14 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) { TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
this(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier, this(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier,
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId).get(), keyGeneratorOpt); getLatestBaseFile(hoodieTable, partitionPath, fileId), keyGeneratorOpt);
} }
public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
TaskContextSupplier taskContextSupplier, HoodieBaseFile baseFile, Option<BaseKeyGenerator> keyGeneratorOpt) { TaskContextSupplier taskContextSupplier, HoodieBaseFile baseFile, Option<BaseKeyGenerator> keyGeneratorOpt) {
super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier); super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier);
init(fileId, recordItr); init(recordItr, baseFile, keyGeneratorOpt);
init(fileId, partitionPath, baseFile);
validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
} }
/** /**
@@ -139,8 +138,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
this.keyToNewRecords = keyToNewRecords; this.keyToNewRecords = keyToNewRecords;
this.useWriterSchemaForCompaction = true; this.useWriterSchemaForCompaction = true;
this.preserveMetadata = config.isPreserveHoodieCommitMetadataForCompaction(); this.preserveMetadata = config.isPreserveHoodieCommitMetadataForCompaction();
init(fileId, this.partitionPath, dataFileToBeMerged); init(null, dataFileToBeMerged, keyGeneratorOpt);
validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
} }
private void validateAndSetAndKeyGenProps(Option<BaseKeyGenerator> keyGeneratorOpt, boolean populateMetaFields) { private void validateAndSetAndKeyGenProps(Option<BaseKeyGenerator> keyGeneratorOpt, boolean populateMetaFields) {
@@ -148,6 +146,22 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
this.keyGeneratorOpt = keyGeneratorOpt; this.keyGeneratorOpt = keyGeneratorOpt;
} }
private void init(Iterator<HoodieRecord<T>> recordItr, HoodieBaseFile baseFile, Option<BaseKeyGenerator> keyGeneratorOpt) {
if (recordItr != null) {
init(this.fileId, recordItr);
}
init(this.fileId, this.partitionPath, baseFile);
validateAndSetAndKeyGenProps(keyGeneratorOpt, this.config.populateMetaFields());
}
public static HoodieBaseFile getLatestBaseFile(HoodieTable<?, ?, ?, ?> hoodieTable, String partitionPath, String fileId) {
Option<HoodieBaseFile> baseFileOp = hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
if (!baseFileOp.isPresent()) {
throw new NoSuchElementException(String.format("FileID %s of partition path %s does not exist.", fileId, partitionPath));
}
return baseFileOp.get();
}
@Override @Override
public Schema getWriterSchemaWithMetaFields() { public Schema getWriterSchemaWithMetaFields() {
return writeSchemaWithMetaFields; return writeSchemaWithMetaFields;