1
0

Implement reliable log file management for Merge on read, which is fault tolerant and allows random block level access on avro file

This commit is contained in:
Prasanna Rajaperumal
2017-01-24 00:55:40 -08:00
parent ccd8cb2407
commit 48fbb0f425
22 changed files with 2626 additions and 23 deletions

View File

@@ -0,0 +1,228 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.table.log;
import com.uber.hoodie.common.util.FSUtils;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
/**
* Configuration for a HoodieLog
*/
public class HoodieLogAppendConfig {
private final static Logger log = LogManager.getLogger(HoodieLogAppendConfig.class);
private static final long DEFAULT_SIZE_THRESHOLD = 32 * 1024 * 1024L;
private final int bufferSize;
private final short replication;
private final long blockSize;
private final HoodieLogFile logFile;
private boolean isAutoFlush;
private final Schema schema;
private final FileSystem fs;
private final long sizeThreshold;
private HoodieLogAppendConfig(FileSystem fs, HoodieLogFile logFile, Schema schema, Integer bufferSize,
Short replication, Long blockSize, boolean isAutoFlush, Long sizeThreshold) {
this.fs = fs;
this.logFile = logFile;
this.schema = schema;
this.bufferSize = bufferSize;
this.replication = replication;
this.blockSize = blockSize;
this.isAutoFlush = isAutoFlush;
this.sizeThreshold = sizeThreshold;
}
public int getBufferSize() {
return bufferSize;
}
public short getReplication() {
return replication;
}
public long getBlockSize() {
return blockSize;
}
public Schema getSchema() {
return schema;
}
public FileSystem getFs() {
return fs;
}
public HoodieLogFile getLogFile() {
return logFile;
}
public long getSizeThreshold() {
return sizeThreshold;
}
public boolean isAutoFlush() {
return isAutoFlush;
}
public static Builder newBuilder() {
return new Builder();
}
public HoodieLogAppendConfig withLogFile(HoodieLogFile newFile) {
return new HoodieLogAppendConfig(fs, newFile, schema, bufferSize, replication, blockSize,
isAutoFlush, sizeThreshold);
}
public static class Builder {
// Auto-flush. if set to true - then after every append, the avro block will be flushed
private boolean isAutoFlush = true;
// Buffer size in the Avro writer
private Integer bufferSize;
// Replication for the log file
private Short replication;
// Blocksize for the avro log file (useful if auto-flush is set to false)
private Long blockSize;
// Schema for the log file
private Schema schema;
// FileSystem
private FileSystem fs;
// Size threshold for the log file. Useful when used with a rolling log appender
private Long sizeThreshold;
// Log File extension. Could be .avro.delta or .avro.commits etc
private String logFileExtension;
// File ID
private String fileId;
// version number for this log file. If not specified, then the current version will be computed
private Integer fileVersion;
// Partition path for the log file
private Path partitionPath;
public Builder withBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}
public Builder withReplication(short replication) {
this.replication = replication;
return this;
}
public Builder withBlockSize(long blockSize) {
this.blockSize = blockSize;
return this;
}
public Builder withSchema(Schema schema) {
this.schema = schema;
return this;
}
public Builder withFs(FileSystem fs) {
this.fs = fs;
return this;
}
public Builder withAutoFlush(boolean autoFlush) {
this.isAutoFlush = autoFlush;
return this;
}
public Builder withSizeThreshold(long sizeThreshold) {
this.sizeThreshold = sizeThreshold;
return this;
}
public Builder withLogFileExtension(String logFileExtension) {
this.logFileExtension = logFileExtension;
return this;
}
public Builder withFileId(String fileId) {
this.fileId = fileId;
return this;
}
public Builder withFileVersion(int version) {
this.fileVersion = version;
return this;
}
public Builder onPartitionPath(Path path) {
this.partitionPath = path;
return this;
}
public HoodieLogAppendConfig build() throws IOException {
log.info("Building HoodieLogAppendConfig");
if (schema == null) {
throw new IllegalArgumentException("Schema for log is not specified");
}
if (fs == null) {
fs = FSUtils.getFs();
}
if (fileId == null) {
throw new IllegalArgumentException("FileID is not specified");
}
if (logFileExtension == null) {
throw new IllegalArgumentException("File extension is not specified");
}
if (partitionPath == null) {
throw new IllegalArgumentException("Partition path is not specified");
}
if (fileVersion == null) {
log.info("Computing the next log version for " + fileId + " in " + partitionPath);
fileVersion =
FSUtils.getCurrentLogVersion(fs, partitionPath, fileId, logFileExtension);
log.info(
"Computed the next log version for " + fileId + " in " + partitionPath + " as "
+ fileVersion);
}
Path logPath = new Path(partitionPath,
FSUtils.makeLogFileName(fileId, logFileExtension, fileVersion));
log.info("LogConfig created on path " + logPath);
HoodieLogFile logFile = new HoodieLogFile(logPath);
if (bufferSize == null) {
bufferSize = FSUtils.getDefaultBufferSize(fs);
}
if (replication == null) {
replication = FSUtils.getDefaultReplication(fs, partitionPath);
}
if (blockSize == null) {
blockSize = FSUtils.getDefaultBlockSize(fs, partitionPath);
}
if (sizeThreshold == null) {
sizeThreshold = DEFAULT_SIZE_THRESHOLD;
}
return new HoodieLogAppendConfig(fs, logFile, schema, bufferSize, replication, blockSize,
isAutoFlush, sizeThreshold);
}
}
}

View File

@@ -0,0 +1,64 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.table.log;
import com.uber.hoodie.common.table.log.avro.AvroLogAppender;
import com.uber.hoodie.common.table.log.avro.RollingAvroLogAppender;
import java.io.IOException;
import java.util.List;
/**
* Interface for implementations supporting appending data to a log file
*
* @param <R>
* @see AvroLogAppender
* @see RollingAvroLogAppender
*/
public interface HoodieLogAppender<R> {
/**
* Append a stream of records in a batch (this will be written as a block/unit to the underlying log)
*
* @param records
* @throws IOException
*/
void append(List<R> records) throws IOException, InterruptedException;
/**
* Syncs the log manually if auto-flush is not set in HoodieLogAppendConfig. If auto-flush is set
* Then the LogAppender will automatically flush after the append call.
*
* @throws IOException
*/
void sync() throws IOException;
/**
* Close the appended and release any resources holding on to
*
* @throws IOException
*/
void close() throws IOException;
/**
* Gets the current offset in the log. This is usually used to mark the start of the block in
* meta-data and passed to the HoodieLogReader
*
* @return
* @throws IOException
*/
long getCurrentSize() throws IOException;
}

