1
0

Nicer handling of timeline archival for Cloud storage

- When append() is not supported, rollover to new file always (instead of failing)
 - Provide way to configure archive log folder (avoids small files inside .hoodie)
 - Datasets written via Spark datasource archive to .hoodie/archived
 - HoodieClientExample will now retain only 2,3 commits to exercise archival path during dev cycles
 - Few tweaks to code structure around CommitArchiveLog
This commit is contained in:
vinothchandar
2018-01-03 04:32:21 -08:00
committed by vinoth chandar
parent 0cd186c899
commit cf7f7aabb9
12 changed files with 121 additions and 56 deletions

View File

@@ -91,7 +91,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
private final HoodieWriteConfig config; private final HoodieWriteConfig config;
private transient final HoodieMetrics metrics; private transient final HoodieMetrics metrics;
private transient final HoodieIndex<T> index; private transient final HoodieIndex<T> index;
private transient final HoodieCommitArchiveLog archiveLog;
private transient Timer.Context writeContext = null; private transient Timer.Context writeContext = null;
/** /**
@@ -116,7 +115,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
this.config = clientConfig; this.config = clientConfig;
this.index = HoodieIndex.createIndex(config, jsc); this.index = HoodieIndex.createIndex(config, jsc);
this.metrics = new HoodieMetrics(config, config.getTableName()); this.metrics = new HoodieMetrics(config, config.getTableName());
this.archiveLog = new HoodieCommitArchiveLog(clientConfig, fs);
if (rollbackInFlight) { if (rollbackInFlight) {
rollbackInflightCommits(); rollbackInflightCommits();
@@ -446,6 +444,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
} }
// We cannot have unbounded commit files. Archive commits if we have to archive // We cannot have unbounded commit files. Archive commits if we have to archive
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(config,
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true));
archiveLog.archiveIfRequired(); archiveLog.archiveIfRequired();
if (config.isAutoClean()) { if (config.isAutoClean()) {
// Call clean to cleanup if there is anything to cleanup after the commit, // Call clean to cleanup if there is anything to cleanup after the commit,

View File

@@ -40,7 +40,6 @@ import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.table.HoodieTable; import com.uber.hoodie.table.HoodieTable;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord; import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@@ -60,15 +59,14 @@ public class HoodieCommitArchiveLog {
private static Logger log = LogManager.getLogger(HoodieCommitArchiveLog.class); private static Logger log = LogManager.getLogger(HoodieCommitArchiveLog.class);
private final Path archiveFilePath; private final Path archiveFilePath;
private final FileSystem fs; private final HoodieTableMetaClient metaClient;
private final HoodieWriteConfig config; private final HoodieWriteConfig config;
private HoodieLogFormat.Writer writer; private HoodieLogFormat.Writer writer;
public HoodieCommitArchiveLog(HoodieWriteConfig config, FileSystem fs) { public HoodieCommitArchiveLog(HoodieWriteConfig config, HoodieTableMetaClient metaClient) {
this.fs = fs;
this.config = config; this.config = config;
this.archiveFilePath = HoodieArchivedTimeline this.metaClient = metaClient;
.getArchiveLogPath(config.getBasePath() + "/" + HoodieTableMetaClient.METAFOLDER_NAME); this.archiveFilePath = HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath());
} }
private HoodieLogFormat.Writer openWriter() { private HoodieLogFormat.Writer openWriter() {
@@ -78,7 +76,7 @@ public class HoodieCommitArchiveLog {
.onParentPath(archiveFilePath.getParent()) .onParentPath(archiveFilePath.getParent())
.withFileId(archiveFilePath.getName()) .withFileId(archiveFilePath.getName())
.withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION) .withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
.withFs(fs) .withFs(metaClient.getFs())
.overBaseCommit("").build(); .overBaseCommit("").build();
} else { } else {
return this.writer; return this.writer;
@@ -125,9 +123,7 @@ public class HoodieCommitArchiveLog {
int maxCommitsToKeep = config.getMaxCommitsToKeep(); int maxCommitsToKeep = config.getMaxCommitsToKeep();
int minCommitsToKeep = config.getMinCommitsToKeep(); int minCommitsToKeep = config.getMinCommitsToKeep();
HoodieTable table = HoodieTable HoodieTable table = HoodieTable.getHoodieTable(metaClient, config);
.getHoodieTable(new HoodieTableMetaClient(fs.getConf(), config.getBasePath(), true),
config);
// GroupBy each action and limit each action timeline to maxCommitsToKeep // GroupBy each action and limit each action timeline to maxCommitsToKeep
HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline() HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline()
@@ -165,16 +161,13 @@ public class HoodieCommitArchiveLog {
private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants) { private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants) {
log.info("Deleting instants " + archivedInstants); log.info("Deleting instants " + archivedInstants);
HoodieTableMetaClient metaClient =
new HoodieTableMetaClient(fs.getConf(), config.getBasePath(), true);
boolean success = true; boolean success = true;
for (HoodieInstant archivedInstant : archivedInstants) { for (HoodieInstant archivedInstant : archivedInstants) {
Path commitFile = Path commitFile =
new Path(metaClient.getMetaPath(), archivedInstant.getFileName()); new Path(metaClient.getMetaPath(), archivedInstant.getFileName());
try { try {
if (fs.exists(commitFile)) { if (metaClient.getFs().exists(commitFile)) {
success &= fs.delete(commitFile, false); success &= metaClient.getFs().delete(commitFile, false);
log.info("Archived and deleted instant file " + commitFile); log.info("Archived and deleted instant file " + commitFile);
} }
} catch (IOException e) { } catch (IOException e) {
@@ -186,13 +179,9 @@ public class HoodieCommitArchiveLog {
} }
public void archive(List<HoodieInstant> instants) throws HoodieCommitException { public void archive(List<HoodieInstant> instants) throws HoodieCommitException {
try { try {
HoodieTableMetaClient metaClient =
new HoodieTableMetaClient(fs.getConf(), config.getBasePath(), true);
HoodieTimeline commitTimeline = HoodieTimeline commitTimeline =
metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants(); metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants();
Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema(); Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema();
log.info("Wrapper schema " + wrapperSchema.toString()); log.info("Wrapper schema " + wrapperSchema.toString());
List<IndexedRecord> records = new ArrayList<>(); List<IndexedRecord> records = new ArrayList<>();

View File

@@ -24,9 +24,10 @@ import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.index.HoodieIndex.IndexType;
import java.util.List; import java.util.List;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@@ -92,7 +93,8 @@ public class HoodieClientExample {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable(tableName).withIndexConfig( .forTable(tableName).withIndexConfig(
HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) HoodieIndexConfig.newBuilder().withIndexType(IndexType.BLOOM).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 3).build())
.build(); .build();
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);

View File

@@ -63,7 +63,8 @@ public class TestHoodieCommitArchiveLog {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable("test-trip-table").build(); .forTable("test-trip-table").build();
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, fs); HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg,
new HoodieTableMetaClient(fs.getConf(), cfg.getBasePath(), true));
boolean result = archiveLog.archiveIfRequired(); boolean result = archiveLog.archiveIfRequired();
assertTrue(result); assertTrue(result);
} }
@@ -82,9 +83,9 @@ public class TestHoodieCommitArchiveLog {
HoodieTestDataGenerator.createCommitFile(basePath, "104"); HoodieTestDataGenerator.createCommitFile(basePath, "104");
HoodieTestDataGenerator.createCommitFile(basePath, "105"); HoodieTestDataGenerator.createCommitFile(basePath, "105");
HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs.getConf(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath);
HoodieTimeline timeline = HoodieTimeline timeline =
metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants()); assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants());
@@ -96,18 +97,19 @@ public class TestHoodieCommitArchiveLog {
HoodieTestUtils.createCleanFiles(basePath, "105"); HoodieTestUtils.createCleanFiles(basePath, "105");
//reload the timeline and get all the commmits before archive //reload the timeline and get all the commmits before archive
timeline = metadata.getActiveTimeline().reload().getAllCommitsTimeline() timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline()
.filterCompletedInstants(); .filterCompletedInstants();
List<HoodieInstant> originalCommits = timeline.getInstants().collect(Collectors.toList()); List<HoodieInstant> originalCommits = timeline.getInstants().collect(Collectors.toList());
assertEquals("Loaded 6 commits and the count should match", 12, timeline.countInstants()); assertEquals("Loaded 6 commits and the count should match", 12, timeline.countInstants());
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, fs); HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg,
new HoodieTableMetaClient(fs.getConf(), basePath, true));
assertTrue(archiveLog.archiveIfRequired()); assertTrue(archiveLog.archiveIfRequired());
//reload the timeline and remove the remaining commits //reload the timeline and remove the remaining commits
timeline = metadata.getActiveTimeline().reload().getAllCommitsTimeline() timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline()
.filterCompletedInstants(); .filterCompletedInstants();
originalCommits.removeAll(timeline.getInstants().collect(Collectors.toList())); originalCommits.removeAll(timeline.getInstants().collect(Collectors.toList()));
@@ -147,21 +149,19 @@ public class TestHoodieCommitArchiveLog {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable("test-trip-table").withCompactionConfig( .forTable("test-trip-table").withCompactionConfig(
HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build(); HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build();
HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs.getConf(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, fs); HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "100"); HoodieTestDataGenerator.createCommitFile(basePath, "100");
HoodieTestDataGenerator.createCommitFile(basePath, "101"); HoodieTestDataGenerator.createCommitFile(basePath, "101");
HoodieTestDataGenerator.createCommitFile(basePath, "102"); HoodieTestDataGenerator.createCommitFile(basePath, "102");
HoodieTestDataGenerator.createCommitFile(basePath, "103"); HoodieTestDataGenerator.createCommitFile(basePath, "103");
HoodieTimeline timeline = HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline()
metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); .filterCompletedInstants();
assertEquals("Loaded 4 commits and the count should match", 4, timeline.countInstants()); assertEquals("Loaded 4 commits and the count should match", 4, timeline.countInstants());
boolean result = archiveLog.archiveIfRequired(); boolean result = archiveLog.archiveIfRequired();
assertTrue(result); assertTrue(result);
timeline = timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline()
metadata.getActiveTimeline().reload().getCommitsTimeline()
.filterCompletedInstants(); .filterCompletedInstants();
assertEquals("Should not archive commits when maxCommitsToKeep is 5", 4, assertEquals("Should not archive commits when maxCommitsToKeep is 5", 4,
timeline.countInstants()); timeline.countInstants());
@@ -173,8 +173,8 @@ public class TestHoodieCommitArchiveLog {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable("test-trip-table").withCompactionConfig( .forTable("test-trip-table").withCompactionConfig(
HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build(); HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build();
HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs.getConf(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, fs); HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "100"); HoodieTestDataGenerator.createCommitFile(basePath, "100");
HoodieTestDataGenerator.createCommitFile(basePath, "101"); HoodieTestDataGenerator.createCommitFile(basePath, "101");
HoodieTestDataGenerator.createCommitFile(basePath, "102"); HoodieTestDataGenerator.createCommitFile(basePath, "102");
@@ -182,13 +182,12 @@ public class TestHoodieCommitArchiveLog {
HoodieTestDataGenerator.createCommitFile(basePath, "104"); HoodieTestDataGenerator.createCommitFile(basePath, "104");
HoodieTestDataGenerator.createCommitFile(basePath, "105"); HoodieTestDataGenerator.createCommitFile(basePath, "105");
HoodieTimeline timeline = HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline()
metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); .filterCompletedInstants();
assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants()); assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants());
boolean result = archiveLog.archiveIfRequired(); boolean result = archiveLog.archiveIfRequired();
assertTrue(result); assertTrue(result);
timeline = timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline()
metadata.getActiveTimeline().reload().getCommitsTimeline()
.filterCompletedInstants(); .filterCompletedInstants();
assertTrue("Archived commits should always be safe", assertTrue("Archived commits should always be safe",
timeline.containsOrBeforeTimelineStarts("100")); timeline.containsOrBeforeTimelineStarts("100"));
@@ -206,8 +205,8 @@ public class TestHoodieCommitArchiveLog {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable("test-trip-table").withCompactionConfig( .forTable("test-trip-table").withCompactionConfig(
HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build(); HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build();
HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs.getConf(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, fs); HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "100"); HoodieTestDataGenerator.createCommitFile(basePath, "100");
HoodieTestDataGenerator.createCommitFile(basePath, "101"); HoodieTestDataGenerator.createCommitFile(basePath, "101");
HoodieTestDataGenerator.createSavepointFile(basePath, "101"); HoodieTestDataGenerator.createSavepointFile(basePath, "101");
@@ -216,13 +215,12 @@ public class TestHoodieCommitArchiveLog {
HoodieTestDataGenerator.createCommitFile(basePath, "104"); HoodieTestDataGenerator.createCommitFile(basePath, "104");
HoodieTestDataGenerator.createCommitFile(basePath, "105"); HoodieTestDataGenerator.createCommitFile(basePath, "105");
HoodieTimeline timeline = HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline()
metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); .filterCompletedInstants();
assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants()); assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants());
boolean result = archiveLog.archiveIfRequired(); boolean result = archiveLog.archiveIfRequired();
assertTrue(result); assertTrue(result);
timeline = timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline()
metadata.getActiveTimeline().reload().getCommitsTimeline()
.filterCompletedInstants(); .filterCompletedInstants();
assertEquals( assertEquals(
"Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)", "Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)",

View File

@@ -52,11 +52,13 @@ public class HoodieTableConfig implements Serializable {
public static final String HOODIE_RT_FILE_FORMAT_PROP_NAME = public static final String HOODIE_RT_FILE_FORMAT_PROP_NAME =
"hoodie.table.rt.file.format"; "hoodie.table.rt.file.format";
public static final String HOODIE_PAYLOAD_CLASS_PROP_NAME = "hoodie.compaction.payload.class"; public static final String HOODIE_PAYLOAD_CLASS_PROP_NAME = "hoodie.compaction.payload.class";
public static final String HOODIE_ARCHIVELOG_FOLDER_PROP_NAME = "hoodie.archivelog.folder";
public static final HoodieTableType DEFAULT_TABLE_TYPE = HoodieTableType.COPY_ON_WRITE; public static final HoodieTableType DEFAULT_TABLE_TYPE = HoodieTableType.COPY_ON_WRITE;
public static final HoodieFileFormat DEFAULT_RO_FILE_FORMAT = HoodieFileFormat.PARQUET; public static final HoodieFileFormat DEFAULT_RO_FILE_FORMAT = HoodieFileFormat.PARQUET;
public static final HoodieFileFormat DEFAULT_RT_FILE_FORMAT = HoodieFileFormat.HOODIE_LOG; public static final HoodieFileFormat DEFAULT_RT_FILE_FORMAT = HoodieFileFormat.HOODIE_LOG;
public static final String DEFAULT_PAYLOAD_CLASS = HoodieAvroPayload.class.getName(); public static final String DEFAULT_PAYLOAD_CLASS = HoodieAvroPayload.class.getName();
public static final String DEFAULT_ARCHIVELOG_FOLDER = "";
private Properties props; private Properties props;
public HoodieTableConfig(FileSystem fs, String metaPath) { public HoodieTableConfig(FileSystem fs, String metaPath) {
@@ -105,6 +107,9 @@ public class HoodieTableConfig implements Serializable {
&& !properties.containsKey(HOODIE_PAYLOAD_CLASS_PROP_NAME)) { && !properties.containsKey(HOODIE_PAYLOAD_CLASS_PROP_NAME)) {
properties.setProperty(HOODIE_PAYLOAD_CLASS_PROP_NAME, DEFAULT_PAYLOAD_CLASS); properties.setProperty(HOODIE_PAYLOAD_CLASS_PROP_NAME, DEFAULT_PAYLOAD_CLASS);
} }
if (!properties.containsKey(HOODIE_ARCHIVELOG_FOLDER_PROP_NAME)) {
properties.setProperty(HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, DEFAULT_ARCHIVELOG_FOLDER);
}
properties properties
.store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis())); .store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis()));
} finally { } finally {
@@ -161,4 +166,10 @@ public class HoodieTableConfig implements Serializable {
return DEFAULT_RT_FILE_FORMAT; return DEFAULT_RT_FILE_FORMAT;
} }
/**
* Get the relative path of archive log folder under metafolder, for this dataset
*/
public String getArchivelogFolder() {
return props.getProperty(HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, DEFAULT_ARCHIVELOG_FOLDER);
}
} }

