[checkstyle] Add ConstantName java checkstyle rule (#1066)
* add SimplifyBooleanExpression java checkstyle rule * collapse empty tags in scalastyle file
This commit is contained in:
@@ -41,7 +41,7 @@ import java.util.Map;
|
||||
*/
|
||||
public class MercifulJsonConverter {
|
||||
|
||||
private static final Map<Schema.Type, JsonToAvroFieldProcessor> fieldTypeProcessors = getFieldTypeProcessors();
|
||||
private static final Map<Schema.Type, JsonToAvroFieldProcessor> FIELD_TYPE_PROCESSORS = getFieldTypeProcessors();
|
||||
|
||||
private final ObjectMapper mapper;
|
||||
|
||||
@@ -125,7 +125,7 @@ public class MercifulJsonConverter {
|
||||
throw new HoodieJsonToAvroConversionException(null, name, schema);
|
||||
}
|
||||
|
||||
JsonToAvroFieldProcessor processor = fieldTypeProcessors.get(schema.getType());
|
||||
JsonToAvroFieldProcessor processor = FIELD_TYPE_PROCESSORS.get(schema.getType());
|
||||
if (null != processor) {
|
||||
return processor.convertToAvro(value, name, schema);
|
||||
}
|
||||
|
||||
@@ -47,7 +47,7 @@ import java.util.stream.Collectors;
|
||||
*/
|
||||
public class HoodieTableConfig implements Serializable {
|
||||
|
||||
private static final transient Logger log = LogManager.getLogger(HoodieTableConfig.class);
|
||||
private static final transient Logger LOG = LogManager.getLogger(HoodieTableConfig.class);
|
||||
|
||||
public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties";
|
||||
public static final String HOODIE_TABLE_NAME_PROP_NAME = "hoodie.table.name";
|
||||
@@ -67,7 +67,7 @@ public class HoodieTableConfig implements Serializable {
|
||||
public HoodieTableConfig(FileSystem fs, String metaPath) {
|
||||
Properties props = new Properties();
|
||||
Path propertyPath = new Path(metaPath, HOODIE_PROPERTIES_FILE);
|
||||
log.info("Loading dataset properties from " + propertyPath);
|
||||
LOG.info("Loading dataset properties from " + propertyPath);
|
||||
try {
|
||||
try (FSDataInputStream inputStream = fs.open(propertyPath)) {
|
||||
props.load(inputStream);
|
||||
|
||||
@@ -64,7 +64,7 @@ import java.util.stream.Collectors;
|
||||
*/
|
||||
public class HoodieTableMetaClient implements Serializable {
|
||||
|
||||
private static final transient Logger log = LogManager.getLogger(HoodieTableMetaClient.class);
|
||||
private static final transient Logger LOG = LogManager.getLogger(HoodieTableMetaClient.class);
|
||||
public static String METAFOLDER_NAME = ".hoodie";
|
||||
public static String TEMPFOLDER_NAME = METAFOLDER_NAME + File.separator + ".temp";
|
||||
public static String AUXILIARYFOLDER_NAME = METAFOLDER_NAME + File.separator + ".aux";
|
||||
@@ -92,7 +92,7 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
|
||||
public HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad,
|
||||
ConsistencyGuardConfig consistencyGuardConfig) throws DatasetNotFoundException {
|
||||
log.info("Loading HoodieTableMetaClient from " + basePath);
|
||||
LOG.info("Loading HoodieTableMetaClient from " + basePath);
|
||||
this.basePath = basePath;
|
||||
this.consistencyGuardConfig = consistencyGuardConfig;
|
||||
this.hadoopConf = new SerializableConfiguration(conf);
|
||||
@@ -103,10 +103,10 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
DatasetNotFoundException.checkValidDataset(fs, basePathDir, metaPathDir);
|
||||
this.tableConfig = new HoodieTableConfig(fs, metaPath);
|
||||
this.tableType = tableConfig.getTableType();
|
||||
log.info("Finished Loading Table of type " + tableType + " from " + basePath);
|
||||
LOG.info("Finished Loading Table of type " + tableType + " from " + basePath);
|
||||
this.loadActiveTimelineOnLoad = loadActiveTimelineOnLoad;
|
||||
if (loadActiveTimelineOnLoad) {
|
||||
log.info("Loading Active commit timeline for " + basePath);
|
||||
LOG.info("Loading Active commit timeline for " + basePath);
|
||||
getActiveTimeline();
|
||||
}
|
||||
}
|
||||
@@ -303,7 +303,7 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
*/
|
||||
public static HoodieTableMetaClient initDatasetAndGetMetaClient(Configuration hadoopConf, String basePath,
|
||||
Properties props) throws IOException {
|
||||
log.info("Initializing " + basePath + " as hoodie dataset " + basePath);
|
||||
LOG.info("Initializing " + basePath + " as hoodie dataset " + basePath);
|
||||
Path basePathDir = new Path(basePath);
|
||||
final FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
|
||||
if (!fs.exists(basePathDir)) {
|
||||
@@ -340,7 +340,7 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
// We should not use fs.getConf as this might be different from the original configuration
|
||||
// used to create the fs in unit tests
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
|
||||
log.info("Finished initializing Table of type " + metaClient.getTableConfig().getTableType() + " from " + basePath);
|
||||
LOG.info("Finished initializing Table of type " + metaClient.getTableConfig().getTableType() + " from " + basePath);
|
||||
return metaClient;
|
||||
}
|
||||
|
||||
|
||||
@@ -66,7 +66,7 @@ import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlo
|
||||
*/
|
||||
public abstract class AbstractHoodieLogRecordScanner {
|
||||
|
||||
private static final Logger log = LogManager.getLogger(AbstractHoodieLogRecordScanner.class);
|
||||
private static final Logger LOG = LogManager.getLogger(AbstractHoodieLogRecordScanner.class);
|
||||
|
||||
// Reader schema for the records
|
||||
protected final Schema readerSchema;
|
||||
@@ -131,7 +131,7 @@ public abstract class AbstractHoodieLogRecordScanner {
|
||||
Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
|
||||
while (logFormatReaderWrapper.hasNext()) {
|
||||
HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
|
||||
log.info("Scanning log file " + logFile);
|
||||
LOG.info("Scanning log file " + logFile);
|
||||
scannedLogFiles.add(logFile);
|
||||
totalLogFiles.set(scannedLogFiles.size());
|
||||
// Use the HoodieLogFileReader to iterate through the blocks in the log file
|
||||
@@ -145,7 +145,7 @@ public abstract class AbstractHoodieLogRecordScanner {
|
||||
}
|
||||
switch (r.getBlockType()) {
|
||||
case AVRO_DATA_BLOCK:
|
||||
log.info("Reading a data block from file " + logFile.getPath());
|
||||
LOG.info("Reading a data block from file " + logFile.getPath());
|
||||
if (isNewInstantBlock(r) && !readBlocksLazily) {
|
||||
// If this is an avro data block belonging to a different commit/instant,
|
||||
// then merge the last blocks and records into the main result
|
||||
@@ -155,7 +155,7 @@ public abstract class AbstractHoodieLogRecordScanner {
|
||||
currentInstantLogBlocks.push(r);
|
||||
break;
|
||||
case DELETE_BLOCK:
|
||||
log.info("Reading a delete block from file " + logFile.getPath());
|
||||
LOG.info("Reading a delete block from file " + logFile.getPath());
|
||||
if (isNewInstantBlock(r) && !readBlocksLazily) {
|
||||
// If this is a delete data block belonging to a different commit/instant,
|
||||
// then merge the last blocks and records into the main result
|
||||
@@ -177,7 +177,7 @@ public abstract class AbstractHoodieLogRecordScanner {
|
||||
// written per ingestion batch for a file but in reality we need to rollback (B1 & B2)
|
||||
// The following code ensures the same rollback block (R1) is used to rollback
|
||||
// both B1 & B2
|
||||
log.info("Reading a command block from file " + logFile.getPath());
|
||||
LOG.info("Reading a command block from file " + logFile.getPath());
|
||||
// This is a command block - take appropriate action based on the command
|
||||
HoodieCommandBlock commandBlock = (HoodieCommandBlock) r;
|
||||
String targetInstantForCommandBlock =
|
||||
@@ -196,34 +196,34 @@ public abstract class AbstractHoodieLogRecordScanner {
|
||||
HoodieLogBlock lastBlock = currentInstantLogBlocks.peek();
|
||||
// handle corrupt blocks separately since they may not have metadata
|
||||
if (lastBlock.getBlockType() == CORRUPT_BLOCK) {
|
||||
log.info("Rolling back the last corrupted log block read in " + logFile.getPath());
|
||||
LOG.info("Rolling back the last corrupted log block read in " + logFile.getPath());
|
||||
currentInstantLogBlocks.pop();
|
||||
numBlocksRolledBack++;
|
||||
} else if (lastBlock.getBlockType() != CORRUPT_BLOCK
|
||||
&& targetInstantForCommandBlock.contentEquals(lastBlock.getLogBlockHeader().get(INSTANT_TIME))) {
|
||||
// rollback last data block or delete block
|
||||
log.info("Rolling back the last log block read in " + logFile.getPath());
|
||||
LOG.info("Rolling back the last log block read in " + logFile.getPath());
|
||||
currentInstantLogBlocks.pop();
|
||||
numBlocksRolledBack++;
|
||||
} else if (!targetInstantForCommandBlock
|
||||
.contentEquals(currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME))) {
|
||||
// invalid or extra rollback block
|
||||
log.warn("TargetInstantTime " + targetInstantForCommandBlock
|
||||
LOG.warn("TargetInstantTime " + targetInstantForCommandBlock
|
||||
+ " invalid or extra rollback command block in " + logFile.getPath());
|
||||
break;
|
||||
} else {
|
||||
// this should not happen ideally
|
||||
log.warn("Unable to apply rollback command block in " + logFile.getPath());
|
||||
LOG.warn("Unable to apply rollback command block in " + logFile.getPath());
|
||||
}
|
||||
}
|
||||
log.info("Number of applied rollback blocks " + numBlocksRolledBack);
|
||||
LOG.info("Number of applied rollback blocks " + numBlocksRolledBack);
|
||||
break;
|
||||
default:
|
||||
throw new UnsupportedOperationException("Command type not yet supported.");
|
||||
}
|
||||
break;
|
||||
case CORRUPT_BLOCK:
|
||||
log.info("Found a corrupt block in " + logFile.getPath());
|
||||
LOG.info("Found a corrupt block in " + logFile.getPath());
|
||||
totalCorruptBlocks.incrementAndGet();
|
||||
// If there is a corrupt block - we will assume that this was the next data block
|
||||
currentInstantLogBlocks.push(r);
|
||||
@@ -234,13 +234,13 @@ public abstract class AbstractHoodieLogRecordScanner {
|
||||
}
|
||||
// merge the last read block when all the blocks are done reading
|
||||
if (!currentInstantLogBlocks.isEmpty()) {
|
||||
log.info("Merging the final data blocks");
|
||||
LOG.info("Merging the final data blocks");
|
||||
processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size());
|
||||
}
|
||||
// Done
|
||||
progress = 1.0f;
|
||||
} catch (Exception e) {
|
||||
log.error("Got exception when reading log file", e);
|
||||
LOG.error("Got exception when reading log file", e);
|
||||
throw new HoodieIOException("IOException when reading log file ");
|
||||
} finally {
|
||||
try {
|
||||
@@ -249,7 +249,7 @@ public abstract class AbstractHoodieLogRecordScanner {
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
// Eat exception as we do not want to mask the original exception that can happen
|
||||
log.error("Unable to close log format reader", ioe);
|
||||
LOG.error("Unable to close log format reader", ioe);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -297,7 +297,7 @@ public abstract class AbstractHoodieLogRecordScanner {
|
||||
*/
|
||||
private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> lastBlocks, int numLogFilesSeen) throws Exception {
|
||||
while (!lastBlocks.isEmpty()) {
|
||||
log.info("Number of remaining logblocks to merge " + lastBlocks.size());
|
||||
LOG.info("Number of remaining logblocks to merge " + lastBlocks.size());
|
||||
// poll the element at the bottom of the stack since that's the order it was inserted
|
||||
HoodieLogBlock lastBlock = lastBlocks.pollLast();
|
||||
switch (lastBlock.getBlockType()) {
|
||||
@@ -308,7 +308,7 @@ public abstract class AbstractHoodieLogRecordScanner {
|
||||
Arrays.stream(((HoodieDeleteBlock) lastBlock).getKeysToDelete()).forEach(this::processNextDeletedKey);
|
||||
break;
|
||||
case CORRUPT_BLOCK:
|
||||
log.warn("Found a corrupt block which was not rolled back");
|
||||
LOG.warn("Found a corrupt block which was not rolled back");
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
|
||||
@@ -54,11 +54,11 @@ import java.util.Map;
|
||||
class HoodieLogFileReader implements HoodieLogFormat.Reader {
|
||||
|
||||
public static final int DEFAULT_BUFFER_SIZE = 16 * 1024 * 1024; // 16 MB
|
||||
private static final Logger log = LogManager.getLogger(HoodieLogFileReader.class);
|
||||
private static final Logger LOG = LogManager.getLogger(HoodieLogFileReader.class);
|
||||
|
||||
private final FSDataInputStream inputStream;
|
||||
private final HoodieLogFile logFile;
|
||||
private static final byte[] magicBuffer = new byte[6];
|
||||
private static final byte[] MAGIC_BUFFER = new byte[6];
|
||||
private final Schema readerSchema;
|
||||
private HoodieLogFormat.LogFormatVersion nextBlockVersion;
|
||||
private boolean readBlockLazily;
|
||||
@@ -112,7 +112,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
|
||||
try {
|
||||
close();
|
||||
} catch (Exception e) {
|
||||
log.warn("unable to close input stream for log file " + logFile, e);
|
||||
LOG.warn("unable to close input stream for log file " + logFile, e);
|
||||
// fail silently for any sort of exception
|
||||
}
|
||||
}
|
||||
@@ -210,12 +210,12 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
|
||||
}
|
||||
|
||||
private HoodieLogBlock createCorruptBlock() throws IOException {
|
||||
log.info("Log " + logFile + " has a corrupted block at " + inputStream.getPos());
|
||||
LOG.info("Log " + logFile + " has a corrupted block at " + inputStream.getPos());
|
||||
long currentPos = inputStream.getPos();
|
||||
long nextBlockOffset = scanForNextAvailableBlockOffset();
|
||||
// Rewind to the initial start and read corrupted bytes till the nextBlockOffset
|
||||
inputStream.seek(currentPos);
|
||||
log.info("Next available block in " + logFile + " starts at " + nextBlockOffset);
|
||||
LOG.info("Next available block in " + logFile + " starts at " + nextBlockOffset);
|
||||
int corruptedBlockSize = (int) (nextBlockOffset - currentPos);
|
||||
long contentPosition = inputStream.getPos();
|
||||
byte[] corruptedBytes = HoodieLogBlock.readOrSkipContent(inputStream, corruptedBlockSize, readBlockLazily);
|
||||
@@ -313,8 +313,8 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
|
||||
private boolean hasNextMagic() throws IOException {
|
||||
long pos = inputStream.getPos();
|
||||
// 1. Read magic header from the start of the block
|
||||
inputStream.readFully(magicBuffer, 0, 6);
|
||||
if (!Arrays.equals(magicBuffer, HoodieLogFormat.MAGIC)) {
|
||||
inputStream.readFully(MAGIC_BUFFER, 0, 6);
|
||||
if (!Arrays.equals(MAGIC_BUFFER, HoodieLogFormat.MAGIC)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
|
||||
@@ -51,7 +51,7 @@ public interface HoodieLogFormat {
|
||||
* The current version of the log format. Anytime the log format changes this version needs to be bumped and
|
||||
* corresponding changes need to be made to {@link HoodieLogFormatVersion}
|
||||
*/
|
||||
int currentVersion = 1;
|
||||
int CURRENT_VERSION = 1;
|
||||
|
||||
String UNKNOWN_WRITE_TOKEN = "1-0-1";
|
||||
|
||||
@@ -104,7 +104,7 @@ public interface HoodieLogFormat {
|
||||
*/
|
||||
class WriterBuilder {
|
||||
|
||||
private static final Logger log = LogManager.getLogger(WriterBuilder.class);
|
||||
private static final Logger LOG = LogManager.getLogger(WriterBuilder.class);
|
||||
// Default max log file size 512 MB
|
||||
public static final long DEFAULT_SIZE_THRESHOLD = 512 * 1024 * 1024L;
|
||||
|
||||
@@ -188,7 +188,7 @@ public interface HoodieLogFormat {
|
||||
}
|
||||
|
||||
public Writer build() throws IOException, InterruptedException {
|
||||
log.info("Building HoodieLogFormat Writer");
|
||||
LOG.info("Building HoodieLogFormat Writer");
|
||||
if (fs == null) {
|
||||
throw new IllegalArgumentException("fs is not specified");
|
||||
}
|
||||
@@ -210,7 +210,7 @@ public interface HoodieLogFormat {
|
||||
}
|
||||
|
||||
if (logVersion == null) {
|
||||
log.info("Computing the next log version for " + logFileId + " in " + parentPath);
|
||||
LOG.info("Computing the next log version for " + logFileId + " in " + parentPath);
|
||||
Option<Pair<Integer, String>> versionAndWriteToken =
|
||||
FSUtils.getLatestLogVersion(fs, parentPath, logFileId, fileExtension, commitTime);
|
||||
if (versionAndWriteToken.isPresent()) {
|
||||
@@ -222,7 +222,7 @@ public interface HoodieLogFormat {
|
||||
// Use rollover write token as write token to create new log file with tokens
|
||||
logWriteToken = rolloverLogWriteToken;
|
||||
}
|
||||
log.info("Computed the next log version for " + logFileId + " in " + parentPath + " as " + logVersion
|
||||
LOG.info("Computed the next log version for " + logFileId + " in " + parentPath + " as " + logVersion
|
||||
+ " with write-token " + logWriteToken);
|
||||
}
|
||||
|
||||
@@ -234,7 +234,7 @@ public interface HoodieLogFormat {
|
||||
|
||||
Path logPath = new Path(parentPath,
|
||||
FSUtils.makeLogFileName(logFileId, fileExtension, commitTime, logVersion, logWriteToken));
|
||||
log.info("HoodieLogFile on path " + logPath);
|
||||
LOG.info("HoodieLogFile on path " + logPath);
|
||||
HoodieLogFile logFile = new HoodieLogFile(logPath);
|
||||
|
||||
if (bufferSize == null) {
|
||||
|
||||
@@ -46,7 +46,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
|
||||
private final boolean reverseLogReader;
|
||||
private int bufferSize;
|
||||
|
||||
private static final Logger log = LogManager.getLogger(HoodieLogFormatReader.class);
|
||||
private static final Logger LOG = LogManager.getLogger(HoodieLogFormatReader.class);
|
||||
|
||||
HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles, Schema readerSchema, boolean readBlocksLazily,
|
||||
boolean reverseLogReader, int bufferSize) throws IOException {
|
||||
@@ -103,7 +103,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
|
||||
} catch (IOException io) {
|
||||
throw new HoodieIOException("unable to initialize read with log file ", io);
|
||||
}
|
||||
log.info("Moving to the next reader for logfile " + currentReader.getLogFile());
|
||||
LOG.info("Moving to the next reader for logfile " + currentReader.getLogFile());
|
||||
return this.currentReader.hasNext();
|
||||
}
|
||||
return false;
|
||||
|
||||
@@ -44,7 +44,7 @@ import java.io.IOException;
|
||||
*/
|
||||
public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
|
||||
|
||||
private static final Logger log = LogManager.getLogger(HoodieLogFormatWriter.class);
|
||||
private static final Logger LOG = LogManager.getLogger(HoodieLogFormatWriter.class);
|
||||
|
||||
private HoodieLogFile logFile;
|
||||
private final FileSystem fs;
|
||||
@@ -76,11 +76,11 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
|
||||
if (fs.exists(path)) {
|
||||
boolean isAppendSupported = StorageSchemes.isAppendSupported(fs.getScheme());
|
||||
if (isAppendSupported) {
|
||||
log.info(logFile + " exists. Appending to existing file");
|
||||
LOG.info(logFile + " exists. Appending to existing file");
|
||||
try {
|
||||
this.output = fs.append(path, bufferSize);
|
||||
} catch (RemoteException e) {
|
||||
log.warn("Remote Exception, attempting to handle or recover lease", e);
|
||||
LOG.warn("Remote Exception, attempting to handle or recover lease", e);
|
||||
handleAppendExceptionOrRecoverLease(path, e);
|
||||
} catch (IOException ioe) {
|
||||
if (ioe.getMessage().toLowerCase().contains("not supported")) {
|
||||
@@ -93,11 +93,11 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
|
||||
}
|
||||
if (!isAppendSupported) {
|
||||
this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
|
||||
log.info("Append not supported.. Rolling over to " + logFile);
|
||||
LOG.info("Append not supported.. Rolling over to " + logFile);
|
||||
createNewFile();
|
||||
}
|
||||
} else {
|
||||
log.info(logFile + " does not exist. Create a new file");
|
||||
LOG.info(logFile + " does not exist. Create a new file");
|
||||
// Block size does not matter as we will always manually autoflush
|
||||
createNewFile();
|
||||
}
|
||||
@@ -120,7 +120,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
|
||||
|
||||
// Find current version
|
||||
HoodieLogFormat.LogFormatVersion currentLogFormatVersion =
|
||||
new HoodieLogFormatVersion(HoodieLogFormat.currentVersion);
|
||||
new HoodieLogFormatVersion(HoodieLogFormat.CURRENT_VERSION);
|
||||
long currentSize = this.output.size();
|
||||
|
||||
// 1. Write the magic header for the start of the block
|
||||
@@ -179,7 +179,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
|
||||
if (getCurrentSize() > sizeThreshold) {
|
||||
// TODO - make an end marker which seals the old log file (no more appends possible to that
|
||||
// file).
|
||||
log.info("CurrentSize " + getCurrentSize() + " has reached threshold " + sizeThreshold
|
||||
LOG.info("CurrentSize " + getCurrentSize() + " has reached threshold " + sizeThreshold
|
||||
+ ". Rolling over to the next version");
|
||||
HoodieLogFile newLogFile = logFile.rollOver(fs, rolloverLogWriteToken);
|
||||
// close this writer and return the new writer
|
||||
@@ -230,12 +230,12 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
|
||||
// hours). During this time, if a fs.append() API is invoked for a file whose last block is eligible to be
|
||||
// appended to, then the NN will throw an exception saying that it couldn't find any active replica with the
|
||||
// last block. Find more information here : https://issues.apache.org/jira/browse/HDFS-6325
|
||||
log.warn("Failed to open an append stream to the log file. Opening a new log file..", e);
|
||||
LOG.warn("Failed to open an append stream to the log file. Opening a new log file..", e);
|
||||
// Rollover the current log file (since cannot get a stream handle) and create new one
|
||||
this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
|
||||
createNewFile();
|
||||
} else if (e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName())) {
|
||||
log.warn("Another task executor writing to the same log file(" + logFile + ". Rolling over");
|
||||
LOG.warn("Another task executor writing to the same log file(" + logFile + ". Rolling over");
|
||||
// Rollover the current log file (since cannot get a stream handle) and create new one
|
||||
this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
|
||||
createNewFile();
|
||||
@@ -244,13 +244,13 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
|
||||
// this happens when either another task executor writing to this file died or
|
||||
// data node is going down. Note that we can only try to recover lease for a DistributedFileSystem.
|
||||
// ViewFileSystem unfortunately does not support this operation
|
||||
log.warn("Trying to recover log on path " + path);
|
||||
LOG.warn("Trying to recover log on path " + path);
|
||||
if (FSUtils.recoverDFSFileLease((DistributedFileSystem) fs, path)) {
|
||||
log.warn("Recovered lease on path " + path);
|
||||
LOG.warn("Recovered lease on path " + path);
|
||||
// try again
|
||||
this.output = fs.append(path, bufferSize);
|
||||
} else {
|
||||
log.warn("Failed to recover lease on path " + path);
|
||||
LOG.warn("Failed to recover lease on path " + path);
|
||||
throw new HoodieException(e);
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -54,7 +54,7 @@ import java.util.Map;
|
||||
public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
|
||||
implements Iterable<HoodieRecord<? extends HoodieRecordPayload>> {
|
||||
|
||||
private static final Logger log = LogManager.getLogger(HoodieMergedLogRecordScanner.class);
|
||||
private static final Logger LOG = LogManager.getLogger(HoodieMergedLogRecordScanner.class);
|
||||
|
||||
// Final map of compacted/merged records
|
||||
private final ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records;
|
||||
@@ -81,12 +81,12 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
|
||||
scan();
|
||||
this.totalTimeTakenToReadAndMergeBlocks = timer.endTimer();
|
||||
this.numMergedRecordsInLog = records.size();
|
||||
log.info("MaxMemoryInBytes allowed for compaction => " + maxMemorySizeInBytes);
|
||||
log.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => " + records.getInMemoryMapNumEntries());
|
||||
log.info(
|
||||
LOG.info("MaxMemoryInBytes allowed for compaction => " + maxMemorySizeInBytes);
|
||||
LOG.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => " + records.getInMemoryMapNumEntries());
|
||||
LOG.info(
|
||||
"Total size in bytes of MemoryBasedMap in ExternalSpillableMap => " + records.getCurrentInMemoryMapSize());
|
||||
log.info("Number of entries in DiskBasedMap in ExternalSpillableMap => " + records.getDiskBasedMapNumEntries());
|
||||
log.info("Size of file spilled to disk => " + records.getSizeOfFileOnDiskInBytes());
|
||||
LOG.info("Number of entries in DiskBasedMap in ExternalSpillableMap => " + records.getDiskBasedMapNumEntries());
|
||||
LOG.info("Size of file spilled to disk => " + records.getSizeOfFileOnDiskInBytes());
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("IOException when reading log file ");
|
||||
}
|
||||
|
||||
@@ -67,7 +67,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
INFLIGHT_CLEAN_EXTENSION, REQUESTED_CLEAN_EXTENSION, INFLIGHT_COMPACTION_EXTENSION,
|
||||
REQUESTED_COMPACTION_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION}));
|
||||
|
||||
private static final transient Logger log = LogManager.getLogger(HoodieActiveTimeline.class);
|
||||
private static final transient Logger LOG = LogManager.getLogger(HoodieActiveTimeline.class);
|
||||
protected HoodieTableMetaClient metaClient;
|
||||
private static AtomicReference<String> lastInstantTime = new AtomicReference<>(String.valueOf(Integer.MIN_VALUE));
|
||||
|
||||
@@ -92,7 +92,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
try {
|
||||
this.setInstants(HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(metaClient.getFs(),
|
||||
new Path(metaClient.getMetaPath()), includedExtensions));
|
||||
log.info("Loaded instants " + getInstants());
|
||||
LOG.info("Loaded instants " + getInstants());
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Failed to scan metadata", e);
|
||||
}
|
||||
@@ -210,30 +210,30 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
}
|
||||
|
||||
public void createInflight(HoodieInstant instant) {
|
||||
log.info("Creating a new in-flight instant " + instant);
|
||||
LOG.info("Creating a new in-flight instant " + instant);
|
||||
// Create the in-flight file
|
||||
createFileInMetaPath(instant.getFileName(), Option.empty());
|
||||
}
|
||||
|
||||
public void saveAsComplete(HoodieInstant instant, Option<byte[]> data) {
|
||||
log.info("Marking instant complete " + instant);
|
||||
LOG.info("Marking instant complete " + instant);
|
||||
Preconditions.checkArgument(instant.isInflight(),
|
||||
"Could not mark an already completed instant as complete again " + instant);
|
||||
transitionState(instant, HoodieTimeline.getCompletedInstant(instant), data);
|
||||
log.info("Completed " + instant);
|
||||
LOG.info("Completed " + instant);
|
||||
}
|
||||
|
||||
public void revertToInflight(HoodieInstant instant) {
|
||||
log.info("Reverting " + instant + " to inflight ");
|
||||
LOG.info("Reverting " + instant + " to inflight ");
|
||||
revertStateTransition(instant, HoodieTimeline.getInflightInstant(instant));
|
||||
log.info("Reverted " + instant + " to inflight");
|
||||
LOG.info("Reverted " + instant + " to inflight");
|
||||
}
|
||||
|
||||
public HoodieInstant revertToRequested(HoodieInstant instant) {
|
||||
log.warn("Reverting " + instant + " to requested ");
|
||||
LOG.warn("Reverting " + instant + " to requested ");
|
||||
HoodieInstant requestedInstant = HoodieTimeline.getRequestedInstant(instant);
|
||||
revertStateTransition(instant, HoodieTimeline.getRequestedInstant(instant));
|
||||
log.warn("Reverted " + instant + " to requested");
|
||||
LOG.warn("Reverted " + instant + " to requested");
|
||||
return requestedInstant;
|
||||
}
|
||||
|
||||
@@ -249,12 +249,12 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
}
|
||||
|
||||
private void deleteInstantFile(HoodieInstant instant) {
|
||||
log.info("Deleting instant " + instant);
|
||||
LOG.info("Deleting instant " + instant);
|
||||
Path inFlightCommitFilePath = new Path(metaClient.getMetaPath(), instant.getFileName());
|
||||
try {
|
||||
boolean result = metaClient.getFs().delete(inFlightCommitFilePath, false);
|
||||
if (result) {
|
||||
log.info("Removed in-flight " + instant);
|
||||
LOG.info("Removed in-flight " + instant);
|
||||
} else {
|
||||
throw new HoodieIOException("Could not delete in-flight instant " + instant);
|
||||
}
|
||||
@@ -391,7 +391,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
if (!success) {
|
||||
throw new HoodieIOException("Could not rename " + currFilePath + " to " + revertFilePath);
|
||||
}
|
||||
log.info("Renamed " + currFilePath + " to " + revertFilePath);
|
||||
LOG.info("Renamed " + currFilePath + " to " + revertFilePath);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Could not complete revert " + curr, e);
|
||||
@@ -429,7 +429,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
// If the path does not exist, create it first
|
||||
if (!metaClient.getFs().exists(fullPath)) {
|
||||
if (metaClient.getFs().createNewFile(fullPath)) {
|
||||
log.info("Created a new file in meta path: " + fullPath);
|
||||
LOG.info("Created a new file in meta path: " + fullPath);
|
||||
} else {
|
||||
throw new HoodieIOException("Failed to create file " + fullPath);
|
||||
}
|
||||
|
||||
@@ -54,7 +54,7 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
|
||||
private HoodieTableMetaClient metaClient;
|
||||
private Map<String, byte[]> readCommits = new HashMap<>();
|
||||
|
||||
private static final transient Logger log = LogManager.getLogger(HoodieArchivedTimeline.class);
|
||||
private static final transient Logger LOG = LogManager.getLogger(HoodieArchivedTimeline.class);
|
||||
|
||||
public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) {
|
||||
// Read back the commits to make sure
|
||||
|
||||
@@ -47,7 +47,7 @@ import static java.util.Collections.reverse;
|
||||
*/
|
||||
public class HoodieDefaultTimeline implements HoodieTimeline {
|
||||
|
||||
private static final transient Logger log = LogManager.getLogger(HoodieDefaultTimeline.class);
|
||||
private static final transient Logger LOG = LogManager.getLogger(HoodieDefaultTimeline.class);
|
||||
|
||||
private static final String HASHING_ALGORITHM = "SHA-256";
|
||||
|
||||
|
||||
@@ -39,7 +39,7 @@ import java.util.stream.Collectors;
|
||||
*/
|
||||
public class FailSafeConsistencyGuard implements ConsistencyGuard {
|
||||
|
||||
private static final transient Logger log = LogManager.getLogger(FailSafeConsistencyGuard.class);
|
||||
private static final transient Logger LOG = LogManager.getLogger(FailSafeConsistencyGuard.class);
|
||||
|
||||
private final FileSystem fs;
|
||||
private final ConsistencyGuardConfig consistencyGuardConfig;
|
||||
@@ -86,7 +86,7 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard {
|
||||
|
||||
retryTillSuccess((retryNum) -> {
|
||||
try {
|
||||
log.info("Trying " + retryNum);
|
||||
LOG.info("Trying " + retryNum);
|
||||
FileStatus[] entries = fs.listStatus(dir);
|
||||
List<String> gotFiles = Arrays.stream(entries).map(e -> Path.getPathWithoutSchemeAndAuthority(e.getPath()))
|
||||
.map(p -> p.toString()).collect(Collectors.toList());
|
||||
@@ -95,7 +95,7 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard {
|
||||
|
||||
switch (event) {
|
||||
case DISAPPEAR:
|
||||
log.info("Following files are visible" + candidateFiles);
|
||||
LOG.info("Following files are visible" + candidateFiles);
|
||||
// If no candidate files gets removed, it means all of them have disappeared
|
||||
return !altered;
|
||||
case APPEAR:
|
||||
@@ -104,7 +104,7 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard {
|
||||
return candidateFiles.isEmpty();
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
log.warn("Got IOException waiting for file event. Have tried " + retryNum + " time(s)", ioe);
|
||||
LOG.warn("Got IOException waiting for file event. Have tried " + retryNum + " time(s)", ioe);
|
||||
}
|
||||
return false;
|
||||
}, "Timed out waiting for files to become visible");
|
||||
@@ -155,7 +155,7 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard {
|
||||
return;
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
log.warn("Got IOException waiting for file visibility. Retrying", ioe);
|
||||
LOG.warn("Got IOException waiting for file visibility. Retrying", ioe);
|
||||
}
|
||||
|
||||
sleepSafe(waitMs);
|
||||
@@ -176,7 +176,7 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard {
|
||||
private void retryTillSuccess(Function<Integer, Boolean> predicate, String timedOutMessage) throws TimeoutException {
|
||||
long waitMs = consistencyGuardConfig.getInitialConsistencyCheckIntervalMs();
|
||||
int attempt = 0;
|
||||
log.info("Max Attempts=" + consistencyGuardConfig.getMaxConsistencyChecks());
|
||||
LOG.info("Max Attempts=" + consistencyGuardConfig.getMaxConsistencyChecks());
|
||||
while (attempt < consistencyGuardConfig.getMaxConsistencyChecks()) {
|
||||
boolean success = predicate.apply(attempt);
|
||||
if (success) {
|
||||
|
||||
@@ -55,7 +55,7 @@ import java.util.stream.Stream;
|
||||
*/
|
||||
public class RocksDBDAO {
|
||||
|
||||
protected static final transient Logger log = LogManager.getLogger(RocksDBDAO.class);
|
||||
protected static final transient Logger LOG = LogManager.getLogger(RocksDBDAO.class);
|
||||
|
||||
private transient ConcurrentHashMap<String, ColumnFamilyHandle> managedHandlesMap;
|
||||
private transient ConcurrentHashMap<String, ColumnFamilyDescriptor> managedDescriptorMap;
|
||||
@@ -86,7 +86,7 @@ public class RocksDBDAO {
|
||||
*/
|
||||
private void init() throws HoodieException {
|
||||
try {
|
||||
log.info("DELETING RocksDB persisted at " + rocksDBBasePath);
|
||||
LOG.info("DELETING RocksDB persisted at " + rocksDBBasePath);
|
||||
FileIOUtils.deleteDirectory(new File(rocksDBBasePath));
|
||||
|
||||
managedHandlesMap = new ConcurrentHashMap<>();
|
||||
@@ -99,7 +99,7 @@ public class RocksDBDAO {
|
||||
dbOptions.setLogger(new org.rocksdb.Logger(dbOptions) {
|
||||
@Override
|
||||
protected void log(InfoLogLevel infoLogLevel, String logMsg) {
|
||||
log.info("From Rocks DB : " + logMsg);
|
||||
LOG.info("From Rocks DB : " + logMsg);
|
||||
}
|
||||
});
|
||||
final List<ColumnFamilyDescriptor> managedColumnFamilies = loadManagedColumnFamilies(dbOptions);
|
||||
@@ -121,7 +121,7 @@ public class RocksDBDAO {
|
||||
managedDescriptorMap.put(familyNameFromDescriptor, descriptor);
|
||||
}
|
||||
} catch (RocksDBException | IOException re) {
|
||||
log.error("Got exception opening rocks db instance ", re);
|
||||
LOG.error("Got exception opening rocks db instance ", re);
|
||||
throw new HoodieException(re);
|
||||
}
|
||||
}
|
||||
@@ -135,10 +135,10 @@ public class RocksDBDAO {
|
||||
List<byte[]> existing = RocksDB.listColumnFamilies(options, rocksDBBasePath);
|
||||
|
||||
if (existing.isEmpty()) {
|
||||
log.info("No column family found. Loading default");
|
||||
LOG.info("No column family found. Loading default");
|
||||
managedColumnFamilies.add(getColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY));
|
||||
} else {
|
||||
log.info("Loading column families :" + existing.stream().map(String::new).collect(Collectors.toList()));
|
||||
LOG.info("Loading column families :" + existing.stream().map(String::new).collect(Collectors.toList()));
|
||||
managedColumnFamilies
|
||||
.addAll(existing.stream().map(RocksDBDAO::getColumnFamilyDescriptor).collect(Collectors.toList()));
|
||||
}
|
||||
@@ -352,7 +352,7 @@ public class RocksDBDAO {
|
||||
}
|
||||
}
|
||||
|
||||
log.info("Prefix Search for (query=" + prefix + ") on " + columnFamilyName + ". Total Time Taken (msec)="
|
||||
LOG.info("Prefix Search for (query=" + prefix + ") on " + columnFamilyName + ". Total Time Taken (msec)="
|
||||
+ timer.endTimer() + ". Serialization Time taken(micro)=" + timeTakenMicro + ", num entries=" + results.size());
|
||||
return results.stream();
|
||||
}
|
||||
@@ -366,7 +366,7 @@ public class RocksDBDAO {
|
||||
*/
|
||||
public <T extends Serializable> void prefixDelete(String columnFamilyName, String prefix) {
|
||||
Preconditions.checkArgument(!closed);
|
||||
log.info("Prefix DELETE (query=" + prefix + ") on " + columnFamilyName);
|
||||
LOG.info("Prefix DELETE (query=" + prefix + ") on " + columnFamilyName);
|
||||
final RocksIterator it = getRocksDB().newIterator(managedHandlesMap.get(columnFamilyName));
|
||||
it.seek(prefix.getBytes());
|
||||
// Find first and last keys to be deleted
|
||||
@@ -389,7 +389,7 @@ public class RocksDBDAO {
|
||||
// Delete the last entry
|
||||
getRocksDB().delete(lastEntry.getBytes());
|
||||
} catch (RocksDBException e) {
|
||||
log.error("Got exception performing range delete");
|
||||
LOG.error("Got exception performing range delete");
|
||||
throw new HoodieException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,7 +40,7 @@ import java.lang.reflect.InvocationTargetException;
|
||||
public class SerializationUtils {
|
||||
|
||||
// Caching kryo serializer to avoid creating kryo instance for every serde operation
|
||||
private static final ThreadLocal<KryoSerializerInstance> serializerRef =
|
||||
private static final ThreadLocal<KryoSerializerInstance> SERIALIZER_REF =
|
||||
ThreadLocal.withInitial(() -> new KryoSerializerInstance());
|
||||
|
||||
// Serialize
|
||||
@@ -56,7 +56,7 @@ public class SerializationUtils {
|
||||
* @throws IOException if the serialization fails
|
||||
*/
|
||||
public static byte[] serialize(final Object obj) throws IOException {
|
||||
return serializerRef.get().serialize(obj);
|
||||
return SERIALIZER_REF.get().serialize(obj);
|
||||
}
|
||||
|
||||
// Deserialize
|
||||
@@ -83,7 +83,7 @@ public class SerializationUtils {
|
||||
if (objectData == null) {
|
||||
throw new IllegalArgumentException("The byte[] must not be null");
|
||||
}
|
||||
return (T) serializerRef.get().deserialize(objectData);
|
||||
return (T) SERIALIZER_REF.get().deserialize(objectData);
|
||||
}
|
||||
|
||||
private static class KryoSerializerInstance implements Serializable {
|
||||
|
||||
@@ -58,12 +58,12 @@ public class SpillableMapUtils {
|
||||
byte[] key = new byte[keySize];
|
||||
file.read(key, 0, keySize);
|
||||
byte[] value = new byte[valueSize];
|
||||
if (!(valueSize == valueLength)) {
|
||||
if (valueSize != valueLength) {
|
||||
throw new HoodieCorruptedDataException("unequal size of payload written to external file, data may be corrupted");
|
||||
}
|
||||
file.read(value, 0, valueSize);
|
||||
long crcOfReadValue = generateChecksum(value);
|
||||
if (!(crc == crcOfReadValue)) {
|
||||
if (crc != crcOfReadValue) {
|
||||
throw new HoodieCorruptedDataException(
|
||||
"checksum of payload written to external disk does not match, " + "data may be corrupted");
|
||||
}
|
||||
|
||||
@@ -54,7 +54,7 @@ import java.util.stream.Stream;
|
||||
*/
|
||||
public final class DiskBasedMap<T extends Serializable, R extends Serializable> implements Map<T, R>, Iterable<R> {
|
||||
|
||||
private static final Logger log = LogManager.getLogger(DiskBasedMap.class);
|
||||
private static final Logger LOG = LogManager.getLogger(DiskBasedMap.class);
|
||||
// Stores the key and corresponding value's latest metadata spilled to disk
|
||||
private final Map<T, ValueMetadata> valueMetadataMap;
|
||||
// Write only file
|
||||
@@ -111,7 +111,7 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>
|
||||
writeOnlyFile.getParentFile().mkdir();
|
||||
}
|
||||
writeOnlyFile.createNewFile();
|
||||
log.info("Spilling to file location " + writeOnlyFile.getAbsolutePath() + " in host ("
|
||||
LOG.info("Spilling to file location " + writeOnlyFile.getAbsolutePath() + " in host ("
|
||||
+ InetAddress.getLocalHost().getHostAddress() + ") with hostname (" + InetAddress.getLocalHost().getHostName()
|
||||
+ ")");
|
||||
// Make sure file is deleted when JVM exits
|
||||
|
||||
@@ -56,7 +56,7 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
|
||||
|
||||
// Find the actual estimated payload size after inserting N records
|
||||
private static final int NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE = 100;
|
||||
private static final Logger log = LogManager.getLogger(ExternalSpillableMap.class);
|
||||
private static final Logger LOG = LogManager.getLogger(ExternalSpillableMap.class);
|
||||
// maximum space allowed in-memory for this map
|
||||
private final long maxInMemorySizeInBytes;
|
||||
// Map to store key-values in memory until it hits maxInMemorySizeInBytes
|
||||
@@ -177,7 +177,7 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
|
||||
// At first, use the sizeEstimate of a record being inserted into the spillable map.
|
||||
// Note, the converter may over estimate the size of a record in the JVM
|
||||
this.estimatedPayloadSize = keySizeEstimator.sizeEstimate(key) + valueSizeEstimator.sizeEstimate(value);
|
||||
log.info("Estimated Payload size => " + estimatedPayloadSize);
|
||||
LOG.info("Estimated Payload size => " + estimatedPayloadSize);
|
||||
} else if (shouldEstimatePayloadSize && inMemoryMap.size() % NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 0) {
|
||||
// Re-estimate the size of a record by calculating the size of the entire map containing
|
||||
// N entries and then dividing by the number of entries present (N). This helps to get a
|
||||
@@ -186,7 +186,7 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
|
||||
this.currentInMemoryMapSize = totalMapSize;
|
||||
this.estimatedPayloadSize = totalMapSize / inMemoryMap.size();
|
||||
shouldEstimatePayloadSize = false;
|
||||
log.info("New Estimated Payload size => " + this.estimatedPayloadSize);
|
||||
LOG.info("New Estimated Payload size => " + this.estimatedPayloadSize);
|
||||
}
|
||||
if (!inMemoryMap.containsKey(key)) {
|
||||
// TODO : Add support for adjusting payloadSize for updates to the same key
|
||||
|
||||
@@ -30,7 +30,7 @@ import java.util.function.Function;
|
||||
*/
|
||||
public class FunctionBasedQueueProducer<I> implements BoundedInMemoryQueueProducer<I> {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(FunctionBasedQueueProducer.class);
|
||||
private static final Logger LOG = LogManager.getLogger(FunctionBasedQueueProducer.class);
|
||||
|
||||
private final Function<BoundedInMemoryQueue<I, ?>, Boolean> producerFunction;
|
||||
|
||||
@@ -40,8 +40,8 @@ public class FunctionBasedQueueProducer<I> implements BoundedInMemoryQueueProduc
|
||||
|
||||
@Override
|
||||
public void produce(BoundedInMemoryQueue<I, ?> queue) {
|
||||
logger.info("starting function which will enqueue records");
|
||||
LOG.info("starting function which will enqueue records");
|
||||
producerFunction.apply(queue);
|
||||
logger.info("finished function which will enqueue records");
|
||||
LOG.info("finished function which will enqueue records");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,7 +30,7 @@ import java.util.Iterator;
|
||||
*/
|
||||
public class IteratorBasedQueueProducer<I> implements BoundedInMemoryQueueProducer<I> {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(IteratorBasedQueueProducer.class);
|
||||
private static final Logger LOG = LogManager.getLogger(IteratorBasedQueueProducer.class);
|
||||
|
||||
// input iterator for producing items in the buffer.
|
||||
private final Iterator<I> inputIterator;
|
||||
@@ -41,10 +41,10 @@ public class IteratorBasedQueueProducer<I> implements BoundedInMemoryQueueProduc
|
||||
|
||||
@Override
|
||||
public void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception {
|
||||
logger.info("starting to buffer records");
|
||||
LOG.info("starting to buffer records");
|
||||
while (inputIterator.hasNext()) {
|
||||
queue.insertRecord(inputIterator.next());
|
||||
}
|
||||
logger.info("finished buffering records");
|
||||
LOG.info("finished buffering records");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,7 +38,7 @@ import java.io.IOException;
|
||||
*/
|
||||
public class HdfsTestService {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(HdfsTestService.class);
|
||||
private static final Logger LOG = LogManager.getLogger(HdfsTestService.class);
|
||||
|
||||
/**
|
||||
* Configuration settings.
|
||||
@@ -72,7 +72,7 @@ public class HdfsTestService {
|
||||
// If clean, then remove the work dir so we can start fresh.
|
||||
String localDFSLocation = getDFSLocation(workDir);
|
||||
if (format) {
|
||||
logger.info("Cleaning HDFS cluster data at: " + localDFSLocation + " and starting fresh.");
|
||||
LOG.info("Cleaning HDFS cluster data at: " + localDFSLocation + " and starting fresh.");
|
||||
File file = new File(localDFSLocation);
|
||||
FileIOUtils.deleteDirectory(file);
|
||||
}
|
||||
@@ -83,12 +83,12 @@ public class HdfsTestService {
|
||||
datanodePort, datanodeIpcPort, datanodeHttpPort);
|
||||
miniDfsCluster = new MiniDFSCluster.Builder(hadoopConf).numDataNodes(1).format(format).checkDataNodeAddrConfig(true)
|
||||
.checkDataNodeHostConfig(true).build();
|
||||
logger.info("HDFS Minicluster service started.");
|
||||
LOG.info("HDFS Minicluster service started.");
|
||||
return miniDfsCluster;
|
||||
}
|
||||
|
||||
public void stop() throws IOException {
|
||||
logger.info("HDFS Minicluster service being shut down.");
|
||||
LOG.info("HDFS Minicluster service being shut down.");
|
||||
miniDfsCluster.shutdown();
|
||||
miniDfsCluster = null;
|
||||
hadoopConf = null;
|
||||
@@ -132,7 +132,7 @@ public class HdfsTestService {
|
||||
private static Configuration configureDFSCluster(Configuration config, String localDFSLocation, String bindIP,
|
||||
int namenodeRpcPort, int namenodeHttpPort, int datanodePort, int datanodeIpcPort, int datanodeHttpPort) {
|
||||
|
||||
logger.info("HDFS force binding to ip: " + bindIP);
|
||||
LOG.info("HDFS force binding to ip: " + bindIP);
|
||||
config.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + bindIP + ":" + namenodeRpcPort);
|
||||
config.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, bindIP + ":" + datanodePort);
|
||||
config.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, bindIP + ":" + datanodeIpcPort);
|
||||
|
||||
@@ -53,7 +53,7 @@ import java.net.Socket;
|
||||
*/
|
||||
public class ZookeeperTestService {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(ZookeeperTestService.class);
|
||||
private static final Logger LOG = LogManager.getLogger(ZookeeperTestService.class);
|
||||
|
||||
private static final int TICK_TIME = 2000;
|
||||
private static final int CONNECTION_TIMEOUT = 30000;
|
||||
@@ -103,7 +103,7 @@ public class ZookeeperTestService {
|
||||
|
||||
// NOTE: Changed from the original, where InetSocketAddress was
|
||||
// originally created to bind to the wildcard IP, we now configure it.
|
||||
logger.info("Zookeeper force binding to: " + this.bindIP);
|
||||
LOG.info("Zookeeper force binding to: " + this.bindIP);
|
||||
standaloneServerFactory.configure(new InetSocketAddress(bindIP, clientPort), 1000);
|
||||
|
||||
// Start up this ZK server
|
||||
@@ -120,7 +120,7 @@ public class ZookeeperTestService {
|
||||
}
|
||||
|
||||
started = true;
|
||||
logger.info("Zookeeper Minicluster service started on client port: " + clientPort);
|
||||
LOG.info("Zookeeper Minicluster service started on client port: " + clientPort);
|
||||
return zooKeeperServer;
|
||||
}
|
||||
|
||||
@@ -139,7 +139,7 @@ public class ZookeeperTestService {
|
||||
standaloneServerFactory = null;
|
||||
zooKeeperServer = null;
|
||||
|
||||
logger.info("Zookeeper Minicluster service shut down.");
|
||||
LOG.info("Zookeeper Minicluster service shut down.");
|
||||
}
|
||||
|
||||
private void recreateDir(File dir, boolean clean) throws IOException {
|
||||
@@ -221,7 +221,7 @@ public class ZookeeperTestService {
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// ignore as this is expected
|
||||
logger.info("server " + hostname + ":" + port + " not up " + e);
|
||||
LOG.info("server " + hostname + ":" + port + " not up " + e);
|
||||
}
|
||||
|
||||
if (System.currentTimeMillis() > start + timeout) {
|
||||
|
||||
@@ -496,7 +496,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
// Write out a length that does not confirm with the content
|
||||
outputStream.writeLong(1000);
|
||||
outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
|
||||
outputStream.writeInt(HoodieLogFormat.currentVersion);
|
||||
outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION);
|
||||
// Write out a length that does not confirm with the content
|
||||
outputStream.writeLong(500);
|
||||
// Write out some bytes
|
||||
@@ -524,7 +524,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
// Write out a length that does not confirm with the content
|
||||
outputStream.writeLong(1000);
|
||||
outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
|
||||
outputStream.writeInt(HoodieLogFormat.currentVersion);
|
||||
outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION);
|
||||
// Write out a length that does not confirm with the content
|
||||
outputStream.writeLong(500);
|
||||
// Write out some bytes
|
||||
@@ -694,7 +694,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
// Write out a length that does not confirm with the content
|
||||
outputStream.writeLong(1000);
|
||||
|
||||
outputStream.writeInt(HoodieLogFormat.currentVersion);
|
||||
outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION);
|
||||
outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
|
||||
|
||||
// Write out some header
|
||||
@@ -1066,7 +1066,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
outputStream.write(HoodieLogFormat.MAGIC);
|
||||
outputStream.writeLong(1000);
|
||||
outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
|
||||
outputStream.writeInt(HoodieLogFormat.currentVersion);
|
||||
outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION);
|
||||
// Write out a length that does not confirm with the content
|
||||
outputStream.writeLong(100);
|
||||
outputStream.flush();
|
||||
@@ -1079,7 +1079,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
outputStream.write(HoodieLogFormat.MAGIC);
|
||||
outputStream.writeLong(1000);
|
||||
outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
|
||||
outputStream.writeInt(HoodieLogFormat.currentVersion);
|
||||
outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION);
|
||||
// Write out a length that does not confirm with the content
|
||||
outputStream.writeLong(100);
|
||||
outputStream.flush();
|
||||
@@ -1099,7 +1099,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
||||
outputStream.write(HoodieLogFormat.MAGIC);
|
||||
outputStream.writeLong(1000);
|
||||
outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
|
||||
outputStream.writeInt(HoodieLogFormat.currentVersion);
|
||||
outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION);
|
||||
// Write out a length that does not confirm with the content
|
||||
outputStream.writeLong(100);
|
||||
outputStream.flush();
|
||||
|
||||
@@ -71,7 +71,7 @@ import static org.junit.Assert.assertTrue;
|
||||
@SuppressWarnings("ResultOfMethodCallIgnored")
|
||||
public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
|
||||
|
||||
private static final transient Logger log = LogManager.getLogger(TestHoodieTableFileSystemView.class);
|
||||
private static final transient Logger LOG = LogManager.getLogger(TestHoodieTableFileSystemView.class);
|
||||
|
||||
private static String TEST_WRITE_TOKEN = "1-0-1";
|
||||
|
||||
@@ -498,7 +498,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
|
||||
roView.getAllDataFiles(partitionPath);
|
||||
|
||||
fileSliceList = rtView.getLatestFileSlices(partitionPath).collect(Collectors.toList());
|
||||
log.info("FILESLICE LIST=" + fileSliceList);
|
||||
LOG.info("FILESLICE LIST=" + fileSliceList);
|
||||
dataFiles = fileSliceList.stream().map(FileSlice::getDataFile).filter(Option::isPresent).map(Option::get)
|
||||
.collect(Collectors.toList());
|
||||
assertEquals("Expect only one data-files in latest view as there is only one file-group", 1, dataFiles.size());
|
||||
|
||||
@@ -77,7 +77,7 @@ import static org.apache.hudi.common.table.HoodieTimeline.COMPACTION_ACTION;
|
||||
*/
|
||||
public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
|
||||
private static final transient Logger log = LogManager.getLogger(TestIncrementalFSViewSync.class);
|
||||
private static final transient Logger LOG = LogManager.getLogger(TestIncrementalFSViewSync.class);
|
||||
|
||||
private static String TEST_WRITE_TOKEN = "1-0-1";
|
||||
|
||||
@@ -318,7 +318,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
Assert.assertEquals(newCleanerInstants.size(), cleanedInstants.size());
|
||||
long initialFileSlices = partitions.stream().mapToLong(p -> view.getAllFileSlices(p).count()).findAny().getAsLong();
|
||||
long exp = initialFileSlices;
|
||||
log.info("Initial File Slices :" + exp);
|
||||
LOG.info("Initial File Slices :" + exp);
|
||||
for (int idx = 0; idx < newCleanerInstants.size(); idx++) {
|
||||
String instant = cleanedInstants.get(idx);
|
||||
try {
|
||||
@@ -335,8 +335,8 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
Assert.assertEquals(State.COMPLETED, view.getLastInstant().get().getState());
|
||||
Assert.assertEquals(HoodieTimeline.CLEAN_ACTION, view.getLastInstant().get().getAction());
|
||||
partitions.forEach(p -> {
|
||||
log.info("PARTTITION : " + p);
|
||||
log.info("\tFileSlices :" + view.getAllFileSlices(p).collect(Collectors.toList()));
|
||||
LOG.info("PARTTITION : " + p);
|
||||
LOG.info("\tFileSlices :" + view.getAllFileSlices(p).collect(Collectors.toList()));
|
||||
});
|
||||
|
||||
partitions.forEach(p -> Assert.assertEquals(fileIdsPerPartition.size(), view.getLatestFileSlices(p).count()));
|
||||
@@ -377,7 +377,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
isDeltaCommit ? initialFileSlices : initialFileSlices - ((idx + 1) * fileIdsPerPartition.size());
|
||||
view.sync();
|
||||
Assert.assertTrue(view.getLastInstant().isPresent());
|
||||
log.info("Last Instant is :" + view.getLastInstant().get());
|
||||
LOG.info("Last Instant is :" + view.getLastInstant().get());
|
||||
if (isRestore) {
|
||||
Assert.assertEquals(newRestoreInstants.get(idx), view.getLastInstant().get().getTimestamp());
|
||||
Assert.assertEquals(isRestore ? HoodieTimeline.RESTORE_ACTION : HoodieTimeline.ROLLBACK_ACTION,
|
||||
@@ -615,7 +615,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
int multiple = begin;
|
||||
for (int idx = 0; idx < instants.size(); idx++) {
|
||||
String instant = instants.get(idx);
|
||||
log.info("Adding instant=" + instant);
|
||||
LOG.info("Adding instant=" + instant);
|
||||
HoodieInstant lastInstant = lastInstants.get(idx);
|
||||
// Add a non-empty ingestion to COW table
|
||||
List<String> filePaths =
|
||||
|
||||
@@ -59,9 +59,9 @@ public class TestCompactionUtils extends HoodieCommonTestHarness {
|
||||
|
||||
private static String TEST_WRITE_TOKEN = "1-0-1";
|
||||
|
||||
private static final Map<String, Double> metrics =
|
||||
private static final Map<String, Double> METRICS =
|
||||
new ImmutableMap.Builder<String, Double>().put("key1", 1.0).put("key2", 3.0).build();
|
||||
private Function<Pair<String, FileSlice>, Map<String, Double>> metricsCaptureFn = (partitionFileSlice) -> metrics;
|
||||
private Function<Pair<String, FileSlice>, Map<String, Double>> metricsCaptureFn = (partitionFileSlice) -> METRICS;
|
||||
|
||||
@Before
|
||||
public void init() throws IOException {
|
||||
@@ -252,7 +252,7 @@ public class TestCompactionUtils extends HoodieCommonTestHarness {
|
||||
version == COMPACTION_METADATA_VERSION_1 ? paths.get(idx) : new Path(paths.get(idx)).getName(),
|
||||
op.getDeltaFilePaths().get(idx));
|
||||
});
|
||||
Assert.assertEquals("Metrics set", metrics, op.getMetrics());
|
||||
Assert.assertEquals("Metrics set", METRICS, op.getMetrics());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user