View File

@@ -0,0 +1,92 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.table.log;
import com.uber.hoodie.common.util.FSUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.io.Serializable;
import java.util.Optional;
/**
* Abstracts a single log file. Contains methods to extract metadata like the fileId, version and
* extension from the log file path.
*
* Also contains logic to roll-over the log file
*/
public class HoodieLogFile {
public static final String DELTA_EXTENSION = ".avro.delta";
private final Path path;
private Optional<FileStatus> fileStatus;
public HoodieLogFile(FileStatus fileStatus) {
this(fileStatus.getPath());
this.fileStatus = Optional.of(fileStatus);
}
public HoodieLogFile(Path logPath) {
this.path = logPath;
this.fileStatus = Optional.empty();
}
public String getFileId() {
return FSUtils.getFileIdFromLogPath(path);
}
public int getLogVersion() {
return FSUtils.getFileVersionFromLog(path);
}
public String getFileExtension() {
return FSUtils.getFileExtensionFromLog(path);
}
public Path getPath() {
return path;
}
public String getFileName() {
return path.getName();
}
public Optional<FileStatus> getFileStatus() {
return fileStatus;
}
public HoodieLogFile rollOver(FileSystem fs) throws IOException {
String fileId = getFileId();
int newVersion =
FSUtils.computeNextLogVersion(fs, path.getParent(), fileId, DELTA_EXTENSION);
return new HoodieLogFile(new Path(path.getParent(),
FSUtils.makeLogFileName(fileId, DELTA_EXTENSION, newVersion)));
}
public boolean shouldRollOver(HoodieLogAppender currentWriter, HoodieLogAppendConfig config)
throws IOException {
return currentWriter.getCurrentSize() > config.getSizeThreshold();
}
@Override
public String toString() {
return "HoodieLogFile{" + path + '}';
}
}

View File

@@ -0,0 +1,140 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.table.log.avro;
import com.uber.hoodie.common.table.log.HoodieLogAppendConfig;
import com.uber.hoodie.common.table.log.HoodieLogAppender;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.AvroFSInput;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.List;
/**
* AvroLogAppender appends a bunch of IndexedRecord to a Avro data file.
* If auto-flush is set, every call to append writes out a block.
* A avro block corresponds to records appended in a single commit.
*
* @see org.apache.avro.file.DataFileReader
*/
public class AvroLogAppender implements HoodieLogAppender<IndexedRecord> {
private final static Logger log = LogManager.getLogger(AvroLogAppender.class);
private final HoodieLogAppendConfig config;
private FSDataOutputStream output;
private DataFileWriter<IndexedRecord> writer;
private boolean autoFlush;
public AvroLogAppender(HoodieLogAppendConfig config) throws IOException, InterruptedException {
FileSystem fs = config.getFs();
this.config = config;
this.autoFlush = config.isAutoFlush();
GenericDatumWriter<IndexedRecord> datumWriter =
new GenericDatumWriter<>(config.getSchema());
this.writer = new DataFileWriter<>(datumWriter);
Path path = config.getLogFile().getPath();
if (fs.exists(path)) {
//TODO - check for log corruption and roll over if needed
log.info(config.getLogFile() + " exists. Appending to existing file");
// this log path exists, we will append to it
fs = FileSystem.get(fs.getConf());
try {
this.output = fs.append(path, config.getBufferSize());
} catch (RemoteException e) {
// this happens when either another task executor writing to this file died or data node is going down
if (e.getClassName().equals(AlreadyBeingCreatedException.class.getName())
&& fs instanceof DistributedFileSystem) {
log.warn("Trying to recover log on path " + path);
if (FSUtils.recoverDFSFileLease((DistributedFileSystem) fs, path)) {
log.warn("Recovered lease on path " + path);
// try again
this.output = fs.append(path, config.getBufferSize());
} else {
log.warn("Failed to recover lease on path " + path);
throw new HoodieException(e);
}
}
}
this.writer
.appendTo(new AvroFSInput(FileContext.getFileContext(fs.getConf()), path), output);
// we always want to flush to disk everytime a avro block is written
this.writer.setFlushOnEveryBlock(true);
} else {
log.info(config.getLogFile() + " does not exist. Create a new file");
this.output = fs.create(path, false, config.getBufferSize(), config.getReplication(),
config.getBlockSize(), null);
this.writer.create(config.getSchema(), output);
this.writer.setFlushOnEveryBlock(true);
// We need to close the writer to be able to tell the name node that we created this file
// this.writer.close();
}
}
public void append(List<IndexedRecord> records) throws IOException {
records.forEach(r -> {
try {
writer.append(r);
} catch (IOException e) {
throw new HoodieIOException(
"Could not append record " + r + " to " + config.getLogFile());
}
});
if (autoFlush) {
sync();
}
}
public void sync() throws IOException {
if (output == null || writer == null)
return; // Presume closed
writer.flush();
output.flush();
output.hflush();
}
public void close() throws IOException {
sync();
writer.close();
writer = null;
output.close();
output = null;
}
public long getCurrentSize() throws IOException {
if (writer == null) {
throw new IllegalStateException(
"LogWriter " + config.getLogFile() + " has been closed. Cannot getCurrentSize");
}
// writer.sync() returns only the offset for this block and not the global offset
return output.getPos();
}
}

View File

