1
0

[HUDI-1434] fix incorrect log file path in HoodieWriteStat (#2300)

* [HUDI-1434] fix incorrect log file path in HoodieWriteStat

* HoodieWriteHandle#close() returns a list of WriteStatus objs

* Handle rolled-over log files and return a WriteStatus per log file written

 - Combined data and delete block logging into a single call
 - Lazily initialize and manage write status based on returned AppendResult
 - Use FSUtils.getFileSize() to set final file size, consistent with other handles
 - Added tests around returned values in AppendResult
 - Added validation of the file sizes returned in write stat

Co-authored-by: Vinoth Chandar <vinoth@apache.org>
This commit is contained in:
Gary Li
2020-12-31 06:22:15 +08:00
committed by GitHub
parent ef28763f08
commit 605b617cfa
29 changed files with 591 additions and 298 deletions

View File

@@ -183,7 +183,7 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload> extends
return handleUpdateInternal(upsertHandle, instantTime, fileId);
}
protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle upsertHandle, String instantTime,
protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?,?,?,?> upsertHandle, String instantTime,
String fileId) throws IOException {
if (upsertHandle.getOldFilePath() == null) {
throw new HoodieUpsertException(
@@ -193,11 +193,12 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload> extends
}
// TODO(vc): This needs to be revisited
if (upsertHandle.getWriteStatus().getPartitionPath() == null) {
if (upsertHandle.getPartitionPath() == null) {
LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", "
+ upsertHandle.getWriteStatus());
+ upsertHandle.writeStatuses());
}
return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator();
return Collections.singletonList(upsertHandle.writeStatuses()).iterator();
}
protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId,
@@ -213,10 +214,10 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload> extends
public Iterator<List<WriteStatus>> handleInsert(String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordMap) {
HoodieCreateHandle createHandle =
HoodieCreateHandle<?,?,?,?> createHandle =
new HoodieCreateHandle(config, instantTime, this, partitionPath, fileId, recordMap, taskContextSupplier);
createHandle.write();
return Collections.singletonList(Collections.singletonList(createHandle.close())).iterator();
return Collections.singletonList(createHandle.close()).iterator();
}
@Override

View File

@@ -291,7 +291,7 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
HoodieFileStatus srcFileStatus, KeyGeneratorInterface keyGenerator) {
Path sourceFilePath = FileStatusUtils.toPath(srcFileStatus.getPath());
HoodieBootstrapHandle bootstrapHandle = new HoodieBootstrapHandle(config, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
HoodieBootstrapHandle<?,?,?,?> bootstrapHandle = new HoodieBootstrapHandle(config, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
table, partitionPath, FSUtils.createNewFileIdPfx(), table.getTaskContextSupplier());
Schema avroSchema = null;
try {
@@ -329,7 +329,8 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
BootstrapWriteStatus writeStatus = (BootstrapWriteStatus)bootstrapHandle.getWriteStatus();
BootstrapWriteStatus writeStatus = (BootstrapWriteStatus) bootstrapHandle.writeStatuses().get(0);
BootstrapFileMapping bootstrapFileMapping = new BootstrapFileMapping(
config.getBootstrapSourceBasePath(), srcPartitionPath, partitionPath,
srcFileStatus, writeStatus.getFileId());

View File

@@ -299,7 +299,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
return handleUpdateInternal(upsertHandle, fileId);
}
protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle upsertHandle, String fileId)
protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?,?,?,?> upsertHandle, String fileId)
throws IOException {
if (upsertHandle.getOldFilePath() == null) {
throw new HoodieUpsertException(
@@ -309,11 +309,12 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
}
// TODO(vc): This needs to be revisited
if (upsertHandle.getWriteStatus().getPartitionPath() == null) {
if (upsertHandle.getPartitionPath() == null) {
LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", "
+ upsertHandle.getWriteStatus());
+ upsertHandle.writeStatuses());
}
return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator();
return Collections.singletonList(upsertHandle.writeStatuses()).iterator();
}
protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {

View File

@@ -79,11 +79,10 @@ public abstract class AbstractSparkDeltaCommitActionExecutor<T extends HoodieRec
LOG.info("Small file corrections for updates for commit " + instantTime + " for file " + fileId);
return super.handleUpdate(partitionPath, fileId, recordItr);
} else {
HoodieAppendHandle appendHandle = new HoodieAppendHandle<>(config, instantTime, table,
HoodieAppendHandle<?,?,?,?> appendHandle = new HoodieAppendHandle<>(config, instantTime, table,
partitionPath, fileId, recordItr, taskContextSupplier);
appendHandle.doAppend();
appendHandle.close();
return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())).iterator();
return Collections.singletonList(appendHandle.close()).iterator();
}
}

