diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CommitsCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CommitsCommand.java index fe6a77369..a3c201c78 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CommitsCommand.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CommitsCommand.java @@ -82,7 +82,8 @@ public class CommitsCommand implements CommandMarker { Collections.reverse(commits); for (int i = 0; i < commits.size(); i++) { HoodieInstant commit = commits.get(i); - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get()); + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(), + HoodieCommitMetadata.class); rows.add(new Comparable[]{commit.getTimestamp(), commitMetadata.fetchTotalBytesWritten(), commitMetadata.fetchTotalFilesInsert(), @@ -160,7 +161,8 @@ public class CommitsCommand implements CommandMarker { if (!timeline.containsInstant(commitInstant)) { return "Commit " + commitTime + " not found in Commits " + timeline; } - HoodieCommitMetadata meta = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(commitInstant).get()); + HoodieCommitMetadata meta = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(commitInstant).get(), + HoodieCommitMetadata.class); List rows = new ArrayList<>(); for (Map.Entry> entry : meta.getPartitionToWriteStats().entrySet()) { String path = entry.getKey(); @@ -221,7 +223,8 @@ public class CommitsCommand implements CommandMarker { if (!timeline.containsInstant(commitInstant)) { return "Commit " + commitTime + " not found in Commits " + timeline; } - HoodieCommitMetadata meta = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(commitInstant).get()); + HoodieCommitMetadata meta = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(commitInstant).get(), + HoodieCommitMetadata.class); List rows = new ArrayList<>(); for (Map.Entry> entry : meta.getPartitionToWriteStats().entrySet()) { String path = entry.getKey(); diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/StatsCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/StatsCommand.java index eeb6bff5c..212e2d277 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/StatsCommand.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/StatsCommand.java @@ -75,7 +75,8 @@ public class StatsCommand implements CommandMarker { DecimalFormat df = new DecimalFormat("#.00"); for (HoodieInstant commitTime : timeline.getInstants().collect(Collectors.toList())) { String waf = "0"; - HoodieCommitMetadata commit = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(commitTime).get()); + HoodieCommitMetadata commit = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(commitTime).get(), + HoodieCommitMetadata.class); if (commit.fetchTotalUpdateRecordsWritten() > 0) { waf = df.format((float) commit.fetchTotalRecordsWritten() / commit.fetchTotalUpdateRecordsWritten()); } diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/utils/CommitUtil.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/utils/CommitUtil.java index 55c2471a9..7be09d171 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/utils/CommitUtil.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/utils/CommitUtil.java @@ -30,7 +30,8 @@ public class CommitUtil { HoodieTimeline timeline = target.getActiveTimeline().reload().getCommitTimeline().filterCompletedInstants(); for (String commit : commitsToCatchup) { HoodieCommitMetadata c = HoodieCommitMetadata.fromBytes( - timeline.getInstantDetails(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commit)).get()); + timeline.getInstantDetails(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commit)).get(), + HoodieCommitMetadata.class); totalNew += c.fetchTotalRecordsWritten() - c.fetchTotalUpdateRecordsWritten(); } return totalNew; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 84bceac82..846db2b40 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -32,6 +32,8 @@ import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.model.HoodieRollingStat; +import com.uber.hoodie.common.model.HoodieRollingStatMetadata; import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; @@ -501,9 +503,8 @@ public class HoodieWriteClient implements Seriali writeStatus.getPartitionPath(), writeStatus.getStat())).collect(); HoodieCommitMetadata metadata = new HoodieCommitMetadata(); - for (Tuple2 stat : stats) { - metadata.addWriteStat(stat._1(), stat._2()); - } + updateMetadataAndRollingStats(actionType, metadata, stats); + // Finalize write final Timer.Context finalizeCtx = metrics.getFinalizeCtx(); @@ -1256,4 +1257,42 @@ public class HoodieWriteClient implements Seriali }); return compactionInstantTimeOpt; } + + private void updateMetadataAndRollingStats(String actionType, HoodieCommitMetadata metadata, List> stats) { + // TODO : make sure we cannot rollback / archive last commit file + try { + // Create a Hoodie table which encapsulated the commits and files visible + HoodieTable table = HoodieTable.getHoodieTable( + new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); + // 0. All of the rolling stat management is only done by the DELTA commit for MOR and COMMIT for COW other wise + // there may be race conditions + HoodieRollingStatMetadata rollingStatMetadata = new HoodieRollingStatMetadata(actionType); + // 1. Look up the previous compaction/commit and get the HoodieCommitMetadata from there. + // 2. Now, first read the existing rolling stats and merge with the result of current metadata. + + // Need to do this on every commit (delta or commit) to support COW and MOR. + for (Tuple2 stat : stats) { + metadata.addWriteStat(stat._1(), stat._2()); + HoodieRollingStat hoodieRollingStat = new HoodieRollingStat(stat._2().getFileId(), + stat._2().getNumWrites() - (stat._2().getNumUpdateWrites() - stat._2.getNumDeletes()), + stat._2().getNumUpdateWrites(), stat._2.getNumDeletes(), stat._2().getTotalWriteBytes()); + rollingStatMetadata.addRollingStat(stat._1, hoodieRollingStat); + } + // The last rolling stat should be present in the completed timeline + Optional lastInstant = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants() + .lastInstant(); + if (lastInstant.isPresent()) { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata + .fromBytes(table.getActiveTimeline().getInstantDetails(lastInstant + .get()).get(), HoodieCommitMetadata.class); + rollingStatMetadata = rollingStatMetadata + .merge(HoodieCommitMetadata.fromBytes(commitMetadata.getExtraMetadata() + .get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(), HoodieRollingStatMetadata.class)); + } + metadata.addMetadata(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY, rollingStatMetadata.toJsonString()); + } catch (IOException io) { + throw new HoodieCommitException("Unable to save rolling stats"); + } + } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java index a3d0429ea..3e4833964 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java @@ -45,7 +45,11 @@ public class InMemoryHashIndex extends HoodieInde public InMemoryHashIndex(HoodieWriteConfig config) { super(config); - recordLocationMap = new ConcurrentHashMap<>(); + synchronized (InMemoryHashIndex.class) { + if (recordLocationMap == null) { + recordLocationMap = new ConcurrentHashMap<>(); + } + } } @Override diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java index fa7d857d6..9f4f0924d 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java @@ -75,6 +75,8 @@ public class HoodieAppendHandle extends HoodieIOH private long recordsWritten = 0; // Total number of records deleted during an append private long recordsDeleted = 0; + // Total number of records updated during an append + private long updatedRecordsWritten = 0; // Average record size for a HoodieRecord. This size is updated at the end of every log block flushed to disk private long averageRecordSize = 0; private HoodieLogFile currentLogFile; @@ -89,6 +91,8 @@ public class HoodieAppendHandle extends HoodieIOH private int maxBlockSize = config.getLogFileDataBlockMaxSize(); // Header metadata for a log block private Map header = Maps.newHashMap(); + // Total number of new records inserted into the delta file + private long insertRecordsWritten = 0; public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable, String fileId, Iterator> recordItr) { @@ -111,6 +115,7 @@ public class HoodieAppendHandle extends HoodieIOH // extract some information from the first record Optional fileSlice = fileSystemView.getLatestFileSlices(partitionPath) .filter(fileSlice1 -> fileSlice1.getFileId().equals(fileId)).findFirst(); + // Set the base commit time as the current commitTime for new inserts into log files String baseInstantTime = commitTime; if (fileSlice.isPresent()) { baseInstantTime = fileSlice.get().getBaseInstantTime(); @@ -156,6 +161,12 @@ public class HoodieAppendHandle extends HoodieIOH hoodieRecord.getPartitionPath(), fileId); HoodieAvroUtils .addCommitMetadataToRecord((GenericRecord) avroRecord.get(), commitTime, seqId); + // If currentLocation is present, then this is an update + if (hoodieRecord.getCurrentLocation() != null) { + updatedRecordsWritten++; + } else { + insertRecordsWritten++; + } recordsWritten++; } else { recordsDeleted++; @@ -238,6 +249,8 @@ public class HoodieAppendHandle extends HoodieIOH writeStatus.getStat().setPrevCommit(commitTime); writeStatus.getStat().setFileId(this.fileId); writeStatus.getStat().setNumWrites(recordsWritten); + writeStatus.getStat().setNumUpdateWrites(updatedRecordsWritten); + writeStatus.getStat().setNumInserts(insertRecordsWritten); writeStatus.getStat().setNumDeletes(recordsDeleted); writeStatus.getStat().setTotalWriteBytes(estimatedNumberOfBytesWritten); writeStatus.getStat().setTotalWriteErrors(writeStatus.getFailedRecords().size()); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java index bf5febd1a..a65008b4c 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java @@ -31,6 +31,7 @@ import com.uber.hoodie.avro.model.HoodieSavepointMetadata; import com.uber.hoodie.common.model.ActionType; import com.uber.hoodie.common.model.HoodieArchivedLogFile; import com.uber.hoodie.common.model.HoodieCommitMetadata; +import com.uber.hoodie.common.model.HoodieRollingStatMetadata; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.log.HoodieLogFormat; @@ -55,6 +56,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -273,7 +275,7 @@ public class HoodieCommitArchiveLog { } case COMMIT_ACTION: { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata - .fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get()); + .fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class); archivedMetaWrapper.setHoodieCommitMetadata(commitMetadataConverter(commitMetadata)); archivedMetaWrapper.setActionType(ActionType.commit.name()); break; @@ -294,7 +296,7 @@ public class HoodieCommitArchiveLog { } case HoodieTimeline.DELTA_COMMIT_ACTION: { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata - .fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get()); + .fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class); archivedMetaWrapper.setHoodieCommitMetadata(commitMetadataConverter(commitMetadata)); archivedMetaWrapper.setActionType(ActionType.commit.name()); break; @@ -312,6 +314,8 @@ public class HoodieCommitArchiveLog { mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); com.uber.hoodie.avro.model.HoodieCommitMetadata avroMetaData = mapper .convertValue(hoodieCommitMetadata, com.uber.hoodie.avro.model.HoodieCommitMetadata.class); + // Do not archive Rolling Stats, cannot set to null since AVRO will throw null pointer + avroMetaData.getExtraMetadata().put(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY, StringUtils.EMPTY); return avroMetaData; } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index fa180dba4..1f9ea2f05 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -28,6 +28,7 @@ import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.model.HoodieRollingStatMetadata; import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; @@ -643,12 +644,18 @@ public class HoodieCopyOnWriteTable extends Hoodi */ private HashMap bucketInfoMap; + /** + * Rolling stats for files + */ + protected HoodieRollingStatMetadata rollingStatMetadata; + protected long averageRecordSize; + UpsertPartitioner(WorkloadProfile profile) { updateLocationToBucket = new HashMap<>(); partitionPathToInsertBuckets = new HashMap<>(); bucketInfoMap = new HashMap<>(); globalStat = profile.getGlobalStat(); - + rollingStatMetadata = getRollingStats(); assignUpdates(profile); assignInserts(profile); @@ -792,7 +799,7 @@ public class HoodieCopyOnWriteTable extends Hoodi * Obtains the average record size based on records written during last commit. Used for * estimating how many records pack into one file. */ - private long averageBytesPerRecord() { + protected long averageBytesPerRecord() { long avgSize = 0L; HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getCommitTimeline() .filterCompletedInstants(); @@ -800,7 +807,7 @@ public class HoodieCopyOnWriteTable extends Hoodi if (!commitTimeline.empty()) { HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); HoodieCommitMetadata commitMetadata = HoodieCommitMetadata - .fromBytes(commitTimeline.getInstantDetails(latestCommitTime).get()); + .fromBytes(commitTimeline.getInstantDetails(latestCommitTime).get(), HoodieCommitMetadata.class); avgSize = (long) Math.ceil( (1.0 * commitMetadata.fetchTotalBytesWritten()) / commitMetadata .fetchTotalRecordsWritten()); @@ -852,4 +859,8 @@ public class HoodieCopyOnWriteTable extends Hoodi } } } + + protected HoodieRollingStatMetadata getRollingStats() { + return null; + } } \ No newline at end of file diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index 078d23ad0..b7d7a5080 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -16,6 +16,7 @@ package com.uber.hoodie.table; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.uber.hoodie.WriteStatus; @@ -27,10 +28,11 @@ import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.model.HoodieRollingStat; +import com.uber.hoodie.common.model.HoodieRollingStatMetadata; import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.log.HoodieLogFormat; -import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer; import com.uber.hoodie.common.table.log.block.HoodieCommandBlock; import com.uber.hoodie.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum; import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HeaderMetadataType; @@ -40,6 +42,7 @@ import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieCompactionException; +import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieRollbackException; import com.uber.hoodie.exception.HoodieUpsertException; import com.uber.hoodie.func.MergeOnReadLazyInsertIterable; @@ -103,7 +106,7 @@ public class HoodieMergeOnReadTable extends Iterator> recordItr) throws IOException { logger.info("Merging updates for commit " + commitTime + " for file " + fileId); - if (mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) { + if (!index.canIndexLogFiles() && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) { logger.info( "Small file corrections for updates for commit " + commitTime + " for file " + fileId); return super.handleUpdate(commitTime, fileId, recordItr); @@ -188,8 +191,6 @@ public class HoodieMergeOnReadTable extends .filter(i -> !i.isInflight()).forEach(this.getActiveTimeline()::revertToInflight); logger.info("Unpublished " + commits); Long startTime = System.currentTimeMillis(); - // TODO (NA) : remove this once HoodieIndex is a member of HoodieTable - HoodieIndex hoodieIndex = HoodieIndex.createIndex(config, jsc); List allRollbackStats = jsc.parallelize(FSUtils .getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning())) @@ -224,90 +225,53 @@ public class HoodieMergeOnReadTable extends throw new UncheckedIOException("Failed to rollback for commit " + commit, io); } case HoodieTimeline.DELTA_COMMIT_ACTION: + // -------------------------------------------------------------------------------------------------- + // (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal + // -------------------------------------------------------------------------------------------------- + // (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In + // this scenario we would want to delete these log files. + // (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario, + // HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks. + // (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is + // being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and + // and hence will end up deleting these log files. This is done so there are no orphan log files + // lying around. + // (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions + // taken in this scenario is a combination of (A.2) and (A.3) + // --------------------------------------------------------------------------------------------------- + // (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal + // --------------------------------------------------------------------------------------------------- + // (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no entries. + // In this scenario, we delete all the parquet files written for the failed commit. + // (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In + // this scenario, perform (A.1) and for updates written to log files, write rollback blocks. + // (B.3) Rollback triggered for first commit - Same as (B.1) + // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files + // as well if the base parquet file gets deleted. try { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( metaClient.getCommitTimeline().getInstantDetails( new HoodieInstant(true, instant.getAction(), instant.getTimestamp())) - .get()); + .get(), HoodieCommitMetadata.class); // read commit file and (either append delete blocks or delete file) final Map filesToDeletedStatus = new HashMap<>(); Map filesToNumBlocksRollback = new HashMap<>(); // In case all data was inserts and the commit failed, delete the file belonging to that commit + // We do not know fileIds for inserts (first inserts are either log files or parquet files), + // delete all files for the corresponding failed commit, if present (same as COW) super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter); - final Set deletedFiles = filesToDeletedStatus.entrySet().stream() .map(entry -> { Path filePath = entry.getKey().getPath(); return FSUtils.getFileIdFromFilePath(filePath); }).collect(Collectors.toSet()); + // append rollback blocks for updates if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { - // This needs to be done since GlobalIndex at the moment does not store the latest commit time - Map fileIdToLatestCommitTimeMap = - hoodieIndex.isGlobal() ? this.getRTFileSystemView().getLatestFileSlices(partitionPath) - .collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime)) : null; - commitMetadata.getPartitionToWriteStats().get(partitionPath).stream() - .filter(wStat -> { - if (wStat != null - && wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT - && wStat.getPrevCommit() != null && !deletedFiles.contains(wStat.getFileId())) { - return true; - } - return false; - }) - .forEach(wStat -> { - Writer writer = null; - String baseCommitTime = wStat.getPrevCommit(); - if (hoodieIndex.isGlobal()) { - baseCommitTime = fileIdToLatestCommitTimeMap.get(wStat.getFileId()); - } - try { - // TODO : wStat.getPrevCommit() might not give the right commit time in the following - // scenario if a compaction was scheduled, the new commitTime will be used to write the - // new log file. In this case, the commit time for the log file is the - // getBaseCommitTime() - writer = HoodieLogFormat.newWriterBuilder().onParentPath( - new Path(this.getMetaClient().getBasePath(), partitionPath)) - .withFileId(wStat.getFileId()).overBaseCommit(baseCommitTime) - .withFs(this.metaClient.getFs()) - .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); - Long numRollbackBlocks = 0L; - // generate metadata - Map header = - Maps.newHashMap(); - header.put(HeaderMetadataType.INSTANT_TIME, - metaClient.getActiveTimeline().lastInstant().get().getTimestamp()); - header.put(HeaderMetadataType.TARGET_INSTANT_TIME, - commit); - header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE, String - .valueOf( - HoodieCommandBlockTypeEnum - .ROLLBACK_PREVIOUS_BLOCK - .ordinal())); - // if update belongs to an existing log file - writer = writer.appendBlock(new HoodieCommandBlock(header)); - numRollbackBlocks++; - filesToNumBlocksRollback.put(this.getMetaClient().getFs() - .getFileStatus(writer.getLogFile().getPath()), numRollbackBlocks); - } catch (IOException | InterruptedException io) { - throw new HoodieRollbackException( - "Failed to rollback for commit " + commit, io); - } finally { - try { - if (writer != null) { - writer.close(); - } - } catch (IOException io) { - throw new UncheckedIOException(io); - } - } - }); - hoodieRollbackStats = HoodieRollbackStat.newBuilder() - .withPartitionPath(partitionPath) - .withDeletedFileResults(filesToDeletedStatus) - .withRollbackBlockAppendResults(filesToNumBlocksRollback).build(); + hoodieRollbackStats = rollback(index, partitionPath, commit, commitMetadata, filesToDeletedStatus, + filesToNumBlocksRollback, deletedFiles); } break; } catch (IOException io) { @@ -335,6 +299,25 @@ public class HoodieMergeOnReadTable extends return Optional.empty(); } + @Override + protected HoodieRollingStatMetadata getRollingStats() { + try { + Optional lastInstant = this.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants() + .lastInstant(); + if (lastInstant.isPresent()) { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + this.getActiveTimeline().getInstantDetails(lastInstant.get()).get(), HoodieCommitMetadata.class); + HoodieRollingStatMetadata rollingStatMetadata = HoodieCommitMetadata + .fromBytes(commitMetadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY) + .getBytes(), HoodieRollingStatMetadata.class); + return rollingStatMetadata; + } + return null; + } catch (IOException e) { + throw new HoodieException(); + } + } + /** * UpsertPartitioner for MergeOnRead table type, this allows auto correction of small parquet * files to larger ones without the need for an index in the logFile. @@ -345,7 +328,6 @@ public class HoodieMergeOnReadTable extends super(profile); } - @Override protected List getSmallFiles(String partitionPath) { // smallFiles only for partitionPath @@ -354,32 +336,59 @@ public class HoodieMergeOnReadTable extends // Init here since this class (and member variables) might not have been initialized HoodieTimeline commitTimeline = getCompletedCommitTimeline(); + // Find out all eligible small file slices if (!commitTimeline.empty()) { HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); // find smallest file in partition and append to it - - // TODO - check if index.isglobal then small files are log files too - Optional smallFileSlice = getRTFileSystemView() - // Use the merged file-slice for small file selection - .getLatestMergedFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).filter( - fileSlice -> fileSlice.getLogFiles().count() < 1 - && fileSlice.getDataFile().get().getFileSize() < config - .getParquetSmallFileLimit()).sorted((FileSlice left, FileSlice right) -> - left.getDataFile().get().getFileSize() < right.getDataFile().get().getFileSize() - ? -1 : 1).findFirst(); - - if (smallFileSlice.isPresent()) { - String filename = smallFileSlice.get().getDataFile().get().getFileName(); + List allSmallFileSlices = new ArrayList<>(); + // If we cannot index log files, then we choose the smallest parquet file in the partition and add inserts to + // it. Doing this overtime for a partition, we ensure that we handle small file issues + if (!index.canIndexLogFiles()) { + // TODO : choose last N small files since there can be multiple small files written to a single partition + // by different spark partitions in a single batch + Optional smallFileSlice = getRTFileSystemView() + .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).filter( + fileSlice -> fileSlice.getLogFiles().count() < 1 + && fileSlice.getDataFile().get().getFileSize() < config + .getParquetSmallFileLimit()).sorted((FileSlice left, FileSlice right) -> + left.getDataFile().get().getFileSize() < right.getDataFile().get().getFileSize() + ? -1 : 1).findFirst(); + if (smallFileSlice.isPresent()) { + allSmallFileSlices.add(smallFileSlice.get()); + } + } else { + // If we can index log files, we can add more inserts to log files. + List allFileSlices = getRTFileSystemView() + .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()) + .collect(Collectors.toList()); + for (FileSlice fileSlice : allFileSlices) { + if (isSmallFile(partitionPath, fileSlice)) { + allSmallFileSlices.add(fileSlice); + } + } + } + // Create SmallFiles from the eligible file slices + for (FileSlice smallFileSlice : allSmallFileSlices) { SmallFile sf = new SmallFile(); - sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), - FSUtils.getFileId(filename)); - sf.sizeBytes = smallFileSlice.get().getDataFile().get().getFileSize(); - smallFileLocations.add(sf); - // Update the global small files list - smallFiles.add(sf); + if (smallFileSlice.getDataFile().isPresent()) { + // TODO : Move logic of file name, file id, base commit time handling inside file slice + String filename = smallFileSlice.getDataFile().get().getFileName(); + sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); + sf.sizeBytes = getTotalFileSize(partitionPath, smallFileSlice); + smallFileLocations.add(sf); + // Update the global small files list + smallFiles.add(sf); + } else { + HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get(); + sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()), + FSUtils.getFileIdFromLogPath(logFile.getPath())); + sf.sizeBytes = getTotalFileSize(partitionPath, smallFileSlice); + smallFileLocations.add(sf); + // Update the global small files list + smallFiles.add(sf); + } } } - return smallFileLocations; } @@ -388,6 +397,116 @@ public class HoodieMergeOnReadTable extends .map(smallFile -> ((SmallFile) smallFile).location.getFileId()) .collect(Collectors.toList()); } + + private long getTotalFileSize(String partitionPath, FileSlice fileSlice) { + if (rollingStatMetadata != null) { + Map partitionRollingStats = + rollingStatMetadata.getPartitionToRollingStats().get(partitionPath); + if (partitionRollingStats != null) { + HoodieRollingStat rollingStatForFile = partitionRollingStats.get(fileSlice.getFileId()); + if (rollingStatForFile != null) { + long inserts = rollingStatForFile.getInserts(); + long totalSize = averageRecordSize * inserts; + return totalSize; + } + } + } + // In case Rolling Stats is not present, fall back to sizing log files based on heuristics + if (!fileSlice.getDataFile().isPresent()) { + return convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList())); + } else { + return fileSlice.getDataFile().get().getFileSize() + convertLogFilesSizeToExpectedParquetSize(fileSlice + .getLogFiles().collect(Collectors.toList())); + } + } + + private boolean isSmallFile(String partitionPath, FileSlice fileSlice) { + long totalSize = getTotalFileSize(partitionPath, fileSlice); + if (totalSize < config.getParquetMaxFileSize()) { + return true; + } + return false; + } + + // TODO (NA) : Make this static part of utility + @VisibleForTesting + public long convertLogFilesSizeToExpectedParquetSize(List hoodieLogFiles) { + long totalSizeOfLogFiles = hoodieLogFiles.stream().map(hoodieLogFile -> hoodieLogFile.getFileSize().get()) + .reduce((a, b) -> (a + b)).orElse(0L); + // Here we assume that if there is no base parquet file, all log files contain only inserts. + // We can then just get the parquet equivalent size of these log files, compare that with + // {@link config.getParquetMaxFileSize()} and decide if there is scope to insert more rows + long logFilesEquivalentParquetFileSize = (long) (totalSizeOfLogFiles * config + .getLogFileToParquetCompressionRatio()); + return logFilesEquivalentParquetFileSize; + } + } + + private Map generateHeader(String commit) { + // generate metadata + Map header = Maps.newHashMap(); + header.put(HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp()); + header.put(HeaderMetadataType.TARGET_INSTANT_TIME, commit); + header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK + .ordinal())); + return header; + } + + private HoodieRollbackStat rollback(HoodieIndex hoodieIndex, String partitionPath, String commit, + HoodieCommitMetadata commitMetadata, final Map filesToDeletedStatus, + Map filesToNumBlocksRollback, Set deletedFiles) { + // The following needs to be done since GlobalIndex at the moment does not store the latest commit time. + // Also, wStat.getPrevCommit() might not give the right commit time in the following + // scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be + // used to write the new log files. In this case, the commit time for the log file is the compaction requested time. + Map fileIdToBaseCommitTimeForLogMap = + hoodieIndex.isGlobal() ? this.getRTFileSystemView().getLatestFileSlices(partitionPath) + .collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime)) : null; + commitMetadata.getPartitionToWriteStats().get(partitionPath).stream() + .filter(wStat -> { + // Filter out stats without prevCommit since they are all inserts + if (wStat != null && wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT && wStat.getPrevCommit() != null + && !deletedFiles.contains(wStat.getFileId())) { + return true; + } + return false; + }).forEach(wStat -> { + HoodieLogFormat.Writer writer = null; + String baseCommitTime = wStat.getPrevCommit(); + if (hoodieIndex.isGlobal()) { + baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()); + } + try { + writer = HoodieLogFormat.newWriterBuilder().onParentPath( + new Path(this.getMetaClient().getBasePath(), partitionPath)) + .withFileId(wStat.getFileId()).overBaseCommit(baseCommitTime) + .withFs(this.metaClient.getFs()) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); + Long numRollbackBlocks = 0L; + // generate metadata + Map header = generateHeader(commit); + // if update belongs to an existing log file + writer = writer.appendBlock(new HoodieCommandBlock(header)); + numRollbackBlocks++; + filesToNumBlocksRollback.put(this.getMetaClient().getFs() + .getFileStatus(writer.getLogFile().getPath()), numRollbackBlocks); + } catch (IOException | InterruptedException io) { + throw new HoodieRollbackException( + "Failed to rollback for commit " + commit, io); + } finally { + try { + if (writer != null) { + writer.close(); + } + } catch (IOException io) { + throw new UncheckedIOException(io); + } + } + }); + return HoodieRollbackStat.newBuilder() + .withPartitionPath(partitionPath) + .withDeletedFileResults(filesToDeletedStatus) + .withRollbackBlockAppendResults(filesToNumBlocksRollback).build(); } } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java index 0a669683c..6b5440b3d 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java @@ -251,7 +251,7 @@ public class TestCleaner extends TestHoodieClientBase { HashMap> fileIdToVersions = new HashMap<>(); for (HoodieInstant entry : timeline.getInstants().collect(Collectors.toList())) { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata - .fromBytes(timeline.getInstantDetails(entry).get()); + .fromBytes(timeline.getInstantDetails(entry).get(), HoodieCommitMetadata.class); for (HoodieWriteStat wstat : commitMetadata.getWriteStats(partitionPath)) { if (!fileIdToVersions.containsKey(wstat.getFileId())) { diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java index 9fbd0871a..09b8795ab 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java @@ -28,6 +28,8 @@ import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRollingStat; +import com.uber.hoodie.common.model.HoodieRollingStatMetadata; import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; @@ -46,6 +48,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -54,6 +57,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaRDD; +import org.junit.Assert; import org.junit.Test; import scala.Option; @@ -563,7 +567,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { HoodieInstant commitInstant = new HoodieInstant(false, actionType, commitTime); HoodieTimeline commitTimeline = metaClient.getCommitTimeline().filterCompletedInstants(); HoodieCommitMetadata commitMetadata = HoodieCommitMetadata - .fromBytes(commitTimeline.getInstantDetails(commitInstant).get()); + .fromBytes(commitTimeline.getInstantDetails(commitInstant).get(), HoodieCommitMetadata.class); String basePath = table.getMetaClient().getBasePath(); Collection commitPathNames = commitMetadata.getFileIdAndFullPaths(basePath).values(); @@ -571,7 +575,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { String filename = HoodieTestUtils.getCommitFilePath(basePath, commitTime); FileInputStream inputStream = new FileInputStream(filename); String everything = IOUtils.toString(inputStream); - HoodieCommitMetadata metadata = HoodieCommitMetadata.fromJsonString(everything.toString()); + HoodieCommitMetadata metadata = HoodieCommitMetadata.fromJsonString(everything.toString(), + HoodieCommitMetadata.class); HashMap paths = metadata.getFileIdAndFullPaths(basePath); inputStream.close(); @@ -581,6 +586,79 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { } } + /** + * Test to ensure commit metadata points to valid files + */ + @Test + public void testRollingStatsInMetadata() throws Exception { + + HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); + HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + + String commitTime = "000"; + client.startCommitWithTime(commitTime); + + List records = dataGen.generateInserts(commitTime, 200); + JavaRDD writeRecords = jsc.parallelize(records, 1); + + JavaRDD result = client.bulkInsert(writeRecords, commitTime); + + assertTrue("Commit should succeed", client.commit(commitTime, result)); + assertTrue("After explicit commit, commit file should be created", + HoodieTestUtils.doesCommitExist(basePath, commitTime)); + + // Read from commit file + String filename = HoodieTestUtils.getCommitFilePath(basePath, commitTime); + FileInputStream inputStream = new FileInputStream(filename); + String everything = IOUtils.toString(inputStream); + HoodieCommitMetadata metadata = HoodieCommitMetadata.fromJsonString(everything.toString(), + HoodieCommitMetadata.class); + HoodieRollingStatMetadata rollingStatMetadata = HoodieCommitMetadata.fromJsonString(metadata.getExtraMetadata() + .get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY), HoodieRollingStatMetadata.class); + int inserts = 0; + for (Map.Entry> pstat : rollingStatMetadata.getPartitionToRollingStats() + .entrySet()) { + for (Map.Entry stat : pstat.getValue().entrySet()) { + inserts += stat.getValue().getInserts(); + } + } + Assert.assertEquals(inserts, 200); + + // Update + Inserts such that they just expand file1 + commitTime = "001"; + client.startCommitWithTime(commitTime); + + records = dataGen.generateUpdates(commitTime, records); + writeRecords = jsc.parallelize(records, 1); + result = client.upsert(writeRecords, commitTime); + + assertTrue("Commit should succeed", client.commit(commitTime, result)); + assertTrue("After explicit commit, commit file should be created", + HoodieTestUtils.doesCommitExist(basePath, commitTime)); + + // Read from commit file + filename = HoodieTestUtils.getCommitFilePath(basePath, commitTime); + inputStream = new FileInputStream(filename); + everything = IOUtils.toString(inputStream); + metadata = HoodieCommitMetadata.fromJsonString(everything.toString(), HoodieCommitMetadata.class); + rollingStatMetadata = HoodieCommitMetadata.fromJsonString(metadata.getExtraMetadata() + .get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY), HoodieRollingStatMetadata.class); + inserts = 0; + int upserts = 0; + for (Map.Entry> pstat : rollingStatMetadata.getPartitionToRollingStats() + .entrySet()) { + for (Map.Entry stat : pstat.getValue().entrySet()) { + inserts += stat.getValue().getInserts(); + upserts += stat.getValue().getUpserts(); + } + } + Assert.assertEquals(inserts, 200); + Assert.assertEquals(upserts, 200); + + } + /** * Build Hoodie Write Config for small data file sizes */ diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java index a85bd3c23..cc1dad8e1 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java @@ -109,7 +109,8 @@ public class HoodieClientTestUtils { List commitsToReturn) throws IOException { HashMap fileIdToFullPath = new HashMap<>(); for (HoodieInstant commit : commitsToReturn) { - HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit).get()); + HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit).get(), + HoodieCommitMetadata.class); fileIdToFullPath.putAll(metadata.getFileIdAndFullPaths(basePath)); } return fileIdToFullPath; diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java index f5c22c442..aa2744043 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java @@ -31,9 +31,12 @@ import com.uber.hoodie.common.HoodieTestDataGenerator; import com.uber.hoodie.common.TestRawTripPayload.MetadataMergeWriteStatus; import com.uber.hoodie.common.minicluster.HdfsTestService; import com.uber.hoodie.common.model.FileSlice; +import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRollingStat; +import com.uber.hoodie.common.model.HoodieRollingStatMetadata; import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.table.HoodieTableMetaClient; @@ -896,6 +899,213 @@ public class TestMergeOnReadTable { } } + /** + * Test to ensure rolling stats are correctly written to metadata file + */ + @Test + public void testRollingStatsInMetadata() throws Exception { + + HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build(); + HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + + String commitTime = "000"; + client.startCommitWithTime(commitTime); + + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + List records = dataGen.generateInserts(commitTime, 200); + JavaRDD writeRecords = jsc.parallelize(records, 1); + + JavaRDD statuses = client.insert(writeRecords, commitTime); + assertTrue("Commit should succeed", client.commit(commitTime, statuses)); + + // Read from commit file + table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(table.getActiveTimeline().getInstantDetails(table + .getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class); + HoodieRollingStatMetadata rollingStatMetadata = HoodieCommitMetadata.fromBytes(metadata.getExtraMetadata() + .get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(), HoodieRollingStatMetadata.class); + int inserts = 0; + for (Map.Entry> pstat : + rollingStatMetadata.getPartitionToRollingStats().entrySet()) { + for (Map.Entry stat : pstat.getValue().entrySet()) { + inserts += stat.getValue().getInserts(); + } + } + Assert.assertEquals(inserts, 200); + + commitTime = "001"; + client.startCommitWithTime(commitTime); + records = dataGen.generateUpdates(commitTime, records); + writeRecords = jsc.parallelize(records, 1); + statuses = client.upsert(writeRecords, commitTime); + assertTrue("Commit should succeed", client.commit(commitTime, statuses)); + + // Read from commit file + table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + metadata = HoodieCommitMetadata.fromBytes(table.getActiveTimeline().getInstantDetails(table + .getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class); + rollingStatMetadata = HoodieCommitMetadata.fromBytes(metadata.getExtraMetadata() + .get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(), HoodieRollingStatMetadata.class); + inserts = 0; + int upserts = 0; + for (Map.Entry> pstat : rollingStatMetadata.getPartitionToRollingStats() + .entrySet()) { + for (Map.Entry stat : pstat.getValue().entrySet()) { + inserts += stat.getValue().getInserts(); + upserts += stat.getValue().getUpserts(); + } + } + + Assert.assertEquals(inserts, 200); + Assert.assertEquals(upserts, 200); + + client.rollback(commitTime); + + // Read from commit file + table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + metadata = HoodieCommitMetadata.fromBytes(table.getActiveTimeline().getInstantDetails(table + .getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class); + rollingStatMetadata = HoodieCommitMetadata.fromBytes(metadata.getExtraMetadata() + .get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(), HoodieRollingStatMetadata.class); + inserts = 0; + upserts = 0; + for (Map.Entry> pstat : rollingStatMetadata.getPartitionToRollingStats() + .entrySet()) { + for (Map.Entry stat : pstat.getValue().entrySet()) { + inserts += stat.getValue().getInserts(); + upserts += stat.getValue().getUpserts(); + } + } + Assert.assertEquals(inserts, 200); + Assert.assertEquals(upserts, 0); + } + + /** + * Test to ensure rolling stats are correctly written to the metadata file, identifies small files and corrects them + */ + @Test + public void testRollingStatsWithSmallFileHandling() throws Exception { + + HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build(); + HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + Map fileIdToInsertsMap = new HashMap<>(); + Map fileIdToUpsertsMap = new HashMap<>(); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + + String commitTime = "000"; + client.startCommitWithTime(commitTime); + + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + List records = dataGen.generateInserts(commitTime, 200); + JavaRDD writeRecords = jsc.parallelize(records, 1); + + JavaRDD statuses = client.insert(writeRecords, commitTime); + assertTrue("Commit should succeed", client.commit(commitTime, statuses)); + + // Read from commit file + table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(table.getActiveTimeline().getInstantDetails(table + .getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class); + HoodieRollingStatMetadata rollingStatMetadata = HoodieCommitMetadata.fromBytes(metadata.getExtraMetadata() + .get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(), HoodieRollingStatMetadata.class); + int inserts = 0; + for (Map.Entry> pstat : + rollingStatMetadata.getPartitionToRollingStats().entrySet()) { + for (Map.Entry stat : pstat.getValue().entrySet()) { + inserts += stat.getValue().getInserts(); + fileIdToInsertsMap.put(stat.getKey(), stat.getValue().getInserts()); + fileIdToUpsertsMap.put(stat.getKey(), stat.getValue().getUpserts()); + } + } + Assert.assertEquals(inserts, 200); + + commitTime = "001"; + client.startCommitWithTime(commitTime); + // generate updates + inserts. inserts should be handled into small files + records = dataGen.generateUpdates(commitTime, records); + records.addAll(dataGen.generateInserts(commitTime, 200)); + writeRecords = jsc.parallelize(records, 1); + statuses = client.upsert(writeRecords, commitTime); + assertTrue("Commit should succeed", client.commit(commitTime, statuses)); + + // Read from commit file + table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + metadata = HoodieCommitMetadata.fromBytes(table.getActiveTimeline().getInstantDetails(table + .getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class); + rollingStatMetadata = HoodieCommitMetadata.fromBytes(metadata.getExtraMetadata() + .get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(), HoodieRollingStatMetadata.class); + inserts = 0; + int upserts = 0; + for (Map.Entry> pstat : rollingStatMetadata.getPartitionToRollingStats() + .entrySet()) { + for (Map.Entry stat : pstat.getValue().entrySet()) { + // No new file id should be created, all the data should be written to small files already there + assertTrue(fileIdToInsertsMap.containsKey(stat.getKey())); + assertTrue(fileIdToUpsertsMap.containsKey(stat.getKey())); + inserts += stat.getValue().getInserts(); + upserts += stat.getValue().getUpserts(); + } + } + + Assert.assertEquals(inserts, 400); + Assert.assertEquals(upserts, 200); + + // Test small file handling after compaction + commitTime = "002"; + client.scheduleCompactionAtInstant(commitTime, Optional.of(metadata.getExtraMetadata())); + statuses = client.compact(commitTime); + client.commitCompaction(commitTime, statuses, Optional.empty()); + + // Read from commit file + table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + metadata = HoodieCommitMetadata.fromBytes(table.getActiveTimeline().getInstantDetails(table + .getActiveTimeline().getCommitsTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class); + HoodieRollingStatMetadata rollingStatMetadata1 = HoodieCommitMetadata.fromBytes(metadata.getExtraMetadata() + .get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(), HoodieRollingStatMetadata.class); + + // Ensure that the rolling stats from the extra metadata of delta commits is copied over to the compaction commit + for (Map.Entry> entry : rollingStatMetadata.getPartitionToRollingStats() + .entrySet()) { + Assert.assertTrue(rollingStatMetadata1.getPartitionToRollingStats().containsKey(entry.getKey())); + Assert.assertEquals(rollingStatMetadata1.getPartitionToRollingStats().get(entry.getKey()).size(), entry + .getValue().size()); + } + + // Write inserts + updates + commitTime = "003"; + client.startCommitWithTime(commitTime); + // generate updates + inserts. inserts should be handled into small files + records = dataGen.generateUpdates(commitTime, records); + records.addAll(dataGen.generateInserts(commitTime, 200)); + writeRecords = jsc.parallelize(records, 1); + statuses = client.upsert(writeRecords, commitTime); + assertTrue("Commit should succeed", client.commit(commitTime, statuses)); + + // Read from commit file + table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + metadata = HoodieCommitMetadata.fromBytes(table.getActiveTimeline().getInstantDetails(table + .getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class); + rollingStatMetadata = HoodieCommitMetadata.fromBytes(metadata.getExtraMetadata() + .get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(), HoodieRollingStatMetadata.class); + inserts = 0; + upserts = 0; + for (Map.Entry> pstat : rollingStatMetadata.getPartitionToRollingStats() + .entrySet()) { + for (Map.Entry stat : pstat.getValue().entrySet()) { + // No new file id should be created, all the data should be written to small files already there + assertTrue(fileIdToInsertsMap.containsKey(stat.getKey())); + inserts += stat.getValue().getInserts(); + upserts += stat.getValue().getUpserts(); + } + } + + Assert.assertEquals(inserts, 600); + Assert.assertEquals(upserts, 600); + + } private HoodieWriteConfig getConfig(Boolean autoCommit) { return getConfigBuilder(autoCommit).build(); diff --git a/hoodie-common/src/main/avro/HoodieCommitMetadata.avsc b/hoodie-common/src/main/avro/HoodieCommitMetadata.avsc index 0429cc7d3..3f4473200 100644 --- a/hoodie-common/src/main/avro/HoodieCommitMetadata.avsc +++ b/hoodie-common/src/main/avro/HoodieCommitMetadata.avsc @@ -66,6 +66,18 @@ "name":"numInserts", "type":["null","long"], "default" : null + }, + { + "name":"totalLogBlocks", + "type":["null","long"] + }, + { + "name":"totalCorruptLogBlock", + "type":["null","long"] + }, + { + "name":"totalRollbackBlocks", + "type":["null","long"] } ] } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java index fe97a4e34..c75c19d77 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java @@ -119,12 +119,12 @@ public class HoodieCommitMetadata implements Serializable { return getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this); } - public static HoodieCommitMetadata fromJsonString(String jsonStr) throws IOException { + public static T fromJsonString(String jsonStr, Class clazz) throws Exception { if (jsonStr == null || jsonStr.isEmpty()) { // For empty commit file (no data or somethings bad happen). - return new HoodieCommitMetadata(); + return clazz.newInstance(); } - return getObjectMapper().readValue(jsonStr, HoodieCommitMetadata.class); + return getObjectMapper().readValue(jsonStr, clazz); } // Here the functions are named "fetch" instead of "get", to get avoid of the json conversion. @@ -319,11 +319,15 @@ public class HoodieCommitMetadata implements Serializable { return result; } - public static HoodieCommitMetadata fromBytes(byte[] bytes) throws IOException { - return fromJsonString(new String(bytes, Charset.forName("utf-8"))); + public static T fromBytes(byte[] bytes, Class clazz) throws IOException { + try { + return fromJsonString(new String(bytes, Charset.forName("utf-8")), clazz); + } catch (Exception e) { + throw new IOException("unable to read commit metadata", e); + } } - private static ObjectMapper getObjectMapper() { + protected static ObjectMapper getObjectMapper() { ObjectMapper mapper = new ObjectMapper(); mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRollingStat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRollingStat.java new file mode 100644 index 000000000..4d51a799c --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRollingStat.java @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.model; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import java.io.Serializable; +import javax.annotation.Nullable; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class HoodieRollingStat implements Serializable { + + private String fileId; + private long inserts; + private long upserts; + private long deletes; + // TODO + @Nullable + private long totalInputWriteBytesToDisk; + @Nullable + private long totalInputWriteBytesOnDisk; + + public HoodieRollingStat() { + // called by jackson json lib + } + + public HoodieRollingStat(String fileId, long inserts, long upserts, long deletes, long totalInputWriteBytesOnDisk) { + this.fileId = fileId; + this.inserts = inserts; + this.upserts = upserts; + this.deletes = deletes; + this.totalInputWriteBytesOnDisk = totalInputWriteBytesOnDisk; + } + + public String getFileId() { + return fileId; + } + + public void setFileId(String fileId) { + this.fileId = fileId; + } + + public long getInserts() { + return inserts; + } + + public void setInserts(long inserts) { + this.inserts = inserts; + } + + public long getUpserts() { + return upserts; + } + + public void setUpserts(long upserts) { + this.upserts = upserts; + } + + public long getDeletes() { + return deletes; + } + + public void setDeletes(long deletes) { + this.deletes = deletes; + } + + public long addInserts(long inserts) { + this.inserts += inserts; + return this.inserts; + } + + public long addUpserts(long upserts) { + this.upserts += upserts; + return this.upserts; + } + + public long addDeletes(long deletes) { + this.deletes += deletes; + return this.deletes; + } + + public long getTotalInputWriteBytesOnDisk() { + return totalInputWriteBytesOnDisk; + } +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRollingStatMetadata.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRollingStatMetadata.java new file mode 100644 index 000000000..ba4b1cfe9 --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRollingStatMetadata.java @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.model; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * This class holds statistics about files belonging to a dataset + */ +public class HoodieRollingStatMetadata implements Serializable { + + private static volatile Logger log = LogManager.getLogger(HoodieRollingStatMetadata.class); + protected Map> partitionToRollingStats; + private String actionType = "DUMMY_ACTION"; + public static final String ROLLING_STAT_METADATA_KEY = "ROLLING_STAT"; + + public void addRollingStat(String partitionPath, HoodieRollingStat stat) { + if (!partitionToRollingStats.containsKey(partitionPath)) { + partitionToRollingStats.put(partitionPath, new RollingStatsHashMap<>()); + } + partitionToRollingStats.get(partitionPath).put(stat.getFileId(), stat); + } + + public HoodieRollingStatMetadata() { + partitionToRollingStats = new HashMap<>(); + } + + public HoodieRollingStatMetadata(String actionType) { + this(); + this.actionType = actionType; + } + + class RollingStatsHashMap extends HashMap { + + @Override + public V put(K key, V value) { + V v = this.get(key); + if (v == null) { + super.put(key, value); + } else if (v instanceof HoodieRollingStat) { + long inserts = ((HoodieRollingStat) v).getInserts(); + long upserts = ((HoodieRollingStat) v).getUpserts(); + long deletes = ((HoodieRollingStat) v).getDeletes(); + ((HoodieRollingStat) value).addInserts(inserts); + ((HoodieRollingStat) value).addUpserts(upserts); + ((HoodieRollingStat) value).addDeletes(deletes); + super.put(key, value); + } + return value; + } + } + + public static HoodieRollingStatMetadata fromBytes(byte[] bytes) throws IOException { + return HoodieCommitMetadata.fromBytes(bytes, HoodieRollingStatMetadata.class); + } + + public String toJsonString() throws IOException { + if (partitionToRollingStats.containsKey(null)) { + log.info("partition path is null for " + partitionToRollingStats.get(null)); + partitionToRollingStats.remove(null); + } + return HoodieCommitMetadata.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this); + } + + public HoodieRollingStatMetadata merge(HoodieRollingStatMetadata rollingStatMetadata) { + for (Map.Entry> stat : rollingStatMetadata.partitionToRollingStats + .entrySet()) { + for (Map.Entry innerStat : stat.getValue().entrySet()) { + this.addRollingStat(stat.getKey(), innerStat.getValue()); + } + } + return this; + } + + public Map> getPartitionToRollingStats() { + return partitionToRollingStats; + } + + public String getActionType() { + return actionType; + } +} diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/TestHoodieCommitMetadata.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/TestHoodieCommitMetadata.java index 5f92e83bf..506416e21 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/model/TestHoodieCommitMetadata.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/TestHoodieCommitMetadata.java @@ -34,7 +34,8 @@ public class TestHoodieCommitMetadata { Assert.assertTrue(commitMetadata.getTotalLogFilesCompacted() > 0); String serializedCommitMetadata = commitMetadata.toJsonString(); - HoodieCommitMetadata metadata = HoodieCommitMetadata.fromJsonString(serializedCommitMetadata); + HoodieCommitMetadata metadata = HoodieCommitMetadata.fromJsonString(serializedCommitMetadata, + HoodieCommitMetadata.class); // Make sure timing metrics are not written to instant file Assert.assertTrue(metadata.getTotalScanTime() == 0); Assert.assertTrue(metadata.getTotalLogFilesCompacted() > 0); diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java index c3d202b5b..55d74d8f1 100644 --- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java @@ -307,7 +307,7 @@ public class HoodieHiveClient { HoodieInstant lastCommit = activeTimeline.lastInstant().orElseThrow( () -> new InvalidDatasetException(syncConfig.basePath)); HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( - activeTimeline.getInstantDetails(lastCommit).get()); + activeTimeline.getInstantDetails(lastCommit).get(), HoodieCommitMetadata.class); String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values() .stream().findAny().orElseThrow(() -> new IllegalArgumentException( "Could not find any data file written for commit " + lastCommit @@ -339,7 +339,7 @@ public class HoodieHiveClient { HoodieInstant lastDeltaInstant = lastDeltaCommit.get(); // read from the log file wrote commitMetadata = HoodieCommitMetadata.fromBytes( - activeTimeline.getInstantDetails(lastDeltaInstant).get()); + activeTimeline.getInstantDetails(lastDeltaInstant).get(), HoodieCommitMetadata.class); filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values() .stream().filter(s -> s.contains(HoodieLogFile.DELTA_EXTENSION)) .findAny().orElseThrow(() -> new IllegalArgumentException( @@ -372,7 +372,7 @@ public class HoodieHiveClient { // Read from the compacted file wrote HoodieCommitMetadata compactionMetadata = HoodieCommitMetadata.fromBytes( - activeTimeline.getInstantDetails(lastCompactionCommit).get()); + activeTimeline.getInstantDetails(lastCompactionCommit).get(), HoodieCommitMetadata.class); String filePath = compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values() .stream().findAny().orElseThrow(() -> new IllegalArgumentException( "Could not find any data file written for compaction " + lastCompactionCommit @@ -539,7 +539,7 @@ public class HoodieHiveClient { Integer.MAX_VALUE); return timelineToSync.getInstants().map(s -> { try { - return HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(s).get()); + return HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(s).get(), HoodieCommitMetadata.class); } catch (IOException e) { throw new HoodieIOException( "Failed to get partitions written since " + lastCommitTimeSynced, e); diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/IncrementalRelation.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/IncrementalRelation.scala index cace267a9..b0014d3b2 100644 --- a/hoodie-spark/src/main/scala/com/uber/hoodie/IncrementalRelation.scala +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/IncrementalRelation.scala @@ -73,7 +73,7 @@ class IncrementalRelation(val sqlContext: SQLContext, // use schema from a file produced in the latest instant val latestSchema = { val latestMeta = HoodieCommitMetadata - .fromBytes(commitTimeline.getInstantDetails(commitsToReturn.last).get) + .fromBytes(commitTimeline.getInstantDetails(commitsToReturn.last).get, classOf[HoodieCommitMetadata]) val metaFilePath = latestMeta.getFileIdAndFullPaths(basePath).values().iterator().next() AvroConversionUtils.convertAvroSchemaToStructType(ParquetUtils.readAvroSchema( sqlContext.sparkContext.hadoopConfiguration, new Path(metaFilePath))) @@ -84,7 +84,8 @@ class IncrementalRelation(val sqlContext: SQLContext, override def buildScan(): RDD[Row] = { val fileIdToFullPath = mutable.HashMap[String, String]() for (commit <- commitsToReturn) { - val metadata: HoodieCommitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit).get) + val metadata: HoodieCommitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit) + .get, classOf[HoodieCommitMetadata]) fileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap } val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path")) diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java index f4619465c..7f58a721e 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -187,7 +187,7 @@ public class HoodieDeltaStreamer implements Serializable { Optional lastCommit = commitTimelineOpt.get().lastInstant(); if (lastCommit.isPresent()) { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( - commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get()); + commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), HoodieCommitMetadata.class); if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null) { resumeCheckpointStr = Optional.of(commitMetadata.getMetadata(CHECKPOINT_KEY)); } else {