@@ -0,0 +1,82 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.table.log.avro;
import com.google.common.collect.Lists;
import com.uber.hoodie.common.table.log.HoodieLogAppender;
import com.uber.hoodie.common.table.log.HoodieLogFile;
import com.uber.hoodie.exception.HoodieIOException;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.AvroFSInput;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
/**
* AvroLogReader allows reading blocks of records given a offset as written by AvroLogAppender
* Avro Log files are never streamed entirely - because of fault tolerance.
* If a block is corrupted, then random access with offset bypasses any corrupt blocks.
* Metadata about offset should be saved when writing blocks and passed in readBlock()
*
* @see AvroLogAppender
*/
public class AvroLogReader {
private final DataFileReader<GenericRecord> reader;
private final HoodieLogFile file;
public AvroLogReader(HoodieLogFile file, FileSystem fs, Schema readerSchema)
throws IOException {
GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
datumReader.setExpected(readerSchema);
final AvroFSInput input = new AvroFSInput(FileContext.getFileContext(fs.getConf()), file.getPath());
this.reader = (DataFileReader<GenericRecord>) DataFileReader.openReader(input, datumReader);
this.file = file;
}
public Iterator<GenericRecord> readBlock(long startOffset) throws IOException {
// We keep track of exact offset for blocks, just seek to it directly
reader.seek(startOffset);
List<GenericRecord> records = Lists.newArrayList();
try {
// First check if we are past the sync market and then check reader.hasNext,
// hasNext will load a block in memory and this will fail if a block is corrupted.
while (!reader.pastSync(startOffset) && reader.hasNext()) {
records.add(reader.next());
}
} catch (IOException e) {
throw new HoodieIOException("Failed to read avro records from " + file);
}
return records.iterator();
}
public HoodieLogFile getFile() {
return file;
}
public void close() throws IOException {
reader.close();
}
}

View File

@@ -0,0 +1,155 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.table.log.avro;
import com.uber.hoodie.common.table.log.HoodieLogFile;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.exception.HoodieIOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* CompositeAvroLogReader reads all versions of the logs for a given fileId.
* It gives a iterator which iterates through all the versions and the list of blocks for that specific version
* Useful for merging records in RecordReader and compacting all the delta versions
*
* @see AvroLogReader
*/
public class CompositeAvroLogReader {
private final Map<Integer, AvroLogReader> readers;
public CompositeAvroLogReader(Path partitionPath, String fileId, FileSystem fs,
Schema readerSchema, String logFileExtension) throws IOException {
Stream<HoodieLogFile> allLogFiles =
FSUtils.getAllLogFiles(fs, partitionPath, fileId, logFileExtension);
this.readers = allLogFiles.map(hoodieLogFile -> {
try {
return new AvroLogReader(hoodieLogFile, fs, readerSchema);
} catch (IOException e) {
throw new HoodieIOException(
"Could not read avro records from path " + hoodieLogFile);
}
}).collect(Collectors.toMap(new Function<AvroLogReader, Integer>() {
@Override
public Integer apply(AvroLogReader avroLogReader) {
return avroLogReader.getFile().getLogVersion();
}
}, Function.identity()));
}
/**
* Reads all the versions (in the order specified) and all the blocks starting with the offset specified
*
* @param filesToOffsetMap
* @return
* @throws IOException
*/
public Iterator<GenericRecord> readBlocks(SortedMap<Integer, List<Long>> filesToOffsetMap)
throws IOException {
return new Iterators(filesToOffsetMap, readers);
}
public void close() throws IOException {
readers.values().forEach(s -> {
try {
s.close();
} catch (IOException e) {
throw new HoodieIOException("Unable to close " + s.getFile(), e);
}
});
}
public class Iterators implements Iterator<GenericRecord> {
private final Map<Integer, AvroLogReader> readers;
private final Map<Integer, List<Long>> versionsToOffsetMap;
private Integer currentVersion;
private Iterator<Integer> currentVersionIterator;
private Iterator<Long> currentOffsetIterator;
private Iterator<GenericRecord> currentRecordIterator;
public Iterators(Map<Integer, List<Long>> versionToOffsetMap,
Map<Integer, AvroLogReader> readers) {
this.currentVersionIterator = versionToOffsetMap.keySet().iterator();
this.readers = readers;
this.versionsToOffsetMap = versionToOffsetMap;
}
private Iterator<GenericRecord> findNextBlock() throws IOException {
if (currentOffsetIterator != null) {
while (currentOffsetIterator.hasNext()) {
// we have more offsets to process for this file
long currentOffset = currentOffsetIterator.next();
Iterator<GenericRecord> currentBlock =
readers.get(currentVersion).readBlock(currentOffset);
if (currentBlock.hasNext()) {
return currentBlock;
}
}
}
return null;
}
private Iterator<GenericRecord> findNext() {
try {
Iterator<GenericRecord> nextBlock = findNextBlock();
if (nextBlock != null) {
// we have more offsets to process for this version
return nextBlock;
}
// We have no more offsets to process for the version, lets move on to the next version
while (currentVersionIterator.hasNext()) {
currentVersion = currentVersionIterator.next();
currentOffsetIterator = versionsToOffsetMap.get(currentVersion).iterator();
nextBlock = findNextBlock();
if (nextBlock != null) {
return nextBlock;
}
}
} catch (IOException e) {
throw new HoodieIOException(
"Could not read avro records from " + readers.get(currentVersion).getFile());
}
return null;
}
@Override
public boolean hasNext() {
if (currentRecordIterator == null || !currentRecordIterator.hasNext()) {
currentRecordIterator = findNext();
}
return (currentRecordIterator != null && currentRecordIterator.hasNext());
}
@Override
public GenericRecord next() {
return currentRecordIterator.next();
}
}
}

View File

