diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java index 9703a2414..c5db22f1f 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java @@ -210,9 +210,9 @@ public class HoodieReadClient implements Serializable { HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit).get()); // get files from each commit, and replace any previous versions - fileIdToFullPath.putAll(metadata.getFileIdAndFullPaths()); + String basePath = hoodieTable.getMetaClient().getBasePath(); + fileIdToFullPath.putAll(metadata.getFileIdAndFullPaths(basePath)); } - return sqlContextOpt.get().read() .parquet(fileIdToFullPath.values().toArray(new String[fileIdToFullPath.size()])) .filter(String.format("%s >'%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, lastCommitTimestamp)); @@ -234,11 +234,12 @@ public class HoodieReadClient implements Serializable { } try { - HoodieCommitMetadata commitMetdata = + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commitInstant).get()); - Collection paths = commitMetdata.getFileIdAndFullPaths().values(); + String basePath = hoodieTable.getMetaClient().getBasePath(); + HashMap paths = commitMetadata.getFileIdAndFullPaths(basePath); return sqlContextOpt.get().read() - .parquet(paths.toArray(new String[paths.size()])) + .parquet(paths.values().toArray(new String[paths.size()])) .filter(String.format("%s ='%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitTime)); } catch (Exception e) { throw new HoodieException("Error reading commit " + commitTime, e); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java index e1c86f67c..8ad69ae02 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java @@ -80,6 +80,8 @@ public class HoodieAppendHandle extends HoodieIOH fileSystemView.getLatestDataFilesForFileId(record.getPartitionPath(), fileId) .findFirst().get().getFileName(); String baseCommitTime = FSUtils.getCommitTime(latestValidFilePath); + Path path = new Path(record.getPartitionPath(), + FSUtils.makeDataFileName(commitTime, TaskContext.getPartitionId(), fileId)); writeStatus.getStat().setPrevCommit(baseCommitTime); writeStatus.setFileId(fileId); writeStatus.setPartitionPath(record.getPartitionPath()); @@ -103,7 +105,7 @@ public class HoodieAppendHandle extends HoodieIOH + " on commit " + commitTime + " on HDFS path " + hoodieTable .getMetaClient().getBasePath() + partitionPath, e); } - writeStatus.getStat().setFullPath(currentLogFile.getPath().toString()); + writeStatus.getStat().setPath(path.toString()); } // update the new location of the record, so we know where to find it next record.setNewLocation(new HoodieRecordLocation(commitTime, fileId)); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieInsertHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieInsertHandle.java index 944d38930..8c2eba61d 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieInsertHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieInsertHandle.java @@ -123,12 +123,14 @@ public class HoodieInsertHandle extends HoodieIOH try { storageWriter.close(); + String relativePath = path.toString().replace(new Path(config.getBasePath()) + "/", ""); + HoodieWriteStat stat = new HoodieWriteStat(); stat.setNumWrites(recordsWritten); stat.setNumDeletes(recordsDeleted); stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT); stat.setFileId(status.getFileId()); - stat.setFullPath(path.toString()); + stat.setPath(relativePath); stat.setTotalWriteBytes(FSUtils.getFileSize(fs, path)); stat.setTotalWriteErrors(status.getFailedRecords().size()); status.setStat(stat); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieUpdateHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieUpdateHandle.java index 9f7a1a825..415c6a2eb 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieUpdateHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieUpdateHandle.java @@ -93,9 +93,9 @@ public class HoodieUpdateHandle extends HoodieIO oldFilePath = new Path( config.getBasePath() + "/" + record.getPartitionPath() + "/" + latestValidFilePath); - newFilePath = new Path( - config.getBasePath() + "/" + record.getPartitionPath() + "/" + FSUtils - .makeDataFileName(commitTime, TaskContext.getPartitionId(), fileId)); + String relativePath = new Path( record.getPartitionPath() + "/" + FSUtils + .makeDataFileName(commitTime, TaskContext.getPartitionId(), fileId)).toString(); + newFilePath = new Path(config.getBasePath(), relativePath); // handle cases of partial failures, for update task if (fs.exists(newFilePath)) { @@ -108,7 +108,7 @@ public class HoodieUpdateHandle extends HoodieIO writeStatus.setFileId(fileId); writeStatus.setPartitionPath(record.getPartitionPath()); writeStatus.getStat().setFileId(fileId); - writeStatus.getStat().setFullPath(newFilePath.toString()); + writeStatus.getStat().setPath(relativePath); } keyToNewRecords.put(record.getRecordKey(), record); // update the new location of the record, so we know where to find it next diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index fa478c4d7..1f3b9b8ff 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -114,6 +114,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { for (CompactionWriteStat stat : updateStatusMap) { metadata.addWriteStat(stat.getPartitionPath(), stat); } + log.info("Compaction finished with result " + metadata); //noinspection ConstantConditions diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java index d8e0d4cb7..d6b34fda3 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java @@ -45,6 +45,7 @@ import com.uber.hoodie.table.HoodieTable; import java.util.Map; import org.apache.avro.generic.GenericRecord; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.SparkConf; @@ -60,6 +61,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -101,7 +103,6 @@ public class TestHoodieClient implements Serializable { folder.create(); basePath = folder.getRoot().getAbsolutePath(); HoodieTestUtils.init(basePath); - dataGen = new HoodieTestDataGenerator(); } @@ -616,7 +617,7 @@ public class TestHoodieClient implements Serializable { if (!fileIdToVersions.containsKey(wstat.getFileId())) { fileIdToVersions.put(wstat.getFileId(), new TreeSet<>()); } - fileIdToVersions.get(wstat.getFileId()).add(FSUtils.getCommitTime(new Path(wstat.getFullPath()).getName())); + fileIdToVersions.get(wstat.getFileId()).add(FSUtils.getCommitTime(new Path(wstat.getPath()).getName())); } } @@ -1136,7 +1137,6 @@ public class TestHoodieClient implements Serializable { List hoodieCleanStatsFour = table.clean(jsc); assertEquals("Must not clean any files" , 0, getCleanStat(hoodieCleanStatsFour, partitionPaths[0]).getSuccessDeleteFiles().size()); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "002", file3P0C2)); - } @Test public void testKeepLatestCommits() throws IOException { @@ -1298,6 +1298,47 @@ public class TestHoodieClient implements Serializable { stageOneShuffleReadTaskRecordsCountMap.values().stream().filter(a -> a > 10 && a < 100).count() == 3); } + public void testCommitWritesRelativePaths() throws Exception { + + HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); + HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + FileSystem fs = FSUtils.getFs(); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg); + + String commitTime = "000"; + List records = dataGen.generateInserts(commitTime, 200); + JavaRDD writeRecords = jsc.parallelize(records, 1); + + JavaRDD result = client.bulkInsert(writeRecords, commitTime); + + assertTrue("Commit should succeed", client.commit(commitTime, result)); + assertTrue("After explicit commit, commit file should be created", + HoodieTestUtils.doesCommitExist(basePath, commitTime)); + + // Get parquet file paths from commit metadata + String actionType = table.getCompactedCommitActionType(); + HoodieInstant commitInstant = + new HoodieInstant(false, actionType, commitTime); + HoodieTimeline commitTimeline = table.getCompletedCompactionCommitTimeline(); + HoodieCommitMetadata commitMetadata = + HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commitInstant).get()); + String basePath = table.getMetaClient().getBasePath(); + Collection commitPathNames = commitMetadata.getFileIdAndFullPaths(basePath).values(); + + // Read from commit file + String filename = HoodieTestUtils.getCommitFilePath(basePath, commitTime); + FileInputStream inputStream = new FileInputStream(filename); + String everything = IOUtils.toString(inputStream); + HoodieCommitMetadata metadata = HoodieCommitMetadata.fromJsonString(everything.toString()); + HashMap paths = metadata.getFileIdAndFullPaths(basePath); + inputStream.close(); + + // Compare values in both to make sure they are equal. + for (String pathName: paths.values()) { + assertTrue(commitPathNames.contains(pathName)); + } + private HoodieCleanStat getCleanStat(List hoodieCleanStatsTwo, String partitionPath) { return hoodieCleanStatsTwo.stream() diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java index 753b7c30c..77baed481 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java @@ -129,8 +129,9 @@ public class TestHoodieCompactor { HoodieCompactionMetadata result = compactor.compact(jsc, getConfig(), table); + String basePath = table.getMetaClient().getBasePath(); assertTrue("If there is nothing to compact, result will be empty", - result.getFileIdAndFullPaths().isEmpty()); + result.getFileIdAndFullPaths(basePath).isEmpty()); } @Test diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java index 99e291a41..9e2713e78 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java @@ -19,6 +19,7 @@ package com.uber.hoodie.common.model; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.databind.DeserializationFeature; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.codehaus.jackson.annotate.JsonAutoDetect; @@ -72,17 +73,25 @@ public class HoodieCommitMetadata implements Serializable { return extraMetadataMap.get(metaKey); } - public HashMap getFileIdAndFullPaths() { + public HashMap getFileIdAndRelativePaths() { HashMap filePaths = new HashMap<>(); // list all partitions paths for (Map.Entry> entry: getPartitionToWriteStats().entrySet()) { for (HoodieWriteStat stat: entry.getValue()) { - filePaths.put(stat.getFileId(), stat.getFullPath()); + filePaths.put(stat.getFileId(), stat.getPath()); } } return filePaths; } + public HashMap getFileIdAndFullPaths(String basePath) { + HashMap fullPaths = new HashMap<>(); + HashMap relativePaths = getFileIdAndRelativePaths(); + for (Map.Entry entry: relativePaths.entrySet()) { + Path fullPath = new Path(basePath, entry.getValue()); + fullPaths.put(entry.getKey(), fullPath.toString()); + } return fullPaths; + } public String toJsonString() throws IOException { if(partitionToWriteStats.containsKey(null)) { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java index 0b2417e0b..a56338cc4 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java @@ -35,9 +35,9 @@ public class HoodieWriteStat implements Serializable { private String fileId; /** - * Full path to the file on underlying file system + * Relative path to the file from the base path */ - private String fullPath; + private String path; /** * The previous version of the file. (null if this is the first version. i.e insert) @@ -79,9 +79,7 @@ public class HoodieWriteStat implements Serializable { this.fileId = fileId; } - public void setFullPath(String fullFilePath) { - this.fullPath = fullFilePath; - } + public void setPath(String path) { this.path = path; } public void setPrevCommit(String prevCommit) { this.prevCommit = prevCommit; @@ -131,15 +129,14 @@ public class HoodieWriteStat implements Serializable { return fileId; } - public String getFullPath() { - return fullPath; - } + public String getPath() { return path; } + @Override public String toString() { return new StringBuilder() .append("HoodieWriteStat {") - .append("fullPath='" + fullPath + '\'') + .append("path=" + path) .append(", prevCommit='" + prevCommit + '\'') .append(", numWrites=" + numWrites) .append(", numDeletes=" + numDeletes) @@ -157,7 +154,7 @@ public class HoodieWriteStat implements Serializable { return false; HoodieWriteStat that = (HoodieWriteStat) o; - if (!fullPath.equals(that.fullPath)) + if (!path.equals(that.path)) return false; return prevCommit.equals(that.prevCommit); @@ -165,7 +162,7 @@ public class HoodieWriteStat implements Serializable { @Override public int hashCode() { - int result = fullPath.hashCode(); + int result = path.hashCode(); result = 31 * result + prevCommit.hashCode(); return result; } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java index 194b1b407..7380388d0 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java @@ -116,6 +116,10 @@ public class HoodieTestUtils { return basePath + "/" + partitionPath + "/" + FSUtils.makeDataFileName(commitTime, DEFAULT_TASK_PARTITIONID, fileID); } + public static final String getCommitFilePath(String basePath, String commitTime) throws IOException { + return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitTime + HoodieTimeline.COMMIT_EXTENSION; + } + public static final boolean doesDataFileExist(String basePath, String partitionPath, String commitTime, String fileID) throws IOException { return new File(getDataFilePath(basePath, partitionPath, commitTime, fileID)).exists(); } diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java index deb76369e..4e53505a7 100644 --- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java @@ -313,7 +313,7 @@ public class HoodieHiveClient { .orElseThrow(() -> new InvalidDatasetException(syncConfig.basePath)); HoodieCommitMetadata commitMetadata = HoodieCommitMetadata .fromBytes(activeTimeline.getInstantDetails(lastCommit).get()); - String filePath = commitMetadata.getFileIdAndFullPaths().values().stream().findAny() + String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny() .orElseThrow(() -> new IllegalArgumentException( "Could not find any data file written for commit " + lastCommit + ", could not get schema for dataset " + metaClient.getBasePath())); @@ -340,7 +340,7 @@ public class HoodieHiveClient { // read from the log file wrote commitMetadata = HoodieCommitMetadata .fromBytes(activeTimeline.getInstantDetails(lastDeltaCommit).get()); - filePath = commitMetadata.getFileIdAndFullPaths().values().stream().filter(s -> s.contains( + filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().filter(s -> s.contains( HoodieLogFile.DELTA_EXTENSION)).findAny() .orElseThrow(() -> new IllegalArgumentException( "Could not find any data file written for commit " + lastDeltaCommit @@ -377,7 +377,7 @@ public class HoodieHiveClient { // Read from the compacted file wrote HoodieCompactionMetadata compactionMetadata = HoodieCompactionMetadata .fromBytes(activeTimeline.getInstantDetails(lastCompactionCommit).get()); - String filePath = compactionMetadata.getFileIdAndFullPaths().values().stream().findAny() + String filePath = compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny() .orElseThrow(() -> new IllegalArgumentException( "Could not find any data file written for compaction " + lastCompactionCommit + ", could not get schema for dataset " + metaClient.getBasePath())); diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java index 08ae5aefe..1ea25f662 100644 --- a/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java +++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java @@ -217,12 +217,12 @@ public class TestUtil { for (Entry> wEntry : partitionWriteStats.entrySet()) { String partitionPath = wEntry.getKey(); for (HoodieWriteStat wStat : wEntry.getValue()) { - Path path = new Path(wStat.getFullPath()); + Path path = new Path(wStat.getPath()); HoodieDataFile dataFile = new HoodieDataFile(fileSystem.getFileStatus(path)); HoodieLogFile logFile = generateLogData(path, isLogSchemaSimple); HoodieDeltaWriteStat writeStat = new HoodieDeltaWriteStat(); writeStat.setFileId(dataFile.getFileId()); - writeStat.setFullPath(logFile.getPath().toString()); + writeStat.setPath(logFile.getPath().toString()); commitMetadata.addWriteStat(partitionPath, writeStat); } } @@ -258,7 +258,7 @@ public class TestUtil { generateParquetData(filePath, isParquetSchemaSimple); HoodieWriteStat writeStat = new HoodieWriteStat(); writeStat.setFileId(fileId); - writeStat.setFullPath(filePath.toString()); + writeStat.setPath(filePath.toString()); writeStats.add(writeStat); } return writeStats;