diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java index 47f883284..3ff262d0e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java @@ -53,7 +53,7 @@ public abstract class AsyncCompactService extends HoodieAsyncService { private final int maxConcurrentCompaction; private transient AbstractCompactor compactor; - private transient HoodieEngineContext context; + protected transient HoodieEngineContext context; private transient BlockingQueue pendingCompactions = new LinkedBlockingQueue<>(); private transient ReentrantLock queueLock = new ReentrantLock(); private transient Condition consumed = queueLock.newCondition(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java index e50228132..ba7db3e99 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java @@ -79,6 +79,7 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl @Override public void close() { stopEmbeddedServerView(true); + this.context.setJobStatus("", ""); } private synchronized void stopEmbeddedServerView(boolean resetViewStorageConfig) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 222e1ab2c..2c6be7ff0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -674,8 +674,7 @@ public abstract class AbstractHoodieWriteClient table, String compactionCommitTime); - - + /** * Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file to the .requested file * diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/ReplaceArchivalHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/ReplaceArchivalHelper.java index 515f43e64..891f4a863 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/ReplaceArchivalHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/ReplaceArchivalHelper.java @@ -68,7 +68,7 @@ public class ReplaceArchivalHelper implements Serializable { public static boolean deleteReplacedFileGroups(HoodieEngineContext context, HoodieTableMetaClient metaClient, TableFileSystemView fileSystemView, HoodieInstant instant, List replacedPartitions) { - + context.setJobStatus(ReplaceArchivalHelper.class.getSimpleName(), "Delete replaced file groups"); List f = context.map(replacedPartitions, partition -> { Stream fileSlices = fileSystemView.getReplacedFileGroupsBeforeOrOn(instant.getTimestamp(), partition) .flatMap(HoodieFileGroup::getAllRawFileSlices); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index b37eabbe6..644eb6273 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -403,6 +403,7 @@ public abstract class HoodieTable implem private void deleteInvalidFilesByPartitions(HoodieEngineContext context, Map>> invalidFilesByPartition) { // Now delete partially written files + context.setJobStatus(this.getClass().getSimpleName(), "Delete invalid files generated during the write operation"); context.map(new ArrayList<>(invalidFilesByPartition.values()), partitionWithFileList -> { final FileSystem fileSystem = metaClient.getFs(); LOG.info("Deleting invalid data files=" + partitionWithFileList); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/MarkerFiles.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/MarkerFiles.java index 8826204cd..24ab3db3d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/MarkerFiles.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/MarkerFiles.java @@ -135,6 +135,7 @@ public class MarkerFiles implements Serializable { if (subDirectories.size() > 0) { parallelism = Math.min(subDirectories.size(), parallelism); SerializableConfiguration serializedConf = new SerializableConfiguration(fs.getConf()); + context.setJobStatus(this.getClass().getSimpleName(), "Obtaining marker files for all created, merged paths"); dataFiles.addAll(context.flatMap(subDirectories, directory -> { Path path = new Path(directory); FileSystem fileSystem = path.getFileSystem(serializedConf.get()); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/async/SparkAsyncCompactService.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/async/SparkAsyncCompactService.java index 152a901a7..f02687915 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/async/SparkAsyncCompactService.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/async/SparkAsyncCompactService.java @@ -31,6 +31,6 @@ public class SparkAsyncCompactService extends AsyncCompactService { @Override protected AbstractCompactor createCompactor(AbstractHoodieWriteClient client) { - return new HoodieSparkCompactor(client); + return new HoodieSparkCompactor(client, this.context); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkCompactor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkCompactor.java index b81570de9..ab22b412a 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkCompactor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkCompactor.java @@ -18,6 +18,7 @@ package org.apache.hudi.client; +import org.apache.hudi.client.common.HoodieEngineContext; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -33,9 +34,12 @@ import java.io.IOException; public class HoodieSparkCompactor extends AbstractCompactor>, JavaRDD, JavaRDD> { private static final Logger LOG = LogManager.getLogger(HoodieSparkCompactor.class); + private transient HoodieEngineContext context; - public HoodieSparkCompactor(AbstractHoodieWriteClient>, JavaRDD, JavaRDD> compactionClient) { + public HoodieSparkCompactor(AbstractHoodieWriteClient>, JavaRDD, JavaRDD> compactionClient, + HoodieEngineContext context) { super(compactionClient); + this.context = context; } @Override @@ -43,6 +47,7 @@ public class HoodieSparkCompactor extends Abstrac LOG.info("Compactor executing compaction " + instant); SparkRDDWriteClient writeClient = (SparkRDDWriteClient)compactionClient; JavaRDD res = writeClient.compact(instant.getTimestamp()); + this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status"); long numWriteErrors = res.collect().stream().filter(WriteStatus::hasErrors).count(); if (numWriteErrors != 0) { // We treat even a single error in compaction as fatal diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 10a55df9f..5010a5f76 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -277,6 +277,7 @@ public class SparkRDDWriteClient extends protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD writeStatuses, HoodieTable>, JavaRDD, JavaRDD> table, String compactionCommitTime) { + this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction"); List writeStats = writeStatuses.map(WriteStatus::getStat).collect(); finalizeWrite(table, compactionCommitTime, writeStats); LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndex.java index 7316043eb..987d9c08f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndex.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndex.java @@ -125,7 +125,7 @@ public class SparkHoodieBloomIndex extends SparkH JavaRDD> fileComparisonsRDD = explodeRecordRDDWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairRDD); Map comparisonsPerFileGroup = - computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, fileComparisonsRDD); + computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, fileComparisonsRDD, context); int inputParallelism = partitionRecordKeyPairRDD.partitions().size(); int joinParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism()); LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: ${" @@ -139,11 +139,13 @@ public class SparkHoodieBloomIndex extends SparkH */ private Map computeComparisonsPerFileGroup(final Map recordsPerPartition, final Map> partitionToFileInfo, - final JavaRDD> fileComparisonsRDD) { + final JavaRDD> fileComparisonsRDD, + final HoodieEngineContext context) { Map fileToComparisons; if (config.getBloomIndexPruneByRanges()) { // we will just try exploding the input and then count to determine comparisons // FIX(vc): Only do sampling here and extrapolate? + context.setJobStatus(this.getClass().getSimpleName(), "Compute all comparisons needed between records and files"); fileToComparisons = fileComparisonsRDD.mapToPair(t -> t).countByKey(); } else { fileToComparisons = new HashMap<>(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index ad62db925..73be8d412 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -101,6 +101,7 @@ public abstract class BaseSparkCommitActionExecutor> extraMetadata, HoodieWriteMetadata> result) { + context.setJobStatus(this.getClass().getSimpleName(), "Commit write status collect"); commit(extraMetadata, result, result.getWriteStatuses().map(WriteStatus::getStat).collect()); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkRunCompactionActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkRunCompactionActionExecutor.java index ebc3de5b8..5851b08c6 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkRunCompactionActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkRunCompactionActionExecutor.java @@ -76,6 +76,7 @@ public class SparkRunCompactionActionExecutor ext JavaRDD statuses = compactor.compact(context, compactionPlan, table, config, instantTime); statuses.persist(SparkMemoryUtils.getWriteStatusStorageLevel(config.getProps())); + context.setJobStatus(this.getClass().getSimpleName(), "Preparing compaction metadata"); List updateStatusMap = statuses.map(WriteStatus::getStat).collect(); HoodieCommitMetadata metadata = new HoodieCommitMetadata(true); for (HoodieWriteStat stat : updateStatusMap) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java index 065b22d78..04fe5aca5 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java @@ -52,6 +52,7 @@ public class SparkMarkerBasedRollbackStrategy ext MarkerFiles markerFiles = new MarkerFiles(table, instantToRollback.getTimestamp()); List markerFilePaths = markerFiles.allMarkerFilePaths(); int parallelism = Math.max(Math.min(markerFilePaths.size(), config.getRollbackParallelism()), 1); + jsc.setJobGroup(this.getClass().getSimpleName(), "Rolling back using marker files"); return jsc.parallelize(markerFilePaths, parallelism) .map(markerFilePath -> { String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1); diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncCompactService.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncCompactService.java index d1a415b17..ba985fe72 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncCompactService.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncCompactService.java @@ -37,6 +37,6 @@ public class SparkStreamingAsyncCompactService extends AsyncCompactService { @Override protected AbstractCompactor createCompactor(AbstractHoodieWriteClient client) { - return new HoodieSparkCompactor(client); + return new HoodieSparkCompactor(client, this.context); } }