@@ -0,0 +1,87 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.table.log.avro;
import com.google.common.base.Preconditions;
import com.uber.hoodie.common.table.log.HoodieLogAppendConfig;
import com.uber.hoodie.common.table.log.HoodieLogAppender;
import com.uber.hoodie.common.table.log.HoodieLogFile;
import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.util.List;
/**
* Implementation of {@link HoodieLogAppender} to roll over the log file when the sizeThreshold is reached.
*/
public class RollingAvroLogAppender implements HoodieLogAppender<IndexedRecord> {
private static final Log LOG = LogFactory.getLog(RollingAvroLogAppender.class);
private AvroLogAppender logWriter;
private HoodieLogAppendConfig config;
public RollingAvroLogAppender(HoodieLogAppendConfig config)
throws IOException, InterruptedException {
// initialize
this.logWriter = new AvroLogAppender(config);
this.config = config;
rollOverIfNeeded();
}
private void rollOverIfNeeded() throws IOException, InterruptedException {
HoodieLogFile logFile = config.getLogFile();
boolean shouldRollOver = logFile.shouldRollOver(this, config);
if (shouldRollOver) {
if (logWriter != null) {
// Close the old writer and open a new one
logWriter.close();
}
// Current logWriter is not initialized, set the current file name
HoodieLogFile nextRollLogPath = logFile.rollOver(config.getFs());
LOG.info("Rolling over log from " + logFile + " to " + nextRollLogPath);
this.config = config.withLogFile(nextRollLogPath);
this.logWriter = new AvroLogAppender(this.config);
}
}
public long getCurrentSize() throws IOException {
Preconditions.checkArgument(logWriter != null);
return logWriter.getCurrentSize();
}
public void append(List<IndexedRecord> records) throws IOException, InterruptedException {
LOG.info("Appending " + records.size() + " records to " + config.getLogFile());
rollOverIfNeeded();
Preconditions.checkArgument(logWriter != null);
logWriter.append(records);
}
public void sync() throws IOException {
Preconditions.checkArgument(logWriter != null);
logWriter.sync();
}
public void close() throws IOException {
Preconditions.checkArgument(logWriter != null);
logWriter.close();
}
public HoodieLogAppendConfig getConfig() {
return config;
}
}

View File

@@ -17,21 +17,27 @@
package com.uber.hoodie.common.util;
import com.google.common.base.Preconditions;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.log.HoodieLogFile;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.InvalidHoodiePathException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
/**
* Utility functions related to accessing the file storage
@@ -39,6 +45,9 @@ import java.util.List;
public class FSUtils {
private static final Logger LOG = LogManager.getLogger(FSUtils.class);
// Log files are of this pattern - b5068208-e1a4-11e6-bf01-fe55135034f3.avro.delta.1
private static final Pattern LOG_FILE_PATTERN = Pattern.compile("(.*)\\.(.*)\\.(.*)\\.([0-9]*)");
private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10;
public static FileSystem getFs() {
Configuration conf = new Configuration();
@@ -119,4 +128,163 @@ public class FSUtils {
public static String getInstantTime(String name) {
return name.replace(getFileExtension(name), "");
}
/**
* Get the file extension from the log file
* @param logPath
* @return
*/
public static String getFileExtensionFromLog(Path logPath) {
Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName());
if(!matcher.find()) {
throw new InvalidHoodiePathException(logPath, "LogFile");
}
return matcher.group(2) + "." + matcher.group(3);
}
/**
* Get the first part of the file name in the log file. That will be the fileId.
* Log file do not have commitTime in the file name.
*
* @param path
* @return
*/
public static String getFileIdFromLogPath(Path path) {
Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
if(!matcher.find()) {
throw new InvalidHoodiePathException(path, "LogFile");
}
return matcher.group(1);
}
/**
* Get the last part of the file name in the log file and convert to int.
*
* @param logPath
* @return
*/
public static int getFileVersionFromLog(Path logPath) {
Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName());
if(!matcher.find()) {
throw new InvalidHoodiePathException(logPath, "LogFile");
}
return Integer.parseInt(matcher.group(4));
}
public static String makeLogFileName(String fileId, String logFileExtension, int version) {
return String.format("%s%s.%d", fileId, logFileExtension, version);
}
/**
* Get the latest log file written from the list of log files passed in
*
* @param logFiles
* @return
*/
public static Optional<HoodieLogFile> getLatestLogFile(Stream<HoodieLogFile> logFiles) {
return logFiles.sorted(Comparator
.comparing(s -> s.getLogVersion(),
Comparator.reverseOrder())).findFirst();
}
/**
* Get all the log files for the passed in FileId in the partition path
*
* @param fs
* @param partitionPath
* @param fileId
* @param logFileExtension
* @return
*/
public static Stream<HoodieLogFile> getAllLogFiles(FileSystem fs, Path partitionPath,
final String fileId, final String logFileExtension) throws IOException {
return Arrays.stream(fs.listStatus(partitionPath,
path -> path.getName().startsWith(fileId) && path.getName()
.contains(logFileExtension))).map(HoodieLogFile::new);
}
/**
* Get the latest log version for the fileId in the partition path
*
* @param fs
* @param partitionPath
* @param fileId
* @param logFileExtension
* @return
* @throws IOException
*/
public static Optional<Integer> getLatestLogVersion(FileSystem fs, Path partitionPath,
final String fileId, final String logFileExtension) throws IOException {
Optional<HoodieLogFile> latestLogFile =
getLatestLogFile(getAllLogFiles(fs, partitionPath, fileId, logFileExtension));
if (latestLogFile.isPresent()) {
return Optional.of(latestLogFile.get().getLogVersion());
}
return Optional.empty();
}
public static int getCurrentLogVersion(FileSystem fs, Path partitionPath,
final String fileId, final String logFileExtension) throws IOException {
Optional<Integer> currentVersion =
getLatestLogVersion(fs, partitionPath, fileId, logFileExtension);
// handle potential overflow
return (currentVersion.isPresent()) ? currentVersion.get() : 1;
}
/**
* computes the next log version for the specified fileId in the partition path
*
* @param fs
* @param partitionPath
* @param fileId
* @return
* @throws IOException
*/
public static int computeNextLogVersion(FileSystem fs, Path partitionPath, final String fileId,
final String logFileExtension) throws IOException {
Optional<Integer> currentVersion =
getLatestLogVersion(fs, partitionPath, fileId, logFileExtension);
// handle potential overflow
return (currentVersion.isPresent()) ? currentVersion.get() + 1 : 1;
}
public static int getDefaultBufferSize(final FileSystem fs) {
return fs.getConf().getInt("io.file.buffer.size", 4096);
}
public static Short getDefaultReplication(FileSystem fs, Path path) {
return fs.getDefaultReplication(path);
}
public static Long getDefaultBlockSize(FileSystem fs, Path path) {
return fs.getDefaultBlockSize(path);
}
/**
* When a file was opened and the task died without closing the stream, another task executor cannot open because the existing lease will be active.
* We will try to recover the lease, from HDFS. If a data node went down, it takes about 10 minutes for the lease to be rocovered.
* But if the client dies, this should be instant.
*
* @param dfs
* @param p
* @return
* @throws IOException
*/
public static boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p)
throws IOException, InterruptedException {
LOG.info("Recover lease on dfs file " + p);
// initiate the recovery
boolean recovered = false;
for (int nbAttempt = 0; nbAttempt < MAX_ATTEMPTS_RECOVER_LEASE; nbAttempt++) {
LOG.info("Attempt " + nbAttempt + " to recover lease on dfs file " + p);
recovered = dfs.recoverLease(p);
if (recovered)
break;
// Sleep for 1 second before trying again. Typically it takes about 2-3 seconds to recover under default settings
Thread.sleep(1000);
}
return recovered;
}
}