View File

@@ -128,7 +128,7 @@ public class ListingBasedRollbackHelper implements Serializable {
if (doDelete) {
Map<HeaderMetadataType, String> header = generateHeader(instantToRollback.getTimestamp());
// if update belongs to an existing log file
writer = writer.appendBlock(new HoodieCommandBlock(header));
writer.appendBlock(new HoodieCommandBlock(header));
}
} catch (IOException | InterruptedException io) {
throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);

View File

@@ -84,10 +84,10 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness {
}
Map<String, HoodieRecord> insertRecordMap = insertRecords.stream()
.collect(Collectors.toMap(r -> r.getRecordKey(), Function.identity()));
HoodieCreateHandle createHandle =
HoodieCreateHandle<?,?,?,?> createHandle =
new HoodieCreateHandle(config, "100", table, insertRecords.get(0).getPartitionPath(), "f1-0", insertRecordMap, supplier);
createHandle.write();
return createHandle.close();
return createHandle.close().get(0);
}).collect();
final Path commitFile = new Path(config.getBasePath() + "/.hoodie/" + HoodieTimeline.makeCommitFileName("100"));

View File

@@ -18,21 +18,18 @@
package org.apache.hudi.table;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
@@ -73,6 +70,12 @@ import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
import org.apache.hudi.testutils.HoodieWriteableTestTable;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -148,7 +151,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
insertAndGetFilePaths(records, client, cfg, newCommitTime);
insertRecords(records, client, cfg, newCommitTime);
/**
* Write 2 (updates)
@@ -156,7 +159,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
newCommitTime = "004";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 100);
updateAndGetFilePaths(records, client, cfg, newCommitTime);
updateRecords(records, client, cfg, newCommitTime);
String compactionCommitTime = client.scheduleCompaction(Option.empty()).get().toString();
client.compact(compactionCommitTime);
@@ -164,7 +167,6 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient);
FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent());
@@ -196,7 +198,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
insertAndGetFilePaths(records, client, cfg, newCommitTime);
insertRecords(records, client, cfg, newCommitTime);
/**
* Write 2 (updates)
@@ -204,7 +206,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
newCommitTime = "004";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 100);
updateAndGetFilePaths(records, client, cfg, newCommitTime);
updateRecords(records, client, cfg, newCommitTime);
String compactionCommitTime = client.scheduleCompaction(Option.empty()).get().toString();
client.compact(compactionCommitTime);
@@ -253,7 +255,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 400);
insertAndGetFilePaths(records.subList(0, 200), client, cfg, newCommitTime);
insertRecords(records.subList(0, 200), client, cfg, newCommitTime);
/**
* Write 2 (more inserts to create new files)
@@ -261,7 +263,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
// we already set small file size to small number to force inserts to go into new file.
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
insertAndGetFilePaths(records.subList(200, 400), client, cfg, newCommitTime);
insertRecords(records.subList(200, 400), client, cfg, newCommitTime);
if (doUpdates) {
/**
@@ -270,7 +272,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
newCommitTime = "003";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 100);
updateAndGetFilePaths(records, client, cfg, newCommitTime);
updateRecords(records, client, cfg, newCommitTime);
}
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient);
@@ -320,7 +322,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
client.startCommitWithTime(commitTime1);
List<HoodieRecord> records001 = dataGen.generateInserts(commitTime1, 200);
insertAndGetFilePaths(records001, client, cfg, commitTime1);
insertRecords(records001, client, cfg, commitTime1);
// verify only one base file shows up with commit time 001
FileStatus[] snapshotROFiles = getROSnapshotFiles(partitionPath);
@@ -341,7 +343,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
String updateTime = "004";
client.startCommitWithTime(updateTime);
List<HoodieRecord> records004 = dataGen.generateUpdates(updateTime, 100);
updateAndGetFilePaths(records004, client, cfg, updateTime);
updateRecords(records004, client, cfg, updateTime);
// verify RO incremental reads - only one parquet file shows up because updates to into log files
incrementalROFiles = getROIncrementalFiles(partitionPath, false);
@@ -368,7 +370,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
String insertsTime = "006";
List<HoodieRecord> records006 = dataGen.generateInserts(insertsTime, 200);
client.startCommitWithTime(insertsTime);
insertAndGetFilePaths(records006, client, cfg, insertsTime);
insertRecords(records006, client, cfg, insertsTime);
// verify new write shows up in snapshot mode even though there is pending compaction
snapshotROFiles = getROSnapshotFiles(partitionPath);
@@ -1064,11 +1066,16 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
long numLogFiles = 0;
for (String partitionPath : dataGen.getPartitionPaths()) {
assertEquals(0, tableRTFileSystemView.getLatestFileSlices(partitionPath)
.filter(fileSlice -> fileSlice.getBaseFile().isPresent()).count());
assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0));
numLogFiles += tableRTFileSystemView.getLatestFileSlices(partitionPath)
.filter(fileSlice -> fileSlice.getLogFiles().count() > 0).count();
List<FileSlice> allSlices = tableRTFileSystemView.getLatestFileSlices(partitionPath).collect(Collectors.toList());
assertEquals(0, allSlices.stream().filter(fileSlice -> fileSlice.getBaseFile().isPresent()).count());
assertTrue(allSlices.stream().anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0));
long logFileCount = allSlices.stream().filter(fileSlice -> fileSlice.getLogFiles().count() > 0).count();
if (logFileCount > 0) {
// check the log versions start from the base version
assertTrue(allSlices.stream().map(slice -> slice.getLogFiles().findFirst().get().getLogVersion())
.allMatch(version -> version.equals(HoodieLogFile.LOGFILE_BASE_VERSION)));
}
numLogFiles += logFileCount;
}
assertTrue(numLogFiles > 0);
@@ -1569,12 +1576,13 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
.withRollbackUsingMarkers(rollbackUsingMarkers);
}
private FileStatus[] insertAndGetFilePaths(List<HoodieRecord> records, SparkRDDWriteClient client,
HoodieWriteConfig cfg, String commitTime) throws IOException {
private void insertRecords(List<HoodieRecord> records, SparkRDDWriteClient client,
HoodieWriteConfig cfg, String commitTime) throws IOException {
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
List<WriteStatus> statuses = client.insert(writeRecords, commitTime).collect();
assertNoWriteErrors(statuses);
assertFileSizes(statuses);
metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient);
@@ -1596,11 +1604,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
dataFilesToRead = roView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent(),
"should list the parquet files we wrote in the delta commit");
return allFiles;
}
private FileStatus[] updateAndGetFilePaths(List<HoodieRecord> records, SparkRDDWriteClient client,
HoodieWriteConfig cfg, String commitTime) throws IOException {
private void updateRecords(List<HoodieRecord> records, SparkRDDWriteClient client,
HoodieWriteConfig cfg, String commitTime) throws IOException {
Map<HoodieKey, HoodieRecord> recordsMap = new HashMap<>();
for (HoodieRecord rec : records) {
if (!recordsMap.containsKey(rec.getKey())) {
@@ -1611,6 +1618,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
List<WriteStatus> statuses = client.upsert(jsc.parallelize(records, 1), commitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
assertFileSizes(statuses);
metaClient = HoodieTableMetaClient.reload(metaClient);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
assertTrue(deltaCommit.isPresent());
@@ -1619,8 +1628,13 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient);
return listAllBaseFilesInPath(hoodieTable);
}
private void assertFileSizes(List<WriteStatus> statuses) throws IOException {
for (WriteStatus status: statuses) {
assertEquals(FSUtils.getFileSize(metaClient.getFs(), new Path(metaClient.getBasePath(), status.getStat().getPath())),
status.getStat().getFileSizeInBytes());
}
}
private FileStatus[] getROSnapshotFiles(String partitionPath)