View File

@@ -131,6 +131,18 @@ public class HoodieTableMetaClient implements Serializable {
return metaPath; return metaPath;
} }
/**
* @return path where archived timeline is stored
*/
public String getArchivePath() {
String archiveFolder = tableConfig.getArchivelogFolder();
if (archiveFolder.equals(HoodieTableConfig.DEFAULT_ARCHIVELOG_FOLDER)) {
return getMetaPath();
} else {
return getMetaPath() + "/" + archiveFolder;
}
}
/** /**
* @return Table Config * @return Table Config
*/ */
@@ -208,6 +220,18 @@ public class HoodieTableMetaClient implements Serializable {
if (!fs.exists(metaPathDir)) { if (!fs.exists(metaPathDir)) {
fs.mkdirs(metaPathDir); fs.mkdirs(metaPathDir);
} }
// if anything other than default archive log folder is specified, create that too
String archiveLogPropVal = props
.getProperty(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME,
HoodieTableConfig.DEFAULT_ARCHIVELOG_FOLDER);
if (!archiveLogPropVal.equals(HoodieTableConfig.DEFAULT_ARCHIVELOG_FOLDER)) {
Path archiveLogDir = new Path(metaPathDir, archiveLogPropVal);
if (!fs.exists(archiveLogDir)) {
fs.mkdirs(archiveLogDir);
}
}
HoodieTableConfig.createHoodieProperties(fs, metaPathDir, props); HoodieTableConfig.createHoodieProperties(fs, metaPathDir, props);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath);
log.info("Finished initializing Table of type " + metaClient.getTableConfig().getTableType() log.info("Finished initializing Table of type " + metaClient.getTableConfig().getTableType()

View File

@@ -40,7 +40,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
private final static Logger log = LogManager.getLogger(HoodieLogFormatWriter.class); private final static Logger log = LogManager.getLogger(HoodieLogFormatWriter.class);
private final HoodieLogFile logFile; private HoodieLogFile logFile;
private final FileSystem fs; private final FileSystem fs;
private final long sizeThreshold; private final long sizeThreshold;
private final Integer bufferSize; private final Integer bufferSize;
@@ -83,6 +83,15 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
throw new HoodieException(e); throw new HoodieException(e);
} }
} }
} catch (IOException ioe) {
if (ioe.getMessage().equalsIgnoreCase("Not supported")) {
log.info("Append not supported. Opening a new log file..");
this.logFile = logFile.rollOver(fs);
this.output = fs.create(this.logFile.getPath(), false, bufferSize, replication,
WriterBuilder.DEFAULT_SIZE_THRESHOLD, null);
} else {
throw ioe;
}
} }
} else { } else {
log.info(logFile + " does not exist. Create a new file"); log.info(logFile + " does not exist. Create a new file");

View File

@@ -50,7 +50,7 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) { public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) {
// Read back the commits to make sure // Read back the commits to make sure
Path archiveLogPath = getArchiveLogPath(metaClient.getMetaPath()); Path archiveLogPath = HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath());
try (SequenceFile.Reader reader = try (SequenceFile.Reader reader =
new SequenceFile.Reader(metaClient.getHadoopConf(), new SequenceFile.Reader(metaClient.getHadoopConf(),
SequenceFile.Reader.file(archiveLogPath))) { SequenceFile.Reader.file(archiveLogPath))) {
@@ -92,8 +92,8 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
} }
public static Path getArchiveLogPath(String metaPath) { public static Path getArchiveLogPath(String archiveFolder) {
return new Path(metaPath, HOODIE_COMMIT_ARCHIVE_LOG_FILE); return new Path(archiveFolder, HOODIE_COMMIT_ARCHIVE_LOG_FILE);
} }
@Override @Override

