diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 9ff50fe61..e6abed677 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -43,7 +43,6 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; -import java.util.stream.Collectors; /** * Represents the Active Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the @@ -113,7 +112,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { // multiple casts will make this lambda serializable - // http://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.16 this.details = (Function> & Serializable) this::getInstantDetails; - LOG.info("Loaded instants " + getInstants().collect(Collectors.toList())); + LOG.info("Loaded instants upto : " + lastInstant()); } public HoodieActiveTimeline(HoodieTableMetaClient metaClient) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index f9a60654c..de5cf3990 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -279,7 +279,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV long beginLsTs = System.currentTimeMillis(); FileStatus[] statuses = listPartition(partitionPath); long endLsTs = System.currentTimeMillis(); - LOG.info("#files found in partition (" + partitionPathStr + ") =" + statuses.length + ", Time taken =" + LOG.debug("#files found in partition (" + partitionPathStr + ") =" + statuses.length + ", Time taken =" + (endLsTs - beginLsTs)); List groups = addFilesToView(statuses); @@ -293,7 +293,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV LOG.debug("View already built for Partition :" + partitionPathStr + ", FOUND is "); } long endTs = System.currentTimeMillis(); - LOG.info("Time to load partition (" + partitionPathStr + ") =" + (endTs - beginTs)); + LOG.debug("Time to load partition (" + partitionPathStr + ") =" + (endTs - beginTs)); return true; }); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java index f0c095f59..0bbbf3e9e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java @@ -317,7 +317,7 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem @Override protected void storePartitionView(String partitionPath, List fileGroups) { - LOG.info("Adding file-groups for partition :" + partitionPath + ", #FileGroups=" + fileGroups.size()); + LOG.debug("Adding file-groups for partition :" + partitionPath + ", #FileGroups=" + fileGroups.size()); List newList = new ArrayList<>(fileGroups); partitionToFileGroupsMap.put(partitionPath, newList); } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java index d4cdfc09c..6eea71644 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java @@ -64,7 +64,10 @@ public class HiveSchemaUtil { } catch (IOException e) { throw new HoodieHiveSyncException("Failed to convert parquet schema to hive schema", e); } - LOG.info("Getting schema difference for " + tableSchema + "\r\n\r\n" + newTableSchema); + if (LOG.isDebugEnabled()) { + LOG.debug("Getting schema difference for " + tableSchema + "\r\n\r\n" + newTableSchema); + } + SchemaDifference.Builder schemaDiffBuilder = SchemaDifference.newBuilder(storageSchema, tableSchema); Set tableColumns = new HashSet<>(); @@ -109,7 +112,9 @@ public class HiveSchemaUtil { schemaDiffBuilder.addTableColumn(entry.getKey(), entry.getValue()); } } - LOG.info("Difference between schemas: " + schemaDiffBuilder.build().toString()); + if (LOG.isDebugEnabled()) { + LOG.debug("Difference between schemas: " + schemaDiffBuilder.build().toString()); + } return schemaDiffBuilder.build(); } diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java index 09ebeb5cc..6b2a605b8 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java @@ -136,8 +136,8 @@ public class RequestHandler { if (isLocalViewBehind(ctx)) { HoodieTimeline localTimeline = viewManager.getFileSystemView(basePath).getTimeline(); LOG.info("Syncing view as client passed last known instant " + lastKnownInstantFromClient - + " as last known instant but server has the folling timeline :" - + localTimeline.getInstants().collect(Collectors.toList())); + + " as last known instant but server has the following last instant on timeline :" + + localTimeline.lastInstant()); view.sync(); return true; } @@ -457,7 +457,7 @@ public class RequestHandler { metricsRegistry.add("TOTAL_CHECK_TIME", finalCheckTimeTaken); metricsRegistry.add("TOTAL_API_CALLS", 1); - LOG.info(String.format( + LOG.debug(String.format( "TimeTakenMillis[Total=%d, Refresh=%d, handle=%d, Check=%d], " + "Success=%s, Query=%s, Host=%s, synced=%s", timeTakenMillis, refreshCheckTimeTaken, handleTimeTaken, finalCheckTimeTaken, success, diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 01a374d7e..37586ac2c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -694,7 +694,9 @@ public class DeltaSync implements Serializable { schemas.add(targetSchema); } - LOG.info("Registering Schema :" + schemas); + if (LOG.isDebugEnabled()) { + LOG.debug("Registering Schema: " + schemas); + } jssc.sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList()); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 7b0a2344d..a1066c7e9 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -63,6 +63,7 @@ import org.apache.spark.sql.SparkSession; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -437,6 +438,24 @@ public class HoodieDeltaStreamer implements Serializable { } } + private static String toSortedTruncatedString(TypedProperties props) { + List allKeys = new ArrayList<>(); + for (Object k : props.keySet()) { + allKeys.add(k.toString()); + } + Collections.sort(allKeys); + StringBuilder propsLog = new StringBuilder("Creating delta streamer with configs:\n"); + for (String key : allKeys) { + String value = Option.ofNullable(props.get(key)).orElse("").toString(); + // Truncate too long values. + if (value.length() > 255 && !LOG.isDebugEnabled()) { + value = value.substring(0, 128) + "[...]"; + } + propsLog.append(key).append(": ").append(value).append("\n"); + } + return propsLog.toString(); + } + public static final Config getConfig(String[] args) { Config cfg = new Config(); JCommander cmd = new JCommander(cfg, null, args); @@ -544,7 +563,8 @@ public class HoodieDeltaStreamer implements Serializable { "'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed."); this.props = properties.get(); - LOG.info("Creating delta streamer with configs : " + props.toString()); + LOG.info(toSortedTruncatedString(props)); + this.schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor( UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc), props, jssc, cfg.transformerClassNames); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java index 9429c89d0..7e5ed05f2 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java @@ -62,7 +62,7 @@ public class SqlQueryBasedTransformer implements Transformer { LOG.info("Registering tmp table : " + tmpTable); rowDataset.registerTempTable(tmpTable); String sqlStr = transformerSQL.replaceAll(SRC_PATTERN, tmpTable); - LOG.info("SQL Query for transformation : (" + sqlStr + ")"); + LOG.debug("SQL Query for transformation : (" + sqlStr + ")"); return sparkSession.sql(sqlStr); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java index 17450a0b5..92a3a0957 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java @@ -163,7 +163,12 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer { //insert updates for already existing records in kafka topics testUtils.sendMessages(topicName1, Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 5, HoodieTestDataGenerator.TRIP_SCHEMA))); testUtils.sendMessages(topicName2, Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA))); + + streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc); + streamer.getTableExecutionContexts().get(1).setProperties(properties); + streamer.getTableExecutionContexts().get(0).setProperties(properties1); streamer.sync(); + assertEquals(2, streamer.getSuccessTables().size()); assertTrue(streamer.getFailedTables().isEmpty());