diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 5c2bee1de..358e30716 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -265,6 +265,7 @@ public abstract class AbstractHoodieWriteClient ((HoodieTableMetadataWriter) w).update(metadata, instantTime, table.isTableServiceAction(actionType))); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index eb4b24a34..8a59e2c11 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -464,6 +464,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta // List all partitions in the basePath of the containing dataset LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath()); + engineContext.setJobStatus(this.getClass().getSimpleName(), "Bootstrap: initializing metadata table by listing files and partitions"); List dirInfoList = listAllPartitions(dataMetaClient); // During bootstrap, the list of files to be committed can be huge. So creating a HoodieCommitMetadata out of these diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java index 9b95bd718..a64cb8845 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java @@ -72,6 +72,7 @@ public class CleanPlanActionExecutor ext try { CleanPlanner planner = new CleanPlanner<>(context, table, config); Option earliestInstant = planner.getEarliestCommitToRetain(); + context.setJobStatus(this.getClass().getSimpleName(), "Obtaining list of partitions to be cleaned"); List partitionsToClean = planner.getPartitionPathsToClean(earliestInstant); if (partitionsToClean.isEmpty()) { @@ -82,7 +83,7 @@ public class CleanPlanActionExecutor ext int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism()); LOG.info("Using cleanerParallelism: " + cleanerParallelism); - context.setJobStatus(this.getClass().getSimpleName(), "Generates list of file slices to be cleaned"); + context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned"); Map> cleanOps = context .map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractWriteHelper.java index 0d9cdc0aa..47cb34fa3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractWriteHelper.java @@ -48,6 +48,7 @@ public abstract class AbstractWriteHelper extends @Override public boolean commit(String instantTime, JavaRDD writeStatuses, Option> extraMetadata, String commitActionType, Map> partitionToReplacedFileIds) { + context.setJobStatus(this.getClass().getSimpleName(), "Committing stats"); List writeStats = writeStatuses.map(WriteStatus::getStat).collect(); return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index 6a01c4aa2..a44172f37 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -115,6 +115,7 @@ public abstract class BaseSparkCommitActionExecutor> clusteringHandleUpdate(JavaRDD> inputRecordsRDD) { + context.setJobStatus(this.getClass().getSimpleName(), "Handling updates which are under clustering"); Set fileGroupsInPendingClustering = table.getFileSystemView().getFileGroupsInPendingClustering().map(entry -> entry.getKey()).collect(Collectors.toSet()); UpdateStrategy updateStrategy = (UpdateStrategy) ReflectionUtils @@ -166,6 +167,7 @@ public abstract class BaseSparkCommitActionExecutor> partitionedRecords = partition(inputRecordsRDDWithClusteringUpdate, partitioner); JavaRDD writeStatusRDD = partitionedRecords.mapPartitionsWithIndex((partition, recordItr) -> { if (WriteOperationType.isChangingRecords(operationType)) { @@ -276,7 +278,7 @@ public abstract class BaseSparkCommitActionExecutor> extraMetadata, HoodieWriteMetadata> result, List writeStats) { String actionType = getCommitActionType(); - LOG.info("Committing " + instantTime + ", action Type " + actionType); + LOG.info("Committing " + instantTime + ", action Type " + actionType + ", operation Type " + operationType); result.setCommitted(true); result.setWriteStats(writeStats); // Finalize write diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index eb553c94e..96a7c25de 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -441,6 +441,7 @@ public class DeltaSync implements Serializable { return null; } + jssc.setJobGroup(this.getClass().getSimpleName(), "Checking if input is empty"); if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) { LOG.info("No new data, perform empty commit."); return Pair.of(schemaProvider, Pair.of(checkpointStr, jssc.emptyRDD()));