From cf7f7aabb97e4bbcfbfb84ec79f56458a731f582 Mon Sep 17 00:00:00 2001 From: vinothchandar Date: Wed, 3 Jan 2018 04:32:21 -0800 Subject: [PATCH] Nicer handling of timeline archival for Cloud storage - When append() is not supported, rollover to new file always (instead of failing) - Provide way to configure archive log folder (avoids small files inside .hoodie) - Datasets written via Spark datasource archive to .hoodie/archived - HoodieClientExample will now retain only 2,3 commits to exercise archival path during dev cycles - Few tweaks to code structure around CommitArchiveLog --- .../com/uber/hoodie/HoodieWriteClient.java | 4 +- .../hoodie/io/HoodieCommitArchiveLog.java | 27 +++------- .../src/test/java/HoodieClientExample.java | 6 ++- .../hoodie/io/TestHoodieCommitArchiveLog.java | 54 +++++++++---------- .../common/table/HoodieTableConfig.java | 11 ++++ .../common/table/HoodieTableMetaClient.java | 24 +++++++++ .../table/log/HoodieLogFormatWriter.java | 11 +++- .../timeline/HoodieArchivedTimeline.java | 6 +-- .../table/HoodieTableMetaClientTest.java | 2 +- .../common/table/log/HoodieLogFormatTest.java | 30 +++++++++++ .../HoodieRealtimeRecordReaderTest.java | 1 + .../scala/com/uber/hoodie/DefaultSource.scala | 1 + 12 files changed, 121 insertions(+), 56 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index cfd20b4e1..6b0adf67f 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -91,7 +91,6 @@ public class HoodieWriteClient implements Seriali private final HoodieWriteConfig config; private transient final HoodieMetrics metrics; private transient final HoodieIndex index; - private transient final HoodieCommitArchiveLog archiveLog; private transient Timer.Context writeContext = null; /** @@ -116,7 +115,6 @@ public class HoodieWriteClient implements Seriali this.config = clientConfig; this.index = HoodieIndex.createIndex(config, jsc); this.metrics = new HoodieMetrics(config, config.getTableName()); - this.archiveLog = new HoodieCommitArchiveLog(clientConfig, fs); if (rollbackInFlight) { rollbackInflightCommits(); @@ -446,6 +444,8 @@ public class HoodieWriteClient implements Seriali } // We cannot have unbounded commit files. Archive commits if we have to archive + HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(config, + new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true)); archiveLog.archiveIfRequired(); if (config.isAutoClean()) { // Call clean to cleanup if there is anything to cleanup after the commit, diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java index 7704517c6..bb295b64d 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java @@ -40,7 +40,6 @@ import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.table.HoodieTable; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -60,15 +59,14 @@ public class HoodieCommitArchiveLog { private static Logger log = LogManager.getLogger(HoodieCommitArchiveLog.class); private final Path archiveFilePath; - private final FileSystem fs; + private final HoodieTableMetaClient metaClient; private final HoodieWriteConfig config; private HoodieLogFormat.Writer writer; - public HoodieCommitArchiveLog(HoodieWriteConfig config, FileSystem fs) { - this.fs = fs; + public HoodieCommitArchiveLog(HoodieWriteConfig config, HoodieTableMetaClient metaClient) { this.config = config; - this.archiveFilePath = HoodieArchivedTimeline - .getArchiveLogPath(config.getBasePath() + "/" + HoodieTableMetaClient.METAFOLDER_NAME); + this.metaClient = metaClient; + this.archiveFilePath = HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath()); } private HoodieLogFormat.Writer openWriter() { @@ -78,7 +76,7 @@ public class HoodieCommitArchiveLog { .onParentPath(archiveFilePath.getParent()) .withFileId(archiveFilePath.getName()) .withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION) - .withFs(fs) + .withFs(metaClient.getFs()) .overBaseCommit("").build(); } else { return this.writer; @@ -125,9 +123,7 @@ public class HoodieCommitArchiveLog { int maxCommitsToKeep = config.getMaxCommitsToKeep(); int minCommitsToKeep = config.getMinCommitsToKeep(); - HoodieTable table = HoodieTable - .getHoodieTable(new HoodieTableMetaClient(fs.getConf(), config.getBasePath(), true), - config); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config); // GroupBy each action and limit each action timeline to maxCommitsToKeep HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline() @@ -165,16 +161,13 @@ public class HoodieCommitArchiveLog { private boolean deleteArchivedInstants(List archivedInstants) { log.info("Deleting instants " + archivedInstants); - HoodieTableMetaClient metaClient = - new HoodieTableMetaClient(fs.getConf(), config.getBasePath(), true); - boolean success = true; for (HoodieInstant archivedInstant : archivedInstants) { Path commitFile = new Path(metaClient.getMetaPath(), archivedInstant.getFileName()); try { - if (fs.exists(commitFile)) { - success &= fs.delete(commitFile, false); + if (metaClient.getFs().exists(commitFile)) { + success &= metaClient.getFs().delete(commitFile, false); log.info("Archived and deleted instant file " + commitFile); } } catch (IOException e) { @@ -186,13 +179,9 @@ public class HoodieCommitArchiveLog { } public void archive(List instants) throws HoodieCommitException { - try { - HoodieTableMetaClient metaClient = - new HoodieTableMetaClient(fs.getConf(), config.getBasePath(), true); HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants(); - Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema(); log.info("Wrapper schema " + wrapperSchema.toString()); List records = new ArrayList<>(); diff --git a/hoodie-client/src/test/java/HoodieClientExample.java b/hoodie-client/src/test/java/HoodieClientExample.java index f1fc056e1..14c1e6eac 100644 --- a/hoodie-client/src/test/java/HoodieClientExample.java +++ b/hoodie-client/src/test/java/HoodieClientExample.java @@ -24,9 +24,10 @@ import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieWriteConfig; -import com.uber.hoodie.index.HoodieIndex; +import com.uber.hoodie.index.HoodieIndex.IndexType; import java.util.List; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -92,7 +93,8 @@ public class HoodieClientExample { HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .forTable(tableName).withIndexConfig( - HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) + HoodieIndexConfig.newBuilder().withIndexType(IndexType.BLOOM).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 3).build()) .build(); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java index 430a0a591..5d8362688 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java @@ -63,7 +63,8 @@ public class TestHoodieCommitArchiveLog { HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .forTable("test-trip-table").build(); - HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, fs); + HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, + new HoodieTableMetaClient(fs.getConf(), cfg.getBasePath(), true)); boolean result = archiveLog.archiveIfRequired(); assertTrue(result); } @@ -82,9 +83,9 @@ public class TestHoodieCommitArchiveLog { HoodieTestDataGenerator.createCommitFile(basePath, "104"); HoodieTestDataGenerator.createCommitFile(basePath, "105"); - HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs.getConf(), basePath); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath); HoodieTimeline timeline = - metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants()); @@ -96,18 +97,19 @@ public class TestHoodieCommitArchiveLog { HoodieTestUtils.createCleanFiles(basePath, "105"); //reload the timeline and get all the commmits before archive - timeline = metadata.getActiveTimeline().reload().getAllCommitsTimeline() + timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline() .filterCompletedInstants(); List originalCommits = timeline.getInstants().collect(Collectors.toList()); assertEquals("Loaded 6 commits and the count should match", 12, timeline.countInstants()); - HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, fs); + HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, + new HoodieTableMetaClient(fs.getConf(), basePath, true)); assertTrue(archiveLog.archiveIfRequired()); //reload the timeline and remove the remaining commits - timeline = metadata.getActiveTimeline().reload().getAllCommitsTimeline() + timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline() .filterCompletedInstants(); originalCommits.removeAll(timeline.getInstants().collect(Collectors.toList())); @@ -147,22 +149,20 @@ public class TestHoodieCommitArchiveLog { .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .forTable("test-trip-table").withCompactionConfig( HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build(); - HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs.getConf(), basePath); - HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, fs); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath); + HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); HoodieTestDataGenerator.createCommitFile(basePath, "100"); HoodieTestDataGenerator.createCommitFile(basePath, "101"); HoodieTestDataGenerator.createCommitFile(basePath, "102"); HoodieTestDataGenerator.createCommitFile(basePath, "103"); - HoodieTimeline timeline = - metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - + HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline() + .filterCompletedInstants(); assertEquals("Loaded 4 commits and the count should match", 4, timeline.countInstants()); boolean result = archiveLog.archiveIfRequired(); assertTrue(result); - timeline = - metadata.getActiveTimeline().reload().getCommitsTimeline() - .filterCompletedInstants(); + timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline() + .filterCompletedInstants(); assertEquals("Should not archive commits when maxCommitsToKeep is 5", 4, timeline.countInstants()); } @@ -173,8 +173,8 @@ public class TestHoodieCommitArchiveLog { .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .forTable("test-trip-table").withCompactionConfig( HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build(); - HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs.getConf(), basePath); - HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, fs); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath); + HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); HoodieTestDataGenerator.createCommitFile(basePath, "100"); HoodieTestDataGenerator.createCommitFile(basePath, "101"); HoodieTestDataGenerator.createCommitFile(basePath, "102"); @@ -182,14 +182,13 @@ public class TestHoodieCommitArchiveLog { HoodieTestDataGenerator.createCommitFile(basePath, "104"); HoodieTestDataGenerator.createCommitFile(basePath, "105"); - HoodieTimeline timeline = - metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline() + .filterCompletedInstants(); assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants()); boolean result = archiveLog.archiveIfRequired(); assertTrue(result); - timeline = - metadata.getActiveTimeline().reload().getCommitsTimeline() - .filterCompletedInstants(); + timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline() + .filterCompletedInstants(); assertTrue("Archived commits should always be safe", timeline.containsOrBeforeTimelineStarts("100")); assertTrue("Archived commits should always be safe", @@ -206,8 +205,8 @@ public class TestHoodieCommitArchiveLog { .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .forTable("test-trip-table").withCompactionConfig( HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build(); - HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs.getConf(), basePath); - HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, fs); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath); + HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); HoodieTestDataGenerator.createCommitFile(basePath, "100"); HoodieTestDataGenerator.createCommitFile(basePath, "101"); HoodieTestDataGenerator.createSavepointFile(basePath, "101"); @@ -216,14 +215,13 @@ public class TestHoodieCommitArchiveLog { HoodieTestDataGenerator.createCommitFile(basePath, "104"); HoodieTestDataGenerator.createCommitFile(basePath, "105"); - HoodieTimeline timeline = - metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline() + .filterCompletedInstants(); assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants()); boolean result = archiveLog.archiveIfRequired(); assertTrue(result); - timeline = - metadata.getActiveTimeline().reload().getCommitsTimeline() - .filterCompletedInstants(); + timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline() + .filterCompletedInstants(); assertEquals( "Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)", 5, timeline.countInstants()); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableConfig.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableConfig.java index 8cc6c18c6..577d7cf4e 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableConfig.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableConfig.java @@ -52,11 +52,13 @@ public class HoodieTableConfig implements Serializable { public static final String HOODIE_RT_FILE_FORMAT_PROP_NAME = "hoodie.table.rt.file.format"; public static final String HOODIE_PAYLOAD_CLASS_PROP_NAME = "hoodie.compaction.payload.class"; + public static final String HOODIE_ARCHIVELOG_FOLDER_PROP_NAME = "hoodie.archivelog.folder"; public static final HoodieTableType DEFAULT_TABLE_TYPE = HoodieTableType.COPY_ON_WRITE; public static final HoodieFileFormat DEFAULT_RO_FILE_FORMAT = HoodieFileFormat.PARQUET; public static final HoodieFileFormat DEFAULT_RT_FILE_FORMAT = HoodieFileFormat.HOODIE_LOG; public static final String DEFAULT_PAYLOAD_CLASS = HoodieAvroPayload.class.getName(); + public static final String DEFAULT_ARCHIVELOG_FOLDER = ""; private Properties props; public HoodieTableConfig(FileSystem fs, String metaPath) { @@ -105,6 +107,9 @@ public class HoodieTableConfig implements Serializable { && !properties.containsKey(HOODIE_PAYLOAD_CLASS_PROP_NAME)) { properties.setProperty(HOODIE_PAYLOAD_CLASS_PROP_NAME, DEFAULT_PAYLOAD_CLASS); } + if (!properties.containsKey(HOODIE_ARCHIVELOG_FOLDER_PROP_NAME)) { + properties.setProperty(HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, DEFAULT_ARCHIVELOG_FOLDER); + } properties .store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis())); } finally { @@ -161,4 +166,10 @@ public class HoodieTableConfig implements Serializable { return DEFAULT_RT_FILE_FORMAT; } + /** + * Get the relative path of archive log folder under metafolder, for this dataset + */ + public String getArchivelogFolder() { + return props.getProperty(HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, DEFAULT_ARCHIVELOG_FOLDER); + } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java index 758d7b5db..841cf47ab 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java @@ -131,6 +131,18 @@ public class HoodieTableMetaClient implements Serializable { return metaPath; } + /** + * @return path where archived timeline is stored + */ + public String getArchivePath() { + String archiveFolder = tableConfig.getArchivelogFolder(); + if (archiveFolder.equals(HoodieTableConfig.DEFAULT_ARCHIVELOG_FOLDER)) { + return getMetaPath(); + } else { + return getMetaPath() + "/" + archiveFolder; + } + } + /** * @return Table Config */ @@ -208,6 +220,18 @@ public class HoodieTableMetaClient implements Serializable { if (!fs.exists(metaPathDir)) { fs.mkdirs(metaPathDir); } + + // if anything other than default archive log folder is specified, create that too + String archiveLogPropVal = props + .getProperty(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, + HoodieTableConfig.DEFAULT_ARCHIVELOG_FOLDER); + if (!archiveLogPropVal.equals(HoodieTableConfig.DEFAULT_ARCHIVELOG_FOLDER)) { + Path archiveLogDir = new Path(metaPathDir, archiveLogPropVal); + if (!fs.exists(archiveLogDir)) { + fs.mkdirs(archiveLogDir); + } + } + HoodieTableConfig.createHoodieProperties(fs, metaPathDir, props); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath); log.info("Finished initializing Table of type " + metaClient.getTableConfig().getTableType() diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatWriter.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatWriter.java index 26a0845e2..f32629571 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatWriter.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatWriter.java @@ -40,7 +40,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { private final static Logger log = LogManager.getLogger(HoodieLogFormatWriter.class); - private final HoodieLogFile logFile; + private HoodieLogFile logFile; private final FileSystem fs; private final long sizeThreshold; private final Integer bufferSize; @@ -83,6 +83,15 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { throw new HoodieException(e); } } + } catch (IOException ioe) { + if (ioe.getMessage().equalsIgnoreCase("Not supported")) { + log.info("Append not supported. Opening a new log file.."); + this.logFile = logFile.rollOver(fs); + this.output = fs.create(this.logFile.getPath(), false, bufferSize, replication, + WriterBuilder.DEFAULT_SIZE_THRESHOLD, null); + } else { + throw ioe; + } } } else { log.info(logFile + " does not exist. Create a new file"); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieArchivedTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieArchivedTimeline.java index bc04873af..793d9d996 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieArchivedTimeline.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieArchivedTimeline.java @@ -50,7 +50,7 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) { // Read back the commits to make sure - Path archiveLogPath = getArchiveLogPath(metaClient.getMetaPath()); + Path archiveLogPath = HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath()); try (SequenceFile.Reader reader = new SequenceFile.Reader(metaClient.getHadoopConf(), SequenceFile.Reader.file(archiveLogPath))) { @@ -92,8 +92,8 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { } - public static Path getArchiveLogPath(String metaPath) { - return new Path(metaPath, HOODIE_COMMIT_ARCHIVE_LOG_FILE); + public static Path getArchiveLogPath(String archiveFolder) { + return new Path(archiveFolder, HOODIE_COMMIT_ARCHIVE_LOG_FILE); } @Override diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/HoodieTableMetaClientTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/HoodieTableMetaClientTest.java index d771b8236..fcfb9b7f6 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/HoodieTableMetaClientTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/HoodieTableMetaClientTest.java @@ -107,7 +107,7 @@ public class HoodieTableMetaClientTest { @Test public void checkArchiveCommitTimeline() throws IOException { - Path archiveLogPath = HoodieArchivedTimeline.getArchiveLogPath(metaClient.getMetaPath()); + Path archiveLogPath = HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath()); SequenceFile.Writer writer = SequenceFile .createWriter(HoodieTestUtils.fs.getConf(), SequenceFile.Writer.file(archiveLogPath), SequenceFile.Writer.keyClass(Text.class), diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java index 084cc1f12..1e8b02191 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.fail; import com.google.common.collect.Maps; import com.uber.hoodie.common.minicluster.MiniClusterUtil; +import com.uber.hoodie.common.model.HoodieArchivedLogFile; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieTableType; @@ -53,6 +54,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.After; @@ -241,6 +243,34 @@ public class HoodieLogFormatTest { writer.close(); } + @Test + public void testAppendNotSupported() + throws IOException, URISyntaxException, InterruptedException { + // Use some fs like LocalFileSystem, that does not support appends + Path localPartitionPath = new Path("file://" + partitionPath); + FileSystem localFs = FSUtils + .getFs(localPartitionPath.toString(), HoodieTestUtils.getDefaultHadoopConf()); + Path testPath = new Path(localPartitionPath, "append_test"); + localFs.mkdirs(testPath); + + // Some data & append two times. + List records = SchemaTestUtil.generateTestRecords(0, 100); + Map metadata = Maps.newHashMap(); + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, + getSimpleSchema(), metadata); + + for (int i = 0; i < 2; i++) { + HoodieLogFormat.newWriterBuilder().onParentPath(testPath) + .withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits.archive") + .overBaseCommit("").withFs(localFs).build().appendBlock(dataBlock).close(); + } + + // ensure there are two log file versions, with same data. + FileStatus[] statuses = localFs.listStatus(testPath); + assertEquals(2, statuses.length); + } + @SuppressWarnings("unchecked") @Test public void testBasicWriteAndScan() diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java index 85e4eaca3..d9d24caa1 100644 --- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java +++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java @@ -72,6 +72,7 @@ public class HoodieRealtimeRecordReaderTest { jobConf = new JobConf(); fs = FSUtils .getFs(basePath.getRoot().getAbsolutePath(), HoodieTestUtils.getDefaultHadoopConf()); + HoodieTestUtils.fs = fs; } @Rule diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala index 26579630d..285ba87b1 100644 --- a/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala @@ -189,6 +189,7 @@ class DefaultSource extends RelationProvider val properties = new Properties(); properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tblName.get); properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, storageType); + properties.put(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived"); HoodieTableMetaClient.initializePathAsHoodieDataset(fs, path.get, properties); }