View File

@@ -0,0 +1,25 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.exception;
import org.apache.hadoop.fs.Path;
public class InvalidHoodiePathException extends HoodieException {
public InvalidHoodiePathException(Path path, String type) {
super("Invalid path " + path + " of type " + type);
}
}

View File

@@ -0,0 +1,166 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.minicluster;
import com.google.common.base.Preconditions;
import com.google.common.io.Files;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
/**
* An HDFS minicluster service implementation.
*/
public class HdfsTestService {
private static final Logger logger = LoggerFactory.getLogger(HdfsTestService.class);
/**
* Configuration settings
*/
private Configuration hadoopConf;
private String workDir;
private String bindIP = "127.0.0.1";
private int namenodeRpcPort = 8020;
private int namenodeHttpPort = 50070;
private int datanodePort = 50010;
private int datanodeIpcPort = 50020;
private int datanodeHttpPort = 50075;
/**
* Embedded HDFS cluster
*/
private MiniDFSCluster miniDfsCluster;
public HdfsTestService() {
hadoopConf = new Configuration();
workDir = Files.createTempDir().getAbsolutePath();
}
public Configuration getHadoopConf() {
return hadoopConf;
}
public MiniDFSCluster start(boolean format) throws IOException {
Preconditions
.checkState(workDir != null, "The work dir must be set before starting cluster.");
if (hadoopConf == null) {
hadoopConf = new Configuration();
}
// If clean, then remove the work dir so we can start fresh.
String localDFSLocation = getDFSLocation(workDir);
if (format) {
logger.info(
"Cleaning HDFS cluster data at: " + localDFSLocation + " and starting fresh.");
File file = new File(localDFSLocation);
FileUtils.deleteDirectory(file);
}
// Configure and start the HDFS cluster
// boolean format = shouldFormatDFSCluster(localDFSLocation, clean);
hadoopConf = configureDFSCluster(hadoopConf, localDFSLocation, bindIP, namenodeRpcPort,
namenodeHttpPort, datanodePort, datanodeIpcPort, datanodeHttpPort);
miniDfsCluster = new MiniDFSCluster.Builder(hadoopConf).numDataNodes(1).format(format)
.checkDataNodeAddrConfig(true).checkDataNodeHostConfig(true).build();
logger.info("HDFS Minicluster service started.");
return miniDfsCluster;
}
public void stop() throws IOException {
miniDfsCluster.shutdown();
logger.info("HDFS Minicluster service shut down.");
miniDfsCluster = null;
hadoopConf = null;
}
/**
* Get the location on the local FS where we store the HDFS data.
*
* @param baseFsLocation The base location on the local filesystem we have write access to
* create dirs.
* @return The location for HDFS data.
*/
private static String getDFSLocation(String baseFsLocation) {
return baseFsLocation + Path.SEPARATOR + "dfs";
}
/**
* Returns true if we should format the DFS Cluster. We'll format if clean is
* true, or if the dfsFsLocation does not exist.
*
* @param localDFSLocation The location on the local FS to hold the HDFS metadata and block
* data
* @param clean Specifies if we want to start a clean cluster
* @return Returns true if we should format a DFSCluster, otherwise false
*/
private static boolean shouldFormatDFSCluster(String localDFSLocation, boolean clean) {
boolean format = true;
File f = new File(localDFSLocation);
if (f.exists() && f.isDirectory() && !clean) {
format = false;
}
return format;
}
/**
* Configure the DFS Cluster before launching it.
*
* @param config The already created Hadoop configuration we'll further configure
* for HDFS
* @param localDFSLocation The location on the local filesystem where cluster data is stored
* @param bindIP An IP address we want to force the datanode and namenode to bind
* to.
* @param namenodeRpcPort
* @param namenodeHttpPort
* @param datanodePort
* @param datanodeIpcPort
* @param datanodeHttpPort
* @return The updated Configuration object.
*/
private static Configuration configureDFSCluster(Configuration config, String localDFSLocation,
String bindIP, int namenodeRpcPort, int namenodeHttpPort, int datanodePort,
int datanodeIpcPort, int datanodeHttpPort) {
logger.info("HDFS force binding to ip: " + bindIP);
config.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + bindIP + ":" + namenodeRpcPort);
config.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, bindIP + ":" + datanodePort);
config.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, bindIP + ":" + datanodeIpcPort);
config.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, bindIP + ":" + datanodeHttpPort);
// When a datanode registers with the namenode, the Namenode do a hostname
// check of the datanode which will fail on OpenShift due to reverse DNS
// issues with the internal IP addresses. This config disables that check,
// and will allow a datanode to connect regardless.
config.setBoolean("dfs.namenode.datanode.registration.ip-hostname-check", false);
config.set("hdfs.minidfs.basedir", localDFSLocation);
// allow current user to impersonate others
String user = System.getProperty("user.name");
config.set("hadoop.proxyuser." + user + ".groups", "*");
config.set("hadoop.proxyuser." + user + ".hosts", "*");
return config;
}
}

View File

@@ -0,0 +1,53 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.minicluster;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.zookeeper.server.ZooKeeperServer;
import java.io.IOException;
public class MiniClusterUtil {
private static MiniDFSCluster dfsCluster;
private static ZooKeeperServer zkServer;
public static Configuration configuration;
public static FileSystem fileSystem;
public static void setUp() throws IOException, InterruptedException {
if (dfsCluster == null) {
HdfsTestService service = new HdfsTestService();
dfsCluster = service.start(true);
configuration = service.getHadoopConf();
}
if (zkServer == null) {
ZookeeperTestService zkService = new ZookeeperTestService(configuration);
zkServer = zkService.start();
}
fileSystem = FileSystem.get(configuration);
}
public static void shutdown() {
if (dfsCluster != null) {
dfsCluster.shutdown();
}
if (zkServer != null) {
zkServer.shutdown();
}
}
}