View File

@@ -107,7 +107,7 @@ public class HoodieTableMetaClientTest {
@Test @Test
public void checkArchiveCommitTimeline() throws IOException { public void checkArchiveCommitTimeline() throws IOException {
Path archiveLogPath = HoodieArchivedTimeline.getArchiveLogPath(metaClient.getMetaPath()); Path archiveLogPath = HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath());
SequenceFile.Writer writer = SequenceFile SequenceFile.Writer writer = SequenceFile
.createWriter(HoodieTestUtils.fs.getConf(), SequenceFile.Writer.file(archiveLogPath), .createWriter(HoodieTestUtils.fs.getConf(), SequenceFile.Writer.file(archiveLogPath),
SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.keyClass(Text.class),

View File

@@ -24,6 +24,7 @@ import static org.junit.Assert.fail;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.uber.hoodie.common.minicluster.MiniClusterUtil; import com.uber.hoodie.common.minicluster.MiniClusterUtil;
import com.uber.hoodie.common.model.HoodieArchivedLogFile;
import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieTableType;
@@ -53,6 +54,7 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord; import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.junit.After; import org.junit.After;
@@ -241,6 +243,34 @@ public class HoodieLogFormatTest {
writer.close(); writer.close();
} }
@Test
public void testAppendNotSupported()
throws IOException, URISyntaxException, InterruptedException {
// Use some fs like LocalFileSystem, that does not support appends
Path localPartitionPath = new Path("file://" + partitionPath);
FileSystem localFs = FSUtils
.getFs(localPartitionPath.toString(), HoodieTestUtils.getDefaultHadoopConf());
Path testPath = new Path(localPartitionPath, "append_test");
localFs.mkdirs(testPath);
// Some data & append two times.
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
Map<HoodieLogBlock.LogMetadataType, String> metadata = Maps.newHashMap();
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100");
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records,
getSimpleSchema(), metadata);
for (int i = 0; i < 2; i++) {
HoodieLogFormat.newWriterBuilder().onParentPath(testPath)
.withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits.archive")
.overBaseCommit("").withFs(localFs).build().appendBlock(dataBlock).close();
}
// ensure there are two log file versions, with same data.
FileStatus[] statuses = localFs.listStatus(testPath);
assertEquals(2, statuses.length);
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Test @Test
public void testBasicWriteAndScan() public void testBasicWriteAndScan()

View File

@@ -72,6 +72,7 @@ public class HoodieRealtimeRecordReaderTest {
jobConf = new JobConf(); jobConf = new JobConf();
fs = FSUtils fs = FSUtils
.getFs(basePath.getRoot().getAbsolutePath(), HoodieTestUtils.getDefaultHadoopConf()); .getFs(basePath.getRoot().getAbsolutePath(), HoodieTestUtils.getDefaultHadoopConf());
HoodieTestUtils.fs = fs;
} }
@Rule @Rule

View File

@@ -189,6 +189,7 @@ class DefaultSource extends RelationProvider
val properties = new Properties(); val properties = new Properties();
properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tblName.get); properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tblName.get);
properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, storageType); properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, storageType);
properties.put(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived");
HoodieTableMetaClient.initializePathAsHoodieDataset(fs, path.get, properties); HoodieTableMetaClient.initializePathAsHoodieDataset(fs, path.get, properties);
} }