diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 4b747d3a7..2f425acbc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -334,7 +334,7 @@ public abstract class BaseHoodieWriteClient ((HoodieTableMetadataWriter) w).update(metadata, instantTime, table.isTableServiceAction(actionType))); } @@ -1038,7 +1038,7 @@ public abstract class BaseHoodieWriteClient { try { ((HoodieTableMetadataWriter) w).dropMetadataPartitions(partitionTypes); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java index 40e8f85a3..d006b52b3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java @@ -85,7 +85,7 @@ public class CompactionAdminClient extends BaseHoodieClient { if (plan.getOperations() != null) { List ops = plan.getOperations().stream() .map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList()); - context.setJobStatus(this.getClass().getSimpleName(), "Validate compaction operations"); + context.setJobStatus(this.getClass().getSimpleName(), "Validate compaction operations: " + config.getTableName()); return context.map(ops, op -> { try { return validateCompactionOperation(metaClient, compactionInstant, op, Option.of(fsView)); @@ -351,7 +351,7 @@ public class CompactionAdminClient extends BaseHoodieClient { } else { LOG.info("The following compaction renaming operations needs to be performed to un-schedule"); if (!dryRun) { - context.setJobStatus(this.getClass().getSimpleName(), "Execute unschedule operations"); + context.setJobStatus(this.getClass().getSimpleName(), "Execute unschedule operations: " + config.getTableName()); return context.map(renameActions, lfPair -> { try { LOG.info("RENAME " + lfPair.getLeft().getPath() + " => " + lfPair.getRight().getPath()); @@ -394,7 +394,7 @@ public class CompactionAdminClient extends BaseHoodieClient { "Number of Compaction Operations :" + plan.getOperations().size() + " for instant :" + compactionInstant); List ops = plan.getOperations().stream() .map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList()); - context.setJobStatus(this.getClass().getSimpleName(), "Generate compaction unscheduling operations"); + context.setJobStatus(this.getClass().getSimpleName(), "Generate compaction unscheduling operations: " + config.getTableName()); return context.flatMap(ops, op -> { try { return getRenamingActionsForUnschedulingCompactionOperation(metaClient, compactionInstant, op, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index 190a5fe1c..41bcf001a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -519,7 +519,7 @@ public class HoodieTimelineArchiver { new Path(metaClient.getMetaPath(), archivedInstant.getFileName()) ).map(Path::toString).collect(Collectors.toList()); - context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants"); + context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants: " + config.getTableName()); Map resultDeleteInstantFiles = deleteFilesParallelize(metaClient, instantFiles, context, false); for (Map.Entry result : resultDeleteInstantFiles.entrySet()) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java index b714c5033..9b3dc8df0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -83,7 +83,7 @@ public class HoodieIndexUtils { public static List> getLatestBaseFilesForAllPartitions(final List partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) { - context.setJobStatus(HoodieIndexUtils.class.getSimpleName(), "Load latest base files from all partitions"); + context.setJobStatus(HoodieIndexUtils.class.getSimpleName(), "Load latest base files from all partitions: " + hoodieTable.getConfig().getTableName()); return context.flatMap(partitions, partitionPath -> { List> filteredFiles = getLatestBaseFilesForPartition(partitionPath, hoodieTable).stream() diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java index aeaf78672..6545c642c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java @@ -167,7 +167,7 @@ public class HoodieBloomIndex extends HoodieIndex { .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId())) .collect(toList()); - context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)"); + context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on): " + config.getTableName()); return context.map(partitionPathFileIDList, pf -> { try { HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf); @@ -209,7 +209,7 @@ public class HoodieBloomIndex extends HoodieIndex { protected List> loadColumnRangesFromMetaIndex( List partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) { // also obtain file ranges, if range pruning is enabled - context.setJobStatus(this.getClass().getName(), "Load meta index key ranges for file slices"); + context.setJobStatus(this.getClass().getName(), "Load meta index key ranges for file slices: " + config.getTableName()); final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp(); return context.flatMap(partitions, partitionName -> { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index d080d14a6..f5a96fb67 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -1047,7 +1047,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta private void initialCommit(String createInstantTime, List partitionTypes) { // List all partitions in the basePath of the containing dataset LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath()); - engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing metadata table by listing files and partitions"); + engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing metadata table by listing files and partitions: " + dataWriteConfig.getTableName()); Map> partitionToRecordsMap = new HashMap<>(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index f6f73f633..807865dae 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -566,7 +566,7 @@ public abstract class HoodieTable implem private void deleteInvalidFilesByPartitions(HoodieEngineContext context, Map>> invalidFilesByPartition) { // Now delete partially written files - context.setJobStatus(this.getClass().getSimpleName(), "Delete invalid files generated during the write operation"); + context.setJobStatus(this.getClass().getSimpleName(), "Delete invalid files generated during the write operation: " + config.getTableName()); context.map(new ArrayList<>(invalidFilesByPartition.values()), partitionWithFileList -> { final FileSystem fileSystem = metaClient.getFs(); LOG.info("Deleting invalid data files=" + partitionWithFileList); @@ -642,7 +642,7 @@ public abstract class HoodieTable implem } // Now delete partially written files - context.setJobStatus(this.getClass().getSimpleName(), "Delete all partially written files"); + context.setJobStatus(this.getClass().getSimpleName(), "Delete all partially written files: " + config.getTableName()); deleteInvalidFilesByPartitions(context, invalidPathsByPartition); // Now ensure the deleted files disappear @@ -665,7 +665,7 @@ public abstract class HoodieTable implem */ private void waitForAllFiles(HoodieEngineContext context, Map>> groupByPartition, FileVisibility visibility) { // This will either ensure all files to be deleted are present. - context.setJobStatus(this.getClass().getSimpleName(), "Wait for all files to appear/disappear"); + context.setJobStatus(this.getClass().getSimpleName(), "Wait for all files to appear/disappear: " + config.getTableName()); boolean checkPassed = context.map(new ArrayList<>(groupByPartition.entrySet()), partitionWithFileList -> waitForCondition(partitionWithFileList.getKey(), partitionWithFileList.getValue().stream(), visibility), config.getFinalizeWriteParallelism()) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java index 2bb277b05..30ed27b39 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -132,7 +132,7 @@ public class CleanActionExecutor extends config.getCleanerParallelism()); LOG.info("Using cleanerParallelism: " + cleanerParallelism); - context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of partitions"); + context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of partitions: " + config.getTableName()); Stream> filesToBeDeletedPerPartition = cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream() diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java index fb2df582b..d8e51bcd1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java @@ -96,7 +96,7 @@ public class CleanPlanActionExecutor ext try { CleanPlanner planner = new CleanPlanner<>(context, table, config); Option earliestInstant = planner.getEarliestCommitToRetain(); - context.setJobStatus(this.getClass().getSimpleName(), "Obtaining list of partitions to be cleaned"); + context.setJobStatus(this.getClass().getSimpleName(), "Obtaining list of partitions to be cleaned: " + config.getTableName()); List partitionsToClean = planner.getPartitionPathsToClean(earliestInstant); if (partitionsToClean.isEmpty()) { @@ -107,7 +107,7 @@ public class CleanPlanActionExecutor ext int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism()); LOG.info("Using cleanerParallelism: " + cleanerParallelism); - context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned"); + context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName()); Map>> cleanOpsWithPartitionMeta = context .map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java index 6d5372b47..846afec7c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java @@ -49,7 +49,7 @@ public abstract class BaseWriteHelper I taggedRecords = dedupedRecords; if (table.getIndex().requiresTagging(operationType)) { // perform index loop up to get existing location of records - context.setJobStatus(this.getClass().getSimpleName(), "Tagging"); + context.setJobStatus(this.getClass().getSimpleName(), "Tagging: " + table.getConfig().getTableName()); taggedRecords = tag(dedupedRecords, context, table); } Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java index d548e07ea..75954872a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java @@ -133,7 +133,7 @@ public abstract class HoodieCompactor im .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); LOG.info("Compactor compacting " + operations + " files"); - context.setJobStatus(this.getClass().getSimpleName(), "Compacting file slices"); + context.setJobStatus(this.getClass().getSimpleName(), "Compacting file slices: " + config.getTableName()); TaskContextSupplier taskContextSupplier = table.getTaskContextSupplier(); return context.parallelize(operations).map(operation -> compact( compactionHandler, metaClient, config, operation, compactionInstantTime, taskContextSupplier)) @@ -288,7 +288,7 @@ public abstract class HoodieCompactor im SliceView fileSystemView = hoodieTable.getSliceView(); LOG.info("Compaction looking for files to compact in " + partitionPaths + " partitions"); - context.setJobStatus(this.getClass().getSimpleName(), "Looking for files to compact"); + context.setJobStatus(this.getClass().getSimpleName(), "Looking for files to compact: " + config.getTableName()); List operations = context.flatMap(partitionPaths, partitionPath -> fileSystemView .getLatestFileSlices(partitionPath) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java index 24c0dbc80..5c184e77d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java @@ -88,7 +88,7 @@ public class RunCompactionActionExecutor extends context, compactionPlan, table, configCopy, instantTime, compactionHandler); compactor.maybePersist(statuses, config); - context.setJobStatus(this.getClass().getSimpleName(), "Preparing compaction metadata"); + context.setJobStatus(this.getClass().getSimpleName(), "Preparing compaction metadata: " + config.getTableName()); List updateStatusMap = statuses.map(WriteStatus::getStat).collectAsList(); HoodieCommitMetadata metadata = new HoodieCommitMetadata(true); for (HoodieWriteStat stat : updateStatusMap) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java index d3cc5660b..05fb7c0c9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java @@ -119,7 +119,7 @@ public class ScheduleCompactionActionExecutor performRollback(HoodieEngineContext context, HoodieInstant instantToRollback, List rollbackRequests) { int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1); - context.setJobStatus(this.getClass().getSimpleName(), "Perform rollback actions"); + context.setJobStatus(this.getClass().getSimpleName(), "Perform rollback actions: " + config.getTableName()); // If not for conversion to HoodieRollbackInternalRequests, code fails. Using avro model (HoodieRollbackRequest) within spark.parallelize // is failing with com.esotericsoftware.kryo.KryoException // stack trace: https://gist.github.com/nsivabalan/b6359e7d5038484f8043506c8bc9e1c8 @@ -88,7 +88,7 @@ public class BaseRollbackHelper implements Serializable { public List collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback, List rollbackRequests) { int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1); - context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback stats for upgrade/downgrade"); + context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback stats for upgrade/downgrade: " + config.getTableName()); // If not for conversion to HoodieRollbackInternalRequests, code fails. Using avro model (HoodieRollbackRequest) within spark.parallelize // is failing with com.esotericsoftware.kryo.KryoException // stack trace: https://gist.github.com/nsivabalan/b6359e7d5038484f8043506c8bc9e1c8 diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java index e3159abad..aa9e0b658 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java @@ -88,7 +88,7 @@ public class ListingBasedRollbackStrategy implements BaseRollbackPlanActionExecu FSUtils.getAllPartitionPaths(context, table.getMetaClient().getBasePath(), false, false); int numPartitions = Math.max(Math.min(partitionPaths.size(), config.getRollbackParallelism()), 1); - context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing Rollback Plan"); + context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing Rollback Plan: " + config.getTableName()); HoodieTableType tableType = table.getMetaClient().getTableType(); String baseFileExtension = getBaseFileExtension(metaClient); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java index 134b23885..7f408c1b8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java @@ -84,7 +84,7 @@ public class SavepointActionExecutor ext ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.GREATER_THAN_OR_EQUALS, lastCommitRetained), "Could not savepoint commit " + instantTime + " as this is beyond the lookup window " + lastCommitRetained); - context.setJobStatus(this.getClass().getSimpleName(), "Collecting latest files for savepoint " + instantTime); + context.setJobStatus(this.getClass().getSimpleName(), "Collecting latest files for savepoint " + instantTime + " " + table.getConfig().getTableName()); List partitions = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath()); Map> latestFilesMap = context.mapToPair(partitions, partitionPath -> { // Scan all partitions files with this commit time diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java index 3dacf1e13..07428dd93 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java @@ -84,7 +84,7 @@ public abstract class WriteMarkers implements Serializable { */ public void quietDeleteMarkerDir(HoodieEngineContext context, int parallelism) { try { - context.setJobStatus(this.getClass().getSimpleName(), "Deleting marker directory"); + context.setJobStatus(this.getClass().getSimpleName(), "Deleting marker directory: " + basePath); deleteMarkerDir(context, parallelism); } catch (Exception e) { LOG.warn("Error deleting marker directory for instant " + instantTime, e); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 271ba95d9..524758a67 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -356,7 +356,7 @@ public class HoodieFlinkWriteClient extends HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime) { - this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction"); + this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName()); List writeStats = metadata.getWriteStats(); final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime); try { @@ -508,7 +508,7 @@ public class HoodieFlinkWriteClient extends List partitionPaths = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath()); if (partitionPaths != null && partitionPaths.size() > 0) { - context.setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of all partitions"); + context.setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of all partitions: " + config.getTableName()); partitionToExistingFileIds = partitionPaths.stream().parallel() .collect( Collectors.toMap( diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java index deaf934cf..fb19259b5 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java @@ -230,7 +230,7 @@ public class JavaUpsertPartitioner> implements } if (partitionPaths != null && partitionPaths.size() > 0) { - context.setJobStatus(this.getClass().getSimpleName(), "Getting small files from partitions"); + context.setJobStatus(this.getClass().getSimpleName(), "Getting small files from partitions: " + config.getTableName()); partitionSmallFilesMap = context.mapToPair(partitionPaths, partitionPath -> new ImmutablePair<>(partitionPath, getSmallFiles(partitionPath)), 0); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 7b0c8bbc8..3b512f0bd 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -117,7 +117,7 @@ public class SparkRDDWriteClient extends @Override public boolean commit(String instantTime, JavaRDD writeStatuses, Option> extraMetadata, String commitActionType, Map> partitionToReplacedFileIds) { - context.setJobStatus(this.getClass().getSimpleName(), "Committing stats"); + context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: " + config.getTableName()); List writeStats = writeStatuses.map(WriteStatus::getStat).collect(); return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds); } @@ -303,7 +303,7 @@ public class SparkRDDWriteClient extends protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime) { - this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction"); + this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName()); List writeStats = metadata.getWriteStats(); final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime); try { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java index 9c2f37d56..c9fb895ad 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java @@ -126,7 +126,7 @@ public class SparkHoodieBloomIndexHelper extends BaseHoodieBloomIndexHelper { if (config.getBloomIndexPruneByRanges()) { // we will just try exploding the input and then count to determine comparisons // FIX(vc): Only do sampling here and extrapolate? - context.setJobStatus(this.getClass().getSimpleName(), "Compute all comparisons needed between records and files"); + context.setJobStatus(this.getClass().getSimpleName(), "Compute all comparisons needed between records and files: " + config.getTableName()); fileToComparisons = fileComparisonsRDD.mapToPair(t -> t).countByKey(); } else { fileToComparisons = new HashMap<>(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java index 504da8a72..4e488047d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java @@ -334,7 +334,7 @@ public class SparkBootstrapCommitActionExecutor }) .collect(Collectors.toList()); - context.setJobStatus(this.getClass().getSimpleName(), "Bootstrap metadata table."); + context.setJobStatus(this.getClass().getSimpleName(), "Bootstrap metadata table: " + config.getTableName()); return context.parallelize(bootstrapPaths, config.getBootstrapParallelism()) .map(partitionFsPair -> getMetadataHandler(config, table, partitionFsPair.getRight().getRight()).runMetadataBootstrap(partitionFsPair.getLeft(), partitionFsPair.getRight().getLeft(), keyGenerator)); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index 205da82ac..f8e4b31ff 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -113,7 +113,7 @@ public abstract class BaseSparkCommitActionExecutor> clusteringHandleUpdate(HoodieData> inputRecords, Set fileGroupsInPendingClustering) { - context.setJobStatus(this.getClass().getSimpleName(), "Handling updates which are under clustering"); + context.setJobStatus(this.getClass().getSimpleName(), "Handling updates which are under clustering: " + config.getTableName()); UpdateStrategy>> updateStrategy = (UpdateStrategy>>) ReflectionUtils .loadClass(config.getClusteringUpdatesStrategyClass(), this.context, fileGroupsInPendingClustering); Pair>, Set> recordsAndPendingClusteringFileGroups = @@ -152,7 +152,7 @@ public abstract class BaseSparkCommitActionExecutor> inputRecordsWithClusteringUpdate = fileGroupsInPendingClustering.isEmpty() ? inputRecords : clusteringHandleUpdate(inputRecords, fileGroupsInPendingClustering); - context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data"); + context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data: " + config.getTableName()); HoodieData writeStatuses = mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner); HoodieWriteMetadata> result = new HoodieWriteMetadata<>(); updateIndexAndCommitIfNeeded(writeStatuses, result); @@ -280,7 +280,7 @@ public abstract class BaseSparkCommitActionExecutor> extraMetadata, HoodieWriteMetadata> result) { - context.setJobStatus(this.getClass().getSimpleName(), "Commit write status collect"); + context.setJobStatus(this.getClass().getSimpleName(), "Commit write status collect: " + config.getTableName()); commit(extraMetadata, result, result.getWriteStatuses().map(WriteStatus::getStat).collectAsList()); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index c54c52625..c2f5a4306 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -266,7 +266,7 @@ public class UpsertPartitioner> extends SparkHo } if (partitionPaths != null && partitionPaths.size() > 0) { - context.setJobStatus(this.getClass().getSimpleName(), "Getting small files from partitions"); + context.setJobStatus(this.getClass().getSimpleName(), "Getting small files from partitions: " + config.getTableName()); JavaRDD partitionPathRdds = jsc.parallelize(partitionPaths, partitionPaths.size()); partitionSmallFilesMap = partitionPathRdds.mapToPair((PairFunction>) partitionPath -> new Tuple2<>(partitionPath, getSmallFiles(partitionPath))).collectAsMap(); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java index b5d7dc4b1..a2e81a337 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java @@ -174,7 +174,7 @@ public class HDFSParquetImporter implements Serializable { ParquetInputFormat.setReadSupportClass(job, (AvroReadSupport.class)); HoodieEngineContext context = new HoodieSparkEngineContext(jsc); - context.setJobStatus(this.getClass().getSimpleName(), "Build records for import"); + context.setJobStatus(this.getClass().getSimpleName(), "Build records for import: " + cfg.tableName); return jsc.newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class, job.getConfiguration()) // To reduce large number of tasks. diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java index a2717a356..402b380a0 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java @@ -107,7 +107,7 @@ public class HoodieSnapshotCopier implements Serializable { fs.delete(new Path(outputDir), true); } - context.setJobStatus(this.getClass().getSimpleName(), "Creating a snapshot"); + context.setJobStatus(this.getClass().getSimpleName(), "Creating a snapshot: " + baseDir); List> filesToCopy = context.flatMap(partitions, partition -> { // Only take latest version files <= latestCommit. diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java index 255393b23..753765fb6 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java @@ -177,7 +177,7 @@ public class HoodieSnapshotExporter { : ReflectionUtils.loadClass(cfg.outputPartitioner); HoodieEngineContext context = new HoodieSparkEngineContext(jsc); - context.setJobStatus(this.getClass().getSimpleName(), "Exporting as non-HUDI dataset"); + context.setJobStatus(this.getClass().getSimpleName(), "Exporting as non-HUDI dataset: " + cfg.targetOutputPath); final BaseFileOnlyView fsView = getBaseFileOnlyView(jsc, cfg); Iterator exportingFilePaths = jsc .parallelize(partitions, partitions.size())