View File

@@ -0,0 +1,241 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.minicluster;
import com.google.common.base.Preconditions;
import com.google.common.io.Files;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Reader;
import java.net.InetSocketAddress;
import java.net.Socket;
/**
* A Zookeeper minicluster service implementation.
* <p/>
* This class was ripped from MiniZooKeeperCluster from the HBase tests. Changes
* made include:
* <p/>
* 1. It will now only launch 1 zookeeper server.
* <p/>
* 2. It will only attempt to bind to the port specified, and will fail if it
* can't.
* <p/>
* 3. The startup method now takes a bindAddress, which allows us to configure
* which IP the ZK server binds to. This was not configurable in the original
* class.
* <p/>
* 4. The ZK cluster will re-use a data dir on the local filesystem if it
* already exists instead of blowing it away.
*/
public class ZookeeperTestService {
private static final Logger logger = LoggerFactory.getLogger(ZookeeperTestService.class);
private static final int TICK_TIME = 2000;
private static final int CONNECTION_TIMEOUT = 30000;
/**
* Configuration settings
*/
private Configuration hadoopConf;
private String workDir;
private Integer clientPort = 2828;
private String bindIP = "127.0.0.1";
private Boolean clean = false;
private int tickTime = 0;
/**
* Embedded ZooKeeper cluster
*/
private NIOServerCnxnFactory standaloneServerFactory;
private ZooKeeperServer zooKeeperServer;
private boolean started = false;
public ZookeeperTestService(Configuration config) {
this.workDir = Files.createTempDir().getAbsolutePath();
this.hadoopConf = config;
}
public Configuration getHadoopConf() {
return hadoopConf;
}
public ZooKeeperServer start() throws IOException, InterruptedException {
Preconditions.checkState(workDir != null,
"The localBaseFsLocation must be set before starting cluster.");
setupTestEnv();
stop();
File dir = new File(workDir, "zookeeper").getAbsoluteFile();
recreateDir(dir, clean);
int tickTimeToUse;
if (this.tickTime > 0) {
tickTimeToUse = this.tickTime;
} else {
tickTimeToUse = TICK_TIME;
}
this.zooKeeperServer = new ZooKeeperServer(dir, dir, tickTimeToUse);
standaloneServerFactory = new NIOServerCnxnFactory();
// NOTE: Changed from the original, where InetSocketAddress was
// originally created to bind to the wildcard IP, we now configure it.
logger.info("Zookeeper force binding to: " + this.bindIP);
standaloneServerFactory.configure(new InetSocketAddress(bindIP, clientPort), 1000);
// Start up this ZK server
standaloneServerFactory.startup(zooKeeperServer);
String serverHostname;
if (bindIP.equals("0.0.0.0")) {
serverHostname = "localhost";
} else {
serverHostname = bindIP;
}
if (!waitForServerUp(serverHostname, clientPort, CONNECTION_TIMEOUT)) {
throw new IOException("Waiting for startup of standalone server");
}
started = true;
logger.info("Zookeeper Minicluster service started on client port: " + clientPort);
return zooKeeperServer;
}
public void stop() throws IOException {
if (!started) {
return;
}
standaloneServerFactory.shutdown();
if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
throw new IOException("Waiting for shutdown of standalone server");
}
// clear everything
started = false;
standaloneServerFactory = null;
zooKeeperServer = null;
logger.info("Zookeeper Minicluster service shut down.");
}
private void recreateDir(File dir, boolean clean) throws IOException {
if (dir.exists() && clean) {
FileUtil.fullyDelete(dir);
} else if (dir.exists() && !clean) {
// the directory's exist, and we don't want to clean, so exit
return;
}
try {
dir.mkdirs();
} catch (SecurityException e) {
throw new IOException("creating dir: " + dir, e);
}
}
// / XXX: From o.a.zk.t.ClientBase
private static void setupTestEnv() {
// during the tests we run with 100K prealloc in the logs.
// on windows systems prealloc of 64M was seen to take ~15seconds
// resulting in test failure (client timeout on first session).
// set env and directly in order to handle static init/gc issues
System.setProperty("zookeeper.preAllocSize", "100");
FileTxnLog.setPreallocSize(100 * 1024);
}
// XXX: From o.a.zk.t.ClientBase
private static boolean waitForServerDown(int port, long timeout) {
long start = System.currentTimeMillis();
while (true) {
try {
Socket sock = new Socket("localhost", port);
try {
OutputStream outstream = sock.getOutputStream();
outstream.write("stat".getBytes());
outstream.flush();
} finally {
sock.close();
}
} catch (IOException e) {
return true;
}
if (System.currentTimeMillis() > start + timeout) {
break;
}
try {
Thread.sleep(250);
} catch (InterruptedException e) {
// ignore
}
}
return false;
}
// XXX: From o.a.zk.t.ClientBase
private static boolean waitForServerUp(String hostname, int port, long timeout) {
long start = System.currentTimeMillis();
while (true) {
try {
Socket sock = new Socket(hostname, port);
BufferedReader reader = null;
try {
OutputStream outstream = sock.getOutputStream();
outstream.write("stat".getBytes());
outstream.flush();
Reader isr = new InputStreamReader(sock.getInputStream());
reader = new BufferedReader(isr);
String line = reader.readLine();
if (line != null && line.startsWith("Zookeeper version:")) {
return true;
}
} finally {
sock.close();
if (reader != null) {
reader.close();
}
}
} catch (IOException e) {
// ignore as this is expected
logger.info("server " + hostname + ":" + port + " not up " + e);
}
if (System.currentTimeMillis() > start + timeout) {
break;
}
try {
Thread.sleep(250);
} catch (InterruptedException e) {
// ignore
}
}
return false;
}
}

View File

