1
0

[checkstyle] Unify LOG form (#1092)

This commit is contained in:
lamber-ken
2019-12-10 19:23:38 +08:00
committed by leesf
parent 70a1040998
commit d447e2d751
100 changed files with 514 additions and 493 deletions

View File

@@ -66,7 +66,7 @@ import scala.Tuple2;
*/
public class HDFSParquetImporter implements Serializable {
private static volatile Logger log = LogManager.getLogger(HDFSParquetImporter.class);
private static final Logger LOG = LogManager.getLogger(HDFSParquetImporter.class);
private static final DateTimeFormatter PARTITION_FORMATTER = DateTimeFormatter.ofPattern("yyyy/MM/dd")
.withZone(ZoneId.systemDefault());
@@ -103,7 +103,7 @@ public class HDFSParquetImporter implements Serializable {
this.fs = FSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration());
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs)
: UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
log.info("Starting data import with configs : " + props.toString());
LOG.info("Starting data import with configs : " + props.toString());
int ret = -1;
try {
// Verify that targetPath is not present.
@@ -114,7 +114,7 @@ public class HDFSParquetImporter implements Serializable {
ret = dataImport(jsc);
} while (ret != 0 && retry-- > 0);
} catch (Throwable t) {
log.error(t);
LOG.error(t);
}
return ret;
}
@@ -145,7 +145,7 @@ public class HDFSParquetImporter implements Serializable {
JavaRDD<WriteStatus> writeResponse = load(client, instantTime, hoodieRecords);
return UtilHelpers.handleErrors(jsc, instantTime, writeResponse);
} catch (Throwable t) {
log.error("Error occurred.", t);
LOG.error("Error occurred.", t);
}
return -1;
}
@@ -175,13 +175,13 @@ public class HDFSParquetImporter implements Serializable {
throw new HoodieIOException("row field is missing. :" + cfg.rowKey);
}
String partitionPath = partitionField.toString();
log.debug("Row Key : " + rowField + ", Partition Path is (" + partitionPath + ")");
LOG.debug("Row Key : " + rowField + ", Partition Path is (" + partitionPath + ")");
if (partitionField instanceof Number) {
try {
long ts = (long) (Double.parseDouble(partitionField.toString()) * 1000L);
partitionPath = PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts));
} catch (NumberFormatException nfe) {
log.warn("Unable to parse date from partition field. Assuming partition as (" + partitionField + ")");
LOG.warn("Unable to parse date from partition field. Assuming partition as (" + partitionField + ")");
}
}
return new HoodieRecord<>(new HoodieKey(rowField.toString(), partitionPath),

View File

@@ -61,7 +61,7 @@ import java.util.stream.Collectors;
*/
public class HiveIncrementalPuller {
private static Logger log = LogManager.getLogger(HiveIncrementalPuller.class);
private static final Logger LOG = LogManager.getLogger(HiveIncrementalPuller.class);
private static String driverName = "org.apache.hive.jdbc.HiveDriver";
public static class Config implements Serializable {
@@ -129,14 +129,14 @@ public class HiveIncrementalPuller {
try {
if (config.fromCommitTime == null) {
config.fromCommitTime = inferCommitTime(fs);
log.info("FromCommitTime inferred as " + config.fromCommitTime);
LOG.info("FromCommitTime inferred as " + config.fromCommitTime);
}
log.info("FromCommitTime - " + config.fromCommitTime);
LOG.info("FromCommitTime - " + config.fromCommitTime);
String sourceTableLocation = getTableLocation(config.sourceDb, config.sourceTable);
String lastCommitTime = getLastCommitTimePulled(fs, sourceTableLocation);
if (lastCommitTime == null) {
log.info("Nothing to pull. However we will continue to create a empty table");
LOG.info("Nothing to pull. However we will continue to create a empty table");
lastCommitTime = config.fromCommitTime;
}
@@ -155,9 +155,9 @@ public class HiveIncrementalPuller {
initHiveBeelineProperties(stmt);
executeIncrementalSQL(tempDbTable, tempDbTablePath, stmt);
log.info("Finished HoodieReader execution");
LOG.info("Finished HoodieReader execution");
} catch (SQLException e) {
log.error("Exception when executing SQL", e);
LOG.error("Exception when executing SQL", e);
throw new IOException("Could not scan " + config.sourceTable + " incrementally", e);
} finally {
try {
@@ -165,7 +165,7 @@ public class HiveIncrementalPuller {
stmt.close();
}
} catch (SQLException e) {
log.error("Could not close the resultset opened ", e);
LOG.error("Could not close the resultset opened ", e);
}
}
}
@@ -180,13 +180,13 @@ public class HiveIncrementalPuller {
incrementalPullSQLtemplate.add("storedAsClause", storedAsClause);
String incrementalSQL = new Scanner(new File(config.incrementalSQLFile)).useDelimiter("\\Z").next();
if (!incrementalSQL.contains(config.sourceDb + "." + config.sourceTable)) {
log.info("Incremental SQL does not have " + config.sourceDb + "." + config.sourceTable
LOG.info("Incremental SQL does not have " + config.sourceDb + "." + config.sourceTable
+ ", which means its pulling from a different table. Fencing this from " + "happening.");
throw new HoodieIncrementalPullSQLException(
"Incremental SQL does not have " + config.sourceDb + "." + config.sourceTable);
}
if (!incrementalSQL.contains("`_hoodie_commit_time` > '%targetBasePath'")) {
log.info("Incremental SQL : " + incrementalSQL
LOG.info("Incremental SQL : " + incrementalSQL
+ " does not contain `_hoodie_commit_time` > '%targetBasePath'. Please add "
+ "this clause for incremental to work properly.");
throw new HoodieIncrementalPullSQLException(
@@ -205,7 +205,7 @@ public class HiveIncrementalPuller {
}
private void initHiveBeelineProperties(Statement stmt) throws SQLException {
log.info("Setting up Hive JDBC Session with properties");
LOG.info("Setting up Hive JDBC Session with properties");
// set the queue
executeStatement("set mapred.job.queue.name=" + config.yarnQueueName, stmt);
// Set the inputformat to HoodieCombineHiveInputFormat
@@ -224,17 +224,17 @@ public class HiveIncrementalPuller {
}
private boolean deleteHDFSPath(FileSystem fs, String path) throws IOException {
log.info("Deleting path " + path);
LOG.info("Deleting path " + path);
return fs.delete(new Path(path), true);
}
private void executeStatement(String sql, Statement stmt) throws SQLException {
log.info("Executing: " + sql);
LOG.info("Executing: " + sql);
stmt.execute(sql);
}
private String inferCommitTime(FileSystem fs) throws SQLException, IOException {
log.info("FromCommitTime not specified. Trying to infer it from Hoodie dataset " + config.targetDb + "."
LOG.info("FromCommitTime not specified. Trying to infer it from Hoodie dataset " + config.targetDb + "."
+ config.targetTable);
String targetDataLocation = getTableLocation(config.targetDb, config.targetTable);
return scanForCommitTime(fs, targetDataLocation);
@@ -249,7 +249,7 @@ public class HiveIncrementalPuller {
resultSet = stmt.executeQuery("describe formatted `" + db + "." + table + "`");
while (resultSet.next()) {
if (resultSet.getString(1).trim().equals("Location:")) {
log.info("Inferred table location for " + db + "." + table + " as " + resultSet.getString(2));
LOG.info("Inferred table location for " + db + "." + table + " as " + resultSet.getString(2));
return resultSet.getString(2);
}
}
@@ -264,7 +264,7 @@ public class HiveIncrementalPuller {
resultSet.close();
}
} catch (SQLException e) {
log.error("Could not close the resultset opened ", e);
LOG.error("Could not close the resultset opened ", e);
}
}
return null;
@@ -290,7 +290,7 @@ public class HiveIncrementalPuller {
private boolean ensureTempPathExists(FileSystem fs, String lastCommitTime) throws IOException {
Path targetBaseDirPath = new Path(config.hoodieTmpDir, config.targetTable + "__" + config.sourceTable);
if (!fs.exists(targetBaseDirPath)) {
log.info("Creating " + targetBaseDirPath + " with permission drwxrwxrwx");
LOG.info("Creating " + targetBaseDirPath + " with permission drwxrwxrwx");
boolean result =
FileSystem.mkdirs(fs, targetBaseDirPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
if (!result) {
@@ -305,7 +305,7 @@ public class HiveIncrementalPuller {
throw new HoodieException("Could not delete existing " + targetPath);
}
}
log.info("Creating " + targetPath + " with permission drwxrwxrwx");
LOG.info("Creating " + targetPath + " with permission drwxrwxrwx");
return FileSystem.mkdirs(fs, targetBaseDirPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
}
@@ -315,20 +315,20 @@ public class HiveIncrementalPuller {
.findInstantsAfter(config.fromCommitTime, config.maxCommits).getInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
if (commitsToSync.isEmpty()) {
log.warn(
LOG.warn(
"Nothing to sync. All commits in "
+ config.sourceTable + " are " + metadata.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants().getInstants().collect(Collectors.toList())
+ " and from commit time is " + config.fromCommitTime);
return null;
}
log.info("Syncing commits " + commitsToSync);
LOG.info("Syncing commits " + commitsToSync);
return commitsToSync.get(commitsToSync.size() - 1);
}
private Connection getConnection() throws SQLException {
if (connection == null) {
log.info("Getting Hive Connection to " + config.hiveJDBCUrl);
LOG.info("Getting Hive Connection to " + config.hiveJDBCUrl);
this.connection = DriverManager.getConnection(config.hiveJDBCUrl, config.hiveUsername, config.hivePassword);
}

View File

@@ -38,7 +38,7 @@ import java.util.List;
public class HoodieCleaner {
private static volatile Logger log = LogManager.getLogger(HoodieCleaner.class);
private static final Logger LOG = LogManager.getLogger(HoodieCleaner.class);
/**
* Config for Cleaner.
@@ -66,7 +66,7 @@ public class HoodieCleaner {
this.fs = FSUtils.getFs(cfg.basePath, jssc.hadoopConfiguration());
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs)
: UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
log.info("Creating Cleaner with configs : " + props.toString());
LOG.info("Creating Cleaner with configs : " + props.toString());
}
public void run() throws Exception {

View File

@@ -39,7 +39,7 @@ import java.util.List;
public class HoodieCompactor {
private static volatile Logger logger = LogManager.getLogger(HoodieCompactor.class);
private static final Logger LOG = LogManager.getLogger(HoodieCompactor.class);
private final Config cfg;
private transient FileSystem fs;
private TypedProperties props;
@@ -110,7 +110,7 @@ public class HoodieCompactor {
}
} while (ret != 0 && retry-- > 0);
} catch (Throwable t) {
logger.error(t);
LOG.error(t);
}
return ret;
}

View File

@@ -54,7 +54,7 @@ import scala.Tuple2;
*/
public class HoodieSnapshotCopier implements Serializable {
private static Logger logger = LogManager.getLogger(HoodieSnapshotCopier.class);
private static final Logger LOG = LogManager.getLogger(HoodieSnapshotCopier.class);
static class Config implements Serializable {
@@ -79,21 +79,21 @@ public class HoodieSnapshotCopier implements Serializable {
Option<HoodieInstant> latestCommit =
tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
if (!latestCommit.isPresent()) {
logger.warn("No commits present. Nothing to snapshot");
LOG.warn("No commits present. Nothing to snapshot");
return;
}
final String latestCommitTimestamp = latestCommit.get().getTimestamp();
logger.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.",
LOG.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.",
latestCommitTimestamp));
List<String> partitions = FSUtils.getAllPartitionPaths(fs, baseDir, shouldAssumeDatePartitioning);
if (partitions.size() > 0) {
logger.info(String.format("The job needs to copy %d partitions.", partitions.size()));
LOG.info(String.format("The job needs to copy %d partitions.", partitions.size()));
// Make sure the output directory is empty
Path outputPath = new Path(outputDir);
if (fs.exists(outputPath)) {
logger.warn(String.format("The output path %s targetBasePath already exists, deleting", outputPath));
LOG.warn(String.format("The output path %s targetBasePath already exists, deleting", outputPath));
fs.delete(new Path(outputDir), true);
}
@@ -126,7 +126,7 @@ public class HoodieSnapshotCopier implements Serializable {
});
// Also copy the .commit files
logger.info(String.format("Copying .commit files which are no-late-than %s.", latestCommitTimestamp));
LOG.info(String.format("Copying .commit files which are no-late-than %s.", latestCommitTimestamp));
FileStatus[] commitFilesToCopy =
fs.listStatus(new Path(baseDir + "/" + HoodieTableMetaClient.METAFOLDER_NAME), (commitFilePath) -> {
if (commitFilePath.getName().equals(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) {
@@ -144,19 +144,19 @@ public class HoodieSnapshotCopier implements Serializable {
fs.mkdirs(targetFilePath.getParent());
}
if (fs.exists(targetFilePath)) {
logger.error(
LOG.error(
String.format("The target output commit file (%s targetBasePath) already exists.", targetFilePath));
}
FileUtil.copy(fs, commitStatus.getPath(), fs, targetFilePath, false, fs.getConf());
}
} else {
logger.info("The job has 0 partition to copy.");
LOG.info("The job has 0 partition to copy.");
}
// Create the _SUCCESS tag
Path successTagPath = new Path(outputDir + "/_SUCCESS");
if (!fs.exists(successTagPath)) {
logger.info(String.format("Creating _SUCCESS under targetBasePath: $s", outputDir));
LOG.info(String.format("Creating _SUCCESS under targetBasePath: $s", outputDir));
fs.createNewFile(successTagPath);
}
}
@@ -165,14 +165,14 @@ public class HoodieSnapshotCopier implements Serializable {
// Take input configs
final Config cfg = new Config();
new JCommander(cfg, args);
logger.info(String.format("Snapshot hoodie table from %s targetBasePath to %stargetBasePath", cfg.basePath,
LOG.info(String.format("Snapshot hoodie table from %s targetBasePath to %stargetBasePath", cfg.basePath,
cfg.outputPath));
// Create a spark job to do the snapshot copy
SparkConf sparkConf = new SparkConf().setAppName("Hoodie-snapshot-copier");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
logger.info("Initializing spark job.");
LOG.info("Initializing spark job.");
// Copy
HoodieSnapshotCopier copier = new HoodieSnapshotCopier();

View File

@@ -58,7 +58,7 @@ import java.util.Map;
* Bunch of helper methods.
*/
public class UtilHelpers {
private static Logger logger = LogManager.getLogger(UtilHelpers.class);
private static final Logger LOG = LogManager.getLogger(UtilHelpers.class);
public static Source createSource(String sourceClass, TypedProperties cfg, JavaSparkContext jssc,
SparkSession sparkSession, SchemaProvider schemaProvider) throws IOException {
@@ -95,7 +95,7 @@ public class UtilHelpers {
try {
DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(cfgPath.getFileSystem(fs.getConf()), cfgPath);
if (!overriddenProps.isEmpty()) {
logger.info("Adding overridden properties to file properties.");
LOG.info("Adding overridden properties to file properties.");
conf.addProperties(new BufferedReader(new StringReader(String.join("\n", overriddenProps))));
}
return conf;
@@ -206,14 +206,14 @@ public class UtilHelpers {
writeResponse.foreach(writeStatus -> {
if (writeStatus.hasErrors()) {
errors.add(1);
logger.error(String.format("Error processing records :writeStatus:%s", writeStatus.getStat().toString()));
LOG.error(String.format("Error processing records :writeStatus:%s", writeStatus.getStat().toString()));
}
});
if (errors.value() == 0) {
logger.info(String.format("Dataset imported into hoodie dataset with %s instant time.", instantTime));
LOG.info(String.format("Dataset imported into hoodie dataset with %s instant time.", instantTime));
return 0;
}
logger.error(String.format("Import failed with %d errors.", errors.value()));
LOG.error(String.format("Import failed with %d errors.", errors.value()));
return -1;
}

View File

@@ -46,7 +46,7 @@ import java.util.Properties;
*/
public class UpgradePayloadFromUberToApache implements Serializable {
private static Logger logger = LogManager.getLogger(UpgradePayloadFromUberToApache.class);
private static final Logger LOG = LogManager.getLogger(UpgradePayloadFromUberToApache.class);
private final Config cfg;
@@ -59,13 +59,13 @@ public class UpgradePayloadFromUberToApache implements Serializable {
try (BufferedReader reader = new BufferedReader(new FileReader(cfg.inputPath))) {
basePath = reader.readLine();
} catch (IOException e) {
logger.error("Read from path: " + cfg.inputPath + " error.", e);
LOG.error("Read from path: " + cfg.inputPath + " error.", e);
}
while (basePath != null) {
basePath = basePath.trim();
if (!basePath.startsWith("#")) {
logger.info("Performing upgrade for " + basePath);
LOG.info("Performing upgrade for " + basePath);
String metaPath = String.format("%s/.hoodie", basePath);
HoodieTableMetaClient metaClient =
new HoodieTableMetaClient(FSUtils.prepareHadoopConf(new Configuration()), basePath, false);
@@ -74,20 +74,20 @@ public class UpgradePayloadFromUberToApache implements Serializable {
Map<String, String> propsMap = tableConfig.getProps();
if (propsMap.containsKey(HoodieCompactionConfig.PAYLOAD_CLASS_PROP)) {
String payloadClass = propsMap.get(HoodieCompactionConfig.PAYLOAD_CLASS_PROP);
logger.info("Found payload class=" + payloadClass);
LOG.info("Found payload class=" + payloadClass);
if (payloadClass.startsWith("com.uber.hoodie")) {
String newPayloadClass = payloadClass.replace("com.uber.hoodie", "org.apache.hudi");
logger.info("Replacing payload class (" + payloadClass + ") with (" + newPayloadClass + ")");
LOG.info("Replacing payload class (" + payloadClass + ") with (" + newPayloadClass + ")");
Map<String, String> newPropsMap = new HashMap<>(propsMap);
newPropsMap.put(HoodieCompactionConfig.PAYLOAD_CLASS_PROP, newPayloadClass);
Properties props = new Properties();
props.putAll(newPropsMap);
HoodieTableConfig.createHoodieProperties(metaClient.getFs(), new Path(metaPath), props);
logger.info("Finished upgrade for " + basePath);
LOG.info("Finished upgrade for " + basePath);
}
}
} else {
logger.info("Skipping as this table is COW table. BasePath=" + basePath);
LOG.info("Skipping as this table is COW table. BasePath=" + basePath);
}
}

View File

@@ -36,7 +36,7 @@ import java.util.function.Function;
*/
public abstract class AbstractDeltaStreamerService implements Serializable {
protected static volatile Logger log = LogManager.getLogger(AbstractDeltaStreamerService.class);
private static final Logger LOG = LogManager.getLogger(AbstractDeltaStreamerService.class);
// Flag to track if the service is started.
private boolean started;
@@ -71,7 +71,7 @@ public abstract class AbstractDeltaStreamerService implements Serializable {
try {
future.get();
} catch (ExecutionException ex) {
log.error("Service shutdown with error", ex);
LOG.error("Service shutdown with error", ex);
throw ex;
}
}
@@ -94,7 +94,7 @@ public abstract class AbstractDeltaStreamerService implements Serializable {
// Wait for some max time after requesting shutdown
executor.awaitTermination(24, TimeUnit.HOURS);
} catch (InterruptedException ie) {
log.error("Interrupted while waiting for shutdown", ie);
LOG.error("Interrupted while waiting for shutdown", ie);
}
}
}
@@ -128,18 +128,18 @@ public abstract class AbstractDeltaStreamerService implements Serializable {
* @param onShutdownCallback
*/
private void monitorThreads(Function<Boolean, Boolean> onShutdownCallback) {
log.info("Submitting monitor thread !!");
LOG.info("Submitting monitor thread !!");
Executors.newSingleThreadExecutor().submit(() -> {
boolean error = false;
try {
log.info("Monitoring thread(s) !!");
LOG.info("Monitoring thread(s) !!");
future.get();
} catch (ExecutionException ex) {
log.error("Monitor noticed one or more threads failed." + " Requesting graceful shutdown of other threads", ex);
LOG.error("Monitor noticed one or more threads failed." + " Requesting graceful shutdown of other threads", ex);
error = true;
shutdown(false);
} catch (InterruptedException ie) {
log.error("Got interrupted Monitoring threads", ie);
LOG.error("Got interrupted Monitoring threads", ie);
error = true;
shutdown(false);
} finally {

View File

@@ -37,7 +37,7 @@ import java.io.Serializable;
*/
public class Compactor implements Serializable {
protected static volatile Logger log = LogManager.getLogger(Compactor.class);
private static final Logger LOG = LogManager.getLogger(Compactor.class);
private transient HoodieWriteClient compactionClient;
private transient JavaSparkContext jssc;
@@ -48,12 +48,12 @@ public class Compactor implements Serializable {
}
public void compact(HoodieInstant instant) throws IOException {
log.info("Compactor executing compaction " + instant);
LOG.info("Compactor executing compaction " + instant);
JavaRDD<WriteStatus> res = compactionClient.compact(instant.getTimestamp());
long numWriteErrors = res.collect().stream().filter(r -> r.hasErrors()).count();
if (numWriteErrors != 0) {
// We treat even a single error in compaction as fatal
log.error("Compaction for instant (" + instant + ") failed with write errors. " + "Errors :" + numWriteErrors);
LOG.error("Compaction for instant (" + instant + ") failed with write errors. " + "Errors :" + numWriteErrors);
throw new HoodieException(
"Compaction for instant (" + instant + ") failed with write errors. " + "Errors :" + numWriteErrors);
}

View File

@@ -83,7 +83,7 @@ import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_REC
*/
public class DeltaSync implements Serializable {
protected static volatile Logger log = LogManager.getLogger(DeltaSync.class);
private static final Logger LOG = LogManager.getLogger(DeltaSync.class);
public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
public static String CHECKPOINT_RESET_KEY = "deltastreamer.checkpoint.reset_key";
@@ -168,7 +168,7 @@ public class DeltaSync implements Serializable {
this.tableType = tableType;
this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
this.props = props;
log.info("Creating delta streamer with configs : " + props.toString());
LOG.info("Creating delta streamer with configs : " + props.toString());
this.schemaProvider = schemaProvider;
refreshTimeline();
@@ -266,7 +266,7 @@ public class DeltaSync implements Serializable {
if (!resumeCheckpointStr.isPresent() && cfg.checkpoint != null) {
resumeCheckpointStr = Option.of(cfg.checkpoint);
}
log.info("Checkpoint to resume from : " + resumeCheckpointStr);
LOG.info("Checkpoint to resume from : " + resumeCheckpointStr);
final Option<JavaRDD<GenericRecord>> avroRDDOptional;
final String checkpointStr;
@@ -300,13 +300,13 @@ public class DeltaSync implements Serializable {
}
if (Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) {
log.info("No new data, source checkpoint has not changed. Nothing to commit." + "Old checkpoint=("
LOG.info("No new data, source checkpoint has not changed. Nothing to commit." + "Old checkpoint=("
+ resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")");
return null;
}
if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) {
log.info("No new data, perform empty commit.");
LOG.info("No new data, perform empty commit.");
return Pair.of(schemaProvider, Pair.of(checkpointStr, jssc.emptyRDD()));
}
@@ -342,7 +342,7 @@ public class DeltaSync implements Serializable {
boolean isEmpty = records.isEmpty();
String commitTime = startCommit();
log.info("Starting commit : " + commitTime);
LOG.info("Starting commit : " + commitTime);
JavaRDD<WriteStatus> writeStatusRDD;
if (cfg.operation == Operation.INSERT) {
@@ -367,13 +367,13 @@ public class DeltaSync implements Serializable {
}
if (hasErrors) {
log.warn("Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total="
LOG.warn("Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total="
+ totalErrorRecords + "/" + totalRecords);
}
boolean success = writeClient.commit(commitTime, writeStatusRDD, Option.of(checkpointCommitMetadata));
if (success) {
log.info("Commit " + commitTime + " successful!");
LOG.info("Commit " + commitTime + " successful!");
// Schedule compaction if needed
if (cfg.isAsyncCompactionEnabled()) {
@@ -387,16 +387,16 @@ public class DeltaSync implements Serializable {
hiveSyncTimeMs = hiveSyncContext != null ? hiveSyncContext.stop() : 0;
}
} else {
log.info("Commit " + commitTime + " failed!");
LOG.info("Commit " + commitTime + " failed!");
throw new HoodieException("Commit " + commitTime + " failed!");
}
} else {
log.error("Delta Sync found errors when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);
log.error("Printing out the top 100 errors");
LOG.error("Delta Sync found errors when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);
LOG.error("Printing out the top 100 errors");
writeStatusRDD.filter(ws -> ws.hasErrors()).take(100).forEach(ws -> {
log.error("Global error :", ws.getGlobalError());
LOG.error("Global error :", ws.getGlobalError());
if (ws.getErrors().size() > 0) {
ws.getErrors().entrySet().forEach(r -> log.trace("Error for key:" + r.getKey() + " is " + r.getValue()));
ws.getErrors().entrySet().forEach(r -> LOG.trace("Error for key:" + r.getKey() + " is " + r.getValue()));
}
});
// Rolling back instant
@@ -420,7 +420,7 @@ public class DeltaSync implements Serializable {
return writeClient.startCommit();
} catch (IllegalArgumentException ie) {
lastException = ie;
log.error("Got error trying to start a new commit. Retrying after sleeping for a sec", ie);
LOG.error("Got error trying to start a new commit. Retrying after sleeping for a sec", ie);
retryNum++;
try {
Thread.sleep(1000);
@@ -438,7 +438,7 @@ public class DeltaSync implements Serializable {
private void syncHive() throws ClassNotFoundException {
if (cfg.enableHiveSync) {
HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath);
log.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :"
LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :"
+ hiveSyncConfig.jdbcUrl + ", basePath :" + cfg.targetBasePath);
new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable();
@@ -451,7 +451,7 @@ public class DeltaSync implements Serializable {
* this constraint.
*/
public void setupWriteClient() {
log.info("Setting up Hoodie Write Client");
LOG.info("Setting up Hoodie Write Client");
if ((null != schemaProvider) && (null == writeClient)) {
registerAvroSchemas(schemaProvider);
HoodieWriteConfig hoodieCfg = getHoodieClientConfig(schemaProvider);
@@ -503,7 +503,7 @@ public class DeltaSync implements Serializable {
schemas.add(schemaProvider.getTargetSchema());
}
log.info("Registering Schema :" + schemas);
LOG.info("Registering Schema :" + schemas);
jssc.sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList());
}
}

View File

@@ -78,7 +78,7 @@ import java.util.stream.IntStream;
*/
public class HoodieDeltaStreamer implements Serializable {
private static volatile Logger log = LogManager.getLogger(HoodieDeltaStreamer.class);
private static final Logger LOG = LogManager.getLogger(HoodieDeltaStreamer.class);
public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
@@ -115,23 +115,23 @@ public class HoodieDeltaStreamer implements Serializable {
if (cfg.continuousMode) {
deltaSyncService.start(this::onDeltaSyncShutdown);
deltaSyncService.waitForShutdown();
log.info("Delta Sync shutting down");
LOG.info("Delta Sync shutting down");
} else {
log.info("Delta Streamer running only single round");
LOG.info("Delta Streamer running only single round");
try {
deltaSyncService.getDeltaSync().syncOnce();
} catch (Exception ex) {
log.error("Got error running delta sync once. Shutting down", ex);
LOG.error("Got error running delta sync once. Shutting down", ex);
throw ex;
} finally {
deltaSyncService.close();
log.info("Shut down deltastreamer");
LOG.info("Shut down deltastreamer");
}
}
}
private boolean onDeltaSyncShutdown(boolean error) {
log.info("DeltaSync shutdown. Closing write client. Error?" + error);
LOG.info("DeltaSync shutdown. Closing write client. Error?" + error);
deltaSyncService.close();
return true;
}
@@ -363,7 +363,7 @@ public class HoodieDeltaStreamer implements Serializable {
}
this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
log.info("Creating delta streamer with configs : " + props.toString());
LOG.info("Creating delta streamer with configs : " + props.toString());
this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc);
if (cfg.filterDupes) {
@@ -385,7 +385,7 @@ public class HoodieDeltaStreamer implements Serializable {
boolean error = false;
if (cfg.isAsyncCompactionEnabled()) {
// set Scheduler Pool.
log.info("Setting Spark Pool name for delta-sync to " + SchedulerConfGenerator.DELTASYNC_POOL_NAME);
LOG.info("Setting Spark Pool name for delta-sync to " + SchedulerConfGenerator.DELTASYNC_POOL_NAME);
jssc.setLocalProperty("spark.scheduler.pool", SchedulerConfGenerator.DELTASYNC_POOL_NAME);
}
try {
@@ -394,19 +394,19 @@ public class HoodieDeltaStreamer implements Serializable {
long start = System.currentTimeMillis();
Option<String> scheduledCompactionInstant = deltaSync.syncOnce();
if (scheduledCompactionInstant.isPresent()) {
log.info("Enqueuing new pending compaction instant (" + scheduledCompactionInstant + ")");
LOG.info("Enqueuing new pending compaction instant (" + scheduledCompactionInstant + ")");
asyncCompactService.enqueuePendingCompaction(new HoodieInstant(State.REQUESTED,
HoodieTimeline.COMPACTION_ACTION, scheduledCompactionInstant.get()));
asyncCompactService.waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions);
}
long toSleepMs = cfg.minSyncIntervalSeconds * 1000 - (System.currentTimeMillis() - start);
if (toSleepMs > 0) {
log.info("Last sync ran less than min sync interval: " + cfg.minSyncIntervalSeconds + " s, sleep: "
LOG.info("Last sync ran less than min sync interval: " + cfg.minSyncIntervalSeconds + " s, sleep: "
+ toSleepMs + " ms.");
Thread.sleep(toSleepMs);
}
} catch (Exception e) {
log.error("Shutting down delta-sync due to exception", e);
LOG.error("Shutting down delta-sync due to exception", e);
error = true;
throw new HoodieException(e.getMessage(), e);
}
@@ -422,9 +422,9 @@ public class HoodieDeltaStreamer implements Serializable {
* Shutdown compactor as DeltaSync is shutdown.
*/
private void shutdownCompactor(boolean error) {
log.info("Delta Sync shutdown. Error ?" + error);
LOG.info("Delta Sync shutdown. Error ?" + error);
if (asyncCompactService != null) {
log.warn("Gracefully shutting down compactor");
LOG.warn("Gracefully shutting down compactor");
asyncCompactService.shutdown(false);
}
}
@@ -537,7 +537,7 @@ public class HoodieDeltaStreamer implements Serializable {
* @throws InterruptedException
*/
private HoodieInstant fetchNextCompactionInstant() throws InterruptedException {
log.info("Compactor waiting for next instant for compaction upto 60 seconds");
LOG.info("Compactor waiting for next instant for compaction upto 60 seconds");
HoodieInstant instant = pendingCompactions.poll(60, TimeUnit.SECONDS);
if (instant != null) {
try {
@@ -560,7 +560,7 @@ public class HoodieDeltaStreamer implements Serializable {
IntStream.range(0, maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
try {
// Set Compactor Pool Name for allowing users to prioritize compaction
log.info("Setting Spark Pool name for compaction to " + SchedulerConfGenerator.COMPACT_POOL_NAME);
LOG.info("Setting Spark Pool name for compaction to " + SchedulerConfGenerator.COMPACT_POOL_NAME);
jssc.setLocalProperty("spark.scheduler.pool", SchedulerConfGenerator.COMPACT_POOL_NAME);
while (!isShutdownRequested()) {
@@ -569,11 +569,11 @@ public class HoodieDeltaStreamer implements Serializable {
compactor.compact(instant);
}
}
log.info("Compactor shutting down properly!!");
LOG.info("Compactor shutting down properly!!");
} catch (InterruptedException ie) {
log.warn("Compactor executor thread got interrupted exception. Stopping", ie);
LOG.warn("Compactor executor thread got interrupted exception. Stopping", ie);
} catch (IOException e) {
log.error("Compactor executor failed", e);
LOG.error("Compactor executor failed", e);
throw new HoodieIOException(e.getMessage(), e);
}
return true;

View File

@@ -39,7 +39,7 @@ import java.util.UUID;
*/
public class SchedulerConfGenerator {
protected static volatile Logger log = LogManager.getLogger(SchedulerConfGenerator.class);
private static final Logger LOG = LogManager.getLogger(SchedulerConfGenerator.class);
public static final String DELTASYNC_POOL_NAME = "hoodiedeltasync";
public static final String COMPACT_POOL_NAME = "hoodiecompact";
@@ -76,7 +76,7 @@ public class SchedulerConfGenerator {
cfg.compactSchedulingWeight, cfg.deltaSyncSchedulingMinShare, cfg.compactSchedulingMinShare);
additionalSparkConfigs.put(SPARK_SCHEDULER_ALLOCATION_FILE_KEY, sparkSchedulingConfFile);
} else {
log.warn("Job Scheduling Configs will not be in effect as spark.scheduler.mode "
LOG.warn("Job Scheduling Configs will not be in effect as spark.scheduler.mode "
+ "is not set to FAIR at instatiation time. Continuing without scheduling configs");
}
return additionalSparkConfigs;
@@ -88,7 +88,7 @@ public class SchedulerConfGenerator {
BufferedWriter bw = new BufferedWriter(new FileWriter(tempConfigFile));
bw.write(generateConfig(deltaSyncWeight, compactionWeight, deltaSyncMinShare, compactionMinShare));
bw.close();
log.info("Configs written to file" + tempConfigFile.getAbsolutePath());
LOG.info("Configs written to file" + tempConfigFile.getAbsolutePath());
return tempConfigFile.getAbsolutePath();
}
}

View File

@@ -58,7 +58,7 @@ import java.util.stream.IntStream;
public class TimelineServerPerf implements Serializable {
private static volatile Logger logger = LogManager.getLogger(TimelineServerPerf.class);
private static final Logger LOG = LogManager.getLogger(TimelineServerPerf.class);
private final Config cfg;
private transient TimelineService timelineServer;
private final boolean useExternalTimelineServer;
@@ -73,10 +73,10 @@ public class TimelineServerPerf implements Serializable {
private void setHostAddrFromSparkConf(SparkConf sparkConf) {
String hostAddr = sparkConf.get("spark.driver.host", null);
if (hostAddr != null) {
logger.info("Overriding hostIp to (" + hostAddr + ") found in spark-conf. It was " + this.hostAddr);
LOG.info("Overriding hostIp to (" + hostAddr + ") found in spark-conf. It was " + this.hostAddr);
this.hostAddr = hostAddr;
} else {
logger.warn("Unable to find driver bind address from spark config");
LOG.warn("Unable to find driver bind address from spark config");
}
}

View File

@@ -40,7 +40,7 @@ import org.apache.spark.streaming.kafka.OffsetRange;
*/
public class AvroKafkaSource extends AvroSource {
private static Logger log = LogManager.getLogger(AvroKafkaSource.class);
private static final Logger LOG = LogManager.getLogger(AvroKafkaSource.class);
private final KafkaOffsetGen offsetGen;
@@ -57,7 +57,7 @@ public class AvroKafkaSource extends AvroSource {
if (totalNewMsgs <= 0) {
return new InputBatch<>(Option.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : "");
} else {
log.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
}
JavaRDD<GenericRecord> newDataRDD = toRDD(offsetRanges);
return new InputBatch<>(Option.of(newDataRDD), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));

View File

@@ -59,7 +59,7 @@ import java.util.stream.Collectors;
*/
public class HiveIncrPullSource extends AvroSource {
private static volatile Logger log = LogManager.getLogger(HiveIncrPullSource.class);
private static final Logger LOG = LogManager.getLogger(HiveIncrPullSource.class);
private final transient FileSystem fs;
@@ -86,7 +86,7 @@ public class HiveIncrPullSource extends AvroSource {
*/
private Option<String> findCommitToPull(Option<String> latestTargetCommit) throws IOException {
log.info("Looking for commits ");
LOG.info("Looking for commits ");
FileStatus[] commitTimePaths = fs.listStatus(new Path(incrPullRootPath));
List<String> commitTimes = new ArrayList<>(commitTimePaths.length);
@@ -95,7 +95,7 @@ public class HiveIncrPullSource extends AvroSource {
commitTimes.add(splits[splits.length - 1]);
}
Collections.sort(commitTimes);
log.info("Retrieved commit times " + commitTimes);
LOG.info("Retrieved commit times " + commitTimes);
if (!latestTargetCommit.isPresent()) {
// start from the beginning

View File

@@ -28,6 +28,8 @@ import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
@@ -37,6 +39,9 @@ import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
public class HoodieIncrSource extends RowSource {
private static final Logger LOG = LogManager.getLogger(HoodieIncrSource.class);
protected static class Config {
/**
@@ -104,7 +109,7 @@ public class HoodieIncrSource extends RowSource {
numInstantsPerFetch, beginInstant, readLatestOnMissingCkpt);
if (instantEndpts.getKey().equals(instantEndpts.getValue())) {
log.warn("Already caught up. Begin Checkpoint was :" + instantEndpts.getKey());
LOG.warn("Already caught up. Begin Checkpoint was :" + instantEndpts.getKey());
return Pair.of(Option.empty(), instantEndpts.getKey());
}

View File

@@ -38,7 +38,7 @@ import org.apache.spark.streaming.kafka.OffsetRange;
*/
public class JsonKafkaSource extends JsonSource {
private static Logger log = LogManager.getLogger(JsonKafkaSource.class);
private static final Logger LOG = LogManager.getLogger(JsonKafkaSource.class);
private final KafkaOffsetGen offsetGen;
@@ -55,7 +55,7 @@ public class JsonKafkaSource extends JsonSource {
if (totalNewMsgs <= 0) {
return new InputBatch<>(Option.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : "");
}
log.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
JavaRDD<String> newDataRDD = toRDD(offsetRanges);
return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges));
}

View File

@@ -33,7 +33,7 @@ import java.io.Serializable;
* Represents a source from which we can tail data. Assumes a constructor that takes properties.
*/
public abstract class Source<T> implements Serializable {
protected static volatile Logger log = LogManager.getLogger(Source.class);
private static final Logger LOG = LogManager.getLogger(Source.class);
public enum SourceType {
JSON, AVRO, ROW, PARQUET

View File

@@ -51,7 +51,7 @@ import scala.util.Either;
*/
public class KafkaOffsetGen {
private static volatile Logger log = LogManager.getLogger(KafkaOffsetGen.class);
private static final Logger LOG = LogManager.getLogger(KafkaOffsetGen.class);
public static class CheckpointUtils {

View File

@@ -37,7 +37,7 @@ import java.util.UUID;
public class FlatteningTransformer implements Transformer {
private static final String TMP_TABLE = "HUDI_SRC_TMP_TABLE_";
private static volatile Logger log = LogManager.getLogger(SqlQueryBasedTransformer.class);
private static final Logger LOG = LogManager.getLogger(SqlQueryBasedTransformer.class);
/**
* Configs supported.
@@ -48,7 +48,7 @@ public class FlatteningTransformer implements Transformer {
// tmp table name doesn't like dashes
String tmpTable = TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
log.info("Registering tmp table : " + tmpTable);
LOG.info("Registering tmp table : " + tmpTable);
rowDataset.registerTempTable(tmpTable);
return sparkSession.sql("select " + flattenSchema(rowDataset.schema(), null) + " from " + tmpTable);
}

View File

@@ -36,7 +36,7 @@ import java.util.UUID;
*/
public class SqlQueryBasedTransformer implements Transformer {
private static volatile Logger log = LogManager.getLogger(SqlQueryBasedTransformer.class);
private static final Logger LOG = LogManager.getLogger(SqlQueryBasedTransformer.class);
private static final String SRC_PATTERN = "<SRC>";
private static final String TMP_TABLE = "HOODIE_SRC_TMP_TABLE_";
@@ -59,10 +59,10 @@ public class SqlQueryBasedTransformer implements Transformer {
// tmp table name doesn't like dashes
String tmpTable = TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
log.info("Registering tmp table : " + tmpTable);
LOG.info("Registering tmp table : " + tmpTable);
rowDataset.registerTempTable(tmpTable);
String sqlStr = transformerSQL.replaceAll(SRC_PATTERN, tmpTable);
log.info("SQL Query for transformation : (" + sqlStr + ")");
LOG.info("SQL Query for transformation : (" + sqlStr + ")");
return sparkSession.sql(sqlStr);
}
}

View File

@@ -89,7 +89,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
private static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties";
private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties";
private static volatile Logger log = LogManager.getLogger(TestHoodieDeltaStreamer.class);
private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class);
@BeforeClass
public static void initClass() throws Exception {
@@ -247,7 +247,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
static void assertAtleastNCompactionCommits(int minExpected, String datasetPath, FileSystem fs) {
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), datasetPath);
HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
log.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
int numCompactionCommits = (int) timeline.getInstants().count();
assertTrue("Got=" + numCompactionCommits + ", exp >=" + minExpected, minExpected <= numCompactionCommits);
}
@@ -255,7 +255,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
static void assertAtleastNDeltaCommits(int minExpected, String datasetPath, FileSystem fs) {
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), datasetPath);
HoodieTimeline timeline = meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
log.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
int numDeltaCommits = (int) timeline.getInstants().count();
assertTrue("Got=" + numDeltaCommits + ", exp >=" + minExpected, minExpected <= numDeltaCommits);
}
@@ -280,7 +280,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
Thread.sleep(3000);
ret = condition.apply(true);
} catch (Throwable error) {
log.warn("Got error :", error);
LOG.warn("Got error :", error);
ret = false;
}
}
@@ -311,7 +311,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
fail("Should error out when setting the key generator class property to an invalid value");
} catch (IOException e) {
// expected
log.error("Expected error during getting the key generator", e);
LOG.error("Expected error during getting the key generator", e);
assertTrue(e.getMessage().contains("Could not load key generator class"));
}
}
@@ -326,7 +326,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
fail("Should error out when pointed out at a dir thats not a dataset");
} catch (DatasetNotFoundException e) {
// expected
log.error("Expected error during dataset creation", e);
LOG.error("Expected error during dataset creation", e);
}
}
@@ -497,7 +497,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
fail("Should error out when schema provider is not provided");
} catch (HoodieException e) {
log.error("Expected error during reading data from source ", e);
LOG.error("Expected error during reading data from source ", e);
assertTrue(e.getMessage().contains("Please provide a valid schema provider class!"));
}
}

View File

@@ -29,6 +29,8 @@ import org.apache.hudi.utilities.sources.config.TestSourceConfig;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
@@ -40,6 +42,8 @@ import java.util.stream.Stream;
public abstract class AbstractBaseTestSource extends AvroSource {
private static final Logger LOG = LogManager.getLogger(AbstractBaseTestSource.class);
static final int DEFAULT_PARTITION_NUM = 0;
// Static instance, helps with reuse across a test.
@@ -56,7 +60,7 @@ public abstract class AbstractBaseTestSource extends AvroSource {
TestSourceConfig.DEFAULT_USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS);
String baseStoreDir = props.getString(TestSourceConfig.ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS,
File.createTempFile("test_data_gen", ".keys").getParent()) + "/" + partition;
log.info("useRocksForTestDataGenKeys=" + useRocksForTestDataGenKeys + ", BaseStoreDir=" + baseStoreDir);
LOG.info("useRocksForTestDataGenKeys=" + useRocksForTestDataGenKeys + ", BaseStoreDir=" + baseStoreDir);
dataGeneratorMap.put(partition, new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS,
useRocksForTestDataGenKeys ? new RocksDBBasedMap<>(baseStoreDir) : new HashMap<>()));
} catch (IOException e) {
@@ -85,11 +89,11 @@ public abstract class AbstractBaseTestSource extends AvroSource {
// generate `sourceLimit` number of upserts each time.
int numExistingKeys = dataGenerator.getNumExistingKeys();
log.info("NumExistingKeys=" + numExistingKeys);
LOG.info("NumExistingKeys=" + numExistingKeys);
int numUpdates = Math.min(numExistingKeys, sourceLimit / 2);
int numInserts = sourceLimit - numUpdates;
log.info("Before adjustments => numInserts=" + numInserts + ", numUpdates=" + numUpdates);
LOG.info("Before adjustments => numInserts=" + numInserts + ", numUpdates=" + numUpdates);
if (numInserts + numExistingKeys > maxUniqueKeys) {
// Limit inserts so that maxUniqueRecords is maintained
@@ -101,9 +105,9 @@ public abstract class AbstractBaseTestSource extends AvroSource {
numUpdates = Math.min(numExistingKeys, sourceLimit - numInserts);
}
log.info("NumInserts=" + numInserts + ", NumUpdates=" + numUpdates + ", maxUniqueRecords=" + maxUniqueKeys);
LOG.info("NumInserts=" + numInserts + ", NumUpdates=" + numUpdates + ", maxUniqueRecords=" + maxUniqueKeys);
long memoryUsage1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
log.info("Before DataGen. Memory Usage=" + memoryUsage1 + ", Total Memory=" + Runtime.getRuntime().totalMemory()
LOG.info("Before DataGen. Memory Usage=" + memoryUsage1 + ", Total Memory=" + Runtime.getRuntime().totalMemory()
+ ", Free Memory=" + Runtime.getRuntime().freeMemory());
Stream<GenericRecord> updateStream = dataGenerator.generateUniqueUpdatesStream(commitTime, numUpdates)

View File

@@ -24,6 +24,8 @@ import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.config.TestSourceConfig;
import org.apache.avro.generic.GenericRecord;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
@@ -37,6 +39,8 @@ import java.util.stream.IntStream;
*/
public class DistributedTestDataSource extends AbstractBaseTestSource {
private static final Logger LOG = LogManager.getLogger(DistributedTestDataSource.class);
private final int numTestSourcePartitions;
public DistributedTestDataSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
@@ -50,7 +54,7 @@ public class DistributedTestDataSource extends AbstractBaseTestSource {
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) {
int nextCommitNum = lastCkptStr.map(s -> Integer.parseInt(s) + 1).orElse(0);
String commitTime = String.format("%05d", nextCommitNum);
log.info("Source Limit is set to " + sourceLimit);
LOG.info("Source Limit is set to " + sourceLimit);
// No new data.
if (sourceLimit <= 0) {
@@ -69,7 +73,7 @@ public class DistributedTestDataSource extends AbstractBaseTestSource {
JavaRDD<GenericRecord> avroRDD =
sparkContext.parallelize(IntStream.range(0, numTestSourcePartitions).boxed().collect(Collectors.toList()),
numTestSourcePartitions).mapPartitionsWithIndex((p, idx) -> {
log.info("Initializing source with newProps=" + newProps);
LOG.info("Initializing source with newProps=" + newProps);
if (!dataGeneratorMap.containsKey(p)) {
initDataGen(newProps, p);
}

View File

@@ -37,7 +37,7 @@ import java.util.stream.Collectors;
*/
public class TestDataSource extends AbstractBaseTestSource {
private static volatile Logger log = LogManager.getLogger(TestDataSource.class);
private static final Logger LOG = LogManager.getLogger(TestDataSource.class);
public TestDataSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
@@ -50,7 +50,7 @@ public class TestDataSource extends AbstractBaseTestSource {
int nextCommitNum = lastCheckpointStr.map(s -> Integer.parseInt(s) + 1).orElse(0);
String commitTime = String.format("%05d", nextCommitNum);
log.info("Source Limit is set to " + sourceLimit);
LOG.info("Source Limit is set to " + sourceLimit);
// No new data.
if (sourceLimit <= 0) {