From 19c22b231ed69197fb8c932dab464d3f93ecc9ad Mon Sep 17 00:00:00 2001 From: Nishith Agarwal Date: Tue, 20 Jun 2017 23:50:23 -0700 Subject: [PATCH] 1. Use HoodieLogFormat to archive commits and other actions 2. Introduced avro schema for commits and compactions and an avro wrapper schema --- .../com/uber/hoodie/HoodieWriteClient.java | 23 +- .../hoodie/io/HoodieCommitArchiveLog.java | 248 +- .../common/HoodieTestDataGenerator.java | 16 +- .../hoodie/io/TestHoodieCommitArchiveLog.java | 142 +- hoodie-common/pom.xml | 10 + .../main/avro/HoodieArchivedMetaEntry.avsc | 55 + .../src/main/avro/HoodieCommitMetadata.avsc | 61 + .../main/avro/HoodieCompactionMetadata.avsc | 42 + .../hoodie/common/file/HoodieAppendLog.java | 3685 ----------------- .../uber/hoodie/common/model/ActionType.java | 21 + .../common/model/HoodieArchivedLogFile.java | 40 + .../common/model/HoodieCommitMetadata.java | 8 +- .../hoodie/common/model/HoodieLogFile.java | 5 +- .../table/timeline/HoodieActiveTimeline.java | 9 + .../timeline/HoodieArchivedTimeline.java | 2 +- .../hoodie/common/model/HoodieTestUtils.java | 30 + .../java/com/uber/hoodie/hive/TestUtil.java | 3 +- 17 files changed, 546 insertions(+), 3854 deletions(-) create mode 100644 hoodie-common/src/main/avro/HoodieArchivedMetaEntry.avsc create mode 100644 hoodie-common/src/main/avro/HoodieCommitMetadata.avsc create mode 100644 hoodie-common/src/main/avro/HoodieCompactionMetadata.avsc delete mode 100644 hoodie-common/src/main/java/com/uber/hoodie/common/file/HoodieAppendLog.java create mode 100644 hoodie-common/src/main/java/com/uber/hoodie/common/model/ActionType.java create mode 100644 hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieArchivedLogFile.java diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 1dd3db1d5..85cba85ac 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -53,17 +53,6 @@ import com.uber.hoodie.io.HoodieCommitArchiveLog; import com.uber.hoodie.metrics.HoodieMetrics; import com.uber.hoodie.table.HoodieTable; import com.uber.hoodie.table.WorkloadProfile; -import java.io.IOException; -import java.io.Serializable; -import java.nio.charset.StandardCharsets; -import java.text.ParseException; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -81,6 +70,18 @@ import org.apache.spark.util.LongAccumulator; import scala.Option; import scala.Tuple2; +import java.io.IOException; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.text.ParseException; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + /** * Hoodie Write Client helps you build datasets on HDFS [insert()] and then * perform efficient mutations on a HDFS dataset [upsert()] diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java index d48a2f8e7..5a10f91af 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java @@ -16,33 +16,44 @@ package com.uber.hoodie.io; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Sets; +import com.uber.hoodie.avro.model.HoodieArchivedMetaEntry; +import com.uber.hoodie.avro.model.HoodieCleanMetadata; +import com.uber.hoodie.avro.model.HoodieRollbackMetadata; +import com.uber.hoodie.avro.model.HoodieSavepointMetadata; +import com.uber.hoodie.common.model.ActionType; +import com.uber.hoodie.common.model.HoodieArchivedLogFile; import com.uber.hoodie.common.model.HoodieCommitMetadata; +import com.uber.hoodie.common.model.HoodieCompactionMetadata; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.log.HoodieLogFormat; +import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; import com.uber.hoodie.common.table.timeline.HoodieArchivedTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.util.AvroUtils; import com.uber.hoodie.config.HoodieWriteConfig; -import com.uber.hoodie.common.file.HoodieAppendLog; import com.uber.hoodie.exception.HoodieCommitException; +import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.table.HoodieTable; -import java.util.Optional; -import java.util.function.Function; +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; +import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; - /** - * Log to hold older historical commits, to bound the growth of .commit files + * Archiver to bound the growth of .commit files */ public class HoodieCommitArchiveLog { private static Logger log = LogManager.getLogger(HoodieCommitArchiveLog.class); @@ -50,6 +61,7 @@ public class HoodieCommitArchiveLog { private final Path archiveFilePath; private final FileSystem fs; private final HoodieWriteConfig config; + private HoodieLogFormat.Writer writer; public HoodieCommitArchiveLog(HoodieWriteConfig config, FileSystem fs) { this.fs = fs; @@ -58,132 +70,200 @@ public class HoodieCommitArchiveLog { .getArchiveLogPath(config.getBasePath() + "/" + HoodieTableMetaClient.METAFOLDER_NAME); } + private HoodieLogFormat.Writer openWriter() { + try { + if(this.writer == null) { + return HoodieLogFormat.newWriterBuilder() + .onParentPath(archiveFilePath.getParent()) + .withFileId(archiveFilePath.getName()) + .withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION) + .withFs(fs) + .overBaseCommit("").build(); + } else { + return this.writer; + } + } catch(InterruptedException | IOException e) { + throw new HoodieException("Unable to initialize HoodieLogFormat writer", e); + } + } + + private void close() { + try { + if(this.writer != null) { + this.writer.close(); + } + } catch(IOException e) { + throw new HoodieException("Unable to close HoodieLogFormat writer", e); + } + } + /** * Check if commits need to be archived. If yes, archive commits. */ public boolean archiveIfRequired() { - List commitsToArchive = getCommitsToArchive().collect(Collectors.toList()); - boolean success = true; - if (commitsToArchive.iterator().hasNext()) { - log.info("Archiving commits " + commitsToArchive); - archive(commitsToArchive); - success = deleteInstants(commitsToArchive); - } else { - log.info("No Commits to archive"); + try { + List instantsToArchive = getInstantsToArchive().collect(Collectors.toList()); + boolean success = true; + if (instantsToArchive.iterator().hasNext()) { + this.writer = openWriter(); + log.info("Archiving instants " + instantsToArchive); + archive(instantsToArchive); + success = deleteArchivedInstants(instantsToArchive); + } else { + log.info("No Instants to archive"); + } + return success; + } finally { + close(); } - return success & deleteOtherInstants(); } - private boolean deleteOtherInstants() { - // Delete clean and rollback files - List toDelete = getInstantsToDelete().collect(Collectors.toList()); - if(!toDelete.isEmpty()) { - log.info("Deleting actions " + toDelete); - return deleteInstants(toDelete); - } - return true; - } - - private Stream getInstantsToDelete() { + private Stream getInstantsToArchive() { + // TODO : rename to max/minInstantsToKeep int maxCommitsToKeep = config.getMaxCommitsToKeep(); int minCommitsToKeep = config.getMinCommitsToKeep(); HoodieTable table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); - HoodieTimeline cleanTimeline = table.getActiveTimeline().getTimelineOfActions(Sets.newHashSet(HoodieTimeline.CLEAN_ACTION)); - if (!cleanTimeline.empty() && cleanTimeline.countInstants() > maxCommitsToKeep) { - // Actually do the commits - return cleanTimeline.getInstants().limit(cleanTimeline.countInstants() - minCommitsToKeep); - } - return Stream.empty(); - } - private Stream getCommitsToArchive() { + // GroupBy each action and limit each action timeline to maxCommitsToKeep + HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline().getTimelineOfActions(Sets.newHashSet(HoodieTimeline.CLEAN_ACTION, + HoodieTimeline.ROLLBACK_ACTION)); + Stream instants = cleanAndRollbackTimeline.getInstants() + .collect(Collectors.groupingBy(s -> s.getAction())) + .entrySet() + .stream() + .map(i -> { + if (i.getValue().size() > maxCommitsToKeep) { + return i.getValue().subList(0, i.getValue().size() - minCommitsToKeep); + } else { + return new ArrayList(); + } + }) + .flatMap(i -> i.stream()); - int maxCommitsToKeep = config.getMaxCommitsToKeep(); - int minCommitsToKeep = config.getMinCommitsToKeep(); - - HoodieTable table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); + //TODO (na) : Add a way to return actions associated with a timeline and then merge/unify with logic above to avoid Stream.concats HoodieTimeline commitTimeline = table.getCompletedCommitTimeline(); // We cannot have any holes in the commit timeline. We cannot archive any commits which are made after the first savepoint present. Optional firstSavepoint = table.getCompletedSavepointTimeline().firstInstant(); if (!commitTimeline.empty() && commitTimeline.countInstants() > maxCommitsToKeep) { // Actually do the commits - return commitTimeline.getInstants().filter(s -> { + instants = Stream.concat(instants, commitTimeline.getInstants().filter(s -> { // if no savepoint present, then dont filter return !(firstSavepoint.isPresent() && HoodieTimeline .compareTimestamps(firstSavepoint.get().getTimestamp(), s.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL)); - }).limit(commitTimeline.countInstants() - minCommitsToKeep); + }).limit(commitTimeline.countInstants() - minCommitsToKeep)); } - return Stream.empty(); + + return instants; } - private boolean deleteInstants(List commitsToArchive) { - log.info("Deleting instant " + commitsToArchive); + private boolean deleteArchivedInstants(List archivedInstants) { + log.info("Deleting instants " + archivedInstants); HoodieTableMetaClient metaClient = - new HoodieTableMetaClient(fs, config.getBasePath(), true); + new HoodieTableMetaClient(fs, config.getBasePath(), true); boolean success = true; - for (HoodieInstant commitToArchive : commitsToArchive) { + for (HoodieInstant archivedInstant : archivedInstants) { Path commitFile = - new Path(metaClient.getMetaPath(), commitToArchive.getFileName()); + new Path(metaClient.getMetaPath(), archivedInstant.getFileName()); try { if (fs.exists(commitFile)) { success &= fs.delete(commitFile, false); log.info("Archived and deleted instant file " + commitFile); } } catch (IOException e) { - throw new HoodieIOException("Failed to delete archived instant " + commitToArchive, + throw new HoodieIOException("Failed to delete archived instant " + archivedInstant, e); } } return success; } - private HoodieAppendLog.Writer openWriter() throws IOException { - log.info("Opening archive file at path: " + archiveFilePath); - return HoodieAppendLog - .createWriter(fs.getConf(), HoodieAppendLog.Writer.file(archiveFilePath), - HoodieAppendLog.Writer.keyClass(Text.class), - HoodieAppendLog.Writer.appendIfExists(true), - HoodieAppendLog.Writer.valueClass(Text.class), HoodieAppendLog.Writer - .compression(HoodieAppendLog.CompressionType.RECORD, new BZip2Codec())); - } + public void archive(List instants) throws HoodieCommitException { - private void archive(List commits) throws HoodieCommitException { - HoodieTableMetaClient metaClient = - new HoodieTableMetaClient(fs, config.getBasePath(), true); - HoodieTimeline commitTimeline = - metaClient.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants(); - - HoodieAppendLog.Writer writer = null; try { - writer = openWriter(); - for (HoodieInstant commitTime : commits) { - Text k = new Text(commitTime.getTimestamp()); - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata - .fromBytes(commitTimeline.getInstantDetails(commitTime).get()); - Text v = new Text(commitMetadata.toJsonString()); - writer.append(k, v); - log.info("Wrote " + k); - } - } catch (IOException e) { - throw new HoodieCommitException("Could not archive commits " + commits, e); - } finally { - if (writer != null) { - try { - writer.hsync(); - writer.close(); - } catch (IOException e) { - throw new HoodieCommitException( - "Could not close the archive commits writer " + commits, e); - } + HoodieTableMetaClient metaClient = + new HoodieTableMetaClient(fs, config.getBasePath(), true); + HoodieTimeline commitTimeline = + metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants(); + + Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema(); + log.info("Wrapper schema " + wrapperSchema.toString()); + List records = new ArrayList<>(); + for (HoodieInstant hoodieInstant : instants) { + records.add(convertToAvroRecord(commitTimeline, hoodieInstant)); } + HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, wrapperSchema); + this.writer = writer.appendBlock(block); + } catch(Exception e) { + throw new HoodieCommitException("Failed to archive commits", e); } } public Path getArchiveFilePath() { return archiveFilePath; } + + private IndexedRecord convertToAvroRecord(HoodieTimeline commitTimeline, HoodieInstant hoodieInstant) throws IOException { + HoodieArchivedMetaEntry archivedMetaWrapper = new HoodieArchivedMetaEntry(); + archivedMetaWrapper.setCommitTime(hoodieInstant.getTimestamp()); + switch(hoodieInstant.getAction()) { + case HoodieTimeline.CLEAN_ACTION:{ + archivedMetaWrapper.setHoodieCleanMetadata(AvroUtils.deserializeAvroMetadata(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieCleanMetadata.class)); + archivedMetaWrapper.setActionType(ActionType.clean.name()); + break; + } + case HoodieTimeline.COMMIT_ACTION:{ + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata + .fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get()); + archivedMetaWrapper.setHoodieCommitMetadata(commitMetadataConverter(commitMetadata)); + archivedMetaWrapper.setActionType(ActionType.commit.name()); + break; + } + case HoodieTimeline.COMPACTION_ACTION:{ + com.uber.hoodie.common.model.HoodieCompactionMetadata compactionMetadata = com.uber.hoodie.common.model.HoodieCompactionMetadata + .fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get()); + archivedMetaWrapper.setHoodieCompactionMetadata(compactionMetadataConverter(compactionMetadata)); + archivedMetaWrapper.setActionType(ActionType.compaction.name()); + break; + } + case HoodieTimeline.ROLLBACK_ACTION:{ + archivedMetaWrapper.setHoodieRollbackMetadata(AvroUtils.deserializeAvroMetadata(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieRollbackMetadata.class)); + archivedMetaWrapper.setActionType(ActionType.rollback.name()); + break; + } + case HoodieTimeline.SAVEPOINT_ACTION:{ + archivedMetaWrapper.setHoodieSavePointMetadata(AvroUtils.deserializeAvroMetadata(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieSavepointMetadata.class)); + archivedMetaWrapper.setActionType(ActionType.savepoint.name()); + break; + } + case HoodieTimeline.DELTA_COMMIT_ACTION:{ + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata + .fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get()); + archivedMetaWrapper.setHoodieCommitMetadata(commitMetadataConverter(commitMetadata)); + archivedMetaWrapper.setActionType(ActionType.commit.name()); + break; + } + } + return archivedMetaWrapper; + } + + private com.uber.hoodie.avro.model.HoodieCommitMetadata commitMetadataConverter(HoodieCommitMetadata hoodieCommitMetadata) { + ObjectMapper mapper = new ObjectMapper(); + //Need this to ignore other public get() methods + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + com.uber.hoodie.avro.model.HoodieCommitMetadata avroMetaData = + mapper.convertValue(hoodieCommitMetadata, com.uber.hoodie.avro.model.HoodieCommitMetadata.class); + return avroMetaData; + } + + private com.uber.hoodie.avro.model.HoodieCompactionMetadata compactionMetadataConverter(HoodieCompactionMetadata hoodieCompactionMetadata) { + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + com.uber.hoodie.avro.model.HoodieCompactionMetadata avroMetaData = mapper.convertValue(hoodieCompactionMetadata, + com.uber.hoodie.avro.model.HoodieCompactionMetadata.class); + return avroMetaData; + } } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java index 858b781f5..fe9c9fd49 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java @@ -16,16 +16,17 @@ package com.uber.hoodie.common; +import com.uber.hoodie.avro.model.HoodieCleanMetadata; +import com.uber.hoodie.common.model.HoodieCleaningPolicy; import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodiePartitionMetadata; import com.uber.hoodie.common.model.HoodieRecord; -import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.util.AvroUtils; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; - import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -35,7 +36,12 @@ import org.apache.hadoop.fs.Path; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.Random; +import java.util.UUID; /** * Class to be used in tests to keep generating test inserts and updates against a corpus. @@ -209,9 +215,7 @@ public class HoodieTestDataGenerator { } } - - - public String[] getPartitionPaths() { + public String[] getPartitionPaths() { return partitionPaths; } } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java index 64ea24b03..2934f9795 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java @@ -16,28 +16,31 @@ package com.uber.hoodie.io; -import com.google.common.collect.Lists; +import com.uber.hoodie.avro.model.HoodieArchivedMetaEntry; +import com.uber.hoodie.common.HoodieTestDataGenerator; +import com.uber.hoodie.common.model.HoodieArchivedLogFile; +import com.uber.hoodie.common.model.HoodieLogFile; +import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.log.HoodieLogFormat; +import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; import com.uber.hoodie.common.table.timeline.HoodieInstant; -import com.uber.hoodie.config.HoodieWriteConfig; -import com.uber.hoodie.common.HoodieTestDataGenerator; -import com.uber.hoodie.common.model.HoodieCommitMetadata; -import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieCompactionConfig; +import com.uber.hoodie.config.HoodieWriteConfig; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; +import org.apache.hadoop.fs.Path; import org.junit.Before; import org.junit.Test; import org.junit.rules.TemporaryFolder; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import java.util.SortedMap; -import java.util.TreeMap; import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; @@ -66,6 +69,75 @@ public class TestHoodieCommitArchiveLog { assertTrue(result); } + @Test + public void testArchiveDatasetWithArchival() throws IOException { + HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 4).build()) + .forTable("test-trip-table").build(); + HoodieTestUtils.init(basePath); + HoodieTestDataGenerator.createCommitFile(basePath, "100"); + HoodieTestDataGenerator.createCommitFile(basePath, "101"); + HoodieTestDataGenerator.createCommitFile(basePath, "102"); + HoodieTestDataGenerator.createCommitFile(basePath, "103"); + HoodieTestDataGenerator.createCommitFile(basePath, "104"); + HoodieTestDataGenerator.createCommitFile(basePath, "105"); + + HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath); + HoodieTimeline timeline = + metadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants(); + + assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants()); + + HoodieTestUtils.createCleanFiles(basePath, "100"); + HoodieTestUtils.createCleanFiles(basePath, "101"); + HoodieTestUtils.createCleanFiles(basePath, "102"); + HoodieTestUtils.createCleanFiles(basePath, "103"); + HoodieTestUtils.createCleanFiles(basePath, "104"); + HoodieTestUtils.createCleanFiles(basePath, "105"); + + //reload the timeline and get all the commmits before archive + timeline = metadata.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants(); + List originalCommits = timeline.getInstants().collect(Collectors.toList()); + + assertEquals("Loaded 6 commits and the count should match", 12, timeline.countInstants()); + + HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, fs); + + assertTrue(archiveLog.archiveIfRequired()); + + //reload the timeline and remove the remaining commits + timeline = metadata.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants(); + originalCommits.removeAll(timeline.getInstants().collect(Collectors.toList())); + + //read the file + HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(FSUtils.getFs(), + new HoodieLogFile(new Path(basePath + "/.hoodie/.commits_.archive.1")), HoodieArchivedMetaEntry.getClassSchema()); + + int archivedRecordsCount = 0; + List readRecords = new ArrayList<>(); + //read the avro blocks and validate the number of records written in each avro block + while(reader.hasNext()) { + HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next(); + List records = blk.getRecords(); + readRecords.addAll(records); + assertEquals("Archived and read records for each block are same", 8, records.size()); + archivedRecordsCount += records.size(); + } + assertEquals("Total archived records and total read records are the same count", 8, archivedRecordsCount); + + //make sure the archived commits are the same as the (originalcommits - commitsleft) + List readCommits = readRecords.stream().map(r -> (GenericRecord)r).map(r -> { + return r.get("commitTime").toString(); + }).collect(Collectors.toList()); + Collections.sort(readCommits); + + assertEquals( + "Read commits map should match the originalCommits - commitsLoadedFromArchival", + originalCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()), + readCommits); + } + @Test public void testArchiveDatasetWithNoArchival() throws IOException { HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) @@ -91,58 +163,6 @@ public class TestHoodieCommitArchiveLog { timeline.countInstants()); } - @Test - public void testArchiveDatasetWithArchival() throws IOException { - HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) - .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) - .forTable("test-trip-table").withCompactionConfig( - HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build(); - HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath); - HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, fs); - HoodieTestDataGenerator.createCommitFile(basePath, "100"); - HoodieTestDataGenerator.createCommitFile(basePath, "101"); - HoodieTestDataGenerator.createCommitFile(basePath, "102"); - HoodieTestDataGenerator.createCommitFile(basePath, "103"); - HoodieTestDataGenerator.createCommitFile(basePath, "104"); - HoodieTestDataGenerator.createCommitFile(basePath, "105"); - - HoodieTimeline timeline = - metadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants(); - List originalCommits = timeline.getInstants().collect(Collectors.toList()); - - assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants()); - boolean result = archiveLog.archiveIfRequired(); - assertTrue(result); - timeline = - metadata.getActiveTimeline().reload().getCommitsAndCompactionsTimeline().filterCompletedInstants(); - assertEquals( - "Should archive commits when maxCommitsToKeep is 5 and now the commits length should be minCommitsToKeep which is 2", - 2, timeline.countInstants()); - assertEquals("Archive should not archive the last 2 commits", - Lists.newArrayList("104", "105"), - timeline.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList())); - - // Remove all the commits from the original commits, make it ready to be checked against the read map - timeline.getInstants().forEach(originalCommits::remove); - - // Read back the commits to make sure - SequenceFile.Reader reader = new SequenceFile.Reader(fs.getConf(), - SequenceFile.Reader.file(archiveLog.getArchiveFilePath())); - Text key = new Text(); - Text val = new Text(); - SortedMap readCommits = new TreeMap<>(); - while (reader.next(key, val)) { - HoodieCommitMetadata meta = HoodieCommitMetadata.fromJsonString(val.toString()); - readCommits.put(key.toString(), meta); - } - - assertEquals( - "Read commits map should match the originalCommits - commitsLoadedAfterArchival", - originalCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()), - new ArrayList<>(readCommits.keySet())); - reader.close(); - } - @Test public void testArchiveCommitSafety() throws IOException { HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) diff --git a/hoodie-common/pom.xml b/hoodie-common/pom.xml index 517f3f910..578711b30 100644 --- a/hoodie-common/pom.xml +++ b/hoodie-common/pom.xml @@ -50,6 +50,16 @@ org.apache.avro avro-maven-plugin + + + + ${basedir}/src/main/avro/HoodieCommitMetadata.avsc + ${basedir}/src/main/avro/HoodieSavePointMetadata.avsc + ${basedir}/src/main/avro/HoodieCompactionMetadata.avsc + ${basedir}/src/main/avro/HoodieCleanMetadata.avsc + ${basedir}/src/main/avro/HoodieRollbackMetadata.avsc + + diff --git a/hoodie-common/src/main/avro/HoodieArchivedMetaEntry.avsc b/hoodie-common/src/main/avro/HoodieArchivedMetaEntry.avsc new file mode 100644 index 000000000..86376518d --- /dev/null +++ b/hoodie-common/src/main/avro/HoodieArchivedMetaEntry.avsc @@ -0,0 +1,55 @@ +{ + "type":"record", + "name":"HoodieArchivedMetaEntry", + "namespace":"com.uber.hoodie.avro.model", + "fields":[ + { + "name":"hoodieCommitMetadata", + "type":[ + "null", + "HoodieCommitMetadata" + ], + "default": "null" + }, + { + "name":"hoodieCleanMetadata", + "type":[ + "null", + "HoodieCleanMetadata" + ], + "default": "null" + }, + { + "name":"hoodieCompactionMetadata", + "type":[ + "null", + "HoodieCompactionMetadata" + ], + "default": "null" + }, + { + "name":"hoodieRollbackMetadata", + "type":[ + "null", + "HoodieRollbackMetadata" + ], + "default": "null" + }, + { + "name":"hoodieSavePointMetadata", + "type":[ + "null", + "HoodieSavepointMetadata" + ], + "default": "null" + }, + { + "name":"commitTime", + "type":["null","string"] + }, + { + "name":"actionType", + "type":["null","string"] + } + ] +} \ No newline at end of file diff --git a/hoodie-common/src/main/avro/HoodieCommitMetadata.avsc b/hoodie-common/src/main/avro/HoodieCommitMetadata.avsc new file mode 100644 index 000000000..b8eced141 --- /dev/null +++ b/hoodie-common/src/main/avro/HoodieCommitMetadata.avsc @@ -0,0 +1,61 @@ +{ + "namespace":"com.uber.hoodie.avro.model", + "type":"record", + "name":"HoodieCommitMetadata", + "fields":[ + { + "name":"partitionToWriteStats", + "type":["null", { + "type":"map", + "values":{ + "type":"array", + "items":{ + "name":"HoodieWriteStat", + "type":"record", + "fields":[ + { + "name":"fileId", + "type":["null","string"] + }, + { + "name":"path", + "type":["null","string"] + }, + { + "name":"prevCommit", + "type":["null","string"] + }, + { + "name":"numWrites", + "type":["null","long"] + }, + { + "name":"numDeletes", + "type":["null","long"] + }, + { + "name":"numUpdateWrites", + "type":["null","long"] + }, + { + "name":"totalWriteBytes", + "type":["null","long"] + }, + { + "name":"totalWriteErrors", + "type":["null","long"] + } + ] + } + } + }] + }, + { + "name":"extraMetadata", + "type":["null", { + "type":"map", + "values":"string" + }] + } + ] +} \ No newline at end of file diff --git a/hoodie-common/src/main/avro/HoodieCompactionMetadata.avsc b/hoodie-common/src/main/avro/HoodieCompactionMetadata.avsc new file mode 100644 index 000000000..92e774d78 --- /dev/null +++ b/hoodie-common/src/main/avro/HoodieCompactionMetadata.avsc @@ -0,0 +1,42 @@ +{ + "namespace":"com.uber.hoodie.avro.model", + "type":"record", + "name":"HoodieCompactionMetadata", + "fields":[ + { + "name":"partitionToCompactionWriteStats", + "type": ["null", { + "type":"map", + "values":{ + "type":"array", + "items":{ + "name":"HoodieCompactionWriteStat", + "type":"record", + "fields":[ + { + "name":"partitionPath", + "type":["null","string"] + }, + { + "name":"totalLogRecords", + "type":["null","long"] + }, + { + "name":"totalLogFiles", + "type":["null","long"] + }, + { + "name":"totalRecordsToBeUpdate", + "type":["null","long"] + }, + { + "name":"hoodieWriteStat", + "type":["null","HoodieWriteStat"] + } + ] + } + } + }] + } + ] +} \ No newline at end of file diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/file/HoodieAppendLog.java b/hoodie-common/src/main/java/com/uber/hoodie/common/file/HoodieAppendLog.java deleted file mode 100644 index 5effd47fa..000000000 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/file/HoodieAppendLog.java +++ /dev/null @@ -1,3685 +0,0 @@ -/* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.uber.hoodie.common.file; - - -import java.io.*; -import java.util.*; -import java.rmi.server.UID; -import java.security.MessageDigest; - -import org.apache.commons.logging.*; -import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.RawComparator; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.UTF8; -import org.apache.hadoop.io.VersionMismatchException; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.io.WritableName; -import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.util.Options; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.Options.CreateOpts; -import org.apache.hadoop.io.compress.CodecPool; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionInputStream; -import org.apache.hadoop.io.compress.CompressionOutputStream; -import org.apache.hadoop.io.compress.Compressor; -import org.apache.hadoop.io.compress.Decompressor; -import org.apache.hadoop.io.compress.DefaultCodec; -import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.hadoop.io.compress.zlib.ZlibFactory; -import org.apache.hadoop.io.serializer.Deserializer; -import org.apache.hadoop.io.serializer.Serializer; -import org.apache.hadoop.io.serializer.SerializationFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.*; -import org.apache.hadoop.util.Progressable; -import org.apache.hadoop.util.Progress; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.NativeCodeLoader; -import org.apache.hadoop.util.MergeSort; -import org.apache.hadoop.util.PriorityQueue; -import org.apache.hadoop.util.Time; - -@InterfaceAudience.Public -@InterfaceStability.Stable -public class HoodieAppendLog { - private static final Log LOG = LogFactory.getLog(HoodieAppendLog.class); - - private HoodieAppendLog() { - } // no public ctor - - private static final byte BLOCK_COMPRESS_VERSION = (byte) 4; - private static final byte CUSTOM_COMPRESS_VERSION = (byte) 5; - private static final byte VERSION_WITH_METADATA = (byte) 6; - private static byte[] VERSION = new byte[]{ - (byte) 'S', (byte) 'E', (byte) 'Q', VERSION_WITH_METADATA - }; - - private static final int SYNC_ESCAPE = -1; // "length" of sync entries - private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash - private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE; // escape + hash - - /** The number of bytes between sync points.*/ - public static final int SYNC_INTERVAL = 100 * SYNC_SIZE; - - /** - * The compression type used to compress key/value pairs in the - * {@link HoodieAppendLog}. - * - * @see HoodieAppendLog.Writer - */ - public static enum CompressionType { - /** Do not compress records. */ - NONE, - /** Compress values only, each separately. */ - RECORD, - /** Compress sequences of records together in blocks. */ - BLOCK - } - - /** - * Get the compression type for the reduce outputs - * @param job the job config to look in - * @return the kind of compression to use - */ - static public CompressionType getDefaultCompressionType(Configuration job) { - String name = job.get("io.seqfile.compression.type"); - return name == null ? CompressionType.RECORD : - CompressionType.valueOf(name); - } - - /** - * Set the default compression type for sequence files. - * @param job the configuration to modify - * @param val the new compression type (none, block, record) - */ - static public void setDefaultCompressionType(Configuration job, - CompressionType val) { - job.set("io.seqfile.compression.type", val.toString()); - } - - /** - * Create a new Writer with the given options. - * @param conf the configuration to use - * @param opts the options to create the file with - * @return a new Writer - * @throws IOException - */ - public static Writer createWriter(Configuration conf, Writer.Option... opts - ) throws IOException { - Writer.CompressionOption compressionOption = - Options.getOption(Writer.CompressionOption.class, opts); - CompressionType kind; - if (compressionOption != null) { - kind = compressionOption.getValue(); - } else { - kind = getDefaultCompressionType(conf); - opts = Options.prependOptions(opts, Writer.compression(kind)); - } - switch (kind) { - default: - case NONE: - return new Writer(conf, opts); - case RECORD: - return new RecordCompressWriter(conf, opts); - case BLOCK: - return new BlockCompressWriter(conf, opts); - } - } - - /** - * Construct the preferred type of SequenceFile Writer. - * @param fs The configured filesystem. - * @param conf The configuration. - * @param name The name of the file. - * @param keyClass The 'key' type. - * @param valClass The 'value' type. - * @return Returns the handle to the constructed SequenceFile Writer. - * @throws IOException - * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} - * instead. - */ - @Deprecated - public static Writer - createWriter(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass) throws IOException { - return createWriter(conf, Writer.filesystem(fs), - Writer.file(name), Writer.keyClass(keyClass), - Writer.valueClass(valClass)); - } - - /** - * Construct the preferred type of SequenceFile Writer. - * @param fs The configured filesystem. - * @param conf The configuration. - * @param name The name of the file. - * @param keyClass The 'key' type. - * @param valClass The 'value' type. - * @param compressionType The compression type. - * @return Returns the handle to the constructed SequenceFile Writer. - * @throws IOException - * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} - * instead. - */ - @Deprecated - public static Writer - createWriter(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, - CompressionType compressionType) throws IOException { - return createWriter(conf, Writer.filesystem(fs), - Writer.file(name), Writer.keyClass(keyClass), - Writer.valueClass(valClass), - Writer.compression(compressionType)); - } - - /** - * Construct the preferred type of SequenceFile Writer. - * @param fs The configured filesystem. - * @param conf The configuration. - * @param name The name of the file. - * @param keyClass The 'key' type. - * @param valClass The 'value' type. - * @param compressionType The compression type. - * @param progress The Progressable object to track progress. - * @return Returns the handle to the constructed SequenceFile Writer. - * @throws IOException - * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} - * instead. - */ - @Deprecated - public static Writer - createWriter(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, CompressionType compressionType, - Progressable progress) throws IOException { - return createWriter(conf, Writer.file(name), - Writer.filesystem(fs), - Writer.keyClass(keyClass), - Writer.valueClass(valClass), - Writer.compression(compressionType), - Writer.progressable(progress)); - } - - /** - * Construct the preferred type of SequenceFile Writer. - * @param fs The configured filesystem. - * @param conf The configuration. - * @param name The name of the file. - * @param keyClass The 'key' type. - * @param valClass The 'value' type. - * @param compressionType The compression type. - * @param codec The compression codec. - * @return Returns the handle to the constructed SequenceFile Writer. - * @throws IOException - * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} - * instead. - */ - @Deprecated - public static Writer - createWriter(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, CompressionType compressionType, - CompressionCodec codec) throws IOException { - return createWriter(conf, Writer.file(name), - Writer.filesystem(fs), - Writer.keyClass(keyClass), - Writer.valueClass(valClass), - Writer.compression(compressionType, codec)); - } - - /** - * Construct the preferred type of SequenceFile Writer. - * @param fs The configured filesystem. - * @param conf The configuration. - * @param name The name of the file. - * @param keyClass The 'key' type. - * @param valClass The 'value' type. - * @param compressionType The compression type. - * @param codec The compression codec. - * @param progress The Progressable object to track progress. - * @param metadata The metadata of the file. - * @return Returns the handle to the constructed SequenceFile Writer. - * @throws IOException - * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} - * instead. - */ - @Deprecated - public static Writer - createWriter(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, - CompressionType compressionType, CompressionCodec codec, - Progressable progress, Metadata metadata) throws IOException { - return createWriter(conf, Writer.file(name), - Writer.filesystem(fs), - Writer.keyClass(keyClass), - Writer.valueClass(valClass), - Writer.compression(compressionType, codec), - Writer.progressable(progress), - Writer.metadata(metadata)); - } - - /** - * Construct the preferred type of SequenceFile Writer. - * @param fs The configured filesystem. - * @param conf The configuration. - * @param name The name of the file. - * @param keyClass The 'key' type. - * @param valClass The 'value' type. - * @param bufferSize buffer size for the underlaying outputstream. - * @param replication replication factor for the file. - * @param blockSize block size for the file. - * @param compressionType The compression type. - * @param codec The compression codec. - * @param progress The Progressable object to track progress. - * @param metadata The metadata of the file. - * @return Returns the handle to the constructed SequenceFile Writer. - * @throws IOException - * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} - * instead. - */ - @Deprecated - public static Writer - createWriter(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, int bufferSize, - short replication, long blockSize, - CompressionType compressionType, CompressionCodec codec, - Progressable progress, Metadata metadata) throws IOException { - return createWriter(conf, Writer.file(name), - Writer.filesystem(fs), - Writer.keyClass(keyClass), - Writer.valueClass(valClass), - Writer.bufferSize(bufferSize), - Writer.replication(replication), - Writer.blockSize(blockSize), - Writer.compression(compressionType, codec), - Writer.progressable(progress), - Writer.metadata(metadata)); - } - - /** - * Construct the preferred type of SequenceFile Writer. - * @param fs The configured filesystem. - * @param conf The configuration. - * @param name The name of the file. - * @param keyClass The 'key' type. - * @param valClass The 'value' type. - * @param bufferSize buffer size for the underlaying outputstream. - * @param replication replication factor for the file. - * @param blockSize block size for the file. - * @param createParent create parent directory if non-existent - * @param compressionType The compression type. - * @param codec The compression codec. - * @param metadata The metadata of the file. - * @return Returns the handle to the constructed SequenceFile Writer. - * @throws IOException - */ - @Deprecated - public static Writer - createWriter(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, int bufferSize, - short replication, long blockSize, boolean createParent, - CompressionType compressionType, CompressionCodec codec, - Metadata metadata) throws IOException { - return createWriter(FileContext.getFileContext(fs.getUri(), conf), - conf, name, keyClass, valClass, compressionType, codec, - metadata, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), - CreateOpts.bufferSize(bufferSize), - createParent ? CreateOpts.createParent() - : CreateOpts.donotCreateParent(), - CreateOpts.repFac(replication), - CreateOpts.blockSize(blockSize) - ); - } - - /** - * Construct the preferred type of SequenceFile Writer. - * @param fc The context for the specified file. - * @param conf The configuration. - * @param name The name of the file. - * @param keyClass The 'key' type. - * @param valClass The 'value' type. - * @param compressionType The compression type. - * @param codec The compression codec. - * @param metadata The metadata of the file. - * @param createFlag gives the semantics of create: overwrite, append etc. - * @param opts file creation options; see {@link CreateOpts}. - * @return Returns the handle to the constructed SequenceFile Writer. - * @throws IOException - */ - public static Writer - createWriter(FileContext fc, Configuration conf, Path name, - Class keyClass, Class valClass, - CompressionType compressionType, CompressionCodec codec, - Metadata metadata, - final EnumSet createFlag, CreateOpts... opts) - throws IOException { - return createWriter(conf, fc.create(name, createFlag, opts), - keyClass, valClass, compressionType, codec, metadata).ownStream(); - } - - /** - * Construct the preferred type of SequenceFile Writer. - * @param fs The configured filesystem. - * @param conf The configuration. - * @param name The name of the file. - * @param keyClass The 'key' type. - * @param valClass The 'value' type. - * @param compressionType The compression type. - * @param codec The compression codec. - * @param progress The Progressable object to track progress. - * @return Returns the handle to the constructed SequenceFile Writer. - * @throws IOException - * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} - * instead. - */ - @Deprecated - public static Writer - createWriter(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, - CompressionType compressionType, CompressionCodec codec, - Progressable progress) throws IOException { - return createWriter(conf, Writer.file(name), - Writer.filesystem(fs), - Writer.keyClass(keyClass), - Writer.valueClass(valClass), - Writer.compression(compressionType, codec), - Writer.progressable(progress)); - } - - /** - * Construct the preferred type of 'raw' SequenceFile Writer. - * @param conf The configuration. - * @param out The stream on top which the writer is to be constructed. - * @param keyClass The 'key' type. - * @param valClass The 'value' type. - * @param compressionType The compression type. - * @param codec The compression codec. - * @param metadata The metadata of the file. - * @return Returns the handle to the constructed SequenceFile Writer. - * @throws IOException - * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} - * instead. - */ - @Deprecated - public static Writer - createWriter(Configuration conf, FSDataOutputStream out, - Class keyClass, Class valClass, - CompressionType compressionType, - CompressionCodec codec, Metadata metadata) throws IOException { - return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), - Writer.valueClass(valClass), - Writer.compression(compressionType, codec), - Writer.metadata(metadata)); - } - - /** - * Construct the preferred type of 'raw' SequenceFile Writer. - * @param conf The configuration. - * @param out The stream on top which the writer is to be constructed. - * @param keyClass The 'key' type. - * @param valClass The 'value' type. - * @param compressionType The compression type. - * @param codec The compression codec. - * @return Returns the handle to the constructed SequenceFile Writer. - * @throws IOException - * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} - * instead. - */ - @Deprecated - public static Writer - createWriter(Configuration conf, FSDataOutputStream out, - Class keyClass, Class valClass, CompressionType compressionType, - CompressionCodec codec) throws IOException { - return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), - Writer.valueClass(valClass), - Writer.compression(compressionType, codec)); - } - - - /** The interface to 'raw' values of SequenceFiles. */ - public static interface ValueBytes { - - /** Writes the uncompressed bytes to the outStream. - * @param outStream : Stream to write uncompressed bytes into. - * @throws IOException - */ - public void writeUncompressedBytes(DataOutputStream outStream) - throws IOException; - - /** Write compressed bytes to outStream. - * Note: that it will NOT compress the bytes if they are not compressed. - * @param outStream : Stream to write compressed bytes into. - */ - public void writeCompressedBytes(DataOutputStream outStream) - throws IllegalArgumentException, IOException; - - /** - * Size of stored data. - */ - public int getSize(); - } - - private static class UncompressedBytes implements ValueBytes { - private int dataSize; - private byte[] data; - - private UncompressedBytes() { - data = null; - dataSize = 0; - } - - private void reset(DataInputStream in, int length) throws IOException { - if (data == null) { - data = new byte[length]; - } else if (length > data.length) { - data = new byte[Math.max(length, data.length * 2)]; - } - dataSize = -1; - in.readFully(data, 0, length); - dataSize = length; - } - - @Override - public int getSize() { - return dataSize; - } - - @Override - public void writeUncompressedBytes(DataOutputStream outStream) - throws IOException { - outStream.write(data, 0, dataSize); - } - - @Override - public void writeCompressedBytes(DataOutputStream outStream) - throws IllegalArgumentException, IOException { - throw - new IllegalArgumentException("UncompressedBytes cannot be compressed!"); - } - - } // UncompressedBytes - - private static class CompressedBytes implements ValueBytes { - private int dataSize; - private byte[] data; - DataInputBuffer rawData = null; - CompressionCodec codec = null; - CompressionInputStream decompressedStream = null; - - private CompressedBytes(CompressionCodec codec) { - data = null; - dataSize = 0; - this.codec = codec; - } - - private void reset(DataInputStream in, int length) throws IOException { - if (data == null) { - data = new byte[length]; - } else if (length > data.length) { - data = new byte[Math.max(length, data.length * 2)]; - } - dataSize = -1; - in.readFully(data, 0, length); - dataSize = length; - } - - @Override - public int getSize() { - return dataSize; - } - - @Override - public void writeUncompressedBytes(DataOutputStream outStream) - throws IOException { - if (decompressedStream == null) { - rawData = new DataInputBuffer(); - decompressedStream = codec.createInputStream(rawData); - } else { - decompressedStream.resetState(); - } - rawData.reset(data, 0, dataSize); - - byte[] buffer = new byte[8192]; - int bytesRead = 0; - while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) { - outStream.write(buffer, 0, bytesRead); - } - } - - @Override - public void writeCompressedBytes(DataOutputStream outStream) - throws IllegalArgumentException, IOException { - outStream.write(data, 0, dataSize); - } - - } // CompressedBytes - - /** - * The class encapsulating with the metadata of a file. - * The metadata of a file is a list of attribute name/value - * pairs of Text type. - * - */ - public static class Metadata implements Writable { - - private TreeMap theMetadata; - - public Metadata() { - this(new TreeMap()); - } - - public Metadata(TreeMap arg) { - if (arg == null) { - this.theMetadata = new TreeMap(); - } else { - this.theMetadata = arg; - } - } - - public Text get(Text name) { - return this.theMetadata.get(name); - } - - public void set(Text name, Text value) { - this.theMetadata.put(name, value); - } - - public TreeMap getMetadata() { - return new TreeMap(this.theMetadata); - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeInt(this.theMetadata.size()); - Iterator> iter = - this.theMetadata.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry en = iter.next(); - en.getKey().write(out); - en.getValue().write(out); - } - } - - @Override - public void readFields(DataInput in) throws IOException { - int sz = in.readInt(); - if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object"); - this.theMetadata = new TreeMap(); - for (int i = 0; i < sz; i++) { - Text key = new Text(); - Text val = new Text(); - key.readFields(in); - val.readFields(in); - this.theMetadata.put(key, val); - } - } - - @Override - public boolean equals(Object other) { - if (other == null) { - return false; - } - if (other.getClass() != this.getClass()) { - return false; - } else { - return equals((Metadata) other); - } - } - - public boolean equals(Metadata other) { - if (other == null) return false; - if (this.theMetadata.size() != other.theMetadata.size()) { - return false; - } - Iterator> iter1 = - this.theMetadata.entrySet().iterator(); - Iterator> iter2 = - other.theMetadata.entrySet().iterator(); - while (iter1.hasNext() && iter2.hasNext()) { - Map.Entry en1 = iter1.next(); - Map.Entry en2 = iter2.next(); - if (!en1.getKey().equals(en2.getKey())) { - return false; - } - if (!en1.getValue().equals(en2.getValue())) { - return false; - } - } - if (iter1.hasNext() || iter2.hasNext()) { - return false; - } - return true; - } - - @Override - public int hashCode() { - assert false : "hashCode not designed"; - return 42; // any arbitrary constant will do - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("size: ").append(this.theMetadata.size()).append("\n"); - Iterator> iter = - this.theMetadata.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry en = iter.next(); - sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString()); - sb.append("\n"); - } - return sb.toString(); - } - } - - /** Write key/value pairs to a sequence-format file. */ - public static class Writer implements java.io.Closeable, Syncable { - private Configuration conf; - FSDataOutputStream out; - boolean ownOutputStream = true; - DataOutputBuffer buffer = new DataOutputBuffer(); - - Class keyClass; - Class valClass; - - private final CompressionType compress; - CompressionCodec codec = null; - CompressionOutputStream deflateFilter = null; - DataOutputStream deflateOut = null; - Metadata metadata = null; - Compressor compressor = null; - - private boolean appendMode = false; - - protected Serializer keySerializer; - protected Serializer uncompressedValSerializer; - protected Serializer compressedValSerializer; - - // Insert a globally unique 16-byte value every few entries, so that one - // can seek into the middle of a file and then synchronize with record - // starts and ends by scanning for this value. - long lastSyncPos; // position of last sync - byte[] sync; // 16 random bytes - - { - try { - MessageDigest digester = MessageDigest.getInstance("MD5"); - long time = Time.now(); - digester.update((new UID() + "@" + time).getBytes()); - sync = digester.digest(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public static interface Option { - } - - static class FileOption extends Options.PathOption - implements Option { - FileOption(Path path) { - super(path); - } - } - - /** - * @deprecated only used for backwards-compatibility in the createWriter methods - * that take FileSystem. - */ - @Deprecated - private static class FileSystemOption implements Option { - private final FileSystem value; - - protected FileSystemOption(FileSystem value) { - this.value = value; - } - - public FileSystem getValue() { - return value; - } - } - - static class StreamOption extends Options.FSDataOutputStreamOption - implements Option { - StreamOption(FSDataOutputStream stream) { - super(stream); - } - } - - static class BufferSizeOption extends Options.IntegerOption - implements Option { - BufferSizeOption(int value) { - super(value); - } - } - - static class BlockSizeOption extends Options.LongOption implements Option { - BlockSizeOption(long value) { - super(value); - } - } - - static class ReplicationOption extends Options.IntegerOption - implements Option { - ReplicationOption(int value) { - super(value); - } - } - - static class AppendIfExistsOption extends Options.BooleanOption implements - Option { - AppendIfExistsOption(boolean value) { - super(value); - } - } - - static class KeyClassOption extends Options.ClassOption implements Option { - KeyClassOption(Class value) { - super(value); - } - } - - static class ValueClassOption extends Options.ClassOption - implements Option { - ValueClassOption(Class value) { - super(value); - } - } - - static class MetadataOption implements Option { - private final Metadata value; - - MetadataOption(Metadata value) { - this.value = value; - } - - Metadata getValue() { - return value; - } - } - - static class ProgressableOption extends Options.ProgressableOption - implements Option { - ProgressableOption(Progressable value) { - super(value); - } - } - - private static class CompressionOption implements Option { - private final CompressionType value; - private final CompressionCodec codec; - - CompressionOption(CompressionType value) { - this(value, null); - } - - CompressionOption(CompressionType value, CompressionCodec codec) { - this.value = value; - this.codec = (CompressionType.NONE != value && null == codec) - ? new DefaultCodec() - : codec; - } - - CompressionType getValue() { - return value; - } - - CompressionCodec getCodec() { - return codec; - } - } - - public static Option file(Path value) { - return new FileOption(value); - } - - /** - * @deprecated only used for backwards-compatibility in the createWriter methods - * that take FileSystem. - */ - @Deprecated - private static Option filesystem(FileSystem fs) { - return new HoodieAppendLog.Writer.FileSystemOption(fs); - } - - public static Option bufferSize(int value) { - return new BufferSizeOption(value); - } - - public static Option stream(FSDataOutputStream value) { - return new StreamOption(value); - } - - public static Option replication(short value) { - return new ReplicationOption(value); - } - - public static Option appendIfExists(boolean value) { - return new AppendIfExistsOption(value); - } - - public static Option blockSize(long value) { - return new BlockSizeOption(value); - } - - public static Option progressable(Progressable value) { - return new ProgressableOption(value); - } - - public static Option keyClass(Class value) { - return new KeyClassOption(value); - } - - public static Option valueClass(Class value) { - return new ValueClassOption(value); - } - - public static Option metadata(Metadata value) { - return new MetadataOption(value); - } - - public static Option compression(CompressionType value) { - return new CompressionOption(value); - } - - public static Option compression(CompressionType value, - CompressionCodec codec) { - return new CompressionOption(value, codec); - } - - /** - * Construct a uncompressed writer from a set of options. - * @param conf the configuration to use - * @param opts the options used when creating the writer - * @throws IOException if it fails - */ - Writer(Configuration conf, - Option... opts) throws IOException { - BlockSizeOption blockSizeOption = - Options.getOption(BlockSizeOption.class, opts); - BufferSizeOption bufferSizeOption = - Options.getOption(BufferSizeOption.class, opts); - ReplicationOption replicationOption = - Options.getOption(ReplicationOption.class, opts); - ProgressableOption progressOption = - Options.getOption(ProgressableOption.class, opts); - FileOption fileOption = Options.getOption(FileOption.class, opts); - AppendIfExistsOption appendIfExistsOption = Options.getOption( - AppendIfExistsOption.class, opts); - FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts); - StreamOption streamOption = Options.getOption(StreamOption.class, opts); - KeyClassOption keyClassOption = - Options.getOption(KeyClassOption.class, opts); - ValueClassOption valueClassOption = - Options.getOption(ValueClassOption.class, opts); - MetadataOption metadataOption = - Options.getOption(MetadataOption.class, opts); - CompressionOption compressionTypeOption = - Options.getOption(CompressionOption.class, opts); - // check consistency of options - if ((fileOption == null) == (streamOption == null)) { - throw new IllegalArgumentException("file or stream must be specified"); - } - if (fileOption == null && (blockSizeOption != null || - bufferSizeOption != null || - replicationOption != null || - progressOption != null)) { - throw new IllegalArgumentException("file modifier options not " + - "compatible with stream"); - } - - FSDataOutputStream out; - boolean ownStream = fileOption != null; - if (ownStream) { - Path p = fileOption.getValue(); - FileSystem fs; - if (fsOption != null) { - fs = fsOption.getValue(); - } else { - fs = p.getFileSystem(conf); - } - int bufferSize = bufferSizeOption == null ? getBufferSize(conf) : - bufferSizeOption.getValue(); - short replication = replicationOption == null ? - fs.getDefaultReplication(p) : - (short) replicationOption.getValue(); - long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) : - blockSizeOption.getValue(); - Progressable progress = progressOption == null ? null : - progressOption.getValue(); - - if (appendIfExistsOption != null && appendIfExistsOption.getValue() - && fs.exists(p)) { - - // Read the file and verify header details - HoodieAppendLog.Reader reader = new HoodieAppendLog.Reader(conf, - HoodieAppendLog.Reader.file(p), new Reader.OnlyHeaderOption()); - try { - - if (keyClassOption.getValue() != reader.getKeyClass() - || valueClassOption.getValue() != reader.getValueClass()) { - throw new IllegalArgumentException( - "Key/value class provided does not match the file"); - } - - if (reader.getVersion() != VERSION[3]) { - throw new VersionMismatchException(VERSION[3], - reader.getVersion()); - } - - if (metadataOption != null) { - LOG.info("MetaData Option is ignored during append"); - } - metadataOption = (MetadataOption) HoodieAppendLog.Writer - .metadata(reader.getMetadata()); - - CompressionOption readerCompressionOption = new CompressionOption( - reader.getCompressionType(), reader.getCompressionCodec()); - - if (readerCompressionOption.value != compressionTypeOption.value - || !readerCompressionOption.codec.getClass().getName() - .equals(compressionTypeOption.codec.getClass().getName())) { - throw new IllegalArgumentException( - "Compression option provided does not match the file"); - } - - sync = reader.getSync(); - - } finally { - reader.close(); - } - - out = fs.append(p, bufferSize, progress); - this.appendMode = true; - } else { - out = fs - .create(p, true, bufferSize, replication, blockSize, progress); - } - } else { - out = streamOption.getValue(); - } - Class keyClass = keyClassOption == null ? - Object.class : keyClassOption.getValue(); - Class valueClass = valueClassOption == null ? - Object.class : valueClassOption.getValue(); - Metadata metadata = metadataOption == null ? - new Metadata() : metadataOption.getValue(); - this.compress = compressionTypeOption.getValue(); - final CompressionCodec codec = compressionTypeOption.getCodec(); - if (codec != null && - (codec instanceof GzipCodec) && - !NativeCodeLoader.isNativeCodeLoaded() && - !ZlibFactory.isNativeZlibLoaded(conf)) { - throw new IllegalArgumentException("SequenceFile doesn't work with " + - "GzipCodec without native-hadoop " + - "code!"); - } - init(conf, out, ownStream, keyClass, valueClass, codec, metadata); - } - - /** Create the named file. - * @deprecated Use - * {@link HoodieAppendLog#createWriter(Configuration, Writer.Option...)} - * instead. - */ - @Deprecated - public Writer(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass) throws IOException { - this.compress = CompressionType.NONE; - init(conf, fs.create(name), true, keyClass, valClass, null, - new Metadata()); - } - - /** Create the named file with write-progress reporter. - * @deprecated Use - * {@link HoodieAppendLog#createWriter(Configuration, Writer.Option...)} - * instead. - */ - @Deprecated - public Writer(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, - Progressable progress, Metadata metadata) throws IOException { - this.compress = CompressionType.NONE; - init(conf, fs.create(name, progress), true, keyClass, valClass, - null, metadata); - } - - /** Create the named file with write-progress reporter. - * @deprecated Use - * {@link HoodieAppendLog#createWriter(Configuration, Writer.Option...)} - * instead. - */ - @Deprecated - public Writer(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, - int bufferSize, short replication, long blockSize, - Progressable progress, Metadata metadata) throws IOException { - this.compress = CompressionType.NONE; - init(conf, - fs.create(name, true, bufferSize, replication, blockSize, progress), - true, keyClass, valClass, null, metadata); - } - - boolean isCompressed() { - return compress != CompressionType.NONE; - } - - boolean isBlockCompressed() { - return compress == CompressionType.BLOCK; - } - - Writer ownStream() { - this.ownOutputStream = true; - return this; - } - - /** Write and flush the file header. */ - private void writeFileHeader() - throws IOException { - out.write(VERSION); - Text.writeString(out, keyClass.getName()); - Text.writeString(out, valClass.getName()); - - out.writeBoolean(this.isCompressed()); - out.writeBoolean(this.isBlockCompressed()); - - if (this.isCompressed()) { - Text.writeString(out, (codec.getClass()).getName()); - } - this.metadata.write(out); - out.write(sync); // write the sync bytes - out.flush(); // flush header - } - - /** Initialize. */ - @SuppressWarnings("unchecked") - void init(Configuration conf, FSDataOutputStream out, boolean ownStream, - Class keyClass, Class valClass, - CompressionCodec codec, Metadata metadata) - throws IOException { - this.conf = conf; - this.out = out; - this.ownOutputStream = ownStream; - this.keyClass = keyClass; - this.valClass = valClass; - this.codec = codec; - this.metadata = metadata; - SerializationFactory serializationFactory = new SerializationFactory(conf); - this.keySerializer = serializationFactory.getSerializer(keyClass); - if (this.keySerializer == null) { - throw new IOException( - "Could not find a serializer for the Key class: '" - + keyClass.getCanonicalName() + "'. " - + "Please ensure that the configuration '" + - CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " - + "properly configured, if you're using" - + "custom serialization."); - } - this.keySerializer.open(buffer); - this.uncompressedValSerializer = serializationFactory.getSerializer(valClass); - if (this.uncompressedValSerializer == null) { - throw new IOException( - "Could not find a serializer for the Value class: '" - + valClass.getCanonicalName() + "'. " - + "Please ensure that the configuration '" + - CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " - + "properly configured, if you're using" - + "custom serialization."); - } - this.uncompressedValSerializer.open(buffer); - if (this.codec != null) { - ReflectionUtils.setConf(this.codec, this.conf); - this.compressor = CodecPool.getCompressor(this.codec); - this.deflateFilter = this.codec.createOutputStream(buffer, compressor); - this.deflateOut = - new DataOutputStream(new BufferedOutputStream(deflateFilter)); - this.compressedValSerializer = serializationFactory.getSerializer(valClass); - if (this.compressedValSerializer == null) { - throw new IOException( - "Could not find a serializer for the Value class: '" - + valClass.getCanonicalName() + "'. " - + "Please ensure that the configuration '" + - CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " - + "properly configured, if you're using" - + "custom serialization."); - } - this.compressedValSerializer.open(deflateOut); - } - - if (appendMode) { - sync(); - } else { - writeFileHeader(); - } - } - - /** Returns the class of keys in this file. */ - public Class getKeyClass() { - return keyClass; - } - - /** Returns the class of values in this file. */ - public Class getValueClass() { - return valClass; - } - - /** Returns the compression codec of data in this file. */ - public CompressionCodec getCompressionCodec() { - return codec; - } - - /** create a sync point */ - public void sync() throws IOException { - if (sync != null && lastSyncPos != out.getPos()) { - out.writeInt(SYNC_ESCAPE); // mark the start of the sync - out.write(sync); // write sync - lastSyncPos = out.getPos(); // update lastSyncPos - } - } - - /** - * flush all currently written data to the file system - * @deprecated Use {@link #hsync()} or {@link #hflush()} instead - */ - @Deprecated - public void syncFs() throws IOException { - if (out != null) { - out.sync(); // flush contents to file system - } - } - - @Override - public void hsync() throws IOException { - if (out != null) { - out.hsync(); - } - } - - @Override - public void hflush() throws IOException { - if (out != null) { - out.hflush(); - } - } - - /** Returns the configuration of this file. */ - Configuration getConf() { - return conf; - } - - /** Close the file. */ - @Override - public synchronized void close() throws IOException { - keySerializer.close(); - uncompressedValSerializer.close(); - if (compressedValSerializer != null) { - compressedValSerializer.close(); - } - - CodecPool.returnCompressor(compressor); - compressor = null; - - if (out != null) { - - // Close the underlying stream iff we own it... - if (ownOutputStream) { - out.close(); - } else { - out.flush(); - } - out = null; - } - } - - synchronized void checkAndWriteSync() throws IOException { - if (sync != null && - out.getPos() >= lastSyncPos + SYNC_INTERVAL) { // time to emit sync - sync(); - } - } - - /** Append a key/value pair. */ - public void append(Writable key, Writable val) - throws IOException { - append((Object) key, (Object) val); - } - - /** Append a key/value pair. */ - @SuppressWarnings("unchecked") - public synchronized void append(Object key, Object val) - throws IOException { - if (key.getClass() != keyClass) - throw new IOException("wrong key class: " + key.getClass().getName() - + " is not " + keyClass); - if (val.getClass() != valClass) - throw new IOException("wrong value class: " + val.getClass().getName() - + " is not " + valClass); - - buffer.reset(); - - // Append the 'key' - keySerializer.serialize(key); - int keyLength = buffer.getLength(); - if (keyLength < 0) - throw new IOException("negative length keys not allowed: " + key); - - // Append the 'value' - if (compress == CompressionType.RECORD) { - deflateFilter.resetState(); - compressedValSerializer.serialize(val); - deflateOut.flush(); - deflateFilter.finish(); - } else { - uncompressedValSerializer.serialize(val); - } - - // Write the record out - checkAndWriteSync(); // sync - out.writeInt(buffer.getLength()); // total record length - out.writeInt(keyLength); // key portion length - out.write(buffer.getData(), 0, buffer.getLength()); // data - } - - public synchronized void appendRaw(byte[] keyData, int keyOffset, - int keyLength, ValueBytes val) throws IOException { - if (keyLength < 0) - throw new IOException("negative length keys not allowed: " + keyLength); - - int valLength = val.getSize(); - - checkAndWriteSync(); - - out.writeInt(keyLength + valLength); // total record length - out.writeInt(keyLength); // key portion length - out.write(keyData, keyOffset, keyLength); // key - val.writeUncompressedBytes(out); // value - } - - /** Returns the current length of the output file. - * - *

This always returns a synchronized position. In other words, - * immediately after calling {@link HoodieAppendLog.Reader#seek(long)} with a position - * returned by this method, {@link HoodieAppendLog.Reader#next(Writable)} may be called. However - * the key may be earlier in the file than key last written when this - * method was called (e.g., with block-compression, it may be the first key - * in the block that was being written when this method was called). - */ - public synchronized long getLength() throws IOException { - return out.getPos(); - } - - } // class Writer - - /** Write key/compressed-value pairs to a sequence-format file. */ - static class RecordCompressWriter extends Writer { - - RecordCompressWriter(Configuration conf, - Option... options) throws IOException { - super(conf, options); - } - - /** Append a key/value pair. */ - @Override - @SuppressWarnings("unchecked") - public synchronized void append(Object key, Object val) - throws IOException { - if (key.getClass() != keyClass) - throw new IOException("wrong key class: " + key.getClass().getName() - + " is not " + keyClass); - if (val.getClass() != valClass) - throw new IOException("wrong value class: " + val.getClass().getName() - + " is not " + valClass); - - buffer.reset(); - - // Append the 'key' - keySerializer.serialize(key); - int keyLength = buffer.getLength(); - if (keyLength < 0) - throw new IOException("negative length keys not allowed: " + key); - - // Compress 'value' and append it - deflateFilter.resetState(); - compressedValSerializer.serialize(val); - deflateOut.flush(); - deflateFilter.finish(); - - // Write the record out - checkAndWriteSync(); // sync - out.writeInt(buffer.getLength()); // total record length - out.writeInt(keyLength); // key portion length - out.write(buffer.getData(), 0, buffer.getLength()); // data - } - - /** Append a key/value pair. */ - @Override - public synchronized void appendRaw(byte[] keyData, int keyOffset, - int keyLength, ValueBytes val) throws IOException { - - if (keyLength < 0) - throw new IOException("negative length keys not allowed: " + keyLength); - - int valLength = val.getSize(); - - checkAndWriteSync(); // sync - out.writeInt(keyLength + valLength); // total record length - out.writeInt(keyLength); // key portion length - out.write(keyData, keyOffset, keyLength); // 'key' data - val.writeCompressedBytes(out); // 'value' data - } - - } // RecordCompressionWriter - - /** Write compressed key/value blocks to a sequence-format file. */ - static class BlockCompressWriter extends Writer { - - private int noBufferedRecords = 0; - - private DataOutputBuffer keyLenBuffer = new DataOutputBuffer(); - private DataOutputBuffer keyBuffer = new DataOutputBuffer(); - - private DataOutputBuffer valLenBuffer = new DataOutputBuffer(); - private DataOutputBuffer valBuffer = new DataOutputBuffer(); - - private final int compressionBlockSize; - - BlockCompressWriter(Configuration conf, - Option... options) throws IOException { - super(conf, options); - compressionBlockSize = - conf.getInt("io.seqfile.compress.blocksize", 1000000); - keySerializer.close(); - keySerializer.open(keyBuffer); - uncompressedValSerializer.close(); - uncompressedValSerializer.open(valBuffer); - } - - /** Workhorse to check and write out compressed data/lengths */ - private synchronized void writeBuffer(DataOutputBuffer uncompressedDataBuffer) - throws IOException { - deflateFilter.resetState(); - buffer.reset(); - deflateOut.write(uncompressedDataBuffer.getData(), 0, - uncompressedDataBuffer.getLength()); - deflateOut.flush(); - deflateFilter.finish(); - - WritableUtils.writeVInt(out, buffer.getLength()); - out.write(buffer.getData(), 0, buffer.getLength()); - } - - /** Compress and flush contents to dfs */ - @Override - public synchronized void sync() throws IOException { - if (noBufferedRecords > 0) { - super.sync(); - - // No. of records - WritableUtils.writeVInt(out, noBufferedRecords); - - // Write 'keys' and lengths - writeBuffer(keyLenBuffer); - writeBuffer(keyBuffer); - - // Write 'values' and lengths - writeBuffer(valLenBuffer); - writeBuffer(valBuffer); - - // Flush the file-stream - out.flush(); - - // Reset internal states - keyLenBuffer.reset(); - keyBuffer.reset(); - valLenBuffer.reset(); - valBuffer.reset(); - noBufferedRecords = 0; - } - - } - - /** Close the file. */ - @Override - public synchronized void close() throws IOException { - if (out != null) { - sync(); - } - super.close(); - } - - /** Append a key/value pair. */ - @Override - @SuppressWarnings("unchecked") - public synchronized void append(Object key, Object val) - throws IOException { - if (key.getClass() != keyClass) - throw new IOException("wrong key class: " + key + " is not " + keyClass); - if (val.getClass() != valClass) - throw new IOException("wrong value class: " + val + " is not " + valClass); - - // Save key/value into respective buffers - int oldKeyLength = keyBuffer.getLength(); - keySerializer.serialize(key); - int keyLength = keyBuffer.getLength() - oldKeyLength; - if (keyLength < 0) - throw new IOException("negative length keys not allowed: " + key); - WritableUtils.writeVInt(keyLenBuffer, keyLength); - - int oldValLength = valBuffer.getLength(); - uncompressedValSerializer.serialize(val); - int valLength = valBuffer.getLength() - oldValLength; - WritableUtils.writeVInt(valLenBuffer, valLength); - - // Added another key/value pair - ++noBufferedRecords; - - // Compress and flush? - int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); - if (currentBlockSize >= compressionBlockSize) { - sync(); - } - } - - /** Append a key/value pair. */ - @Override - public synchronized void appendRaw(byte[] keyData, int keyOffset, - int keyLength, ValueBytes val) throws IOException { - - if (keyLength < 0) - throw new IOException("negative length keys not allowed"); - - int valLength = val.getSize(); - - // Save key/value data in relevant buffers - WritableUtils.writeVInt(keyLenBuffer, keyLength); - keyBuffer.write(keyData, keyOffset, keyLength); - WritableUtils.writeVInt(valLenBuffer, valLength); - val.writeUncompressedBytes(valBuffer); - - // Added another key/value pair - ++noBufferedRecords; - - // Compress and flush? - int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); - if (currentBlockSize >= compressionBlockSize) { - sync(); - } - } - - } // BlockCompressionWriter - - /** Get the configured buffer size */ - private static int getBufferSize(Configuration conf) { - return conf.getInt("io.file.buffer.size", 4096); - } - - /** Reads key/value pairs from a sequence-format file. */ - public static class Reader implements java.io.Closeable { - private String filename; - private FSDataInputStream in; - private DataOutputBuffer outBuf = new DataOutputBuffer(); - - private byte version; - - private String keyClassName; - private String valClassName; - private Class keyClass; - private Class valClass; - - private CompressionCodec codec = null; - private Metadata metadata = null; - - private byte[] sync = new byte[SYNC_HASH_SIZE]; - private byte[] syncCheck = new byte[SYNC_HASH_SIZE]; - private boolean syncSeen; - - private long headerEnd; - private long end; - private int keyLength; - private int recordLength; - - private boolean decompress; - private boolean blockCompressed; - - private Configuration conf; - - private int noBufferedRecords = 0; - private boolean lazyDecompress = true; - private boolean valuesDecompressed = true; - - private int noBufferedKeys = 0; - private int noBufferedValues = 0; - - private DataInputBuffer keyLenBuffer = null; - private CompressionInputStream keyLenInFilter = null; - private DataInputStream keyLenIn = null; - private Decompressor keyLenDecompressor = null; - private DataInputBuffer keyBuffer = null; - private CompressionInputStream keyInFilter = null; - private DataInputStream keyIn = null; - private Decompressor keyDecompressor = null; - - private DataInputBuffer valLenBuffer = null; - private CompressionInputStream valLenInFilter = null; - private DataInputStream valLenIn = null; - private Decompressor valLenDecompressor = null; - private DataInputBuffer valBuffer = null; - private CompressionInputStream valInFilter = null; - private DataInputStream valIn = null; - private Decompressor valDecompressor = null; - - private Deserializer keyDeserializer; - private Deserializer valDeserializer; - - /** - * A tag interface for all of the Reader options - */ - public static interface Option { - } - - /** - * Create an option to specify the path name of the sequence file. - * @param value the path to read - * @return a new option - */ - public static Option file(Path value) { - return new FileOption(value); - } - - /** - * Create an option to specify the stream with the sequence file. - * @param value the stream to read. - * @return a new option - */ - public static Option stream(FSDataInputStream value) { - return new InputStreamOption(value); - } - - /** - * Create an option to specify the starting byte to read. - * @param value the number of bytes to skip over - * @return a new option - */ - public static Option start(long value) { - return new StartOption(value); - } - - /** - * Create an option to specify the number of bytes to read. - * @param value the number of bytes to read - * @return a new option - */ - public static Option length(long value) { - return new LengthOption(value); - } - - /** - * Create an option with the buffer size for reading the given pathname. - * @param value the number of bytes to buffer - * @return a new option - */ - public static Option bufferSize(int value) { - return new BufferSizeOption(value); - } - - private static class FileOption extends Options.PathOption - implements Option { - private FileOption(Path value) { - super(value); - } - } - - private static class InputStreamOption - extends Options.FSDataInputStreamOption - implements Option { - private InputStreamOption(FSDataInputStream value) { - super(value); - } - } - - private static class StartOption extends Options.LongOption - implements Option { - private StartOption(long value) { - super(value); - } - } - - private static class LengthOption extends Options.LongOption - implements Option { - private LengthOption(long value) { - super(value); - } - } - - private static class BufferSizeOption extends Options.IntegerOption - implements Option { - private BufferSizeOption(int value) { - super(value); - } - } - - // only used directly - private static class OnlyHeaderOption extends Options.BooleanOption - implements Option { - private OnlyHeaderOption() { - super(true); - } - } - - public Reader(Configuration conf, Option... opts) throws IOException { - // Look up the options, these are null if not set - FileOption fileOpt = Options.getOption(FileOption.class, opts); - InputStreamOption streamOpt = - Options.getOption(InputStreamOption.class, opts); - StartOption startOpt = Options.getOption(StartOption.class, opts); - LengthOption lenOpt = Options.getOption(LengthOption.class, opts); - BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class, opts); - OnlyHeaderOption headerOnly = - Options.getOption(OnlyHeaderOption.class, opts); - // check for consistency - if ((fileOpt == null) == (streamOpt == null)) { - throw new - IllegalArgumentException("File or stream option must be specified"); - } - if (fileOpt == null && bufOpt != null) { - throw new IllegalArgumentException("buffer size can only be set when" + - " a file is specified."); - } - // figure out the real values - Path filename = null; - FSDataInputStream file; - final long len; - if (fileOpt != null) { - filename = fileOpt.getValue(); - FileSystem fs = filename.getFileSystem(conf); - int bufSize = bufOpt == null ? getBufferSize(conf) : bufOpt.getValue(); - len = null == lenOpt - ? fs.getFileStatus(filename).getLen() - : lenOpt.getValue(); - file = openFile(fs, filename, bufSize, len); - } else { - len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue(); - file = streamOpt.getValue(); - } - long start = startOpt == null ? 0 : startOpt.getValue(); - // really set up - initialize(filename, file, start, len, conf, headerOnly != null); - } - - /** - * Construct a reader by opening a file from the given file system. - * @param fs The file system used to open the file. - * @param file The file being read. - * @param conf Configuration - * @throws IOException - * @deprecated Use Reader(Configuration, Option...) instead. - */ - @Deprecated - public Reader(FileSystem fs, Path file, - Configuration conf) throws IOException { - this(conf, file(file.makeQualified(fs))); - } - - /** - * Construct a reader by the given input stream. - * @param in An input stream. - * @param buffersize unused - * @param start The starting position. - * @param length The length being read. - * @param conf Configuration - * @throws IOException - * @deprecated Use Reader(Configuration, Reader.Option...) instead. - */ - @Deprecated - public Reader(FSDataInputStream in, int buffersize, - long start, long length, Configuration conf) throws IOException { - this(conf, stream(in), start(start), length(length)); - } - - /** Common work of the constructors. */ - private void initialize(Path filename, FSDataInputStream in, - long start, long length, Configuration conf, - boolean tempReader) throws IOException { - if (in == null) { - throw new IllegalArgumentException("in == null"); - } - this.filename = filename == null ? "" : filename.toString(); - this.in = in; - this.conf = conf; - boolean succeeded = false; - try { - seek(start); - this.end = this.in.getPos() + length; - // if it wrapped around, use the max - if (end < length) { - end = Long.MAX_VALUE; - } - init(tempReader); - succeeded = true; - } finally { - if (!succeeded) { - IOUtils.cleanup(LOG, this.in); - } - } - } - - /** - * Override this method to specialize the type of - * {@link FSDataInputStream} returned. - * @param fs The file system used to open the file. - * @param file The file being read. - * @param bufferSize The buffer size used to read the file. - * @param length The length being read if it is gteq 0. Otherwise, - * the length is not available. - * @return The opened stream. - * @throws IOException - */ - protected FSDataInputStream openFile(FileSystem fs, Path file, - int bufferSize, long length) throws IOException { - return fs.open(file, bufferSize); - } - - /** - * Initialize the {@link Reader} - * @param tempReader true if we are constructing a temporary - * reader {@link HoodieAppendLog.Sorter}, - * and hence do not initialize every component; - * false otherwise. - * @throws IOException - */ - private void init(boolean tempReader) throws IOException { - byte[] versionBlock = new byte[VERSION.length]; - in.readFully(versionBlock); - - if ((versionBlock[0] != VERSION[0]) || - (versionBlock[1] != VERSION[1]) || - (versionBlock[2] != VERSION[2])) - throw new IOException(this + " not a SequenceFile"); - - // Set 'version' - version = versionBlock[3]; - if (version > VERSION[3]) - throw new VersionMismatchException(VERSION[3], version); - - if (version < BLOCK_COMPRESS_VERSION) { - UTF8 className = new UTF8(); - - className.readFields(in); - keyClassName = className.toStringChecked(); // key class name - - className.readFields(in); - valClassName = className.toStringChecked(); // val class name - } else { - keyClassName = Text.readString(in); - valClassName = Text.readString(in); - } - - if (version > 2) { // if version > 2 - this.decompress = in.readBoolean(); // is compressed? - } else { - decompress = false; - } - - if (version >= BLOCK_COMPRESS_VERSION) { // if version >= 4 - this.blockCompressed = in.readBoolean(); // is block-compressed? - } else { - blockCompressed = false; - } - - // if version >= 5 - // setup the compression codec - if (decompress) { - if (version >= CUSTOM_COMPRESS_VERSION) { - String codecClassname = Text.readString(in); - try { - Class codecClass - = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class); - this.codec = ReflectionUtils.newInstance(codecClass, conf); - } catch (ClassNotFoundException cnfe) { - throw new IllegalArgumentException("Unknown codec: " + - codecClassname, cnfe); - } - } else { - codec = new DefaultCodec(); - ((Configurable) codec).setConf(conf); - } - } - - this.metadata = new Metadata(); - if (version >= VERSION_WITH_METADATA) { // if version >= 6 - this.metadata.readFields(in); - } - - if (version > 1) { // if version > 1 - in.readFully(sync); // read sync bytes - headerEnd = in.getPos(); // record end of header - } - - // Initialize... *not* if this we are constructing a temporary Reader - if (!tempReader) { - valBuffer = new DataInputBuffer(); - if (decompress) { - valDecompressor = CodecPool.getDecompressor(codec); - valInFilter = codec.createInputStream(valBuffer, valDecompressor); - valIn = new DataInputStream(valInFilter); - } else { - valIn = valBuffer; - } - - if (blockCompressed) { - keyLenBuffer = new DataInputBuffer(); - keyBuffer = new DataInputBuffer(); - valLenBuffer = new DataInputBuffer(); - - keyLenDecompressor = CodecPool.getDecompressor(codec); - keyLenInFilter = codec.createInputStream(keyLenBuffer, - keyLenDecompressor); - keyLenIn = new DataInputStream(keyLenInFilter); - - keyDecompressor = CodecPool.getDecompressor(codec); - keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor); - keyIn = new DataInputStream(keyInFilter); - - valLenDecompressor = CodecPool.getDecompressor(codec); - valLenInFilter = codec.createInputStream(valLenBuffer, - valLenDecompressor); - valLenIn = new DataInputStream(valLenInFilter); - } - - SerializationFactory serializationFactory = - new SerializationFactory(conf); - this.keyDeserializer = - getDeserializer(serializationFactory, getKeyClass()); - if (this.keyDeserializer == null) { - throw new IOException( - "Could not find a deserializer for the Key class: '" - + getKeyClass().getCanonicalName() + "'. " - + "Please ensure that the configuration '" + - CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " - + "properly configured, if you're using " - + "custom serialization."); - } - if (!blockCompressed) { - this.keyDeserializer.open(valBuffer); - } else { - this.keyDeserializer.open(keyIn); - } - this.valDeserializer = - getDeserializer(serializationFactory, getValueClass()); - if (this.valDeserializer == null) { - throw new IOException( - "Could not find a deserializer for the Value class: '" - + getValueClass().getCanonicalName() + "'. " - + "Please ensure that the configuration '" + - CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " - + "properly configured, if you're using " - + "custom serialization."); - } - this.valDeserializer.open(valIn); - } - } - - @SuppressWarnings("unchecked") - private Deserializer getDeserializer(SerializationFactory sf, Class c) { - return sf.getDeserializer(c); - } - - /** Close the file. */ - @Override - public synchronized void close() throws IOException { - // Return the decompressors to the pool - CodecPool.returnDecompressor(keyLenDecompressor); - CodecPool.returnDecompressor(keyDecompressor); - CodecPool.returnDecompressor(valLenDecompressor); - CodecPool.returnDecompressor(valDecompressor); - keyLenDecompressor = keyDecompressor = null; - valLenDecompressor = valDecompressor = null; - - if (keyDeserializer != null) { - keyDeserializer.close(); - } - if (valDeserializer != null) { - valDeserializer.close(); - } - - // Close the input-stream - in.close(); - } - - /** Returns the name of the key class. */ - public String getKeyClassName() { - return keyClassName; - } - - /** Returns the class of keys in this file. */ - public synchronized Class getKeyClass() { - if (null == keyClass) { - try { - keyClass = WritableName.getClass(getKeyClassName(), conf); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - return keyClass; - } - - /** Returns the name of the value class. */ - public String getValueClassName() { - return valClassName; - } - - /** Returns the class of values in this file. */ - public synchronized Class getValueClass() { - if (null == valClass) { - try { - valClass = WritableName.getClass(getValueClassName(), conf); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - return valClass; - } - - /** Returns true if values are compressed. */ - public boolean isCompressed() { - return decompress; - } - - /** Returns true if records are block-compressed. */ - public boolean isBlockCompressed() { - return blockCompressed; - } - - /** Returns the compression codec of data in this file. */ - public CompressionCodec getCompressionCodec() { - return codec; - } - - private byte[] getSync() { - return sync; - } - - private byte getVersion() { - return version; - } - - /** - * Get the compression type for this file. - * @return the compression type - */ - public CompressionType getCompressionType() { - if (decompress) { - return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD; - } else { - return CompressionType.NONE; - } - } - - /** Returns the metadata object of the file */ - public Metadata getMetadata() { - return this.metadata; - } - - /** Returns the configuration used for this file. */ - Configuration getConf() { - return conf; - } - - /** Read a compressed buffer */ - private synchronized void readBuffer(DataInputBuffer buffer, - CompressionInputStream filter) throws IOException { - // Read data into a temporary buffer - DataOutputBuffer dataBuffer = new DataOutputBuffer(); - - try { - int dataBufferLength = WritableUtils.readVInt(in); - dataBuffer.write(in, dataBufferLength); - - // Set up 'buffer' connected to the input-stream - buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength()); - } finally { - dataBuffer.close(); - } - - // Reset the codec - filter.resetState(); - } - - /** Read the next 'compressed' block */ - private synchronized void readBlock() throws IOException { - // Check if we need to throw away a whole block of - // 'values' due to 'lazy decompression' - if (lazyDecompress && !valuesDecompressed) { - in.seek(WritableUtils.readVInt(in) + in.getPos()); - in.seek(WritableUtils.readVInt(in) + in.getPos()); - } - - // Reset internal states - noBufferedKeys = 0; - noBufferedValues = 0; - noBufferedRecords = 0; - valuesDecompressed = false; - - //Process sync - if (sync != null) { - in.readInt(); - in.readFully(syncCheck); // read syncCheck - if (!Arrays.equals(sync, syncCheck)) // check it - throw new IOException("File is corrupt!"); - } - syncSeen = true; - - // Read number of records in this block - noBufferedRecords = WritableUtils.readVInt(in); - - // Read key lengths and keys - readBuffer(keyLenBuffer, keyLenInFilter); - readBuffer(keyBuffer, keyInFilter); - noBufferedKeys = noBufferedRecords; - - // Read value lengths and values - if (!lazyDecompress) { - readBuffer(valLenBuffer, valLenInFilter); - readBuffer(valBuffer, valInFilter); - noBufferedValues = noBufferedRecords; - valuesDecompressed = true; - } - } - - /** - * Position valLenIn/valIn to the 'value' - * corresponding to the 'current' key - */ - private synchronized void seekToCurrentValue() throws IOException { - if (!blockCompressed) { - if (decompress) { - valInFilter.resetState(); - } - valBuffer.reset(); - } else { - // Check if this is the first value in the 'block' to be read - if (lazyDecompress && !valuesDecompressed) { - // Read the value lengths and values - readBuffer(valLenBuffer, valLenInFilter); - readBuffer(valBuffer, valInFilter); - noBufferedValues = noBufferedRecords; - valuesDecompressed = true; - } - - // Calculate the no. of bytes to skip - // Note: 'current' key has already been read! - int skipValBytes = 0; - int currentKey = noBufferedKeys + 1; - for (int i = noBufferedValues; i > currentKey; --i) { - skipValBytes += WritableUtils.readVInt(valLenIn); - --noBufferedValues; - } - - // Skip to the 'val' corresponding to 'current' key - if (skipValBytes > 0) { - if (valIn.skipBytes(skipValBytes) != skipValBytes) { - throw new IOException("Failed to seek to " + currentKey + - "(th) value!"); - } - } - } - } - - /** - * Get the 'value' corresponding to the last read 'key'. - * @param val : The 'value' to be read. - * @throws IOException - */ - public synchronized void getCurrentValue(Writable val) - throws IOException { - if (val instanceof Configurable) { - ((Configurable) val).setConf(this.conf); - } - - // Position stream to 'current' value - seekToCurrentValue(); - - if (!blockCompressed) { - val.readFields(valIn); - - if (valIn.read() > 0) { - LOG.info("available bytes: " + valIn.available()); - throw new IOException(val + " read " + (valBuffer.getPosition() - keyLength) - + " bytes, should read " + - (valBuffer.getLength() - keyLength)); - } - } else { - // Get the value - int valLength = WritableUtils.readVInt(valLenIn); - val.readFields(valIn); - - // Read another compressed 'value' - --noBufferedValues; - - // Sanity check - if ((valLength < 0) && LOG.isDebugEnabled()) { - LOG.debug(val + " is a zero-length value"); - } - } - - } - - /** - * Get the 'value' corresponding to the last read 'key'. - * @param val : The 'value' to be read. - * @throws IOException - */ - public synchronized Object getCurrentValue(Object val) - throws IOException { - if (val instanceof Configurable) { - ((Configurable) val).setConf(this.conf); - } - - // Position stream to 'current' value - seekToCurrentValue(); - - if (!blockCompressed) { - val = deserializeValue(val); - - if (valIn.read() > 0) { - LOG.info("available bytes: " + valIn.available()); - throw new IOException(val + " read " + (valBuffer.getPosition() - keyLength) - + " bytes, should read " + - (valBuffer.getLength() - keyLength)); - } - } else { - // Get the value - int valLength = WritableUtils.readVInt(valLenIn); - val = deserializeValue(val); - - // Read another compressed 'value' - --noBufferedValues; - - // Sanity check - if ((valLength < 0) && LOG.isDebugEnabled()) { - LOG.debug(val + " is a zero-length value"); - } - } - return val; - - } - - @SuppressWarnings("unchecked") - private Object deserializeValue(Object val) throws IOException { - return valDeserializer.deserialize(val); - } - - /** Read the next key in the file into key, skipping its - * value. True if another entry exists, and false at end of file. */ - public synchronized boolean next(Writable key) throws IOException { - if (key.getClass() != getKeyClass()) - throw new IOException("wrong key class: " + key.getClass().getName() - + " is not " + keyClass); - - if (!blockCompressed) { - outBuf.reset(); - - keyLength = next(outBuf); - if (keyLength < 0) - return false; - - valBuffer.reset(outBuf.getData(), outBuf.getLength()); - - key.readFields(valBuffer); - valBuffer.mark(0); - if (valBuffer.getPosition() != keyLength) - throw new IOException(key + " read " + valBuffer.getPosition() - + " bytes, should read " + keyLength); - } else { - //Reset syncSeen - syncSeen = false; - - if (noBufferedKeys == 0) { - try { - readBlock(); - } catch (EOFException eof) { - return false; - } - } - - int keyLength = WritableUtils.readVInt(keyLenIn); - - // Sanity check - if (keyLength < 0) { - return false; - } - - //Read another compressed 'key' - key.readFields(keyIn); - --noBufferedKeys; - } - - return true; - } - - /** Read the next key/value pair in the file into key and - * val. Returns true if such a pair exists and false when at - * end of file */ - public synchronized boolean next(Writable key, Writable val) - throws IOException { - if (val.getClass() != getValueClass()) - throw new IOException("wrong value class: " + val + " is not " + valClass); - - boolean more = next(key); - - if (more) { - getCurrentValue(val); - } - - return more; - } - - /** - * Read and return the next record length, potentially skipping over - * a sync block. - * @return the length of the next record or -1 if there is no next record - * @throws IOException - */ - private synchronized int readRecordLength() throws IOException { - if (in.getPos() >= end) { - return -1; - } - int length = in.readInt(); - if (version > 1 && sync != null && - length == SYNC_ESCAPE) { // process a sync entry - in.readFully(syncCheck); // read syncCheck - if (!Arrays.equals(sync, syncCheck)) // check it - throw new IOException("File is corrupt!"); - syncSeen = true; - if (in.getPos() >= end) { - return -1; - } - length = in.readInt(); // re-read length - } else { - syncSeen = false; - } - - return length; - } - - /** Read the next key/value pair in the file into buffer. - * Returns the length of the key read, or -1 if at end of file. The length - * of the value may be computed by calling buffer.getLength() before and - * after calls to this method. */ - /** @deprecated Call {@link #nextRaw(DataOutputBuffer, HoodieAppendLog.ValueBytes)}. */ - @Deprecated - synchronized int next(DataOutputBuffer buffer) throws IOException { - // Unsupported for block-compressed sequence files - if (blockCompressed) { - throw new IOException("Unsupported call for block-compressed" + - " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)"); - } - try { - int length = readRecordLength(); - if (length == -1) { - return -1; - } - int keyLength = in.readInt(); - buffer.write(in, length); - return keyLength; - } catch (ChecksumException e) { // checksum failure - handleChecksumException(e); - return next(buffer); - } - } - - public ValueBytes createValueBytes() { - ValueBytes val = null; - if (!decompress || blockCompressed) { - val = new UncompressedBytes(); - } else { - val = new CompressedBytes(codec); - } - return val; - } - - /** - * Read 'raw' records. - * @param key - The buffer into which the key is read - * @param val - The 'raw' value - * @return Returns the total record length or -1 for end of file - * @throws IOException - */ - public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) - throws IOException { - if (!blockCompressed) { - int length = readRecordLength(); - if (length == -1) { - return -1; - } - int keyLength = in.readInt(); - int valLength = length - keyLength; - key.write(in, keyLength); - if (decompress) { - CompressedBytes value = (CompressedBytes) val; - value.reset(in, valLength); - } else { - UncompressedBytes value = (UncompressedBytes) val; - value.reset(in, valLength); - } - - return length; - } else { - //Reset syncSeen - syncSeen = false; - - // Read 'key' - if (noBufferedKeys == 0) { - if (in.getPos() >= end) - return -1; - - try { - readBlock(); - } catch (EOFException eof) { - return -1; - } - } - int keyLength = WritableUtils.readVInt(keyLenIn); - if (keyLength < 0) { - throw new IOException("zero length key found!"); - } - key.write(keyIn, keyLength); - --noBufferedKeys; - - // Read raw 'value' - seekToCurrentValue(); - int valLength = WritableUtils.readVInt(valLenIn); - UncompressedBytes rawValue = (UncompressedBytes) val; - rawValue.reset(valIn, valLength); - --noBufferedValues; - - return (keyLength + valLength); - } - - } - - /** - * Read 'raw' keys. - * @param key - The buffer into which the key is read - * @return Returns the key length or -1 for end of file - * @throws IOException - */ - public synchronized int nextRawKey(DataOutputBuffer key) - throws IOException { - if (!blockCompressed) { - recordLength = readRecordLength(); - if (recordLength == -1) { - return -1; - } - keyLength = in.readInt(); - key.write(in, keyLength); - return keyLength; - } else { - //Reset syncSeen - syncSeen = false; - - // Read 'key' - if (noBufferedKeys == 0) { - if (in.getPos() >= end) - return -1; - - try { - readBlock(); - } catch (EOFException eof) { - return -1; - } - } - int keyLength = WritableUtils.readVInt(keyLenIn); - if (keyLength < 0) { - throw new IOException("zero length key found!"); - } - key.write(keyIn, keyLength); - --noBufferedKeys; - - return keyLength; - } - - } - - /** Read the next key in the file, skipping its - * value. Return null at end of file. */ - public synchronized Object next(Object key) throws IOException { - if (key != null && key.getClass() != getKeyClass()) { - throw new IOException("wrong key class: " + key.getClass().getName() - + " is not " + keyClass); - } - - if (!blockCompressed) { - outBuf.reset(); - - keyLength = next(outBuf); - if (keyLength < 0) - return null; - - valBuffer.reset(outBuf.getData(), outBuf.getLength()); - - key = deserializeKey(key); - valBuffer.mark(0); - if (valBuffer.getPosition() != keyLength) - throw new IOException(key + " read " + valBuffer.getPosition() - + " bytes, should read " + keyLength); - } else { - //Reset syncSeen - syncSeen = false; - - if (noBufferedKeys == 0) { - try { - readBlock(); - } catch (EOFException eof) { - return null; - } - } - - int keyLength = WritableUtils.readVInt(keyLenIn); - - // Sanity check - if (keyLength < 0) { - return null; - } - - //Read another compressed 'key' - key = deserializeKey(key); - --noBufferedKeys; - } - - return key; - } - - @SuppressWarnings("unchecked") - private Object deserializeKey(Object key) throws IOException { - return keyDeserializer.deserialize(key); - } - - /** - * Read 'raw' values. - * @param val - The 'raw' value - * @return Returns the value length - * @throws IOException - */ - public synchronized int nextRawValue(ValueBytes val) - throws IOException { - - // Position stream to current value - seekToCurrentValue(); - - if (!blockCompressed) { - int valLength = recordLength - keyLength; - if (decompress) { - CompressedBytes value = (CompressedBytes) val; - value.reset(in, valLength); - } else { - UncompressedBytes value = (UncompressedBytes) val; - value.reset(in, valLength); - } - - return valLength; - } else { - int valLength = WritableUtils.readVInt(valLenIn); - UncompressedBytes rawValue = (UncompressedBytes) val; - rawValue.reset(valIn, valLength); - --noBufferedValues; - return valLength; - } - - } - - private void handleChecksumException(ChecksumException e) - throws IOException { - if (this.conf.getBoolean("io.skip.checksum.errors", false)) { - LOG.warn("Bad checksum at " + getPosition() + ". Skipping entries."); - sync(getPosition() + this.conf.getInt("io.bytes.per.checksum", 512)); - } else { - throw e; - } - } - - /** disables sync. often invoked for tmp files */ - synchronized void ignoreSync() { - sync = null; - } - - /** Set the current byte position in the input file. - * - *

The position passed must be a position returned by {@link - * HoodieAppendLog.Writer#getLength()} when writing this file. To seek to an arbitrary - * position, use {@link HoodieAppendLog.Reader#sync(long)}. - */ - public synchronized void seek(long position) throws IOException { - in.seek(position); - if (blockCompressed) { // trigger block read - noBufferedKeys = 0; - valuesDecompressed = true; - } - } - - /** Seek to the next sync mark past a given position.*/ - public synchronized void sync(long position) throws IOException { - if (position + SYNC_SIZE >= end) { - seek(end); - return; - } - - if (position < headerEnd) { - // seek directly to first record - in.seek(headerEnd); - // note the sync marker "seen" in the header - syncSeen = true; - return; - } - - try { - seek(position + 4); // skip escape - in.readFully(syncCheck); - int syncLen = sync.length; - for (int i = 0; in.getPos() < end; i++) { - int j = 0; - for (; j < syncLen; j++) { - if (sync[j] != syncCheck[(i + j) % syncLen]) - break; - } - if (j == syncLen) { - in.seek(in.getPos() - SYNC_SIZE); // position before sync - return; - } - syncCheck[i % syncLen] = in.readByte(); - } - } catch (ChecksumException e) { // checksum failure - handleChecksumException(e); - } - } - - /** Returns true iff the previous call to next passed a sync mark.*/ - public synchronized boolean syncSeen() { - return syncSeen; - } - - /** Return the current byte position in the input file. */ - public synchronized long getPosition() throws IOException { - return in.getPos(); - } - - /** Returns the name of the file. */ - @Override - public String toString() { - return filename; - } - - } - - /** Sorts key/value pairs in a sequence-format file. - * - *

For best performance, applications should make sure that the {@link - * Writable#readFields(DataInput)} implementation of their keys is - * very efficient. In particular, it should avoid allocating memory. - */ - public static class Sorter { - - private RawComparator comparator; - - private MergeSort mergeSort; //the implementation of merge sort - - private Path[] inFiles; // when merging or sorting - - private Path outFile; - - private int memory; // bytes - private int factor; // merged per pass - - private FileSystem fs = null; - - private Class keyClass; - private Class valClass; - - private Configuration conf; - private Metadata metadata; - - private Progressable progressable = null; - - /** Sort and merge files containing the named classes. */ - public Sorter(FileSystem fs, Class keyClass, - Class valClass, Configuration conf) { - this(fs, WritableComparator.get(keyClass, conf), keyClass, valClass, conf); - } - - /** Sort and merge using an arbitrary {@link RawComparator}. */ - public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, - Class valClass, Configuration conf) { - this(fs, comparator, keyClass, valClass, conf, new Metadata()); - } - - /** Sort and merge using an arbitrary {@link RawComparator}. */ - public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, - Class valClass, Configuration conf, Metadata metadata) { - this.fs = fs; - this.comparator = comparator; - this.keyClass = keyClass; - this.valClass = valClass; - this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024; - this.factor = conf.getInt("io.sort.factor", 100); - this.conf = conf; - this.metadata = metadata; - } - - /** Set the number of streams to merge at once.*/ - public void setFactor(int factor) { - this.factor = factor; - } - - /** Get the number of streams to merge at once.*/ - public int getFactor() { - return factor; - } - - /** Set the total amount of buffer memory, in bytes.*/ - public void setMemory(int memory) { - this.memory = memory; - } - - /** Get the total amount of buffer memory, in bytes.*/ - public int getMemory() { - return memory; - } - - /** Set the progressable object in order to report progress. */ - public void setProgressable(Progressable progressable) { - this.progressable = progressable; - } - - /** - * Perform a file sort from a set of input files into an output file. - * @param inFiles the files to be sorted - * @param outFile the sorted output file - * @param deleteInput should the input files be deleted as they are read? - */ - public void sort(Path[] inFiles, Path outFile, - boolean deleteInput) throws IOException { - if (fs.exists(outFile)) { - throw new IOException("already exists: " + outFile); - } - - this.inFiles = inFiles; - this.outFile = outFile; - - int segments = sortPass(deleteInput); - if (segments > 1) { - mergePass(outFile.getParent()); - } - } - - /** - * Perform a file sort from a set of input files and return an iterator. - * @param inFiles the files to be sorted - * @param tempDir the directory where temp files are created during sort - * @param deleteInput should the input files be deleted as they are read? - * @return iterator the RawKeyValueIterator - */ - public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, - boolean deleteInput) throws IOException { - Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2"); - if (fs.exists(outFile)) { - throw new IOException("already exists: " + outFile); - } - this.inFiles = inFiles; - //outFile will basically be used as prefix for temp files in the cases - //where sort outputs multiple sorted segments. For the single segment - //case, the outputFile itself will contain the sorted data for that - //segment - this.outFile = outFile; - - int segments = sortPass(deleteInput); - if (segments > 1) - return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), - tempDir); - else if (segments == 1) - return merge(new Path[]{outFile}, true, tempDir); - else return null; - } - - /** - * The backwards compatible interface to sort. - * @param inFile the input file to sort - * @param outFile the sorted output file - */ - public void sort(Path inFile, Path outFile) throws IOException { - sort(new Path[]{inFile}, outFile, false); - } - - private int sortPass(boolean deleteInput) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("running sort pass"); - } - SortPass sortPass = new SortPass(); // make the SortPass - sortPass.setProgressable(progressable); - mergeSort = new MergeSort(sortPass.new SeqFileComparator()); - try { - return sortPass.run(deleteInput); // run it - } finally { - sortPass.close(); // close it - } - } - - private class SortPass { - private int memoryLimit = memory / 4; - private int recordLimit = 1000000; - - private DataOutputBuffer rawKeys = new DataOutputBuffer(); - private byte[] rawBuffer; - - private int[] keyOffsets = new int[1024]; - private int[] pointers = new int[keyOffsets.length]; - private int[] pointersCopy = new int[keyOffsets.length]; - private int[] keyLengths = new int[keyOffsets.length]; - private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length]; - - private ArrayList segmentLengths = new ArrayList(); - - private Reader in = null; - private FSDataOutputStream out = null; - private FSDataOutputStream indexOut = null; - private Path outName; - - private Progressable progressable = null; - - public int run(boolean deleteInput) throws IOException { - int segments = 0; - int currentFile = 0; - boolean atEof = (currentFile >= inFiles.length); - CompressionType compressionType; - CompressionCodec codec = null; - segmentLengths.clear(); - if (atEof) { - return 0; - } - - // Initialize - in = new Reader(fs, inFiles[currentFile], conf); - compressionType = in.getCompressionType(); - codec = in.getCompressionCodec(); - - for (int i = 0; i < rawValues.length; ++i) { - rawValues[i] = null; - } - - while (!atEof) { - int count = 0; - int bytesProcessed = 0; - rawKeys.reset(); - while (!atEof && - bytesProcessed < memoryLimit && count < recordLimit) { - - // Read a record into buffer - // Note: Attempt to re-use 'rawValue' as far as possible - int keyOffset = rawKeys.getLength(); - ValueBytes rawValue = - (count == keyOffsets.length || rawValues[count] == null) ? - in.createValueBytes() : - rawValues[count]; - int recordLength = in.nextRaw(rawKeys, rawValue); - if (recordLength == -1) { - in.close(); - if (deleteInput) { - fs.delete(inFiles[currentFile], true); - } - currentFile += 1; - atEof = currentFile >= inFiles.length; - if (!atEof) { - in = new Reader(fs, inFiles[currentFile], conf); - } else { - in = null; - } - continue; - } - - int keyLength = rawKeys.getLength() - keyOffset; - - if (count == keyOffsets.length) - grow(); - - keyOffsets[count] = keyOffset; // update pointers - pointers[count] = count; - keyLengths[count] = keyLength; - rawValues[count] = rawValue; - - bytesProcessed += recordLength; - count++; - } - - // buffer is full -- sort & flush it - if (LOG.isDebugEnabled()) { - LOG.debug("flushing segment " + segments); - } - rawBuffer = rawKeys.getData(); - sort(count); - // indicate we're making progress - if (progressable != null) { - progressable.progress(); - } - flush(count, bytesProcessed, compressionType, codec, - segments == 0 && atEof); - segments++; - } - return segments; - } - - public void close() throws IOException { - if (in != null) { - in.close(); - } - if (out != null) { - out.close(); - } - if (indexOut != null) { - indexOut.close(); - } - } - - private void grow() { - int newLength = keyOffsets.length * 3 / 2; - keyOffsets = grow(keyOffsets, newLength); - pointers = grow(pointers, newLength); - pointersCopy = new int[newLength]; - keyLengths = grow(keyLengths, newLength); - rawValues = grow(rawValues, newLength); - } - - private int[] grow(int[] old, int newLength) { - int[] result = new int[newLength]; - System.arraycopy(old, 0, result, 0, old.length); - return result; - } - - private ValueBytes[] grow(ValueBytes[] old, int newLength) { - ValueBytes[] result = new ValueBytes[newLength]; - System.arraycopy(old, 0, result, 0, old.length); - for (int i = old.length; i < newLength; ++i) { - result[i] = null; - } - return result; - } - - private void flush(int count, int bytesProcessed, - CompressionType compressionType, - CompressionCodec codec, - boolean done) throws IOException { - if (out == null) { - outName = done ? outFile : outFile.suffix(".0"); - out = fs.create(outName); - if (!done) { - indexOut = fs.create(outName.suffix(".index")); - } - } - - long segmentStart = out.getPos(); - Writer writer = createWriter(conf, Writer.stream(out), - Writer.keyClass(keyClass), Writer.valueClass(valClass), - Writer.compression(compressionType, codec), - Writer.metadata(done ? metadata : new Metadata())); - - if (!done) { - writer.sync = null; // disable sync on temp files - } - - for (int i = 0; i < count; i++) { // write in sorted order - int p = pointers[i]; - writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]); - } - writer.close(); - - if (!done) { - // Save the segment length - WritableUtils.writeVLong(indexOut, segmentStart); - WritableUtils.writeVLong(indexOut, (out.getPos() - segmentStart)); - indexOut.flush(); - } - } - - private void sort(int count) { - System.arraycopy(pointers, 0, pointersCopy, 0, count); - mergeSort.mergeSort(pointersCopy, pointers, 0, count); - } - - class SeqFileComparator implements Comparator { - @Override - public int compare(IntWritable I, IntWritable J) { - return comparator.compare(rawBuffer, keyOffsets[I.get()], - keyLengths[I.get()], rawBuffer, - keyOffsets[J.get()], keyLengths[J.get()]); - } - } - - /** set the progressable object in order to report progress */ - public void setProgressable(Progressable progressable) { - this.progressable = progressable; - } - - } // SequenceFile.Sorter.SortPass - - /** The interface to iterate over raw keys/values of SequenceFiles. */ - public static interface RawKeyValueIterator { - /** Gets the current raw key - * @return DataOutputBuffer - * @throws IOException - */ - DataOutputBuffer getKey() throws IOException; - - /** Gets the current raw value - * @return ValueBytes - * @throws IOException - */ - ValueBytes getValue() throws IOException; - - /** Sets up the current key and value (for getKey and getValue) - * @return true if there exists a key/value, false otherwise - * @throws IOException - */ - boolean next() throws IOException; - - /** closes the iterator so that the underlying streams can be closed - * @throws IOException - */ - void close() throws IOException; - - /** Gets the Progress object; this has a float (0.0 - 1.0) - * indicating the bytes processed by the iterator so far - */ - Progress getProgress(); - } - - /** - * Merges the list of segments of type SegmentDescriptor - * @param segments the list of SegmentDescriptors - * @param tmpDir the directory to write temporary files into - * @return RawKeyValueIterator - * @throws IOException - */ - public RawKeyValueIterator merge(List segments, - Path tmpDir) - throws IOException { - // pass in object to report progress, if present - MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable); - return mQueue.merge(); - } - - /** - * Merges the contents of files passed in Path[] using a max factor value - * that is already set - * @param inNames the array of path names - * @param deleteInputs true if the input files should be deleted when - * unnecessary - * @param tmpDir the directory to write temporary files into - * @return RawKeyValueIteratorMergeQueue - * @throws IOException - */ - public RawKeyValueIterator merge(Path[] inNames, boolean deleteInputs, - Path tmpDir) - throws IOException { - return merge(inNames, deleteInputs, - (inNames.length < factor) ? inNames.length : factor, - tmpDir); - } - - /** - * Merges the contents of files passed in Path[] - * @param inNames the array of path names - * @param deleteInputs true if the input files should be deleted when - * unnecessary - * @param factor the factor that will be used as the maximum merge fan-in - * @param tmpDir the directory to write temporary files into - * @return RawKeyValueIteratorMergeQueue - * @throws IOException - */ - public RawKeyValueIterator merge(Path[] inNames, boolean deleteInputs, - int factor, Path tmpDir) - throws IOException { - //get the segments from inNames - ArrayList a = new ArrayList(); - for (int i = 0; i < inNames.length; i++) { - SegmentDescriptor s = new SegmentDescriptor(0, - fs.getFileStatus(inNames[i]).getLen(), inNames[i]); - s.preserveInput(!deleteInputs); - s.doSync(); - a.add(s); - } - this.factor = factor; - MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable); - return mQueue.merge(); - } - - /** - * Merges the contents of files passed in Path[] - * @param inNames the array of path names - * @param tempDir the directory for creating temp files during merge - * @param deleteInputs true if the input files should be deleted when - * unnecessary - * @return RawKeyValueIteratorMergeQueue - * @throws IOException - */ - public RawKeyValueIterator merge(Path[] inNames, Path tempDir, - boolean deleteInputs) - throws IOException { - //outFile will basically be used as prefix for temp files for the - //intermediate merge outputs - this.outFile = new Path(tempDir + Path.SEPARATOR + "merged"); - //get the segments from inNames - ArrayList a = new ArrayList(); - for (int i = 0; i < inNames.length; i++) { - SegmentDescriptor s = new SegmentDescriptor(0, - fs.getFileStatus(inNames[i]).getLen(), inNames[i]); - s.preserveInput(!deleteInputs); - s.doSync(); - a.add(s); - } - factor = (inNames.length < factor) ? inNames.length : factor; - // pass in object to report progress, if present - MergeQueue mQueue = new MergeQueue(a, tempDir, progressable); - return mQueue.merge(); - } - - /** - * Clones the attributes (like compression of the input file and creates a - * corresponding Writer - * @param inputFile the path of the input file whose attributes should be - * cloned - * @param outputFile the path of the output file - * @param prog the Progressable to report status during the file write - * @return Writer - * @throws IOException - */ - public Writer cloneFileAttributes(Path inputFile, Path outputFile, - Progressable prog) throws IOException { - Reader reader = new Reader(conf, - Reader.file(inputFile), - new Reader.OnlyHeaderOption()); - CompressionType compress = reader.getCompressionType(); - CompressionCodec codec = reader.getCompressionCodec(); - reader.close(); - - Writer writer = createWriter(conf, - Writer.file(outputFile), - Writer.keyClass(keyClass), - Writer.valueClass(valClass), - Writer.compression(compress, codec), - Writer.progressable(prog)); - return writer; - } - - /** - * Writes records from RawKeyValueIterator into a file represented by the - * passed writer - * @param records the RawKeyValueIterator - * @param writer the Writer created earlier - * @throws IOException - */ - public void writeFile(RawKeyValueIterator records, Writer writer) - throws IOException { - while (records.next()) { - writer.appendRaw(records.getKey().getData(), 0, - records.getKey().getLength(), records.getValue()); - } - writer.sync(); - } - - /** Merge the provided files. - * @param inFiles the array of input path names - * @param outFile the final output file - * @throws IOException - */ - public void merge(Path[] inFiles, Path outFile) throws IOException { - if (fs.exists(outFile)) { - throw new IOException("already exists: " + outFile); - } - RawKeyValueIterator r = merge(inFiles, false, outFile.getParent()); - Writer writer = cloneFileAttributes(inFiles[0], outFile, null); - - writeFile(r, writer); - - writer.close(); - } - - /** sort calls this to generate the final merged output */ - private int mergePass(Path tmpDir) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("running merge pass"); - } - Writer writer = cloneFileAttributes( - outFile.suffix(".0"), outFile, null); - RawKeyValueIterator r = merge(outFile.suffix(".0"), - outFile.suffix(".0.index"), tmpDir); - writeFile(r, writer); - - writer.close(); - return 0; - } - - /** Used by mergePass to merge the output of the sort - * @param inName the name of the input file containing sorted segments - * @param indexIn the offsets of the sorted segments - * @param tmpDir the relative directory to store intermediate results in - * @return RawKeyValueIterator - * @throws IOException - */ - private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) - throws IOException { - //get the segments from indexIn - //we create a SegmentContainer so that we can track segments belonging to - //inName and delete inName as soon as we see that we have looked at all - //the contained segments during the merge process & hence don't need - //them anymore - SegmentContainer container = new SegmentContainer(inName, indexIn); - MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable); - return mQueue.merge(); - } - - /** This class implements the core of the merge logic */ - private class MergeQueue extends PriorityQueue - implements RawKeyValueIterator { - private boolean compress; - private boolean blockCompress; - private DataOutputBuffer rawKey = new DataOutputBuffer(); - private ValueBytes rawValue; - private long totalBytesProcessed; - private float progPerByte; - private Progress mergeProgress = new Progress(); - private Path tmpDir; - private Progressable progress = null; //handle to the progress reporting object - private SegmentDescriptor minSegment; - - //a TreeMap used to store the segments sorted by size (segment offset and - //segment path name is used to break ties between segments of same sizes) - private Map sortedSegmentSizes = - new TreeMap(); - - @SuppressWarnings("unchecked") - public void put(SegmentDescriptor stream) throws IOException { - if (size() == 0) { - compress = stream.in.isCompressed(); - blockCompress = stream.in.isBlockCompressed(); - } else if (compress != stream.in.isCompressed() || - blockCompress != stream.in.isBlockCompressed()) { - throw new IOException("All merged files must be compressed or not."); - } - super.put(stream); - } - - /** - * A queue of file segments to merge - * @param segments the file segments to merge - * @param tmpDir a relative local directory to save intermediate files in - * @param progress the reference to the Progressable object - */ - public MergeQueue(List segments, - Path tmpDir, Progressable progress) { - int size = segments.size(); - for (int i = 0; i < size; i++) { - sortedSegmentSizes.put(segments.get(i), null); - } - this.tmpDir = tmpDir; - this.progress = progress; - } - - @Override - protected boolean lessThan(Object a, Object b) { - // indicate we're making progress - if (progress != null) { - progress.progress(); - } - SegmentDescriptor msa = (SegmentDescriptor) a; - SegmentDescriptor msb = (SegmentDescriptor) b; - return comparator.compare(msa.getKey().getData(), 0, - msa.getKey().getLength(), msb.getKey().getData(), 0, - msb.getKey().getLength()) < 0; - } - - @Override - public void close() throws IOException { - SegmentDescriptor ms; // close inputs - while ((ms = (SegmentDescriptor) pop()) != null) { - ms.cleanup(); - } - minSegment = null; - } - - @Override - public DataOutputBuffer getKey() throws IOException { - return rawKey; - } - - @Override - public ValueBytes getValue() throws IOException { - return rawValue; - } - - @Override - public boolean next() throws IOException { - if (size() == 0) - return false; - if (minSegment != null) { - //minSegment is non-null for all invocations of next except the first - //one. For the first invocation, the priority queue is ready for use - //but for the subsequent invocations, first adjust the queue - adjustPriorityQueue(minSegment); - if (size() == 0) { - minSegment = null; - return false; - } - } - minSegment = (SegmentDescriptor) top(); - long startPos = minSegment.in.getPosition(); // Current position in stream - //save the raw key reference - rawKey = minSegment.getKey(); - //load the raw value. Re-use the existing rawValue buffer - if (rawValue == null) { - rawValue = minSegment.in.createValueBytes(); - } - minSegment.nextRawValue(rawValue); - long endPos = minSegment.in.getPosition(); // End position after reading value - updateProgress(endPos - startPos); - return true; - } - - @Override - public Progress getProgress() { - return mergeProgress; - } - - private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException { - long startPos = ms.in.getPosition(); // Current position in stream - boolean hasNext = ms.nextRawKey(); - long endPos = ms.in.getPosition(); // End position after reading key - updateProgress(endPos - startPos); - if (hasNext) { - adjustTop(); - } else { - pop(); - ms.cleanup(); - } - } - - private void updateProgress(long bytesProcessed) { - totalBytesProcessed += bytesProcessed; - if (progPerByte > 0) { - mergeProgress.set(totalBytesProcessed * progPerByte); - } - } - - /** This is the single level merge that is called multiple times - * depending on the factor size and the number of segments - * @return RawKeyValueIterator - * @throws IOException - */ - public RawKeyValueIterator merge() throws IOException { - //create the MergeStreams from the sorted map created in the constructor - //and dump the final output to a file - int numSegments = sortedSegmentSizes.size(); - int origFactor = factor; - int passNo = 1; - LocalDirAllocator lDirAlloc = new LocalDirAllocator("io.seqfile.local.dir"); - do { - //get the factor for this pass of merge - factor = getPassFactor(passNo, numSegments); - List segmentsToMerge = - new ArrayList(); - int segmentsConsidered = 0; - int numSegmentsToConsider = factor; - while (true) { - //extract the smallest 'factor' number of segment pointers from the - //TreeMap. Call cleanup on the empty segments (no key/value data) - SegmentDescriptor[] mStream = - getSegmentDescriptors(numSegmentsToConsider); - for (int i = 0; i < mStream.length; i++) { - if (mStream[i].nextRawKey()) { - segmentsToMerge.add(mStream[i]); - segmentsConsidered++; - // Count the fact that we read some bytes in calling nextRawKey() - updateProgress(mStream[i].in.getPosition()); - } else { - mStream[i].cleanup(); - numSegments--; //we ignore this segment for the merge - } - } - //if we have the desired number of segments - //or looked at all available segments, we break - if (segmentsConsidered == factor || - sortedSegmentSizes.size() == 0) { - break; - } - - numSegmentsToConsider = factor - segmentsConsidered; - } - //feed the streams to the priority queue - initialize(segmentsToMerge.size()); - clear(); - for (int i = 0; i < segmentsToMerge.size(); i++) { - put(segmentsToMerge.get(i)); - } - //if we have lesser number of segments remaining, then just return the - //iterator, else do another single level merge - if (numSegments <= factor) { - //calculate the length of the remaining segments. Required for - //calculating the merge progress - long totalBytes = 0; - for (int i = 0; i < segmentsToMerge.size(); i++) { - totalBytes += segmentsToMerge.get(i).segmentLength; - } - if (totalBytes != 0) //being paranoid - progPerByte = 1.0f / (float) totalBytes; - //reset factor to what it originally was - factor = origFactor; - return this; - } else { - //we want to spread the creation of temp files on multiple disks if - //available under the space constraints - long approxOutputSize = 0; - for (SegmentDescriptor s : segmentsToMerge) { - approxOutputSize += s.segmentLength + - ChecksumFileSystem.getApproxChkSumLength( - s.segmentLength); - } - Path tmpFilename = - new Path(tmpDir, "intermediate").suffix("." + passNo); - - Path outputFile = lDirAlloc.getLocalPathForWrite( - tmpFilename.toString(), - approxOutputSize, conf); - if (LOG.isDebugEnabled()) { - LOG.debug("writing intermediate results to " + outputFile); - } - Writer writer = cloneFileAttributes( - fs.makeQualified(segmentsToMerge.get(0).segmentPathName), - fs.makeQualified(outputFile), null); - writer.sync = null; //disable sync for temp files - writeFile(this, writer); - writer.close(); - - //we finished one single level merge; now clean up the priority - //queue - this.close(); - - SegmentDescriptor tempSegment = - new SegmentDescriptor(0, - fs.getFileStatus(outputFile).getLen(), outputFile); - //put the segment back in the TreeMap - sortedSegmentSizes.put(tempSegment, null); - numSegments = sortedSegmentSizes.size(); - passNo++; - } - //we are worried about only the first pass merge factor. So reset the - //factor to what it originally was - factor = origFactor; - } while (true); - } - - //Hadoop-591 - public int getPassFactor(int passNo, int numSegments) { - if (passNo > 1 || numSegments <= factor || factor == 1) - return factor; - int mod = (numSegments - 1) % (factor - 1); - if (mod == 0) - return factor; - return mod + 1; - } - - /** Return (& remove) the requested number of segment descriptors from the - * sorted map. - */ - public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) { - if (numDescriptors > sortedSegmentSizes.size()) - numDescriptors = sortedSegmentSizes.size(); - SegmentDescriptor[] SegmentDescriptors = - new SegmentDescriptor[numDescriptors]; - Iterator iter = sortedSegmentSizes.keySet().iterator(); - int i = 0; - while (i < numDescriptors) { - SegmentDescriptors[i++] = (SegmentDescriptor) iter.next(); - iter.remove(); - } - return SegmentDescriptors; - } - } // SequenceFile.Sorter.MergeQueue - - /** This class defines a merge segment. This class can be subclassed to - * provide a customized cleanup method implementation. In this - * implementation, cleanup closes the file handle and deletes the file - */ - public class SegmentDescriptor implements Comparable { - - long segmentOffset; //the start of the segment in the file - long segmentLength; //the length of the segment - Path segmentPathName; //the path name of the file containing the segment - boolean ignoreSync = true; //set to true for temp files - private Reader in = null; - private DataOutputBuffer rawKey = null; //this will hold the current key - private boolean preserveInput = false; //delete input segment files? - - /** Constructs a segment - * @param segmentOffset the offset of the segment in the file - * @param segmentLength the length of the segment - * @param segmentPathName the path name of the file containing the segment - */ - public SegmentDescriptor(long segmentOffset, long segmentLength, - Path segmentPathName) { - this.segmentOffset = segmentOffset; - this.segmentLength = segmentLength; - this.segmentPathName = segmentPathName; - } - - /** Do the sync checks */ - public void doSync() { - ignoreSync = false; - } - - /** Whether to delete the files when no longer needed */ - public void preserveInput(boolean preserve) { - preserveInput = preserve; - } - - public boolean shouldPreserveInput() { - return preserveInput; - } - - @Override - public int compareTo(Object o) { - SegmentDescriptor that = (SegmentDescriptor) o; - if (this.segmentLength != that.segmentLength) { - return (this.segmentLength < that.segmentLength ? -1 : 1); - } - if (this.segmentOffset != that.segmentOffset) { - return (this.segmentOffset < that.segmentOffset ? -1 : 1); - } - return (this.segmentPathName.toString()). - compareTo(that.segmentPathName.toString()); - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof SegmentDescriptor)) { - return false; - } - SegmentDescriptor that = (SegmentDescriptor) o; - if (this.segmentLength == that.segmentLength && - this.segmentOffset == that.segmentOffset && - this.segmentPathName.toString().equals( - that.segmentPathName.toString())) { - return true; - } - return false; - } - - @Override - public int hashCode() { - return 37 * 17 + (int) (segmentOffset ^ (segmentOffset >>> 32)); - } - - /** Fills up the rawKey object with the key returned by the Reader - * @return true if there is a key returned; false, otherwise - * @throws IOException - */ - public boolean nextRawKey() throws IOException { - if (in == null) { - int bufferSize = getBufferSize(conf); - Reader reader = new Reader(conf, - Reader.file(segmentPathName), - Reader.bufferSize(bufferSize), - Reader.start(segmentOffset), - Reader.length(segmentLength)); - - //sometimes we ignore syncs especially for temp merge files - if (ignoreSync) reader.ignoreSync(); - - if (reader.getKeyClass() != keyClass) - throw new IOException("wrong key class: " + reader.getKeyClass() + - " is not " + keyClass); - if (reader.getValueClass() != valClass) - throw new IOException("wrong value class: " + reader.getValueClass() + - " is not " + valClass); - this.in = reader; - rawKey = new DataOutputBuffer(); - } - rawKey.reset(); - int keyLength = - in.nextRawKey(rawKey); - return (keyLength >= 0); - } - - /** Fills up the passed rawValue with the value corresponding to the key - * read earlier - * @param rawValue - * @return the length of the value - * @throws IOException - */ - public int nextRawValue(ValueBytes rawValue) throws IOException { - int valLength = in.nextRawValue(rawValue); - return valLength; - } - - /** Returns the stored rawKey */ - public DataOutputBuffer getKey() { - return rawKey; - } - - /** closes the underlying reader */ - private void close() throws IOException { - this.in.close(); - this.in = null; - } - - /** The default cleanup. Subclasses can override this with a custom - * cleanup - */ - public void cleanup() throws IOException { - close(); - if (!preserveInput) { - fs.delete(segmentPathName, true); - } - } - } // SequenceFile.Sorter.SegmentDescriptor - - /** This class provisions multiple segments contained within a single - * file - */ - private class LinkedSegmentsDescriptor extends SegmentDescriptor { - - SegmentContainer parentContainer = null; - - /** Constructs a segment - * @param segmentOffset the offset of the segment in the file - * @param segmentLength the length of the segment - * @param segmentPathName the path name of the file containing the segment - * @param parent the parent SegmentContainer that holds the segment - */ - public LinkedSegmentsDescriptor(long segmentOffset, long segmentLength, - Path segmentPathName, SegmentContainer parent) { - super(segmentOffset, segmentLength, segmentPathName); - this.parentContainer = parent; - } - - /** The default cleanup. Subclasses can override this with a custom - * cleanup - */ - @Override - public void cleanup() throws IOException { - super.close(); - if (super.shouldPreserveInput()) return; - parentContainer.cleanup(); - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof LinkedSegmentsDescriptor)) { - return false; - } - return super.equals(o); - } - } //SequenceFile.Sorter.LinkedSegmentsDescriptor - - /** The class that defines a container for segments to be merged. Primarily - * required to delete temp files as soon as all the contained segments - * have been looked at */ - private class SegmentContainer { - private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups - private int numSegmentsContained; //# of segments contained - private Path inName; //input file from where segments are created - - //the list of segments read from the file - private ArrayList segments = - new ArrayList(); - - /** This constructor is there primarily to serve the sort routine that - * generates a single output file with an associated index file */ - public SegmentContainer(Path inName, Path indexIn) throws IOException { - //get the segments from indexIn - FSDataInputStream fsIndexIn = fs.open(indexIn); - long end = fs.getFileStatus(indexIn).getLen(); - while (fsIndexIn.getPos() < end) { - long segmentOffset = WritableUtils.readVLong(fsIndexIn); - long segmentLength = WritableUtils.readVLong(fsIndexIn); - Path segmentName = inName; - segments.add(new LinkedSegmentsDescriptor(segmentOffset, - segmentLength, segmentName, this)); - } - fsIndexIn.close(); - fs.delete(indexIn, true); - numSegmentsContained = segments.size(); - this.inName = inName; - } - - public List getSegmentList() { - return segments; - } - - public void cleanup() throws IOException { - numSegmentsCleanedUp++; - if (numSegmentsCleanedUp == numSegmentsContained) { - fs.delete(inName, true); - } - } - } //SequenceFile.Sorter.SegmentContainer - - } // SequenceFile.Sorter - -} // SequenceFile diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/ActionType.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/ActionType.java new file mode 100644 index 000000000..5e8a8c2a4 --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/ActionType.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.model; + +public enum ActionType { + commit, savepoint, compaction, clean, rollback; +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieArchivedLogFile.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieArchivedLogFile.java new file mode 100644 index 000000000..100cd8382 --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieArchivedLogFile.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.common.model; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + +public class HoodieArchivedLogFile extends HoodieLogFile { + + public static final String ARCHIVE_EXTENSION = ".archive"; + + public HoodieArchivedLogFile(FileStatus fileStatus) { + super(fileStatus); + } + + public HoodieArchivedLogFile(Path logPath) { + super(logPath); + } + + @Override + public String toString() { + return "HoodieArchivedLogFile {" + super.getPath() + '}'; + } +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java index 28d884754..19787ebed 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java @@ -41,9 +41,9 @@ import java.util.Map; @JsonIgnoreProperties(ignoreUnknown = true) public class HoodieCommitMetadata implements Serializable { private static volatile Logger log = LogManager.getLogger(HoodieCommitMetadata.class); - protected HashMap> partitionToWriteStats; + protected Map> partitionToWriteStats; - private HashMap extraMetadataMap; + private Map extraMetadataMap; public HoodieCommitMetadata() { extraMetadataMap = new HashMap<>(); @@ -65,7 +65,9 @@ public class HoodieCommitMetadata implements Serializable { return partitionToWriteStats.get(partitionPath); } - public HashMap> getPartitionToWriteStats() { + public Map getExtraMetadata() { return extraMetadataMap; } + + public Map> getPartitionToWriteStats() { return partitionToWriteStats; } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java index c59512bfb..fd4f7d806 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java @@ -83,11 +83,12 @@ public class HoodieLogFile { public HoodieLogFile rollOver(FileSystem fs) throws IOException { String fileId = getFileId(); String baseCommitTime = getBaseCommitTime(); + String extension = "." + FSUtils.getFileExtensionFromLog(path); int newVersion = FSUtils .computeNextLogVersion(fs, path.getParent(), fileId, - DELTA_EXTENSION, baseCommitTime); + extension, baseCommitTime); return new HoodieLogFile(new Path(path.getParent(), - FSUtils.makeLogFileName(fileId, DELTA_EXTENSION, baseCommitTime, newVersion))); + FSUtils.makeLogFileName(fileId, extension, baseCommitTime, newVersion))); } public static Comparator getLogVersionComparator() { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java index b662fe9ab..2a5b46300 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java @@ -129,6 +129,15 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, COMPACTION_ACTION, DELTA_COMMIT_ACTION)); } + /** + * Get all instants (commits, delta commits, compactions, clean, savepoint, rollback) that result in actions, in the active timeline + ** + * @return + */ + public HoodieTimeline getAllCommitsTimeline() { + return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, COMPACTION_ACTION, DELTA_COMMIT_ACTION, CLEAN_ACTION, SAVEPOINT_ACTION, ROLLBACK_ACTION)); + } + /** * Get only pure commits (inflight and completed) in the active timeline * diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieArchivedTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieArchivedTimeline.java index 2835fe0e1..458cf6eb8 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieArchivedTimeline.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieArchivedTimeline.java @@ -45,7 +45,7 @@ import java.util.stream.Collectors; * This class can be serialized and de-serialized and on de-serialization the FileSystem is re-initialized. */ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { - private static final String HOODIE_COMMIT_ARCHIVE_LOG_FILE = "commits.archived"; + private static final String HOODIE_COMMIT_ARCHIVE_LOG_FILE = "commits"; private transient FileSystem fs; private String metaPath; private Map readCommits = new HashMap<>(); diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java index 640f68029..f1380e07e 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java @@ -22,17 +22,21 @@ import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.uber.hoodie.avro.model.HoodieCleanMetadata; +import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.table.HoodieTableConfig; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.log.HoodieLogFormat; import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer; import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; +import com.uber.hoodie.common.util.AvroUtils; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; @@ -46,11 +50,15 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; +import java.util.Random; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -63,6 +71,8 @@ public class HoodieTestUtils { public static final String TEST_EXTENSION = ".test"; public static final String RAW_TRIPS_TEST_NAME = "raw_trips"; public static final int DEFAULT_TASK_PARTITIONID = 1; + public static final String[] DEFAULT_PARTITION_PATHS = {"2016/03/15", "2015/03/16", "2015/03/17"}; + private static Random rand = new Random(46474747); public static void resetFS() { HoodieTestUtils.fs = FSUtils.getFs(); @@ -139,6 +149,26 @@ public class HoodieTestUtils { return instant + TEST_EXTENSION + HoodieTimeline.INFLIGHT_EXTENSION; } + public static void createCleanFiles(String basePath, String commitTime) throws IOException { + Path commitFile = + new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeCleanerFileName(commitTime)); + FileSystem fs = FSUtils.getFs(); + FSDataOutputStream os = fs.create(commitFile, true); + try { + HoodieCleanStat cleanStats = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, + DEFAULT_PARTITION_PATHS[rand.nextInt(DEFAULT_PARTITION_PATHS.length)], + new ArrayList<>(), new ArrayList<>(), + new ArrayList<>(), commitTime); + // Create the clean metadata + HoodieCleanMetadata cleanMetadata = + AvroUtils.convertCleanMetadata(commitTime, Optional.of(0L), Arrays.asList(cleanStats)); + // Write empty clean metadata + os.write(AvroUtils.serializeCleanMetadata(cleanMetadata).get()); + } finally { + os.close(); + } + } + public static String makeTestFileName(String instant) { return instant + TEST_EXTENSION; } diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java index 8c2036270..3c23c7ef0 100644 --- a/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java +++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java @@ -47,6 +47,7 @@ import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.UUID; @@ -213,7 +214,7 @@ public class TestUtil { } private static HoodieCommitMetadata createLogFiles( - HashMap> partitionWriteStats, boolean isLogSchemaSimple) + Map> partitionWriteStats, boolean isLogSchemaSimple) throws InterruptedException, IOException, URISyntaxException { HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); for (Entry> wEntry : partitionWriteStats.entrySet()) {