@@ -0,0 +1,315 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.table.log.avro;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.uber.hoodie.common.minicluster.MiniClusterUtil;
import com.uber.hoodie.common.table.log.HoodieLogAppendConfig;
import com.uber.hoodie.common.table.log.HoodieLogFile;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.SchemaTestUtil;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.collections.IteratorUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class AvroLogAppenderTest {
private FileSystem fs;
private Path partitionPath;
@BeforeClass
public static void setUpClass() throws IOException, InterruptedException {
// Append is not supported in LocalFileSystem. HDFS needs to be setup.
MiniClusterUtil.setUp();
}
@AfterClass
public static void tearDownClass() {
MiniClusterUtil.shutdown();
}
@Before
public void setUp() throws IOException, InterruptedException {
this.fs = MiniClusterUtil.fileSystem;
TemporaryFolder folder = new TemporaryFolder();
folder.create();
assertTrue(fs.mkdirs(new Path(folder.getRoot().getPath())));
this.partitionPath = new Path(folder.getRoot().getPath());
}
@After
public void tearDown() throws IOException {
fs.delete(partitionPath, true);
}
@Test
public void testBasicAppend() throws IOException, URISyntaxException, InterruptedException {
HoodieLogAppendConfig logConfig =
HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath)
.withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
.withSchema(SchemaTestUtil.getSimpleSchema()).withFs(fs).build();
RollingAvroLogAppender logAppender = new RollingAvroLogAppender(logConfig);
logAppender.append(SchemaTestUtil.generateTestRecords(0, 100));
long size1 = logAppender.getCurrentSize();
assertTrue("", size1 > 0);
assertEquals("", size1, fs.getFileStatus(logConfig.getLogFile().getPath()).getLen());
logAppender.close();
// Close and Open again and append 100 more records
logAppender = new RollingAvroLogAppender(logConfig);
logAppender.append(SchemaTestUtil.generateTestRecords(100, 100));
long size2 = logAppender.getCurrentSize();
assertTrue("", size2 > size1);
assertEquals("", size2, fs.getFileStatus(logConfig.getLogFile().getPath()).getLen());
logAppender.close();
// Close and Open again and append 100 more records
logAppender = new RollingAvroLogAppender(logConfig);
logAppender.append(SchemaTestUtil.generateTestRecords(200, 100));
long size3 = logAppender.getCurrentSize();
assertTrue("", size3 > size2);
assertEquals("", size3, fs.getFileStatus(logConfig.getLogFile().getPath()).getLen());
logAppender.close();
// Cannot get the current size after closing the log
try {
logAppender.getCurrentSize();
fail("getCurrentSize should fail after the logAppender is closed");
} catch (IllegalStateException e) {
// pass
}
}
@Test
public void testLeaseRecovery() throws IOException, URISyntaxException, InterruptedException {
HoodieLogAppendConfig logConfig =
HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath)
.withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
.withSchema(SchemaTestUtil.getSimpleSchema()).withFs(fs).build();
RollingAvroLogAppender logAppender = new RollingAvroLogAppender(logConfig);
logAppender.append(SchemaTestUtil.generateTestRecords(0, 100));
// do not close this log appender
// logAppender.close();
// Try opening again and append 100 more records
logAppender = new RollingAvroLogAppender(logConfig);
logAppender.append(SchemaTestUtil.generateTestRecords(100, 100));
assertEquals("", logAppender.getCurrentSize(),
fs.getFileStatus(logConfig.getLogFile().getPath()).getLen());
logAppender.close();
}
@Test
public void testAppendOnCorruptedBlock()
throws IOException, URISyntaxException, InterruptedException {
HoodieLogAppendConfig logConfig =
HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath)
.withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
.withSchema(SchemaTestUtil.getSimpleSchema()).withFs(fs).build();
RollingAvroLogAppender logAppender = new RollingAvroLogAppender(logConfig);
logAppender.append(SchemaTestUtil.generateTestRecords(0, 100));
logAppender.close();
// Append some arbit byte[] to thee end of the log (mimics a partially written commit)
assertTrue(fs.exists(logConfig.getLogFile().getPath()));
fs = FileSystem.get(fs.getConf());
FSDataOutputStream outputStream =
fs.append(logConfig.getLogFile().getPath(), logConfig.getBufferSize());
outputStream.write("something-random".getBytes());
outputStream.flush();
outputStream.close();
logAppender = new RollingAvroLogAppender(logConfig);
logAppender.append(SchemaTestUtil.generateTestRecords(100, 100));
logAppender.close();
}
@SuppressWarnings("unchecked")
@Test
public void testBasicWriteAndRead()
throws IOException, URISyntaxException, InterruptedException {
HoodieLogAppendConfig logConfig =
HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath)
.withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
.withSchema(SchemaTestUtil.getSimpleSchema()).withFs(fs).build();
RollingAvroLogAppender logAppender = new RollingAvroLogAppender(logConfig);
long size1 = logAppender.getCurrentSize();
List<IndexedRecord> inputRecords = SchemaTestUtil.generateTestRecords(0, 100);
logAppender.append(inputRecords);
logAppender.close();
AvroLogReader logReader =
new AvroLogReader(logConfig.getLogFile(), fs, logConfig.getSchema());
List<GenericRecord> result = IteratorUtils.toList(logReader.readBlock(size1));
assertEquals("Random access should return 100 records", 100, result.size());
assertEquals("both lists should be the same. (ordering guaranteed)", inputRecords, result);
}
@SuppressWarnings("unchecked")
@Test
public void testBasicAppendAndRead()
throws IOException, URISyntaxException, InterruptedException {
HoodieLogAppendConfig logConfig =
HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath)
.withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
.withSchema(SchemaTestUtil.getSimpleSchema()).withFs(fs).build();
RollingAvroLogAppender logAppender = new RollingAvroLogAppender(logConfig);
logAppender.append(SchemaTestUtil.generateTestRecords(0, 100));
long size1 = logAppender.getCurrentSize();
logAppender.close();
// Close and Open again and append 100 more records
logAppender = new RollingAvroLogAppender(logConfig);
List<IndexedRecord> secondBatchInput = SchemaTestUtil.generateTestRecords(100, 100);
logAppender.append(secondBatchInput);
long size2 = logAppender.getCurrentSize();
logAppender.close();
// Close and Open again and append 100 more records
logAppender = new RollingAvroLogAppender(logConfig);
List<IndexedRecord> lastBatchInput = SchemaTestUtil.generateTestRecords(200, 100);
logAppender.append(lastBatchInput);
long size3 = logAppender.getCurrentSize();
logAppender.close();
AvroLogReader logReader =
new AvroLogReader(logConfig.getLogFile(), fs, logConfig.getSchema());
// Try to grab the middle block here
List<GenericRecord> secondBatch = IteratorUtils.toList(logReader.readBlock(size1));
assertEquals("Stream collect should return 100 records", 100, secondBatch.size());
assertEquals("Collected list should match the input list (ordering guaranteed)",
secondBatchInput, secondBatch);
// Try to grab the middle block here
List<GenericRecord> lastBatch = IteratorUtils.toList(logReader.readBlock(size2));
assertEquals("Stream collect should return 100 records", 100, secondBatch.size());
assertEquals("Collected list should match the input list (ordering guaranteed)",
lastBatchInput, lastBatch);
List<GenericRecord> imaginaryBatch = IteratorUtils.toList(logReader.readBlock(size3));
assertEquals("Stream collect should return 0 records", 0, imaginaryBatch.size());
}
@Test
public void testAppendAndReadOnCorruptedLog()
throws IOException, URISyntaxException, InterruptedException {
HoodieLogAppendConfig logConfig =
HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath)
.withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
.withSchema(SchemaTestUtil.getSimpleSchema()).withFs(fs).build();
RollingAvroLogAppender logAppender = new RollingAvroLogAppender(logConfig);
long size1 = logAppender.getCurrentSize();
logAppender.append(SchemaTestUtil.generateTestRecords(0, 100));
logAppender.close();
// Append some arbit byte[] to thee end of the log (mimics a partially written commit)
assertTrue(fs.exists(logConfig.getLogFile().getPath()));
fs = FileSystem.get(fs.getConf());
FSDataOutputStream outputStream =
fs.append(logConfig.getLogFile().getPath(), logConfig.getBufferSize());
outputStream.write("something-random".getBytes());
outputStream.flush();
outputStream.close();
logAppender = new RollingAvroLogAppender(logConfig);
long size2 = logAppender.getCurrentSize();
logAppender.append(SchemaTestUtil.generateTestRecords(100, 100));
logAppender.close();
AvroLogReader logReader =
new AvroLogReader(logConfig.getLogFile(), fs, logConfig.getSchema());
// Try to grab the middle block here
List<GenericRecord> secondBatch = IteratorUtils.toList(logReader.readBlock(size1));
assertEquals("Stream collect should return 100 records", 100, secondBatch.size());
// Try to grab the last block here
List<GenericRecord> lastBatch = IteratorUtils.toList(logReader.readBlock(size2));
assertEquals("Stream collect should return 100 records", 100, lastBatch.size());
}
@Test
public void testCompositeAvroLogReader()
throws IOException, URISyntaxException, InterruptedException {
// Set a small threshold so that every block is a new version
HoodieLogAppendConfig logConfig =
HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath)
.withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
.withSchema(SchemaTestUtil.getSimpleSchema()).withSizeThreshold(500).withFs(fs)
.build();
RollingAvroLogAppender logAppender = new RollingAvroLogAppender(logConfig);
long size1 = logAppender.getCurrentSize();
List<IndexedRecord> input1 = SchemaTestUtil.generateTestRecords(0, 100);
logAppender.append(input1);
logAppender.close();
// Need to rebuild config to set the latest version as path
logConfig = HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath)
.withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
.withSchema(SchemaTestUtil.getSimpleSchema()).withSizeThreshold(500).withFs(fs).build();
logAppender = new RollingAvroLogAppender(logConfig);
long size2 = logAppender.getCurrentSize();
List<IndexedRecord> input2 = SchemaTestUtil.generateTestRecords(100, 100);
logAppender.append(input2);
logAppender.close();
logConfig = HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath)
.withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
.withSchema(SchemaTestUtil.getSimpleSchema()).withSizeThreshold(500).withFs(fs).build();
List<HoodieLogFile> allLogFiles = FSUtils
.getAllLogFiles(fs, partitionPath, logConfig.getLogFile().getFileId(),
HoodieLogFile.DELTA_EXTENSION).collect(Collectors.toList());
assertEquals("", 2, allLogFiles.size());
SortedMap<Integer, List<Long>> offsets = Maps.newTreeMap();
offsets.put(1, Lists.newArrayList(size1));
offsets.put(2, Lists.newArrayList(size2));
CompositeAvroLogReader reader =
new CompositeAvroLogReader(partitionPath, logConfig.getLogFile().getFileId(), fs,
logConfig.getSchema(), HoodieLogFile.DELTA_EXTENSION);
Iterator<GenericRecord> results = reader.readBlocks(offsets);
List<GenericRecord> totalBatch = IteratorUtils.toList(results);
assertEquals("Stream collect should return all 200 records", 200, totalBatch.size());
input1.addAll(input2);
assertEquals("CompositeAvroLogReader should return 200 records from 2 versions", input1,
totalBatch);
}
}

