From e637d9ed26fea1a336f2fd6139cde0dd192c429d Mon Sep 17 00:00:00 2001 From: hejinbiao123 <38057507+hejinbiao123@users.noreply.github.com> Date: Tue, 31 Dec 2019 13:49:34 +0800 Subject: [PATCH] [HUDI-455] Redo hudi-client log statements using SLF4J (#1145) * [HUDI-455] Redo hudi-client log statements using SLF4J --- hudi-client/pom.xml | 5 ++ .../org/apache/hudi/AbstractHoodieClient.java | 6 +- .../apache/hudi/CompactionAdminClient.java | 17 +++--- .../org/apache/hudi/HoodieCleanClient.java | 16 +++--- .../org/apache/hudi/HoodieReadClient.java | 6 +- .../org/apache/hudi/HoodieWriteClient.java | 56 +++++++++---------- .../embedded/EmbeddedTimelineService.java | 10 ++-- .../DefaultHBaseQPSResourceAllocator.java | 10 ++-- .../apache/hudi/index/hbase/HBaseIndex.java | 42 +++++++------- .../apache/hudi/io/HoodieAppendHandle.java | 20 +++---- .../org/apache/hudi/io/HoodieCleanHelper.java | 16 +++--- .../hudi/io/HoodieCommitArchiveLog.java | 22 ++++---- .../apache/hudi/io/HoodieCreateHandle.java | 17 +++--- .../apache/hudi/io/HoodieKeyLookupHandle.java | 19 +++---- .../org/apache/hudi/io/HoodieMergeHandle.java | 39 +++++++------ .../org/apache/hudi/io/HoodieWriteHandle.java | 10 ++-- .../compact/HoodieRealtimeTableCompactor.java | 28 +++++----- .../apache/hudi/metrics/HoodieMetrics.java | 17 +++--- .../hudi/metrics/JmxMetricsReporter.java | 6 +- .../java/org/apache/hudi/metrics/Metrics.java | 6 +- .../hudi/metrics/MetricsGraphiteReporter.java | 6 +- .../hudi/metrics/MetricsReporterFactory.java | 8 +-- .../hudi/table/HoodieCopyOnWriteTable.java | 52 ++++++++--------- .../hudi/table/HoodieMergeOnReadTable.java | 27 +++++---- .../org/apache/hudi/table/HoodieTable.java | 12 ++-- .../apache/hudi/table/RollbackExecutor.java | 14 ++--- 26 files changed, 242 insertions(+), 245 deletions(-) diff --git a/hudi-client/pom.xml b/hudi-client/pom.xml index d350777cf..66538e0d9 100644 --- a/hudi-client/pom.xml +++ b/hudi-client/pom.xml @@ -85,6 +85,11 @@ log4j log4j + + org.slf4j + slf4j-api + ${slf4j.version} + diff --git a/hudi-client/src/main/java/org/apache/hudi/AbstractHoodieClient.java b/hudi-client/src/main/java/org/apache/hudi/AbstractHoodieClient.java index dd108be02..8457b908f 100644 --- a/hudi-client/src/main/java/org/apache/hudi/AbstractHoodieClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/AbstractHoodieClient.java @@ -26,9 +26,9 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hadoop.fs.FileSystem; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; @@ -39,7 +39,7 @@ import java.io.Serializable; */ public abstract class AbstractHoodieClient implements Serializable, AutoCloseable { - private static final Logger LOG = LogManager.getLogger(AbstractHoodieClient.class); + private static final Logger LOG = LoggerFactory.getLogger(AbstractHoodieClient.class); protected final transient FileSystem fs; protected final transient JavaSparkContext jsc; diff --git a/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java b/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java index 549c990a4..6927f7df7 100644 --- a/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java @@ -45,9 +45,9 @@ import org.apache.hudi.func.OperationResult; import com.google.common.base.Preconditions; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.FileNotFoundException; import java.io.IOException; @@ -65,7 +65,7 @@ import static org.apache.hudi.common.table.HoodieTimeline.COMPACTION_ACTION; */ public class CompactionAdminClient extends AbstractHoodieClient { - private static final Logger LOG = LogManager.getLogger(CompactionAdminClient.class); + private static final Logger LOG = LoggerFactory.getLogger(CompactionAdminClient.class); public CompactionAdminClient(JavaSparkContext jsc, String basePath) { super(jsc, HoodieWriteConfig.newBuilder().withPath(basePath).build()); @@ -359,13 +359,14 @@ public class CompactionAdminClient extends AbstractHoodieClient { if (!dryRun) { return jsc.parallelize(renameActions, parallelism).map(lfPair -> { try { - LOG.info("RENAME " + lfPair.getLeft().getPath() + " => " + lfPair.getRight().getPath()); + LOG.info("RENAME {} => {}", lfPair.getLeft().getPath(), lfPair.getRight().getPath()); renameLogFile(metaClient, lfPair.getLeft(), lfPair.getRight()); return new RenameOpResult(lfPair, true, Option.empty()); } catch (IOException e) { LOG.error("Error renaming log file", e); - LOG.error("\n\n\n***NOTE Compaction is in inconsistent state. Try running \"compaction repair " - + lfPair.getLeft().getBaseCommitTime() + "\" to recover from failure ***\n\n\n"); + LOG.error("\n\n\n***NOTE Compaction is in inconsistent state. " + + "Try running \"compaction repair {} \" to recover from failure ***\n\n\n", + lfPair.getLeft().getBaseCommitTime()); return new RenameOpResult(lfPair, false, Option.of(e)); } }).collect(); @@ -396,7 +397,7 @@ public class CompactionAdminClient extends AbstractHoodieClient { HoodieCompactionPlan plan = getCompactionPlan(metaClient, compactionInstant); if (plan.getOperations() != null) { LOG.info( - "Number of Compaction Operations :" + plan.getOperations().size() + " for instant :" + compactionInstant); + "Number of Compaction Operations :{} for instant :{}", plan.getOperations().size(), compactionInstant); List ops = plan.getOperations().stream() .map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList()); return jsc.parallelize(ops, parallelism).flatMap(op -> { @@ -410,7 +411,7 @@ public class CompactionAdminClient extends AbstractHoodieClient { } }).collect(); } - LOG.warn("No operations for compaction instant : " + compactionInstant); + LOG.warn("No operations for compaction instant : {}", compactionInstant); return new ArrayList<>(); } diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java index 9411782bc..68503c67e 100644 --- a/hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java @@ -39,16 +39,16 @@ import org.apache.hudi.table.HoodieTable; import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.List; public class HoodieCleanClient extends AbstractHoodieClient { - private static final Logger LOG = LogManager.getLogger(HoodieCleanClient.class); + private static final Logger LOG = LoggerFactory.getLogger(HoodieCleanClient.class); private final transient HoodieMetrics metrics; public HoodieCleanClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, HoodieMetrics metrics) { @@ -85,7 +85,7 @@ public class HoodieCleanClient extends AbstractHo // If there are inflight(failed) or previously requested clean operation, first perform them table.getCleanTimeline().filterInflightsAndRequested().getInstants().forEach(hoodieInstant -> { - LOG.info("There were previously unfinished cleaner operations. Finishing Instant=" + hoodieInstant); + LOG.info("There were previously unfinished cleaner operations. Finishing Instant={}", hoodieInstant); runClean(table, hoodieInstant); }); @@ -122,7 +122,7 @@ public class HoodieCleanClient extends AbstractHo // Save to both aux and timeline folder try { table.getActiveTimeline().saveToCleanRequested(cleanInstant, AvroUtils.serializeCleanerPlan(cleanerPlan)); - LOG.info("Requesting Cleaning with instant time " + cleanInstant); + LOG.info("Requesting Cleaning with instant time {}", cleanInstant); } catch (IOException e) { LOG.error("Got exception when saving cleaner requested file", e); throw new HoodieIOException(e.getMessage(), e); @@ -173,20 +173,20 @@ public class HoodieCleanClient extends AbstractHo Option durationInMs = Option.empty(); if (context != null) { durationInMs = Option.of(metrics.getDurationInMs(context.stop())); - LOG.info("cleanerElaspsedTime (Minutes): " + durationInMs.get() / (1000 * 60)); + LOG.info("cleanerElaspsedTime (Minutes): {}", durationInMs.get() / (1000 * 60)); } HoodieTableMetaClient metaClient = createMetaClient(true); // Create the metadata and save it HoodieCleanMetadata metadata = CleanerUtils.convertCleanMetadata(metaClient, cleanInstant.getTimestamp(), durationInMs, cleanStats); - LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files. Earliest Retained :" + metadata.getEarliestCommitToRetain()); + LOG.info("Cleaned {} files. Earliest Retained : {}", metadata.getTotalFilesDeleted(), metadata.getEarliestCommitToRetain()); metrics.updateCleanMetrics(durationInMs.orElseGet(() -> -1L), metadata.getTotalFilesDeleted()); table.getActiveTimeline().transitionCleanInflightToComplete( new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, cleanInstant.getTimestamp()), AvroUtils.serializeCleanMetadata(metadata)); - LOG.info("Marked clean started on " + cleanInstant.getTimestamp() + " as complete"); + LOG.info("Marked clean started on {} as complete", cleanInstant.getTimestamp()); return metadata; } catch (IOException e) { throw new HoodieIOException("Failed to clean up after commit", e); diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieReadClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieReadClient.java index 3c4290c89..f309f4057 100644 --- a/hudi-client/src/main/java/org/apache/hudi/HoodieReadClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/HoodieReadClient.java @@ -35,8 +35,6 @@ import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -51,6 +49,8 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -58,7 +58,7 @@ import scala.Tuple2; */ public class HoodieReadClient extends AbstractHoodieClient { - private static final Logger LOG = LogManager.getLogger(HoodieReadClient.class); + private static final Logger LOG = LoggerFactory.getLogger(HoodieReadClient.class); /** * TODO: We need to persist the index type into hoodie.properties and be able to access the index just with a simple diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java index 9d0332d62..697dac45e 100644 --- a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java @@ -67,8 +67,6 @@ import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -86,6 +84,8 @@ import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -96,7 +96,7 @@ import scala.Tuple2; */ public class HoodieWriteClient extends AbstractHoodieClient { - private static final Logger LOG = LogManager.getLogger(HoodieWriteClient.class); + private static final Logger LOG = LoggerFactory.getLogger(HoodieWriteClient.class); private static final String UPDATE_STR = "update"; private static final String LOOKUP_STR = "lookup"; private final boolean rollbackPending; @@ -399,13 +399,13 @@ public class HoodieWriteClient extends AbstractHo private void commitOnAutoCommit(String commitTime, JavaRDD resultRDD, String actionType) { if (config.shouldAutoCommit()) { - LOG.info("Auto commit enabled: Committing " + commitTime); + LOG.info("Auto commit enabled: Committing {}", commitTime); boolean commitResult = commit(commitTime, resultRDD, Option.empty(), actionType); if (!commitResult) { throw new HoodieCommitException("Failed to commit " + commitTime); } } else { - LOG.info("Auto commit disabled for " + commitTime); + LOG.info("Auto commit disabled for {}", commitTime); } } @@ -454,13 +454,13 @@ public class HoodieWriteClient extends AbstractHo if (preppedRecords.getStorageLevel() == StorageLevel.NONE()) { preppedRecords.persist(StorageLevel.MEMORY_AND_DISK_SER()); } else { - LOG.info("RDD PreppedRecords was persisted at: " + preppedRecords.getStorageLevel()); + LOG.info("RDD PreppedRecords was persisted at: {}", preppedRecords.getStorageLevel()); } WorkloadProfile profile = null; if (hoodieTable.isWorkloadProfileNeeded()) { profile = new WorkloadProfile(preppedRecords); - LOG.info("Workload profile :" + profile); + LOG.info("Workload profile : {}", profile); saveWorkloadProfileMetadataToInflight(profile, hoodieTable, commitTime); } @@ -526,7 +526,7 @@ public class HoodieWriteClient extends AbstractHo private boolean commit(String commitTime, JavaRDD writeStatuses, Option> extraMetadata, String actionType) { - LOG.info("Commiting " + commitTime); + LOG.info("Commiting {}", commitTime); // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); @@ -573,7 +573,7 @@ public class HoodieWriteClient extends AbstractHo metadata, actionType); writeContext = null; } - LOG.info("Committed " + commitTime); + LOG.info("Committed {}", commitTime); } catch (IOException e) { throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + commitTime, e); @@ -607,7 +607,7 @@ public class HoodieWriteClient extends AbstractHo } String latestCommit = table.getCompletedCommitsTimeline().lastInstant().get().getTimestamp(); - LOG.info("Savepointing latest commit " + latestCommit); + LOG.info("Savepointing latest commit {}", latestCommit); return savepoint(latestCommit, user, comment); } @@ -658,7 +658,7 @@ public class HoodieWriteClient extends AbstractHo config.shouldAssumeDatePartitioning())) .mapToPair((PairFunction>) partitionPath -> { // Scan all partitions files with this commit time - LOG.info("Collecting latest files in partition path " + partitionPath); + LOG.info("Collecting latest files in partition path {}", partitionPath); ReadOptimizedView view = table.getROFileSystemView(); List latestFiles = view.getLatestDataFilesBeforeOrOn(partitionPath, commitTime) .map(HoodieDataFile::getFileName).collect(Collectors.toList()); @@ -672,7 +672,7 @@ public class HoodieWriteClient extends AbstractHo table.getActiveTimeline() .saveAsComplete(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, commitTime), AvroUtils.serializeSavepointMetadata(metadata)); - LOG.info("Savepoint " + commitTime + " created"); + LOG.info("Savepoint {} created", commitTime); return true; } catch (IOException e) { throw new HoodieSavepointException("Failed to savepoint " + commitTime, e); @@ -696,13 +696,13 @@ public class HoodieWriteClient extends AbstractHo HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime); boolean isSavepointPresent = table.getCompletedSavepointTimeline().containsInstant(savePoint); if (!isSavepointPresent) { - LOG.warn("No savepoint present " + savepointTime); + LOG.warn("No savepoint present {}", savepointTime); return; } activeTimeline.revertToInflight(savePoint); activeTimeline.deleteInflight(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, savepointTime)); - LOG.info("Savepoint " + savepointTime + " deleted"); + LOG.info("Savepoint {} deleted", savepointTime); } /** @@ -730,7 +730,7 @@ public class HoodieWriteClient extends AbstractHo } else { throw new IllegalArgumentException("Compaction is not in requested state " + compactionTime); } - LOG.info("Compaction " + compactionTime + " deleted"); + LOG.info("Compaction {} deleted", compactionTime); } /** @@ -758,7 +758,7 @@ public class HoodieWriteClient extends AbstractHo List commitsToRollback = commitTimeline.findInstantsAfter(savepointTime, Integer.MAX_VALUE).getInstants() .map(HoodieInstant::getTimestamp).collect(Collectors.toList()); - LOG.info("Rolling back commits " + commitsToRollback); + LOG.info("Rolling back commits {}", commitsToRollback); restoreToInstant(savepointTime); @@ -818,7 +818,7 @@ public class HoodieWriteClient extends AbstractHo // delete these files when it does not see a corresponding instant file under .hoodie List statsForCompaction = doRollbackAndGetStats(instant); instantsToStats.put(instant.getTimestamp(), statsForCompaction); - LOG.info("Deleted compaction instant " + instant); + LOG.info("Deleted compaction instant {}", instant); break; default: throw new IllegalArgumentException("invalid action name " + instant.getAction()); @@ -859,7 +859,7 @@ public class HoodieWriteClient extends AbstractHo if (commitTimeline.empty() && inflightAndRequestedCommitTimeline.empty()) { // nothing to rollback - LOG.info("No commits to rollback " + commitToRollback); + LOG.info("No commits to rollback {}", commitToRollback); } // Make sure only the last n commits are being rolled back @@ -881,13 +881,13 @@ public class HoodieWriteClient extends AbstractHo List stats = table.rollback(jsc, instantToRollback, true); - LOG.info("Deleted inflight commits " + commitToRollback); + LOG.info("Deleted inflight commits {}", commitToRollback); // cleanup index entries if (!index.rollbackCommit(commitToRollback)) { throw new HoodieRollbackException("Rollback index changes failed, for time :" + commitToRollback); } - LOG.info("Index rolled back for commits " + commitToRollback); + LOG.info("Index rolled back for commits {}", commitToRollback); return stats; } @@ -908,7 +908,7 @@ public class HoodieWriteClient extends AbstractHo table.getActiveTimeline().saveAsComplete( new HoodieInstant(true, HoodieTimeline.ROLLBACK_ACTION, startRollbackTime), AvroUtils.serializeRollbackMetadata(rollbackMetadata)); - LOG.info("Commits " + commitsToRollback + " rollback is complete"); + LOG.info("Commits {} rollback is complete", commitsToRollback); if (!table.getActiveTimeline().getCleanerTimeline().empty()) { LOG.info("Cleaning up older rollback meta files"); @@ -936,7 +936,7 @@ public class HoodieWriteClient extends AbstractHo AvroUtils.convertRestoreMetadata(startRestoreTime, durationInMs, commitsToRollback, commitToStats); table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, startRestoreTime), AvroUtils.serializeRestoreMetadata(restoreMetadata)); - LOG.info("Commits " + commitsToRollback + " rollback is complete. Restored dataset to " + restoreToInstant); + LOG.info("Commits {} rollback is complete. Restored dataset to {}", commitsToRollback, restoreToInstant); if (!table.getActiveTimeline().getCleanerTimeline().empty()) { LOG.info("Cleaning up older restore meta files"); @@ -1028,7 +1028,7 @@ public class HoodieWriteClient extends AbstractHo } private void startCommit(String instantTime) { - LOG.info("Generate a new instant time " + instantTime); + LOG.info("Generate a new instant time {}", instantTime); HoodieTableMetaClient metaClient = createMetaClient(true); // if there are pending compactions, their instantTime must not be greater than that of this instant time metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending -> { @@ -1048,7 +1048,7 @@ public class HoodieWriteClient extends AbstractHo */ public Option scheduleCompaction(Option> extraMetadata) throws IOException { String instantTime = HoodieActiveTimeline.createNewInstantTime(); - LOG.info("Generate a new instant time " + instantTime); + LOG.info("Generate a new instant time {}", instantTime); boolean notEmpty = scheduleCompactionAtInstant(instantTime, extraMetadata); return notEmpty ? Option.of(instantTime) : Option.empty(); } @@ -1292,9 +1292,9 @@ public class HoodieWriteClient extends AbstractHo + config.getBasePath() + " at time " + compactionCommitTime, e); } } - LOG.info("Compacted successfully on commit " + compactionCommitTime); + LOG.info("Compacted successfully on commit {}", compactionCommitTime); } else { - LOG.info("Compaction did not run for commit " + compactionCommitTime); + LOG.info("Compaction did not run for commit {}", compactionCommitTime); } } @@ -1305,7 +1305,7 @@ public class HoodieWriteClient extends AbstractHo if (finalizeCtx != null) { Option durationInMs = Option.of(metrics.getDurationInMs(finalizeCtx.stop())); durationInMs.ifPresent(duration -> { - LOG.info("Finalize write elapsed time (milliseconds): " + duration); + LOG.info("Finalize write elapsed time (milliseconds): {}", duration); metrics.updateFinalizeWriteMetrics(duration, stats.size()); }); } @@ -1347,7 +1347,7 @@ public class HoodieWriteClient extends AbstractHo }); }); - LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata); + LOG.info("Committing Compaction {}. Finished with result {}", compactionCommitTime, metadata); HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); try { diff --git a/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java b/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java index 5afee3fd4..a95861729 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java @@ -26,9 +26,9 @@ import org.apache.hudi.common.util.NetworkUtils; import org.apache.hudi.timeline.service.TimelineService; import org.apache.hadoop.conf.Configuration; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.SparkConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; @@ -37,7 +37,7 @@ import java.io.IOException; */ public class EmbeddedTimelineService { - private static final Logger LOG = LogManager.getLogger(EmbeddedTimelineService.class); + private static final Logger LOG = LoggerFactory.getLogger(EmbeddedTimelineService.class); private int serverPort; private String hostAddr; @@ -72,13 +72,13 @@ public class EmbeddedTimelineService { public void startServer() throws IOException { server = new TimelineService(0, viewManager, hadoopConf.newCopy()); serverPort = server.startService(); - LOG.info("Started embedded timeline server at " + hostAddr + ":" + serverPort); + LOG.info("Started embedded timeline server at {} : {}", hostAddr, serverPort); } private void setHostAddrFromSparkConf(SparkConf sparkConf) { String hostAddr = sparkConf.get("spark.driver.host", null); if (hostAddr != null) { - LOG.info("Overriding hostIp to (" + hostAddr + ") found in spark-conf. It was " + this.hostAddr); + LOG.info("Overriding hostIp to ({}) found in spark-conf. It was {}", hostAddr, this.hostAddr); this.hostAddr = hostAddr; } else { LOG.warn("Unable to find driver bind address from spark config"); diff --git a/hudi-client/src/main/java/org/apache/hudi/index/hbase/DefaultHBaseQPSResourceAllocator.java b/hudi-client/src/main/java/org/apache/hudi/index/hbase/DefaultHBaseQPSResourceAllocator.java index e3a49041e..4c8f9c441 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/hbase/DefaultHBaseQPSResourceAllocator.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/hbase/DefaultHBaseQPSResourceAllocator.java @@ -20,12 +20,12 @@ package org.apache.hudi.index.hbase; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DefaultHBaseQPSResourceAllocator implements HBaseIndexQPSResourceAllocator { private HoodieWriteConfig hoodieWriteConfig; - private static final Logger LOG = LogManager.getLogger(DefaultHBaseQPSResourceAllocator.class); + private static final Logger LOG = LoggerFactory.getLogger(DefaultHBaseQPSResourceAllocator.class); public DefaultHBaseQPSResourceAllocator(HoodieWriteConfig hoodieWriteConfig) { this.hoodieWriteConfig = hoodieWriteConfig; @@ -46,7 +46,7 @@ public class DefaultHBaseQPSResourceAllocator implements HBaseIndexQPSResourceAl @Override public void releaseQPSResources() { // Do nothing, as there are no resources locked in default implementation - LOG.info(String.format("Release QPS resources called for %s with default implementation, do nothing", - this.hoodieWriteConfig.getHbaseTableName())); + LOG.info("Release QPS resources called for {} with default implementation, do nothing", + this.hoodieWriteConfig.getHbaseTableName()); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java index 677408b52..1bb46c14f 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java @@ -49,8 +49,6 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -64,6 +62,8 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -82,7 +82,7 @@ public class HBaseIndex extends HoodieIndex { private static final byte[] PARTITION_PATH_COLUMN = Bytes.toBytes("partition_path"); private static final int SLEEP_TIME_MILLISECONDS = 100; - private static final Logger LOG = LogManager.getLogger(HBaseIndex.class); + private static final Logger LOG = LoggerFactory.getLogger(HBaseIndex.class); private static Connection hbaseConnection = null; private HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = null; private float qpsFraction; @@ -114,7 +114,7 @@ public class HBaseIndex extends HoodieIndex { @VisibleForTesting public HBaseIndexQPSResourceAllocator createQPSResourceAllocator(HoodieWriteConfig config) { try { - LOG.info("createQPSResourceAllocator :" + config.getHBaseQPSResourceAllocatorClass()); + LOG.info("createQPSResourceAllocator : {}", config.getHBaseQPSResourceAllocatorClass()); final HBaseIndexQPSResourceAllocator resourceAllocator = (HBaseIndexQPSResourceAllocator) ReflectionUtils .loadClass(config.getHBaseQPSResourceAllocatorClass(), config); return resourceAllocator; @@ -323,7 +323,7 @@ public class HBaseIndex extends HoodieIndex { doPutsAndDeletes(hTable, puts, deletes); } catch (Exception e) { Exception we = new Exception("Error updating index for " + writeStatus, e); - LOG.error(we); + LOG.error("Error updating index for {}", writeStatus, e); writeStatus.setGlobalError(we); } writeStatusList.add(writeStatus); @@ -373,7 +373,7 @@ public class HBaseIndex extends HoodieIndex { HoodieTable hoodieTable) { final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config); setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc); - LOG.info("multiPutBatchSize: before hbase puts" + multiPutBatchSize); + LOG.info("multiPutBatchSize: before HBase puts {}", multiPutBatchSize); JavaRDD writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true); // caching the index updated status RDD writeStatusJavaRDD = writeStatusJavaRDD.persist(config.getWriteStatusStorageLevel()); @@ -401,15 +401,15 @@ public class HBaseIndex extends HoodieIndex { this.numRegionServersForTable = getNumRegionServersAliveForTable(); final float desiredQPSFraction = hBaseIndexQPSResourceAllocator.calculateQPSFractionForPutsTime(numPuts, this.numRegionServersForTable); - LOG.info("Desired QPSFraction :" + desiredQPSFraction); - LOG.info("Number HBase puts :" + numPuts); - LOG.info("Hbase Puts Parallelism :" + hbasePutsParallelism); + LOG.info("Desired QPSFraction : {}", desiredQPSFraction); + LOG.info("Number HBase puts : {}", numPuts); + LOG.info("HBase Puts Parallelism : {}", hbasePutsParallelism); final float availableQpsFraction = hBaseIndexQPSResourceAllocator.acquireQPSResources(desiredQPSFraction, numPuts); LOG.info("Allocated QPS Fraction :" + availableQpsFraction); multiPutBatchSize = putBatchSizeCalculator.getBatchSize(numRegionServersForTable, maxQpsPerRegionServer, hbasePutsParallelism, maxExecutors, SLEEP_TIME_MILLISECONDS, availableQpsFraction); - LOG.info("multiPutBatchSize :" + multiPutBatchSize); + LOG.info("multiPutBatchSize : {}", multiPutBatchSize); } } @@ -423,7 +423,7 @@ public class HBaseIndex extends HoodieIndex { public static class HbasePutBatchSizeCalculator implements Serializable { private static final int MILLI_SECONDS_IN_A_SECOND = 1000; - private static final Logger LOG = LogManager.getLogger(HbasePutBatchSizeCalculator.class); + private static final Logger LOG = LoggerFactory.getLogger(HbasePutBatchSizeCalculator.class); /** * Calculate putBatch size so that sum of requests across multiple jobs in a second does not exceed @@ -465,15 +465,15 @@ public class HBaseIndex extends HoodieIndex { int maxParallelPuts = Math.max(1, Math.min(numTasks, maxExecutors)); int maxReqsSentPerTaskPerSec = MILLI_SECONDS_IN_A_SECOND / sleepTimeMs; int multiPutBatchSize = Math.max(1, maxReqPerSec / (maxParallelPuts * maxReqsSentPerTaskPerSec)); - LOG.info("HbaseIndexThrottling: qpsFraction :" + qpsFraction); - LOG.info("HbaseIndexThrottling: numRSAlive :" + numRSAlive); - LOG.info("HbaseIndexThrottling: maxReqPerSec :" + maxReqPerSec); - LOG.info("HbaseIndexThrottling: numTasks :" + numTasks); - LOG.info("HbaseIndexThrottling: maxExecutors :" + maxExecutors); - LOG.info("HbaseIndexThrottling: maxParallelPuts :" + maxParallelPuts); - LOG.info("HbaseIndexThrottling: maxReqsSentPerTaskPerSec :" + maxReqsSentPerTaskPerSec); - LOG.info("HbaseIndexThrottling: numRegionServersForTable :" + numRegionServersForTable); - LOG.info("HbaseIndexThrottling: multiPutBatchSize :" + multiPutBatchSize); + LOG.info("HBaseIndexThrottling: qpsFraction : {}", qpsFraction); + LOG.info("HBaseIndexThrottling: numRSAlive : {}", numRSAlive); + LOG.info("HBaseIndexThrottling: maxReqPerSec : {}", maxReqPerSec); + LOG.info("HBaseIndexThrottling: numTasks : {}", numTasks); + LOG.info("HBaseIndexThrottling: maxExecutors : {}", maxExecutors); + LOG.info("HBaseIndexThrottling: maxParallelPuts : {}", maxParallelPuts); + LOG.info("HBaseIndexThrottling: maxReqsSentPerTaskPerSec : {}", maxReqsSentPerTaskPerSec); + LOG.info("HBaseIndexThrottling: numRegionServersForTable : {}", numRegionServersForTable); + LOG.info("HBaseIndexThrottling: multiPutBatchSize : {}", multiPutBatchSize); return multiPutBatchSize; } } @@ -488,7 +488,7 @@ public class HBaseIndex extends HoodieIndex { .toIntExact(regionLocator.getAllRegionLocations().stream().map(e -> e.getServerName()).distinct().count()); return numRegionServersForTable; } catch (IOException e) { - LOG.error(e); + LOG.error("Error while connecting HBase:", e); throw new RuntimeException(e); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index a4400d03e..58eafac09 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -47,10 +47,10 @@ import com.google.common.collect.Maps; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.TaskContext; import org.apache.spark.util.SizeEstimator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; @@ -64,7 +64,7 @@ import java.util.concurrent.atomic.AtomicLong; */ public class HoodieAppendHandle extends HoodieWriteHandle { - private static final Logger LOG = LogManager.getLogger(HoodieAppendHandle.class); + private static final Logger LOG = LoggerFactory.getLogger(HoodieAppendHandle.class); // This acts as the sequenceID for records written private static AtomicLong recordIndex = new AtomicLong(1); private final String fileId; @@ -123,7 +123,7 @@ public class HoodieAppendHandle extends HoodieWri } else { // This means there is no base data file, start appending to a new log file fileSlice = Option.of(new FileSlice(partitionPath, baseInstantTime, this.fileId)); - LOG.info("New InsertHandle for partition :" + partitionPath); + LOG.info("New InsertHandle for partition : {}", partitionPath); } writeStatus.getStat().setPrevCommit(baseInstantTime); writeStatus.setFileId(fileId); @@ -137,7 +137,7 @@ public class HoodieAppendHandle extends HoodieWri ((HoodieDeltaWriteStat) writeStatus.getStat()).setLogVersion(currentLogFile.getLogVersion()); ((HoodieDeltaWriteStat) writeStatus.getStat()).setLogOffset(writer.getCurrentSize()); } catch (Exception e) { - LOG.error("Error in update task at commit " + instantTime, e); + LOG.error("Error in update task at commit {}", instantTime, e); writeStatus.setGlobalError(e); throw new HoodieUpsertException("Failed to initialize HoodieAppendHandle for FileId: " + fileId + " on commit " + instantTime + " on HDFS path " + hoodieTable.getMetaClient().getBasePath() + partitionPath, e); @@ -179,7 +179,7 @@ public class HoodieAppendHandle extends HoodieWri hoodieRecord.deflate(); return avroRecord; } catch (Exception e) { - LOG.error("Error writing record " + hoodieRecord, e); + LOG.error("Error writing record {}", hoodieRecord, e); writeStatus.markFailure(hoodieRecord, e, recordMetadata); } return Option.empty(); @@ -232,7 +232,7 @@ public class HoodieAppendHandle extends HoodieWri // Not throwing exception from here, since we don't want to fail the entire job // for a single record writeStatus.markFailure(record, t, recordMetadata); - LOG.error("Error writing record " + record, t); + LOG.error("Error writing record {}", record, t); } } @@ -259,8 +259,8 @@ public class HoodieAppendHandle extends HoodieWri runtimeStats.setTotalUpsertTime(timer.endTimer()); stat.setRuntimeStats(runtimeStats); - LOG.info(String.format("AppendHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(), - stat.getFileId(), runtimeStats.getTotalUpsertTime())); + LOG.info("AppendHandle for partitionPath {} fileID {}, took {} ms.", stat.getPartitionPath(), + stat.getFileId(), runtimeStats.getTotalUpsertTime()); return writeStatus; } catch (IOException e) { @@ -308,7 +308,7 @@ public class HoodieAppendHandle extends HoodieWri if (numberOfRecords >= (int) (maxBlockSize / averageRecordSize)) { // Recompute averageRecordSize before writing a new block and update existing value with // avg of new and old - LOG.info("AvgRecordSize => " + averageRecordSize); + LOG.info("AvgRecordSize => {}", averageRecordSize); averageRecordSize = (averageRecordSize + SizeEstimator.estimate(record)) / 2; doAppend(header); estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords; diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java index 5f5aa15fe..e8a06a6b8 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java @@ -39,8 +39,8 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; @@ -61,7 +61,7 @@ import java.util.stream.Collectors; */ public class HoodieCleanHelper> implements Serializable { - private static final Logger LOG = LogManager.getLogger(HoodieCleanHelper.class); + private static final Logger LOG = LoggerFactory.getLogger(HoodieCleanHelper.class); private final SyncableFileSystemView fileSystemView; private final HoodieTimeline commitTimeline; @@ -100,8 +100,7 @@ public class HoodieCleanHelper> implements Seri if ((cleanMetadata.getEarliestCommitToRetain() != null) && (cleanMetadata.getEarliestCommitToRetain().length() > 0)) { LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed " - + "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain() - + ". New Instant to retain : " + newInstantToRetain); + + "since last cleaned at {}. New Instant to retain : {}", cleanMetadata.getEarliestCommitToRetain(), newInstantToRetain); return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(instant -> { return HoodieTimeline.compareTimestamps(instant.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(), HoodieTimeline.GREATER_OR_EQUAL) && HoodieTimeline.compareTimestamps(instant.getTimestamp(), @@ -129,8 +128,7 @@ public class HoodieCleanHelper> implements Seri * single file (i.e run it with versionsRetained = 1) */ private List getFilesToCleanKeepingLatestVersions(String partitionPath) throws IOException { - LOG.info("Cleaning " + partitionPath + ", retaining latest " + config.getCleanerFileVersionsRetained() - + " file versions. "); + LOG.info("Cleaning {}, retaining latest {} file versions. ", partitionPath, config.getCleanerFileVersionsRetained()); List fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList()); List deletePaths = new ArrayList<>(); // Collect all the datafiles savepointed by all the savepoints @@ -189,7 +187,7 @@ public class HoodieCleanHelper> implements Seri */ private List getFilesToCleanKeepingLatestCommits(String partitionPath) throws IOException { int commitsRetained = config.getCleanerCommitsRetained(); - LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. "); + LOG.info("Cleaning {}, retaining latest {} commits. ", partitionPath, commitsRetained); List deletePaths = new ArrayList<>(); // Collect all the datafiles savepointed by all the savepoints @@ -276,7 +274,7 @@ public class HoodieCleanHelper> implements Seri } else { throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name()); } - LOG.info(deletePaths.size() + " patterns used to delete in partition path:" + partitionPath); + LOG.info("{} patterns used to delete in partition path: {}", deletePaths.size(), partitionPath); return deletePaths; } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java index e918b709c..c4ce91a56 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java @@ -54,9 +54,9 @@ import com.google.common.collect.Sets; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; @@ -71,7 +71,7 @@ import java.util.stream.Stream; */ public class HoodieCommitArchiveLog { - private static final Logger LOG = LogManager.getLogger(HoodieCommitArchiveLog.class); + private static final Logger LOG = LoggerFactory.getLogger(HoodieCommitArchiveLog.class); private final Path archiveFilePath; private final HoodieTableMetaClient metaClient; @@ -118,9 +118,9 @@ public class HoodieCommitArchiveLog { boolean success = true; if (!instantsToArchive.isEmpty()) { this.writer = openWriter(); - LOG.info("Archiving instants " + instantsToArchive); + LOG.info("Archiving instants {}", instantsToArchive); archive(instantsToArchive); - LOG.info("Deleting archived instants " + instantsToArchive); + LOG.info("Deleting archived instants {}", instantsToArchive); success = deleteArchivedInstants(instantsToArchive); } else { LOG.info("No Instants to archive"); @@ -189,14 +189,14 @@ public class HoodieCommitArchiveLog { } private boolean deleteArchivedInstants(List archivedInstants) throws IOException { - LOG.info("Deleting instants " + archivedInstants); + LOG.info("Deleting instants {}", archivedInstants); boolean success = true; for (HoodieInstant archivedInstant : archivedInstants) { Path commitFile = new Path(metaClient.getMetaPath(), archivedInstant.getFileName()); try { if (metaClient.getFs().exists(commitFile)) { success &= metaClient.getFs().delete(commitFile, false); - LOG.info("Archived and deleted instant file " + commitFile); + LOG.info("Archived and deleted instant file {}", commitFile); } } catch (IOException e) { throw new HoodieIOException("Failed to delete archived instant " + archivedInstant, e); @@ -208,7 +208,7 @@ public class HoodieCommitArchiveLog { return i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION) || (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION))); }).max(Comparator.comparing(HoodieInstant::getTimestamp))); - LOG.info("Latest Committed Instant=" + latestCommitted); + LOG.info("Latest Committed Instant={}", latestCommitted); if (latestCommitted.isPresent()) { success &= deleteAllInstantsOlderorEqualsInAuxMetaFolder(latestCommitted.get()); } @@ -236,7 +236,7 @@ public class HoodieCommitArchiveLog { Path metaFile = new Path(metaClient.getMetaAuxiliaryPath(), deleteInstant.getFileName()); if (metaClient.getFs().exists(metaFile)) { success &= metaClient.getFs().delete(metaFile, false); - LOG.info("Deleted instant file in auxiliary metapath : " + metaFile); + LOG.info("Deleted instant file in auxiliary metapath : {}", metaFile); } } return success; @@ -246,7 +246,7 @@ public class HoodieCommitArchiveLog { try { HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants(); Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema(); - LOG.info("Wrapper schema " + wrapperSchema.toString()); + LOG.info("Wrapper schema {}", wrapperSchema.toString()); List records = new ArrayList<>(); for (HoodieInstant hoodieInstant : instants) { try { @@ -255,7 +255,7 @@ public class HoodieCommitArchiveLog { writeToFile(wrapperSchema, records); } } catch (Exception e) { - LOG.error("Failed to archive commits, .commit file: " + hoodieInstant.getFileName(), e); + LOG.error("Failed to archive commits, commit file: {}", hoodieInstant.getFileName(), e); if (this.config.isFailOnTimelineArchivingEnabled()) { throw e; } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 095e0a0e3..bece8810d 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -36,16 +36,16 @@ import org.apache.hudi.table.HoodieTable; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.TaskContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Iterator; public class HoodieCreateHandle extends HoodieWriteHandle { - private static final Logger LOG = LogManager.getLogger(HoodieCreateHandle.class); + private static final Logger LOG = LoggerFactory.getLogger(HoodieCreateHandle.class); private final HoodieStorageWriter storageWriter; private final Path path; @@ -73,7 +73,7 @@ public class HoodieCreateHandle extends HoodieWri } catch (IOException e) { throw new HoodieInsertException("Failed to initialize HoodieStorageWriter for path " + path, e); } - LOG.info("New CreateHandle for partition :" + partitionPath + " with fileId " + fileId); + LOG.info("New CreateHandle for partition : {} with fileId {}", partitionPath, fileId); } /** @@ -120,7 +120,7 @@ public class HoodieCreateHandle extends HoodieWri // Not throwing exception from here, since we don't want to fail the entire job // for a single record writeStatus.markFailure(record, t, recordMetadata); - LOG.error("Error writing record " + record, t); + LOG.error("Error writing record {}", record, t); } } @@ -152,8 +152,7 @@ public class HoodieCreateHandle extends HoodieWri */ @Override public WriteStatus close() { - LOG - .info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten); + LOG.info("Closing the file {} as we are done with all the records {}", writeStatus.getFileId(), recordsWritten); try { storageWriter.close(); @@ -175,8 +174,8 @@ public class HoodieCreateHandle extends HoodieWri stat.setRuntimeStats(runtimeStats); writeStatus.setStat(stat); - LOG.info(String.format("CreateHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(), - stat.getFileId(), runtimeStats.getTotalCreateTime())); + LOG.info("CreateHandle for partitionPath {} fileID {}, took {} ms.", stat.getPartitionPath(), + stat.getFileId(), runtimeStats.getTotalCreateTime()); return writeStatus; } catch (IOException e) { diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java index 9f3bdbbdd..45472bcea 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java @@ -31,8 +31,8 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashSet; @@ -44,7 +44,7 @@ import java.util.Set; */ public class HoodieKeyLookupHandle extends HoodieReadHandle { - private static final Logger LOG = LogManager.getLogger(HoodieKeyLookupHandle.class); + private static final Logger LOG = LoggerFactory.getLogger(HoodieKeyLookupHandle.class); private final HoodieTableType tableType; @@ -63,7 +63,7 @@ public class HoodieKeyLookupHandle extends Hoodie HoodieTimer timer = new HoodieTimer().startTimer(); this.bloomFilter = ParquetUtils.readBloomFilterFromParquetMetadata(hoodieTable.getHadoopConf(), new Path(getLatestDataFile().getPath())); - LOG.info(String.format("Read bloom filter from %s in %d ms", partitionPathFilePair, timer.endTimer())); + LOG.info("Read bloom filter from {} in {} ms", partitionPathFilePair, timer.endTimer()); } /** @@ -82,7 +82,7 @@ public class HoodieKeyLookupHandle extends Hoodie LOG.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath, timer.endTimer(), candidateRecordKeys.size(), foundRecordKeys.size())); if (LOG.isDebugEnabled()) { - LOG.debug("Keys matching for file " + filePath + " => " + foundRecordKeys); + LOG.debug("Keys matching for file {} => {}", filePath, foundRecordKeys); } } } catch (Exception e) { @@ -98,7 +98,7 @@ public class HoodieKeyLookupHandle extends Hoodie // check record key against bloom filter of current file & add to possible keys if needed if (bloomFilter.mightContain(recordKey)) { if (LOG.isDebugEnabled()) { - LOG.debug("Record key " + recordKey + " matches bloom filter in " + partitionPathFilePair); + LOG.debug("Record key {} matches bloom filter in {}", recordKey, partitionPathFilePair); } candidateRecordKeys.add(recordKey); } @@ -110,15 +110,14 @@ public class HoodieKeyLookupHandle extends Hoodie */ public KeyLookupResult getLookupResult() { if (LOG.isDebugEnabled()) { - LOG.debug("#The candidate row keys for " + partitionPathFilePair + " => " + candidateRecordKeys); + LOG.debug("#The candidate row keys for {} => {}", partitionPathFilePair, candidateRecordKeys); } HoodieDataFile dataFile = getLatestDataFile(); List matchingKeys = checkCandidatesAgainstFile(hoodieTable.getHadoopConf(), candidateRecordKeys, new Path(dataFile.getPath())); - LOG.info( - String.format("Total records (%d), bloom filter candidates (%d)/fp(%d), actual matches (%d)", totalKeysChecked, - candidateRecordKeys.size(), candidateRecordKeys.size() - matchingKeys.size(), matchingKeys.size())); + LOG.info("Total records ({}), bloom filter candidates ({})/fp({}), actual matches ({})", totalKeysChecked, + candidateRecordKeys.size(), candidateRecordKeys.size() - matchingKeys.size(), matchingKeys.size()); return new KeyLookupResult(partitionPathFilePair.getRight(), partitionPathFilePair.getLeft(), dataFile.getCommitTime(), matchingKeys); } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 518b88334..0c801e782 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -43,9 +43,9 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.TaskContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashSet; @@ -56,7 +56,7 @@ import java.util.Set; @SuppressWarnings("Duplicates") public class HoodieMergeHandle extends HoodieWriteHandle { - private static final Logger LOG = LogManager.getLogger(HoodieMergeHandle.class); + private static final Logger LOG = LoggerFactory.getLogger(HoodieMergeHandle.class); private Map> keyToNewRecords; private Set writtenRecordKeys; @@ -137,7 +137,7 @@ public class HoodieMergeHandle extends HoodieWrit if (exception.isPresent() && exception.get() instanceof Throwable) { // Not throwing exception from here, since we don't want to fail the entire job for a single record writeStatus.markFailure(record, exception.get(), recordMetadata); - LOG.error("Error writing record " + record, exception.get()); + LOG.error("Error writing record {}", record, exception.get()); } else { write(record, avroRecord); } @@ -155,7 +155,7 @@ public class HoodieMergeHandle extends HoodieWrit * Extract old file path, initialize StorageWriter and WriteStatus. */ private void init(String fileId, String partitionPath, HoodieDataFile dataFileToBeMerged) { - LOG.info("partitionPath:" + partitionPath + ", fileId to be merged:" + fileId); + LOG.info("partitionPath: {}, fileId to be merged: {}", partitionPath, fileId); this.writtenRecordKeys = new HashSet<>(); writeStatus.setStat(new HoodieWriteStat()); try { @@ -171,8 +171,7 @@ public class HoodieMergeHandle extends HoodieWrit + FSUtils.makeDataFileName(instantTime, writeToken, fileId)).toString(); newFilePath = new Path(config.getBasePath(), relativePath); - LOG.info(String.format("Merging new data into oldPath %s, as newPath %s", oldFilePath.toString(), - newFilePath.toString())); + LOG.info("Merging new data into oldPath {}, as newPath {}", oldFilePath.toString(), newFilePath.toString()); // file name is same for all records, in this bunch writeStatus.setFileId(fileId); writeStatus.setPartitionPath(partitionPath); @@ -187,7 +186,7 @@ public class HoodieMergeHandle extends HoodieWrit storageWriter = HoodieStorageWriterFactory.getStorageWriter(instantTime, newFilePath, hoodieTable, config, writerSchema); } catch (IOException io) { - LOG.error("Error in update task at commit " + instantTime, io); + LOG.error("Error in update task at commit {}", instantTime, io); writeStatus.setGlobalError(io); throw new HoodieUpsertException("Failed to initialize HoodieUpdateHandle for FileId: " + fileId + " on commit " + instantTime + " on path " + hoodieTable.getMetaClient().getBasePath(), io); @@ -201,7 +200,7 @@ public class HoodieMergeHandle extends HoodieWrit try { // Load the new records in a map long memoryForMerge = config.getMaxMemoryPerPartitionMerge(); - LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge); + LOG.info("MaxMemoryPerPartitionMerge => {}", memoryForMerge); this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(originalSchema)); } catch (IOException io) { @@ -218,12 +217,10 @@ public class HoodieMergeHandle extends HoodieWrit // NOTE: Once Records are added to map (spillable-map), DO NOT change it as they won't persist keyToNewRecords.put(record.getRecordKey(), record); } - LOG.info("Number of entries in MemoryBasedMap => " - + ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries() - + "Total size in bytes of MemoryBasedMap => " - + ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize() + "Number of entries in DiskBasedMap => " - + ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries() + "Size of file spilled to disk => " - + ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes()); + LOG.info("Number of entries in MemoryBasedMap => {}. Total size in bytes of MemoryBasedMap => {}. " + + "Number of entries in DiskBasedMap => {}. Size of file spilled to disk => {}", + ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries(), ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize(), + ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries(), ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes()); return partitionPath; } @@ -253,7 +250,7 @@ public class HoodieMergeHandle extends HoodieWrit hoodieRecord.deflate(); return true; } catch (Exception e) { - LOG.error("Error writing record " + hoodieRecord, e); + LOG.error("Error writing record {}", hoodieRecord, e); writeStatus.markFailure(hoodieRecord, e, recordMetadata); } return false; @@ -295,12 +292,12 @@ public class HoodieMergeHandle extends HoodieWrit try { storageWriter.writeAvro(key, oldRecord); } catch (ClassCastException e) { - LOG.error("Schema mismatch when rewriting old record " + oldRecord + " from file " + getOldFilePath() - + " to file " + newFilePath + " with writerSchema " + writerSchema.toString(true)); + LOG.error("Schema mismatch when rewriting old record {} from file {} to file {} with writerSchema {}", + oldRecord, getOldFilePath(), newFilePath, writerSchema.toString(true)); throw new HoodieUpsertException(errMsg, e); } catch (IOException e) { - LOG.error("Failed to merge old record into new file for key " + key + " from old file " + getOldFilePath() - + " to new file " + newFilePath, e); + LOG.error("Failed to merge old record into new file for key {} from old file {} to new file {}", + key, getOldFilePath(), newFilePath, e); throw new HoodieUpsertException(errMsg, e); } recordsWritten++; @@ -345,6 +342,8 @@ public class HoodieMergeHandle extends HoodieWrit LOG.info(String.format("MergeHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(), stat.getFileId(), runtimeStats.getTotalUpsertTime())); + LOG.info("MergeHandle for partitionPath {} fileID {}, took {} ms.", stat.getPartitionPath(), + stat.getFileId(), runtimeStats.getTotalUpsertTime()); return writeStatus; } catch (IOException e) { diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 7a1939a47..50256bcf3 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -36,9 +36,9 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.TaskContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; @@ -47,7 +47,7 @@ import java.io.IOException; */ public abstract class HoodieWriteHandle extends HoodieIOHandle { - private static final Logger LOG = LogManager.getLogger(HoodieWriteHandle.class); + private static final Logger LOG = LoggerFactory.getLogger(HoodieWriteHandle.class); protected final Schema originalSchema; protected final Schema writerSchema; protected HoodieTimer timer; @@ -97,7 +97,7 @@ public abstract class HoodieWriteHandle extends H protected void createMarkerFile(String partitionPath) { Path markerPath = makeNewMarkerPath(partitionPath); try { - LOG.info("Creating Marker Path=" + markerPath); + LOG.info("Creating Marker Path={}", markerPath); fs.create(markerPath, false).close(); } catch (IOException e) { throw new HoodieException("Failed to create marker file " + markerPath, e); @@ -147,7 +147,7 @@ public abstract class HoodieWriteHandle extends H if (exception.isPresent() && exception.get() instanceof Throwable) { // Not throwing exception from here, since we don't want to fail the entire job for a single record writeStatus.markFailure(record, exception.get(), recordMetadata); - LOG.error("Error writing record " + record, exception.get()); + LOG.error("Error writing record {}", record, exception.get()); } else { write(record, avroRecord); } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java b/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java index 6f976014a..1d02d4ea3 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java @@ -47,13 +47,13 @@ import com.google.common.collect.Sets; import org.apache.avro.Schema; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.util.AccumulatorV2; import org.apache.spark.util.LongAccumulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collection; @@ -74,7 +74,7 @@ import static java.util.stream.Collectors.toList; */ public class HoodieRealtimeTableCompactor implements HoodieCompactor { - private static final Logger LOG = LogManager.getLogger(HoodieRealtimeTableCompactor.class); + private static final Logger LOG = LoggerFactory.getLogger(HoodieRealtimeTableCompactor.class); // Accumulator to keep track of total log files for a dataset private AccumulatorV2 totalLogFiles; // Accumulator to keep track of total log file slices for a dataset @@ -92,7 +92,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc); List operations = compactionPlan.getOperations().stream() .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); - LOG.info("Compactor compacting " + operations + " files"); + LOG.info("Compactor compacting {} files", operations); return jsc.parallelize(operations, operations.size()) .map(s -> compact(table, metaClient, config, s, compactionInstantTime)).flatMap(List::iterator); @@ -103,8 +103,8 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { FileSystem fs = metaClient.getFs(); Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); - LOG.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames() - + " for commit " + commitTime); + LOG.info("Compacting base {} with delta files {} for commit {}", + operation.getDataFileName(), operation.getDeltaFileNames(), commitTime); // TODO - FIX THIS // Reads the entire avro file. Always only specific blocks should be read from the avro file // (failure recover). @@ -115,7 +115,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { .getActiveTimeline().getTimelineOfActions(Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) .filterCompletedInstants().lastInstant().get().getTimestamp(); - LOG.info("MaxMemoryPerCompaction => " + config.getMaxMemoryPerCompaction()); + LOG.info("MaxMemoryPerCompaction => {}", config.getMaxMemoryPerCompaction()); List logFiles = operation.getDeltaFileNames().stream().map( p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString()) @@ -176,7 +176,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { // TODO : check if maxMemory is not greater than JVM or spark.executor memory // TODO - rollback any compactions in flight HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); - LOG.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommitTime); + LOG.info("Compacting {} with commit {}", metaClient.getBasePath(), compactionCommitTime); List partitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), config.shouldAssumeDatePartitioning()); @@ -189,7 +189,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { } RealtimeView fileSystemView = hoodieTable.getRTFileSystemView(); - LOG.info("Compaction looking for files to compact in " + partitionPaths + " partitions"); + LOG.info("Compaction looking for files to compact in {} partitions", partitionPaths); List operations = jsc.parallelize(partitionPaths, partitionPaths.size()) .flatMap((FlatMapFunction) partitionPath -> fileSystemView .getLatestFileSlices(partitionPath) @@ -206,10 +206,10 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { config.getCompactionStrategy().captureMetrics(config, dataFile, partitionPath, logFiles)); }).filter(c -> !c.getDeltaFileNames().isEmpty()).collect(toList()).iterator()) .collect().stream().map(CompactionUtils::buildHoodieCompactionOperation).collect(toList()); - LOG.info("Total of " + operations.size() + " compactions are retrieved"); - LOG.info("Total number of latest files slices " + totalFileSlices.value()); - LOG.info("Total number of log files " + totalLogFiles.value()); - LOG.info("Total number of file slices " + totalFileSlices.value()); + LOG.info("Total of {} compactions are retrieved", operations.size()); + LOG.info("Total number of latest files slices {}", totalFileSlices.value()); + LOG.info("Total number of log files {}", totalLogFiles.value()); + LOG.info("Total number of file slices {}", totalFileSlices.value()); // Filter the compactions with the passed in filter. This lets us choose most effective // compactions only HoodieCompactionPlan compactionPlan = config.getCompactionStrategy().generateCompactionPlan(config, operations, @@ -221,7 +221,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { + "Please fix your strategy implementation. FileIdsWithPendingCompactions :" + fgIdsInPendingCompactions + ", Selected workload :" + compactionPlan); if (compactionPlan.getOperations().isEmpty()) { - LOG.warn("After filtering, Nothing to compact for " + metaClient.getBasePath()); + LOG.warn("After filtering, Nothing to compact for {}", metaClient.getBasePath()); } return compactionPlan; } diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java b/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java index 0cfd5c62b..a2922fd38 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java +++ b/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java @@ -24,15 +24,15 @@ import org.apache.hudi.config.HoodieWriteConfig; import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Wrapper for metrics-related operations. */ public class HoodieMetrics { - private static final Logger LOG = LogManager.getLogger(HoodieMetrics.class); + private static final Logger LOG = LoggerFactory.getLogger(HoodieMetrics.class); // Some timers public String rollbackTimerName = null; public String cleanTimerName = null; @@ -155,8 +155,7 @@ public class HoodieMetrics { public void updateRollbackMetrics(long durationInMs, long numFilesDeleted) { if (config.isMetricsOn()) { - LOG.info( - String.format("Sending rollback metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted)); + LOG.info("Sending rollback metrics (duration={}, numFilesDeleted={})", durationInMs, numFilesDeleted); Metrics.registerGauge(getMetricsName("rollback", "duration"), durationInMs); Metrics.registerGauge(getMetricsName("rollback", "numFilesDeleted"), numFilesDeleted); } @@ -164,8 +163,7 @@ public class HoodieMetrics { public void updateCleanMetrics(long durationInMs, int numFilesDeleted) { if (config.isMetricsOn()) { - LOG.info( - String.format("Sending clean metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted)); + LOG.info("Sending clean metrics (duration={}, numFilesDeleted={})", durationInMs, numFilesDeleted); Metrics.registerGauge(getMetricsName("clean", "duration"), durationInMs); Metrics.registerGauge(getMetricsName("clean", "numFilesDeleted"), numFilesDeleted); } @@ -173,8 +171,7 @@ public class HoodieMetrics { public void updateFinalizeWriteMetrics(long durationInMs, long numFilesFinalized) { if (config.isMetricsOn()) { - LOG.info(String.format("Sending finalize write metrics (duration=%d, numFilesFinalized=%d)", durationInMs, - numFilesFinalized)); + LOG.info("Sending finalize write metrics (duration={}, numFilesFinalized={})", durationInMs, numFilesFinalized); Metrics.registerGauge(getMetricsName("finalize", "duration"), durationInMs); Metrics.registerGauge(getMetricsName("finalize", "numFilesFinalized"), numFilesFinalized); } @@ -182,7 +179,7 @@ public class HoodieMetrics { public void updateIndexMetrics(final String action, final long durationInMs) { if (config.isMetricsOn()) { - LOG.info(String.format("Sending index metrics (%s.duration, %d)", action, durationInMs)); + LOG.info("Sending index metrics ({}.duration, {})", action, durationInMs); Metrics.registerGauge(getMetricsName("index", String.format("%s.duration", action)), durationInMs); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java b/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java index 98a6b30fc..d8c4ae6bd 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java +++ b/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java @@ -22,8 +22,8 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import com.google.common.base.Preconditions; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.management.remote.JMXConnectorServer; import javax.management.remote.JMXConnectorServerFactory; @@ -38,7 +38,7 @@ import java.rmi.registry.LocateRegistry; */ public class JmxMetricsReporter extends MetricsReporter { - private static final Logger LOG = LogManager.getLogger(JmxMetricsReporter.class); + private static final Logger LOG = LoggerFactory.getLogger(JmxMetricsReporter.class); private final JMXConnectorServer connector; private String host; private int port; diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java index 4b194416a..4d006840a 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java +++ b/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java @@ -24,8 +24,8 @@ import org.apache.hudi.exception.HoodieException; import com.codahale.metrics.Gauge; import com.codahale.metrics.MetricRegistry; import com.google.common.io.Closeables; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Closeable; @@ -33,7 +33,7 @@ import java.io.Closeable; * This is the main class of the metrics system. */ public class Metrics { - private static final Logger LOG = LogManager.getLogger(Metrics.class); + private static final Logger LOG = LoggerFactory.getLogger(Metrics.class); private static volatile boolean initialized = false; private static Metrics metrics = null; diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsGraphiteReporter.java b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsGraphiteReporter.java index aac6c708f..bb33a978a 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsGraphiteReporter.java +++ b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsGraphiteReporter.java @@ -24,8 +24,8 @@ import com.codahale.metrics.MetricFilter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.graphite.Graphite; import com.codahale.metrics.graphite.GraphiteReporter; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Closeable; import java.net.InetSocketAddress; @@ -36,7 +36,7 @@ import java.util.concurrent.TimeUnit; */ public class MetricsGraphiteReporter extends MetricsReporter { - private static final Logger LOG = LogManager.getLogger(MetricsGraphiteReporter.class); + private static final Logger LOG = LoggerFactory.getLogger(MetricsGraphiteReporter.class); private final MetricRegistry registry; private final GraphiteReporter graphiteReporter; private final HoodieWriteConfig config; diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java index b9d433d94..a80c1efbe 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java +++ b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java @@ -21,15 +21,15 @@ package org.apache.hudi.metrics; import org.apache.hudi.config.HoodieWriteConfig; import com.codahale.metrics.MetricRegistry; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Factory class for creating MetricsReporter. */ public class MetricsReporterFactory { - private static final Logger LOG = LogManager.getLogger(MetricsReporterFactory.class); + private static final Logger LOG = LoggerFactory.getLogger(MetricsReporterFactory.class); public static MetricsReporter createReporter(HoodieWriteConfig config, MetricRegistry registry) { MetricsReporterType type = config.getMetricsReporterType(); @@ -45,7 +45,7 @@ public class MetricsReporterFactory { reporter = new JmxMetricsReporter(config); break; default: - LOG.error("Reporter type[" + type + "] is not supported."); + LOG.error("Reporter type[{}] is not supported.", type); break; } return reporter; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java index f9447d34d..24553f5ec 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java @@ -58,8 +58,6 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.hadoop.ParquetReader; @@ -81,6 +79,8 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -92,7 +92,7 @@ import scala.Tuple2; */ public class HoodieCopyOnWriteTable extends HoodieTable { - private static final Logger LOG = LogManager.getLogger(HoodieCopyOnWriteTable.class); + private static final Logger LOG = LoggerFactory.getLogger(HoodieCopyOnWriteTable.class); public HoodieCopyOnWriteTable(HoodieWriteConfig config, JavaSparkContext jsc) { super(config, jsc); @@ -130,7 +130,7 @@ public class HoodieCopyOnWriteTable extends Hoodi try { boolean deleteResult = fs.delete(deletePath, false); if (deleteResult) { - LOG.debug("Cleaned file at path :" + deletePath); + LOG.debug("Cleaned file at path : {}", deletePath); } return deleteResult; } catch (FileNotFoundException fio) { @@ -172,7 +172,7 @@ public class HoodieCopyOnWriteTable extends Hoodi throws IOException { // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records if (!recordItr.hasNext()) { - LOG.info("Empty partition with fileId => " + fileId); + LOG.info("Empty partition with fileId => {}", fileId); return Collections.singletonList((List) Collections.EMPTY_LIST).iterator(); } // these are updates @@ -212,8 +212,8 @@ public class HoodieCopyOnWriteTable extends Hoodi // TODO(vc): This needs to be revisited if (upsertHandle.getWriteStatus().getPartitionPath() == null) { - LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", " - + upsertHandle.getWriteStatus()); + LOG.info("Upsert Handle has partition path as null {}, {}", upsertHandle.getOldFilePath(), + upsertHandle.getWriteStatus()); } return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator(); } @@ -291,8 +291,7 @@ public class HoodieCopyOnWriteTable extends Hoodi LOG.info("Nothing to clean here. It is already clean"); return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build(); } - LOG.info( - "Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy()); + LOG.info("Total Partitions to clean : {}, with policy {}", partitionsToClean.size(), config.getCleanerPolicy()); int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism()); LOG.info("Using cleanerParallelism: " + cleanerParallelism); @@ -318,7 +317,7 @@ public class HoodieCopyOnWriteTable extends Hoodi int cleanerParallelism = Math.min( (int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(x -> x.size()).count()), config.getCleanerParallelism()); - LOG.info("Using cleanerParallelism: " + cleanerParallelism); + LOG.info("Using cleanerParallelism: {}", cleanerParallelism); List> partitionCleanStats = jsc .parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream() .flatMap(x -> x.getValue().stream().map(y -> new Tuple2(x.getKey(), y))) @@ -355,7 +354,7 @@ public class HoodieCopyOnWriteTable extends Hoodi HoodieActiveTimeline activeTimeline = this.getActiveTimeline(); if (instant.isCompleted()) { - LOG.info("Unpublishing instant " + instant); + LOG.info("Unpublishing instant {}", instant); instant = activeTimeline.revertToInflight(instant); } @@ -365,7 +364,7 @@ public class HoodieCopyOnWriteTable extends Hoodi String commit = instant.getTimestamp(); // delete all the data files for this commit - LOG.info("Clean out all parquet files generated for commit: " + commit); + LOG.info("Clean out all parquet files generated for commit: {}", commit); List rollbackRequests = generateRollbackRequests(instant); //TODO: We need to persist this as rollback workload and use it in case of partial failures @@ -373,7 +372,7 @@ public class HoodieCopyOnWriteTable extends Hoodi } // Delete Inflight instant if enabled deleteInflightAndRequestedInstant(deleteInstants, activeTimeline, instant); - LOG.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime)); + LOG.info("Time(in ms) taken to finish rollback {}", (System.currentTimeMillis() - startTime)); return stats; } @@ -400,7 +399,7 @@ public class HoodieCopyOnWriteTable extends Hoodi // Remove the rolled back inflight commits if (deleteInstant) { - LOG.info("Deleting instant=" + instantToBeDeleted); + LOG.info("Deleting instant={}", instantToBeDeleted); activeTimeline.deletePending(instantToBeDeleted); if (instantToBeDeleted.isInflight() && !metaClient.getTimelineLayoutVersion().isNullVersion()) { // Delete corresponding requested instant @@ -408,9 +407,9 @@ public class HoodieCopyOnWriteTable extends Hoodi instantToBeDeleted.getTimestamp()); activeTimeline.deletePending(instantToBeDeleted); } - LOG.info("Deleted pending commit " + instantToBeDeleted); + LOG.info("Deleted pending commit {}", instantToBeDeleted); } else { - LOG.warn("Rollback finished without deleting inflight instant file. Instant=" + instantToBeDeleted); + LOG.warn("Rollback finished without deleting inflight instant file. Instant={}", instantToBeDeleted); } } @@ -579,9 +578,10 @@ public class HoodieCopyOnWriteTable extends Hoodi assignUpdates(profile); assignInserts(profile); - LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + bucketInfoMap + ", \n" - + "Partition to insert buckets => " + partitionPathToInsertBuckets + ", \n" - + "UpdateLocations mapped to buckets =>" + updateLocationToBucket); + LOG.info("Total Buckets :{}, buckets info => {}, \n" + + "Partition to insert buckets => {}, \n" + + "UpdateLocations mapped to buckets =>{}", + totalBuckets, bucketInfoMap, partitionPathToInsertBuckets, updateLocationToBucket); } private void assignUpdates(WorkloadProfile profile) { @@ -609,13 +609,13 @@ public class HoodieCopyOnWriteTable extends Hoodi long averageRecordSize = averageBytesPerRecord(metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(), config.getCopyOnWriteRecordSizeEstimate()); - LOG.info("AvgRecordSize => " + averageRecordSize); + LOG.info("AvgRecordSize => {}", averageRecordSize); for (String partitionPath : partitionPaths) { WorkloadStat pStat = profile.getWorkloadStat(partitionPath); if (pStat.getNumInserts() > 0) { List smallFiles = getSmallFiles(partitionPath); - LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles); + LOG.info("For partitionPath : {} Small Files => {}", partitionPath, smallFiles); long totalUnassignedInserts = pStat.getNumInserts(); List bucketNumbers = new ArrayList<>(); @@ -630,10 +630,10 @@ public class HoodieCopyOnWriteTable extends Hoodi int bucket; if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) { bucket = updateLocationToBucket.get(smallFile.location.getFileId()); - LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket); + LOG.info("Assigning {} inserts to existing update bucket {}", recordsToAppend, bucket); } else { bucket = addUpdateBucket(smallFile.location.getFileId()); - LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket); + LOG.info("Assigning {} inserts to new update bucket {}", recordsToAppend, bucket); } bucketNumbers.add(bucket); recordsPerBucket.add(recordsToAppend); @@ -649,8 +649,8 @@ public class HoodieCopyOnWriteTable extends Hoodi } int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket); - LOG.info("After small file assignment: unassignedInserts => " + totalUnassignedInserts - + ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket); + LOG.info("After small file assignment: unassignedInserts => {}, totalInsertBuckets => {}, " + + "recordsPerBucket => {}", totalUnassignedInserts, insertBuckets, insertRecordsPerBucket); for (int b = 0; b < insertBuckets; b++) { bucketNumbers.add(totalBuckets); recordsPerBucket.add(totalUnassignedInserts / insertBuckets); @@ -670,7 +670,7 @@ public class HoodieCopyOnWriteTable extends Hoodi bkt.weight = (1.0 * recordsPerBucket.get(i)) / pStat.getNumInserts(); insertBuckets.add(bkt); } - LOG.info("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets); + LOG.info("Total insert buckets for partition path {} => {}", partitionPath, insertBuckets); partitionPathToInsertBuckets.put(partitionPath, insertBuckets); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java index f2095fb50..c020cc942 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java @@ -44,11 +44,11 @@ import org.apache.hudi.io.compact.HoodieRealtimeTableCompactor; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.UncheckedIOException; @@ -77,7 +77,7 @@ import java.util.stream.Collectors; */ public class HoodieMergeOnReadTable extends HoodieCopyOnWriteTable { - private static final Logger LOG = LogManager.getLogger(HoodieMergeOnReadTable.class); + private static final Logger LOG = LoggerFactory.getLogger(HoodieMergeOnReadTable.class); // UpsertPartitioner for MergeOnRead table type private MergeOnReadUpsertPartitioner mergeOnReadUpsertPartitioner; @@ -98,10 +98,10 @@ public class HoodieMergeOnReadTable extends Hoodi @Override public Iterator> handleUpdate(String commitTime, String fileId, Iterator> recordItr) throws IOException { - LOG.info("Merging updates for commit " + commitTime + " for file " + fileId); + LOG.info("Merging updates for commit {} for file {}", commitTime, fileId); if (!index.canIndexLogFiles() && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) { - LOG.info("Small file corrections for updates for commit " + commitTime + " for file " + fileId); + LOG.info("Small file corrections for updates for commit {} for file {}", commitTime, fileId); return super.handleUpdate(commitTime, fileId, recordItr); } else { HoodieAppendHandle appendHandle = new HoodieAppendHandle<>(config, commitTime, this, fileId, recordItr); @@ -124,7 +124,7 @@ public class HoodieMergeOnReadTable extends Hoodi @Override public HoodieCompactionPlan scheduleCompaction(JavaSparkContext jsc, String instantTime) { - LOG.info("Checking if compaction needs to be run on " + config.getBasePath()); + LOG.info("Checking if compaction needs to be run on {}", config.getBasePath()); Option lastCompaction = getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant(); String deltaCommitsSinceTs = "0"; @@ -135,13 +135,12 @@ public class HoodieMergeOnReadTable extends Hoodi int deltaCommitsSinceLastCompaction = getActiveTimeline().getDeltaCommitTimeline() .findInstantsAfter(deltaCommitsSinceTs, Integer.MAX_VALUE).countInstants(); if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) { - LOG.info("Not running compaction as only " + deltaCommitsSinceLastCompaction - + " delta commits was found since last compaction " + deltaCommitsSinceTs + ". Waiting for " - + config.getInlineCompactDeltaCommitMax()); + LOG.info("Not running compaction as only {} delta commits was found since last compaction {}. Waiting for {}", + deltaCommitsSinceLastCompaction, deltaCommitsSinceTs, config.getInlineCompactDeltaCommitMax()); return new HoodieCompactionPlan(); } - LOG.info("Compacting merge on read table " + config.getBasePath()); + LOG.info("Compacting merge on read table {}", config.getBasePath()); HoodieRealtimeTableCompactor compactor = new HoodieRealtimeTableCompactor(); try { return compactor.generateCompactionPlan(jsc, this, config, instantTime, @@ -171,11 +170,11 @@ public class HoodieMergeOnReadTable extends Hoodi Long startTime = System.currentTimeMillis(); String commit = instant.getTimestamp(); - LOG.error("Rolling back instant " + instant); + LOG.error("Rolling back instant {}", instant); // Atomically un-publish all non-inflight commits if (instant.isCompleted()) { - LOG.error("Un-publishing instant " + instant + ", deleteInstants=" + deleteInstants); + LOG.error("Un-publishing instant {}, deleteInstants={}", instant, deleteInstants); instant = this.getActiveTimeline().revertToInflight(instant); } @@ -191,7 +190,7 @@ public class HoodieMergeOnReadTable extends Hoodi // For Requested State (like failure during index lookup), there is nothing to do rollback other than // deleting the timeline file if (!instant.isRequested()) { - LOG.info("Unpublished " + commit); + LOG.info("Unpublished {}", commit); List rollbackRequests = generateRollbackRequests(jsc, instant); // TODO: We need to persist this as rollback workload and use it in case of partial failures allRollbackStats = new RollbackExecutor(metaClient, config).performRollback(jsc, instant, rollbackRequests); @@ -200,7 +199,7 @@ public class HoodieMergeOnReadTable extends Hoodi // Delete Inflight instants if enabled deleteInflightAndRequestedInstant(deleteInstants, this.getActiveTimeline(), instant); - LOG.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime)); + LOG.info("Time(in ms) taken to finish rollback {}", (System.currentTimeMillis() - startTime)); return allRollbackStats; } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java index 63811c1e4..2f2105c2d 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -52,11 +52,11 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; @@ -73,7 +73,7 @@ import java.util.stream.Stream; */ public abstract class HoodieTable implements Serializable { - private static final Logger LOG = LogManager.getLogger(HoodieTable.class); + private static final Logger LOG = LoggerFactory.getLogger(HoodieTable.class); protected final HoodieWriteConfig config; protected final HoodieTableMetaClient metaClient; @@ -324,7 +324,7 @@ public abstract class HoodieTable implements Seri Path markerDir = new Path(metaClient.getMarkerFolderPath(instantTs)); if (fs.exists(markerDir)) { // For append only case, we do not write to marker dir. Hence, the above check - LOG.info("Removing marker directory=" + markerDir); + LOG.info("Removing marker directory={}", markerDir); fs.delete(markerDir, true); } } catch (IOException ioe) { @@ -363,7 +363,7 @@ public abstract class HoodieTable implements Seri invalidDataPaths.removeAll(validDataPaths); if (!invalidDataPaths.isEmpty()) { LOG.info( - "Removing duplicate data files created due to spark retries before committing. Paths=" + invalidDataPaths); + "Removing duplicate data files created due to spark retries before committing. Paths={}", invalidDataPaths); } Map>> groupByPartition = invalidDataPaths.stream() @@ -381,7 +381,7 @@ public abstract class HoodieTable implements Seri jsc.parallelize(new ArrayList<>(groupByPartition.values()), config.getFinalizeWriteParallelism()) .map(partitionWithFileList -> { final FileSystem fileSystem = metaClient.getFs(); - LOG.info("Deleting invalid data files=" + partitionWithFileList); + LOG.info("Deleting invalid data files={}", partitionWithFileList); if (partitionWithFileList.isEmpty()) { return true; } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java index 68aa6c6d5..a87b4bed4 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java @@ -36,8 +36,6 @@ import com.google.common.collect.Maps; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.PathFilter; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; @@ -48,6 +46,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -55,7 +55,7 @@ import scala.Tuple2; */ public class RollbackExecutor implements Serializable { - private static final Logger LOG = LogManager.getLogger(RollbackExecutor.class); + private static final Logger LOG = LoggerFactory.getLogger(RollbackExecutor.class); private final HoodieTableMetaClient metaClient; private final HoodieWriteConfig config; @@ -181,13 +181,13 @@ public class RollbackExecutor implements Serializable { */ private Map deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, Map results, String partitionPath, PathFilter filter) throws IOException { - LOG.info("Cleaning path " + partitionPath); + LOG.info("Cleaning path {}", partitionPath); FileSystem fs = metaClient.getFs(); FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); for (FileStatus file : toBeDeleted) { boolean success = fs.delete(file.getPath(), false); results.put(file, success); - LOG.info("Delete file " + file.getPath() + "\t" + success); + LOG.info("Delete file {} \t {}", file.getPath(), success); } return results; } @@ -197,7 +197,7 @@ public class RollbackExecutor implements Serializable { */ private Map deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, Map results, String commit, String partitionPath) throws IOException { - LOG.info("Cleaning path " + partitionPath); + LOG.info("Cleaning path {}", partitionPath); FileSystem fs = metaClient.getFs(); PathFilter filter = (path) -> { if (path.toString().contains(".parquet")) { @@ -210,7 +210,7 @@ public class RollbackExecutor implements Serializable { for (FileStatus file : toBeDeleted) { boolean success = fs.delete(file.getPath(), false); results.put(file, success); - LOG.info("Delete file " + file.getPath() + "\t" + success); + LOG.info("Delete file {} \t {}", file.getPath(), success); } return results; }