[HUDI-2849] Improve SparkUI job description for write path (#4222)
This commit is contained in:
@@ -265,6 +265,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
|||||||
* @param metadata instance of {@link HoodieCommitMetadata}.
|
* @param metadata instance of {@link HoodieCommitMetadata}.
|
||||||
*/
|
*/
|
||||||
protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
|
protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
|
||||||
|
context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table");
|
||||||
table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime,
|
table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime,
|
||||||
table.isTableServiceAction(actionType)));
|
table.isTableServiceAction(actionType)));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -464,6 +464,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
|||||||
|
|
||||||
// List all partitions in the basePath of the containing dataset
|
// List all partitions in the basePath of the containing dataset
|
||||||
LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath());
|
LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath());
|
||||||
|
engineContext.setJobStatus(this.getClass().getSimpleName(), "Bootstrap: initializing metadata table by listing files and partitions");
|
||||||
List<DirectoryInfo> dirInfoList = listAllPartitions(dataMetaClient);
|
List<DirectoryInfo> dirInfoList = listAllPartitions(dataMetaClient);
|
||||||
|
|
||||||
// During bootstrap, the list of files to be committed can be huge. So creating a HoodieCommitMetadata out of these
|
// During bootstrap, the list of files to be committed can be huge. So creating a HoodieCommitMetadata out of these
|
||||||
|
|||||||
@@ -72,6 +72,7 @@ public class CleanPlanActionExecutor<T extends HoodieRecordPayload, I, K, O> ext
|
|||||||
try {
|
try {
|
||||||
CleanPlanner<T, I, K, O> planner = new CleanPlanner<>(context, table, config);
|
CleanPlanner<T, I, K, O> planner = new CleanPlanner<>(context, table, config);
|
||||||
Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain();
|
Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain();
|
||||||
|
context.setJobStatus(this.getClass().getSimpleName(), "Obtaining list of partitions to be cleaned");
|
||||||
List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant);
|
List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant);
|
||||||
|
|
||||||
if (partitionsToClean.isEmpty()) {
|
if (partitionsToClean.isEmpty()) {
|
||||||
@@ -82,7 +83,7 @@ public class CleanPlanActionExecutor<T extends HoodieRecordPayload, I, K, O> ext
|
|||||||
int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
|
int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
|
||||||
LOG.info("Using cleanerParallelism: " + cleanerParallelism);
|
LOG.info("Using cleanerParallelism: " + cleanerParallelism);
|
||||||
|
|
||||||
context.setJobStatus(this.getClass().getSimpleName(), "Generates list of file slices to be cleaned");
|
context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned");
|
||||||
|
|
||||||
Map<String, List<HoodieCleanFileInfo>> cleanOps = context
|
Map<String, List<HoodieCleanFileInfo>> cleanOps = context
|
||||||
.map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism)
|
.map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism)
|
||||||
|
|||||||
@@ -48,6 +48,7 @@ public abstract class AbstractWriteHelper<T extends HoodieRecordPayload, I, K, O
|
|||||||
I taggedRecords = dedupedRecords;
|
I taggedRecords = dedupedRecords;
|
||||||
if (performTagging) {
|
if (performTagging) {
|
||||||
// perform index loop up to get existing location of records
|
// perform index loop up to get existing location of records
|
||||||
|
context.setJobStatus(this.getClass().getSimpleName(), "Tagging");
|
||||||
taggedRecords = tag(dedupedRecords, context, table);
|
taggedRecords = tag(dedupedRecords, context, table);
|
||||||
}
|
}
|
||||||
Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());
|
Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());
|
||||||
|
|||||||
@@ -118,6 +118,7 @@ public class ScheduleCompactionActionExecutor<T extends HoodieRecordPayload, I,
|
|||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
// exclude files in pending clustering from compaction.
|
// exclude files in pending clustering from compaction.
|
||||||
fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
|
fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
|
||||||
|
context.setJobStatus(this.getClass().getSimpleName(), "Compaction: generating compaction plan");
|
||||||
return compactor.generateCompactionPlan(context, table, config, instantTime, fgInPendingCompactionAndClustering);
|
return compactor.generateCompactionPlan(context, table, config, instantTime, fgInPendingCompactionAndClustering);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e);
|
throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e);
|
||||||
|
|||||||
@@ -85,6 +85,7 @@ public abstract class WriteMarkers implements Serializable {
|
|||||||
*/
|
*/
|
||||||
public void quietDeleteMarkerDir(HoodieEngineContext context, int parallelism) {
|
public void quietDeleteMarkerDir(HoodieEngineContext context, int parallelism) {
|
||||||
try {
|
try {
|
||||||
|
context.setJobStatus(this.getClass().getSimpleName(), "Deleting marker directory");
|
||||||
deleteMarkerDir(context, parallelism);
|
deleteMarkerDir(context, parallelism);
|
||||||
} catch (HoodieIOException ioe) {
|
} catch (HoodieIOException ioe) {
|
||||||
LOG.warn("Error deleting marker directory for instant " + instantTime, ioe);
|
LOG.warn("Error deleting marker directory for instant " + instantTime, ioe);
|
||||||
|
|||||||
@@ -120,6 +120,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
|||||||
@Override
|
@Override
|
||||||
public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata,
|
public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata,
|
||||||
String commitActionType, Map<String, List<String>> partitionToReplacedFileIds) {
|
String commitActionType, Map<String, List<String>> partitionToReplacedFileIds) {
|
||||||
|
context.setJobStatus(this.getClass().getSimpleName(), "Committing stats");
|
||||||
List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect();
|
List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect();
|
||||||
return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds);
|
return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -115,6 +115,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
|
|||||||
}
|
}
|
||||||
|
|
||||||
private JavaRDD<HoodieRecord<T>> clusteringHandleUpdate(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
|
private JavaRDD<HoodieRecord<T>> clusteringHandleUpdate(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
|
||||||
|
context.setJobStatus(this.getClass().getSimpleName(), "Handling updates which are under clustering");
|
||||||
Set<HoodieFileGroupId> fileGroupsInPendingClustering =
|
Set<HoodieFileGroupId> fileGroupsInPendingClustering =
|
||||||
table.getFileSystemView().getFileGroupsInPendingClustering().map(entry -> entry.getKey()).collect(Collectors.toSet());
|
table.getFileSystemView().getFileGroupsInPendingClustering().map(entry -> entry.getKey()).collect(Collectors.toSet());
|
||||||
UpdateStrategy updateStrategy = (UpdateStrategy) ReflectionUtils
|
UpdateStrategy updateStrategy = (UpdateStrategy) ReflectionUtils
|
||||||
@@ -166,6 +167,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
|
|||||||
|
|
||||||
// partition using the insert partitioner
|
// partition using the insert partitioner
|
||||||
final Partitioner partitioner = getPartitioner(profile);
|
final Partitioner partitioner = getPartitioner(profile);
|
||||||
|
context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data");
|
||||||
JavaRDD<HoodieRecord<T>> partitionedRecords = partition(inputRecordsRDDWithClusteringUpdate, partitioner);
|
JavaRDD<HoodieRecord<T>> partitionedRecords = partition(inputRecordsRDDWithClusteringUpdate, partitioner);
|
||||||
JavaRDD<WriteStatus> writeStatusRDD = partitionedRecords.mapPartitionsWithIndex((partition, recordItr) -> {
|
JavaRDD<WriteStatus> writeStatusRDD = partitionedRecords.mapPartitionsWithIndex((partition, recordItr) -> {
|
||||||
if (WriteOperationType.isChangingRecords(operationType)) {
|
if (WriteOperationType.isChangingRecords(operationType)) {
|
||||||
@@ -276,7 +278,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
|
|||||||
|
|
||||||
protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<JavaRDD<WriteStatus>> result, List<HoodieWriteStat> writeStats) {
|
protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<JavaRDD<WriteStatus>> result, List<HoodieWriteStat> writeStats) {
|
||||||
String actionType = getCommitActionType();
|
String actionType = getCommitActionType();
|
||||||
LOG.info("Committing " + instantTime + ", action Type " + actionType);
|
LOG.info("Committing " + instantTime + ", action Type " + actionType + ", operation Type " + operationType);
|
||||||
result.setCommitted(true);
|
result.setCommitted(true);
|
||||||
result.setWriteStats(writeStats);
|
result.setWriteStats(writeStats);
|
||||||
// Finalize write
|
// Finalize write
|
||||||
|
|||||||
@@ -441,6 +441,7 @@ public class DeltaSync implements Serializable {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
jssc.setJobGroup(this.getClass().getSimpleName(), "Checking if input is empty");
|
||||||
if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) {
|
if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) {
|
||||||
LOG.info("No new data, perform empty commit.");
|
LOG.info("No new data, perform empty commit.");
|
||||||
return Pair.of(schemaProvider, Pair.of(checkpointStr, jssc.emptyRDD()));
|
return Pair.of(schemaProvider, Pair.of(checkpointStr, jssc.emptyRDD()));
|
||||||
|
|||||||
Reference in New Issue
Block a user