View File

@@ -0,0 +1,62 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.util;
import com.uber.hoodie.exception.HoodieIOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.DecoderFactory;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class SchemaTestUtil {
public static Schema getSimpleSchema() throws IOException {
return new Schema.Parser()
.parse(SchemaTestUtil.class.getResourceAsStream("/simple-test.avro"));
}
public static List<IndexedRecord> generateTestRecords(int from, int limit)
throws IOException, URISyntaxException {
return toRecords(getSimpleSchema(), getSimpleSchema(), from, limit);
}
private static List<IndexedRecord> toRecords(Schema writerSchema, Schema readerSchema, int from,
int limit) throws IOException, URISyntaxException {
GenericDatumReader<IndexedRecord> reader =
new GenericDatumReader<>(writerSchema, readerSchema);
try (Stream<String> stream = Files
.lines(Paths.get(SchemaTestUtil.class.getResource("/sample.data").toURI()))) {
return stream.skip(from).limit(limit).map(s -> {
try {
return reader.read(null, DecoderFactory.get().jsonDecoder(readerSchema, s));
} catch (IOException e) {
throw new HoodieIOException("Could not read data from simple_data.json", e);
}
}).collect(Collectors.toList());
} catch (IOException e) {
throw new HoodieIOException("Could not read data from simple_data.json", e);
}
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,10 @@
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": "long"},
{"name": "favorite_color", "type": "string"}
]
}