[HUDI-1707] Reduces log level for too verbose messages from info to debug level. (#2714)
* Reduces log level for too verbose messages from info to debug level. * Sort config output. * Code Review : Small restructuring + rebasing to master - Fixing flaky multi delta streamer test - Using isDebugEnabled() checks - Some changes to shorten log message without moving to DEBUG Co-authored-by: volodymyr.burenin <volodymyr.burenin@cloudkitchens.com> Co-authored-by: Vinoth Chandar <vinoth@apache.org>
This commit is contained in:
committed by
GitHub
parent
511ac4881d
commit
8a48d16e41
@@ -43,7 +43,6 @@ import java.util.Objects;
|
|||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Represents the Active Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the
|
* Represents the Active Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the
|
||||||
@@ -113,7 +112,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
|||||||
// multiple casts will make this lambda serializable -
|
// multiple casts will make this lambda serializable -
|
||||||
// http://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.16
|
// http://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.16
|
||||||
this.details = (Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails;
|
this.details = (Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails;
|
||||||
LOG.info("Loaded instants " + getInstants().collect(Collectors.toList()));
|
LOG.info("Loaded instants upto : " + lastInstant());
|
||||||
}
|
}
|
||||||
|
|
||||||
public HoodieActiveTimeline(HoodieTableMetaClient metaClient) {
|
public HoodieActiveTimeline(HoodieTableMetaClient metaClient) {
|
||||||
|
|||||||
@@ -279,7 +279,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
|
|||||||
long beginLsTs = System.currentTimeMillis();
|
long beginLsTs = System.currentTimeMillis();
|
||||||
FileStatus[] statuses = listPartition(partitionPath);
|
FileStatus[] statuses = listPartition(partitionPath);
|
||||||
long endLsTs = System.currentTimeMillis();
|
long endLsTs = System.currentTimeMillis();
|
||||||
LOG.info("#files found in partition (" + partitionPathStr + ") =" + statuses.length + ", Time taken ="
|
LOG.debug("#files found in partition (" + partitionPathStr + ") =" + statuses.length + ", Time taken ="
|
||||||
+ (endLsTs - beginLsTs));
|
+ (endLsTs - beginLsTs));
|
||||||
List<HoodieFileGroup> groups = addFilesToView(statuses);
|
List<HoodieFileGroup> groups = addFilesToView(statuses);
|
||||||
|
|
||||||
@@ -293,7 +293,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
|
|||||||
LOG.debug("View already built for Partition :" + partitionPathStr + ", FOUND is ");
|
LOG.debug("View already built for Partition :" + partitionPathStr + ", FOUND is ");
|
||||||
}
|
}
|
||||||
long endTs = System.currentTimeMillis();
|
long endTs = System.currentTimeMillis();
|
||||||
LOG.info("Time to load partition (" + partitionPathStr + ") =" + (endTs - beginTs));
|
LOG.debug("Time to load partition (" + partitionPathStr + ") =" + (endTs - beginTs));
|
||||||
return true;
|
return true;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -317,7 +317,7 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void storePartitionView(String partitionPath, List<HoodieFileGroup> fileGroups) {
|
protected void storePartitionView(String partitionPath, List<HoodieFileGroup> fileGroups) {
|
||||||
LOG.info("Adding file-groups for partition :" + partitionPath + ", #FileGroups=" + fileGroups.size());
|
LOG.debug("Adding file-groups for partition :" + partitionPath + ", #FileGroups=" + fileGroups.size());
|
||||||
List<HoodieFileGroup> newList = new ArrayList<>(fileGroups);
|
List<HoodieFileGroup> newList = new ArrayList<>(fileGroups);
|
||||||
partitionToFileGroupsMap.put(partitionPath, newList);
|
partitionToFileGroupsMap.put(partitionPath, newList);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -64,7 +64,10 @@ public class HiveSchemaUtil {
|
|||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new HoodieHiveSyncException("Failed to convert parquet schema to hive schema", e);
|
throw new HoodieHiveSyncException("Failed to convert parquet schema to hive schema", e);
|
||||||
}
|
}
|
||||||
LOG.info("Getting schema difference for " + tableSchema + "\r\n\r\n" + newTableSchema);
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Getting schema difference for " + tableSchema + "\r\n\r\n" + newTableSchema);
|
||||||
|
}
|
||||||
|
|
||||||
SchemaDifference.Builder schemaDiffBuilder = SchemaDifference.newBuilder(storageSchema, tableSchema);
|
SchemaDifference.Builder schemaDiffBuilder = SchemaDifference.newBuilder(storageSchema, tableSchema);
|
||||||
Set<String> tableColumns = new HashSet<>();
|
Set<String> tableColumns = new HashSet<>();
|
||||||
|
|
||||||
@@ -109,7 +112,9 @@ public class HiveSchemaUtil {
|
|||||||
schemaDiffBuilder.addTableColumn(entry.getKey(), entry.getValue());
|
schemaDiffBuilder.addTableColumn(entry.getKey(), entry.getValue());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.info("Difference between schemas: " + schemaDiffBuilder.build().toString());
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Difference between schemas: " + schemaDiffBuilder.build().toString());
|
||||||
|
}
|
||||||
|
|
||||||
return schemaDiffBuilder.build();
|
return schemaDiffBuilder.build();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -136,8 +136,8 @@ public class RequestHandler {
|
|||||||
if (isLocalViewBehind(ctx)) {
|
if (isLocalViewBehind(ctx)) {
|
||||||
HoodieTimeline localTimeline = viewManager.getFileSystemView(basePath).getTimeline();
|
HoodieTimeline localTimeline = viewManager.getFileSystemView(basePath).getTimeline();
|
||||||
LOG.info("Syncing view as client passed last known instant " + lastKnownInstantFromClient
|
LOG.info("Syncing view as client passed last known instant " + lastKnownInstantFromClient
|
||||||
+ " as last known instant but server has the folling timeline :"
|
+ " as last known instant but server has the following last instant on timeline :"
|
||||||
+ localTimeline.getInstants().collect(Collectors.toList()));
|
+ localTimeline.lastInstant());
|
||||||
view.sync();
|
view.sync();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@@ -457,7 +457,7 @@ public class RequestHandler {
|
|||||||
metricsRegistry.add("TOTAL_CHECK_TIME", finalCheckTimeTaken);
|
metricsRegistry.add("TOTAL_CHECK_TIME", finalCheckTimeTaken);
|
||||||
metricsRegistry.add("TOTAL_API_CALLS", 1);
|
metricsRegistry.add("TOTAL_API_CALLS", 1);
|
||||||
|
|
||||||
LOG.info(String.format(
|
LOG.debug(String.format(
|
||||||
"TimeTakenMillis[Total=%d, Refresh=%d, handle=%d, Check=%d], "
|
"TimeTakenMillis[Total=%d, Refresh=%d, handle=%d, Check=%d], "
|
||||||
+ "Success=%s, Query=%s, Host=%s, synced=%s",
|
+ "Success=%s, Query=%s, Host=%s, synced=%s",
|
||||||
timeTakenMillis, refreshCheckTimeTaken, handleTimeTaken, finalCheckTimeTaken, success,
|
timeTakenMillis, refreshCheckTimeTaken, handleTimeTaken, finalCheckTimeTaken, success,
|
||||||
|
|||||||
@@ -694,7 +694,9 @@ public class DeltaSync implements Serializable {
|
|||||||
schemas.add(targetSchema);
|
schemas.add(targetSchema);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("Registering Schema :" + schemas);
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Registering Schema: " + schemas);
|
||||||
|
}
|
||||||
jssc.sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList());
|
jssc.sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -63,6 +63,7 @@ import org.apache.spark.sql.SparkSession;
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
@@ -437,6 +438,24 @@ public class HoodieDeltaStreamer implements Serializable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static String toSortedTruncatedString(TypedProperties props) {
|
||||||
|
List<String> allKeys = new ArrayList<>();
|
||||||
|
for (Object k : props.keySet()) {
|
||||||
|
allKeys.add(k.toString());
|
||||||
|
}
|
||||||
|
Collections.sort(allKeys);
|
||||||
|
StringBuilder propsLog = new StringBuilder("Creating delta streamer with configs:\n");
|
||||||
|
for (String key : allKeys) {
|
||||||
|
String value = Option.ofNullable(props.get(key)).orElse("").toString();
|
||||||
|
// Truncate too long values.
|
||||||
|
if (value.length() > 255 && !LOG.isDebugEnabled()) {
|
||||||
|
value = value.substring(0, 128) + "[...]";
|
||||||
|
}
|
||||||
|
propsLog.append(key).append(": ").append(value).append("\n");
|
||||||
|
}
|
||||||
|
return propsLog.toString();
|
||||||
|
}
|
||||||
|
|
||||||
public static final Config getConfig(String[] args) {
|
public static final Config getConfig(String[] args) {
|
||||||
Config cfg = new Config();
|
Config cfg = new Config();
|
||||||
JCommander cmd = new JCommander(cfg, null, args);
|
JCommander cmd = new JCommander(cfg, null, args);
|
||||||
@@ -544,7 +563,8 @@ public class HoodieDeltaStreamer implements Serializable {
|
|||||||
"'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed.");
|
"'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed.");
|
||||||
|
|
||||||
this.props = properties.get();
|
this.props = properties.get();
|
||||||
LOG.info("Creating delta streamer with configs : " + props.toString());
|
LOG.info(toSortedTruncatedString(props));
|
||||||
|
|
||||||
this.schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(
|
this.schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(
|
||||||
UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc), props, jssc, cfg.transformerClassNames);
|
UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc), props, jssc, cfg.transformerClassNames);
|
||||||
|
|
||||||
|
|||||||
@@ -62,7 +62,7 @@ public class SqlQueryBasedTransformer implements Transformer {
|
|||||||
LOG.info("Registering tmp table : " + tmpTable);
|
LOG.info("Registering tmp table : " + tmpTable);
|
||||||
rowDataset.registerTempTable(tmpTable);
|
rowDataset.registerTempTable(tmpTable);
|
||||||
String sqlStr = transformerSQL.replaceAll(SRC_PATTERN, tmpTable);
|
String sqlStr = transformerSQL.replaceAll(SRC_PATTERN, tmpTable);
|
||||||
LOG.info("SQL Query for transformation : (" + sqlStr + ")");
|
LOG.debug("SQL Query for transformation : (" + sqlStr + ")");
|
||||||
return sparkSession.sql(sqlStr);
|
return sparkSession.sql(sqlStr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -163,7 +163,12 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer {
|
|||||||
//insert updates for already existing records in kafka topics
|
//insert updates for already existing records in kafka topics
|
||||||
testUtils.sendMessages(topicName1, Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 5, HoodieTestDataGenerator.TRIP_SCHEMA)));
|
testUtils.sendMessages(topicName1, Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 5, HoodieTestDataGenerator.TRIP_SCHEMA)));
|
||||||
testUtils.sendMessages(topicName2, Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
|
testUtils.sendMessages(topicName2, Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
|
||||||
|
|
||||||
|
streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
|
||||||
|
streamer.getTableExecutionContexts().get(1).setProperties(properties);
|
||||||
|
streamer.getTableExecutionContexts().get(0).setProperties(properties1);
|
||||||
streamer.sync();
|
streamer.sync();
|
||||||
|
|
||||||
assertEquals(2, streamer.getSuccessTables().size());
|
assertEquals(2, streamer.getSuccessTables().size());
|
||||||
assertTrue(streamer.getFailedTables().isEmpty());
|
assertTrue(streamer.getFailedTables().isEmpty());
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user