1
0

[HUDI-4097] add table info to jobStatus (#5529)

Co-authored-by: wqwl611 <wqwl611@gmail.com>
This commit is contained in:
wqwl611
2022-05-14 09:01:15 +08:00
committed by GitHub
parent 5c4813f101
commit 52e63b39d6
27 changed files with 41 additions and 41 deletions

View File

@@ -334,7 +334,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* @param metadata instance of {@link HoodieCommitMetadata}. * @param metadata instance of {@link HoodieCommitMetadata}.
*/ */
protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) { protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table"); context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName());
table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime, table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime,
table.isTableServiceAction(actionType))); table.isTableServiceAction(actionType)));
} }
@@ -1038,7 +1038,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
HoodieInstant ownerInstant = new HoodieInstant(true, HoodieTimeline.INDEXING_ACTION, dropInstant); HoodieInstant ownerInstant = new HoodieInstant(true, HoodieTimeline.INDEXING_ACTION, dropInstant);
this.txnManager.beginTransaction(Option.of(ownerInstant), Option.empty()); this.txnManager.beginTransaction(Option.of(ownerInstant), Option.empty());
try { try {
context.setJobStatus(this.getClass().getSimpleName(), "Dropping partitions from metadata table"); context.setJobStatus(this.getClass().getSimpleName(), "Dropping partitions from metadata table: " + config.getTableName());
table.getMetadataWriter(dropInstant).ifPresent(w -> { table.getMetadataWriter(dropInstant).ifPresent(w -> {
try { try {
((HoodieTableMetadataWriter) w).dropMetadataPartitions(partitionTypes); ((HoodieTableMetadataWriter) w).dropMetadataPartitions(partitionTypes);

View File

@@ -85,7 +85,7 @@ public class CompactionAdminClient extends BaseHoodieClient {
if (plan.getOperations() != null) { if (plan.getOperations() != null) {
List<CompactionOperation> ops = plan.getOperations().stream() List<CompactionOperation> ops = plan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList()); .map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
context.setJobStatus(this.getClass().getSimpleName(), "Validate compaction operations"); context.setJobStatus(this.getClass().getSimpleName(), "Validate compaction operations: " + config.getTableName());
return context.map(ops, op -> { return context.map(ops, op -> {
try { try {
return validateCompactionOperation(metaClient, compactionInstant, op, Option.of(fsView)); return validateCompactionOperation(metaClient, compactionInstant, op, Option.of(fsView));
@@ -351,7 +351,7 @@ public class CompactionAdminClient extends BaseHoodieClient {
} else { } else {
LOG.info("The following compaction renaming operations needs to be performed to un-schedule"); LOG.info("The following compaction renaming operations needs to be performed to un-schedule");
if (!dryRun) { if (!dryRun) {
context.setJobStatus(this.getClass().getSimpleName(), "Execute unschedule operations"); context.setJobStatus(this.getClass().getSimpleName(), "Execute unschedule operations: " + config.getTableName());
return context.map(renameActions, lfPair -> { return context.map(renameActions, lfPair -> {
try { try {
LOG.info("RENAME " + lfPair.getLeft().getPath() + " => " + lfPair.getRight().getPath()); LOG.info("RENAME " + lfPair.getLeft().getPath() + " => " + lfPair.getRight().getPath());
@@ -394,7 +394,7 @@ public class CompactionAdminClient extends BaseHoodieClient {
"Number of Compaction Operations :" + plan.getOperations().size() + " for instant :" + compactionInstant); "Number of Compaction Operations :" + plan.getOperations().size() + " for instant :" + compactionInstant);
List<CompactionOperation> ops = plan.getOperations().stream() List<CompactionOperation> ops = plan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList()); .map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
context.setJobStatus(this.getClass().getSimpleName(), "Generate compaction unscheduling operations"); context.setJobStatus(this.getClass().getSimpleName(), "Generate compaction unscheduling operations: " + config.getTableName());
return context.flatMap(ops, op -> { return context.flatMap(ops, op -> {
try { try {
return getRenamingActionsForUnschedulingCompactionOperation(metaClient, compactionInstant, op, return getRenamingActionsForUnschedulingCompactionOperation(metaClient, compactionInstant, op,

View File

@@ -519,7 +519,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
new Path(metaClient.getMetaPath(), archivedInstant.getFileName()) new Path(metaClient.getMetaPath(), archivedInstant.getFileName())
).map(Path::toString).collect(Collectors.toList()); ).map(Path::toString).collect(Collectors.toList());
context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants"); context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants: " + config.getTableName());
Map<String, Boolean> resultDeleteInstantFiles = deleteFilesParallelize(metaClient, instantFiles, context, false); Map<String, Boolean> resultDeleteInstantFiles = deleteFilesParallelize(metaClient, instantFiles, context, false);
for (Map.Entry<String, Boolean> result : resultDeleteInstantFiles.entrySet()) { for (Map.Entry<String, Boolean> result : resultDeleteInstantFiles.entrySet()) {

View File

@@ -83,7 +83,7 @@ public class HoodieIndexUtils {
public static List<Pair<String, HoodieBaseFile>> getLatestBaseFilesForAllPartitions(final List<String> partitions, public static List<Pair<String, HoodieBaseFile>> getLatestBaseFilesForAllPartitions(final List<String> partitions,
final HoodieEngineContext context, final HoodieEngineContext context,
final HoodieTable hoodieTable) { final HoodieTable hoodieTable) {
context.setJobStatus(HoodieIndexUtils.class.getSimpleName(), "Load latest base files from all partitions"); context.setJobStatus(HoodieIndexUtils.class.getSimpleName(), "Load latest base files from all partitions: " + hoodieTable.getConfig().getTableName());
return context.flatMap(partitions, partitionPath -> { return context.flatMap(partitions, partitionPath -> {
List<Pair<String, HoodieBaseFile>> filteredFiles = List<Pair<String, HoodieBaseFile>> filteredFiles =
getLatestBaseFilesForPartition(partitionPath, hoodieTable).stream() getLatestBaseFilesForPartition(partitionPath, hoodieTable).stream()

View File

@@ -167,7 +167,7 @@ public class HoodieBloomIndex extends HoodieIndex<Object, Object> {
.map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId())) .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
.collect(toList()); .collect(toList());
context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)"); context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on): " + config.getTableName());
return context.map(partitionPathFileIDList, pf -> { return context.map(partitionPathFileIDList, pf -> {
try { try {
HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf); HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf);
@@ -209,7 +209,7 @@ public class HoodieBloomIndex extends HoodieIndex<Object, Object> {
protected List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromMetaIndex( protected List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromMetaIndex(
List<String> partitions, final HoodieEngineContext context, final HoodieTable<?, ?, ?, ?> hoodieTable) { List<String> partitions, final HoodieEngineContext context, final HoodieTable<?, ?, ?, ?> hoodieTable) {
// also obtain file ranges, if range pruning is enabled // also obtain file ranges, if range pruning is enabled
context.setJobStatus(this.getClass().getName(), "Load meta index key ranges for file slices"); context.setJobStatus(this.getClass().getName(), "Load meta index key ranges for file slices: " + config.getTableName());
final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp(); final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
return context.flatMap(partitions, partitionName -> { return context.flatMap(partitions, partitionName -> {

View File

@@ -1047,7 +1047,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
private void initialCommit(String createInstantTime, List<MetadataPartitionType> partitionTypes) { private void initialCommit(String createInstantTime, List<MetadataPartitionType> partitionTypes) {
// List all partitions in the basePath of the containing dataset // List all partitions in the basePath of the containing dataset
LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath()); LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath());
engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing metadata table by listing files and partitions"); engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing metadata table by listing files and partitions: " + dataWriteConfig.getTableName());
Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>(); Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>();

View File

@@ -566,7 +566,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
private void deleteInvalidFilesByPartitions(HoodieEngineContext context, Map<String, List<Pair<String, String>>> invalidFilesByPartition) { private void deleteInvalidFilesByPartitions(HoodieEngineContext context, Map<String, List<Pair<String, String>>> invalidFilesByPartition) {
// Now delete partially written files // Now delete partially written files
context.setJobStatus(this.getClass().getSimpleName(), "Delete invalid files generated during the write operation"); context.setJobStatus(this.getClass().getSimpleName(), "Delete invalid files generated during the write operation: " + config.getTableName());
context.map(new ArrayList<>(invalidFilesByPartition.values()), partitionWithFileList -> { context.map(new ArrayList<>(invalidFilesByPartition.values()), partitionWithFileList -> {
final FileSystem fileSystem = metaClient.getFs(); final FileSystem fileSystem = metaClient.getFs();
LOG.info("Deleting invalid data files=" + partitionWithFileList); LOG.info("Deleting invalid data files=" + partitionWithFileList);
@@ -642,7 +642,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
} }
// Now delete partially written files // Now delete partially written files
context.setJobStatus(this.getClass().getSimpleName(), "Delete all partially written files"); context.setJobStatus(this.getClass().getSimpleName(), "Delete all partially written files: " + config.getTableName());
deleteInvalidFilesByPartitions(context, invalidPathsByPartition); deleteInvalidFilesByPartitions(context, invalidPathsByPartition);
// Now ensure the deleted files disappear // Now ensure the deleted files disappear
@@ -665,7 +665,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
*/ */
private void waitForAllFiles(HoodieEngineContext context, Map<String, List<Pair<String, String>>> groupByPartition, FileVisibility visibility) { private void waitForAllFiles(HoodieEngineContext context, Map<String, List<Pair<String, String>>> groupByPartition, FileVisibility visibility) {
// This will either ensure all files to be deleted are present. // This will either ensure all files to be deleted are present.
context.setJobStatus(this.getClass().getSimpleName(), "Wait for all files to appear/disappear"); context.setJobStatus(this.getClass().getSimpleName(), "Wait for all files to appear/disappear: " + config.getTableName());
boolean checkPassed = boolean checkPassed =
context.map(new ArrayList<>(groupByPartition.entrySet()), partitionWithFileList -> waitForCondition(partitionWithFileList.getKey(), context.map(new ArrayList<>(groupByPartition.entrySet()), partitionWithFileList -> waitForCondition(partitionWithFileList.getKey(),
partitionWithFileList.getValue().stream(), visibility), config.getFinalizeWriteParallelism()) partitionWithFileList.getValue().stream(), visibility), config.getFinalizeWriteParallelism())

View File

@@ -132,7 +132,7 @@ public class CleanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends
config.getCleanerParallelism()); config.getCleanerParallelism());
LOG.info("Using cleanerParallelism: " + cleanerParallelism); LOG.info("Using cleanerParallelism: " + cleanerParallelism);
context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of partitions"); context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of partitions: " + config.getTableName());
Stream<Pair<String, CleanFileInfo>> filesToBeDeletedPerPartition = Stream<Pair<String, CleanFileInfo>> filesToBeDeletedPerPartition =
cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream() cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()

View File

@@ -96,7 +96,7 @@ public class CleanPlanActionExecutor<T extends HoodieRecordPayload, I, K, O> ext
try { try {
CleanPlanner<T, I, K, O> planner = new CleanPlanner<>(context, table, config); CleanPlanner<T, I, K, O> planner = new CleanPlanner<>(context, table, config);
Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain(); Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain();
context.setJobStatus(this.getClass().getSimpleName(), "Obtaining list of partitions to be cleaned"); context.setJobStatus(this.getClass().getSimpleName(), "Obtaining list of partitions to be cleaned: " + config.getTableName());
List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant); List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant);
if (partitionsToClean.isEmpty()) { if (partitionsToClean.isEmpty()) {
@@ -107,7 +107,7 @@ public class CleanPlanActionExecutor<T extends HoodieRecordPayload, I, K, O> ext
int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism()); int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
LOG.info("Using cleanerParallelism: " + cleanerParallelism); LOG.info("Using cleanerParallelism: " + cleanerParallelism);
context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned"); context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName());
Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context
.map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism) .map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism)

View File

@@ -49,7 +49,7 @@ public abstract class BaseWriteHelper<T extends HoodieRecordPayload, I, K, O, R>
I taggedRecords = dedupedRecords; I taggedRecords = dedupedRecords;
if (table.getIndex().requiresTagging(operationType)) { if (table.getIndex().requiresTagging(operationType)) {
// perform index loop up to get existing location of records // perform index loop up to get existing location of records
context.setJobStatus(this.getClass().getSimpleName(), "Tagging"); context.setJobStatus(this.getClass().getSimpleName(), "Tagging: " + table.getConfig().getTableName());
taggedRecords = tag(dedupedRecords, context, table); taggedRecords = tag(dedupedRecords, context, table);
} }
Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now()); Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());

View File

@@ -133,7 +133,7 @@ public abstract class HoodieCompactor<T extends HoodieRecordPayload, I, K, O> im
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
LOG.info("Compactor compacting " + operations + " files"); LOG.info("Compactor compacting " + operations + " files");
context.setJobStatus(this.getClass().getSimpleName(), "Compacting file slices"); context.setJobStatus(this.getClass().getSimpleName(), "Compacting file slices: " + config.getTableName());
TaskContextSupplier taskContextSupplier = table.getTaskContextSupplier(); TaskContextSupplier taskContextSupplier = table.getTaskContextSupplier();
return context.parallelize(operations).map(operation -> compact( return context.parallelize(operations).map(operation -> compact(
compactionHandler, metaClient, config, operation, compactionInstantTime, taskContextSupplier)) compactionHandler, metaClient, config, operation, compactionInstantTime, taskContextSupplier))
@@ -288,7 +288,7 @@ public abstract class HoodieCompactor<T extends HoodieRecordPayload, I, K, O> im
SliceView fileSystemView = hoodieTable.getSliceView(); SliceView fileSystemView = hoodieTable.getSliceView();
LOG.info("Compaction looking for files to compact in " + partitionPaths + " partitions"); LOG.info("Compaction looking for files to compact in " + partitionPaths + " partitions");
context.setJobStatus(this.getClass().getSimpleName(), "Looking for files to compact"); context.setJobStatus(this.getClass().getSimpleName(), "Looking for files to compact: " + config.getTableName());
List<HoodieCompactionOperation> operations = context.flatMap(partitionPaths, partitionPath -> fileSystemView List<HoodieCompactionOperation> operations = context.flatMap(partitionPaths, partitionPath -> fileSystemView
.getLatestFileSlices(partitionPath) .getLatestFileSlices(partitionPath)

View File

@@ -88,7 +88,7 @@ public class RunCompactionActionExecutor<T extends HoodieRecordPayload> extends
context, compactionPlan, table, configCopy, instantTime, compactionHandler); context, compactionPlan, table, configCopy, instantTime, compactionHandler);
compactor.maybePersist(statuses, config); compactor.maybePersist(statuses, config);
context.setJobStatus(this.getClass().getSimpleName(), "Preparing compaction metadata"); context.setJobStatus(this.getClass().getSimpleName(), "Preparing compaction metadata: " + config.getTableName());
List<HoodieWriteStat> updateStatusMap = statuses.map(WriteStatus::getStat).collectAsList(); List<HoodieWriteStat> updateStatusMap = statuses.map(WriteStatus::getStat).collectAsList();
HoodieCommitMetadata metadata = new HoodieCommitMetadata(true); HoodieCommitMetadata metadata = new HoodieCommitMetadata(true);
for (HoodieWriteStat stat : updateStatusMap) { for (HoodieWriteStat stat : updateStatusMap) {

View File

@@ -119,7 +119,7 @@ public class ScheduleCompactionActionExecutor<T extends HoodieRecordPayload, I,
.collect(Collectors.toSet()); .collect(Collectors.toSet());
// exclude files in pending clustering from compaction. // exclude files in pending clustering from compaction.
fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet())); fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
context.setJobStatus(this.getClass().getSimpleName(), "Compaction: generating compaction plan"); context.setJobStatus(this.getClass().getSimpleName(), "Compaction: generating compaction plan: " + config.getTableName());
return compactor.generateCompactionPlan(context, table, config, instantTime, fgInPendingCompactionAndClustering); return compactor.generateCompactionPlan(context, table, config, instantTime, fgInPendingCompactionAndClustering);
} catch (IOException e) { } catch (IOException e) {
throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e); throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e);

View File

@@ -72,7 +72,7 @@ public class BaseRollbackHelper implements Serializable {
public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, HoodieInstant instantToRollback, public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, HoodieInstant instantToRollback,
List<HoodieRollbackRequest> rollbackRequests) { List<HoodieRollbackRequest> rollbackRequests) {
int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1); int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
context.setJobStatus(this.getClass().getSimpleName(), "Perform rollback actions"); context.setJobStatus(this.getClass().getSimpleName(), "Perform rollback actions: " + config.getTableName());
// If not for conversion to HoodieRollbackInternalRequests, code fails. Using avro model (HoodieRollbackRequest) within spark.parallelize // If not for conversion to HoodieRollbackInternalRequests, code fails. Using avro model (HoodieRollbackRequest) within spark.parallelize
// is failing with com.esotericsoftware.kryo.KryoException // is failing with com.esotericsoftware.kryo.KryoException
// stack trace: https://gist.github.com/nsivabalan/b6359e7d5038484f8043506c8bc9e1c8 // stack trace: https://gist.github.com/nsivabalan/b6359e7d5038484f8043506c8bc9e1c8
@@ -88,7 +88,7 @@ public class BaseRollbackHelper implements Serializable {
public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback, public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback,
List<HoodieRollbackRequest> rollbackRequests) { List<HoodieRollbackRequest> rollbackRequests) {
int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1); int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback stats for upgrade/downgrade"); context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback stats for upgrade/downgrade: " + config.getTableName());
// If not for conversion to HoodieRollbackInternalRequests, code fails. Using avro model (HoodieRollbackRequest) within spark.parallelize // If not for conversion to HoodieRollbackInternalRequests, code fails. Using avro model (HoodieRollbackRequest) within spark.parallelize
// is failing with com.esotericsoftware.kryo.KryoException // is failing with com.esotericsoftware.kryo.KryoException
// stack trace: https://gist.github.com/nsivabalan/b6359e7d5038484f8043506c8bc9e1c8 // stack trace: https://gist.github.com/nsivabalan/b6359e7d5038484f8043506c8bc9e1c8

View File

@@ -88,7 +88,7 @@ public class ListingBasedRollbackStrategy implements BaseRollbackPlanActionExecu
FSUtils.getAllPartitionPaths(context, table.getMetaClient().getBasePath(), false, false); FSUtils.getAllPartitionPaths(context, table.getMetaClient().getBasePath(), false, false);
int numPartitions = Math.max(Math.min(partitionPaths.size(), config.getRollbackParallelism()), 1); int numPartitions = Math.max(Math.min(partitionPaths.size(), config.getRollbackParallelism()), 1);
context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing Rollback Plan"); context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing Rollback Plan: " + config.getTableName());
HoodieTableType tableType = table.getMetaClient().getTableType(); HoodieTableType tableType = table.getMetaClient().getTableType();
String baseFileExtension = getBaseFileExtension(metaClient); String baseFileExtension = getBaseFileExtension(metaClient);

View File

@@ -84,7 +84,7 @@ public class SavepointActionExecutor<T extends HoodieRecordPayload, I, K, O> ext
ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.GREATER_THAN_OR_EQUALS, lastCommitRetained), ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.GREATER_THAN_OR_EQUALS, lastCommitRetained),
"Could not savepoint commit " + instantTime + " as this is beyond the lookup window " + lastCommitRetained); "Could not savepoint commit " + instantTime + " as this is beyond the lookup window " + lastCommitRetained);
context.setJobStatus(this.getClass().getSimpleName(), "Collecting latest files for savepoint " + instantTime); context.setJobStatus(this.getClass().getSimpleName(), "Collecting latest files for savepoint " + instantTime + " " + table.getConfig().getTableName());
List<String> partitions = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath()); List<String> partitions = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath());
Map<String, List<String>> latestFilesMap = context.mapToPair(partitions, partitionPath -> { Map<String, List<String>> latestFilesMap = context.mapToPair(partitions, partitionPath -> {
// Scan all partitions files with this commit time // Scan all partitions files with this commit time

View File

@@ -84,7 +84,7 @@ public abstract class WriteMarkers implements Serializable {
*/ */
public void quietDeleteMarkerDir(HoodieEngineContext context, int parallelism) { public void quietDeleteMarkerDir(HoodieEngineContext context, int parallelism) {
try { try {
context.setJobStatus(this.getClass().getSimpleName(), "Deleting marker directory"); context.setJobStatus(this.getClass().getSimpleName(), "Deleting marker directory: " + basePath);
deleteMarkerDir(context, parallelism); deleteMarkerDir(context, parallelism);
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Error deleting marker directory for instant " + instantTime, e); LOG.warn("Error deleting marker directory for instant " + instantTime, e);

View File

@@ -356,7 +356,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
HoodieCommitMetadata metadata, HoodieCommitMetadata metadata,
HoodieTable table, HoodieTable table,
String compactionCommitTime) { String compactionCommitTime) {
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction"); this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName());
List<HoodieWriteStat> writeStats = metadata.getWriteStats(); List<HoodieWriteStat> writeStats = metadata.getWriteStats();
final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime); final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime);
try { try {
@@ -508,7 +508,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
List<String> partitionPaths = List<String> partitionPaths =
FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath()); FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath());
if (partitionPaths != null && partitionPaths.size() > 0) { if (partitionPaths != null && partitionPaths.size() > 0) {
context.setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of all partitions"); context.setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of all partitions: " + config.getTableName());
partitionToExistingFileIds = partitionPaths.stream().parallel() partitionToExistingFileIds = partitionPaths.stream().parallel()
.collect( .collect(
Collectors.toMap( Collectors.toMap(

View File

@@ -230,7 +230,7 @@ public class JavaUpsertPartitioner<T extends HoodieRecordPayload<T>> implements
} }
if (partitionPaths != null && partitionPaths.size() > 0) { if (partitionPaths != null && partitionPaths.size() > 0) {
context.setJobStatus(this.getClass().getSimpleName(), "Getting small files from partitions"); context.setJobStatus(this.getClass().getSimpleName(), "Getting small files from partitions: " + config.getTableName());
partitionSmallFilesMap = context.mapToPair(partitionPaths, partitionSmallFilesMap = context.mapToPair(partitionPaths,
partitionPath -> new ImmutablePair<>(partitionPath, getSmallFiles(partitionPath)), 0); partitionPath -> new ImmutablePair<>(partitionPath, getSmallFiles(partitionPath)), 0);
} }

View File

@@ -117,7 +117,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
@Override @Override
public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata, public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>> partitionToReplacedFileIds) { String commitActionType, Map<String, List<String>> partitionToReplacedFileIds) {
context.setJobStatus(this.getClass().getSimpleName(), "Committing stats"); context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: " + config.getTableName());
List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect(); List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect();
return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds); return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds);
} }
@@ -303,7 +303,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
protected void completeCompaction(HoodieCommitMetadata metadata, protected void completeCompaction(HoodieCommitMetadata metadata,
HoodieTable table, HoodieTable table,
String compactionCommitTime) { String compactionCommitTime) {
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction"); this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName());
List<HoodieWriteStat> writeStats = metadata.getWriteStats(); List<HoodieWriteStat> writeStats = metadata.getWriteStats();
final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime); final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime);
try { try {

View File

@@ -126,7 +126,7 @@ public class SparkHoodieBloomIndexHelper extends BaseHoodieBloomIndexHelper {
if (config.getBloomIndexPruneByRanges()) { if (config.getBloomIndexPruneByRanges()) {
// we will just try exploding the input and then count to determine comparisons // we will just try exploding the input and then count to determine comparisons
// FIX(vc): Only do sampling here and extrapolate? // FIX(vc): Only do sampling here and extrapolate?
context.setJobStatus(this.getClass().getSimpleName(), "Compute all comparisons needed between records and files"); context.setJobStatus(this.getClass().getSimpleName(), "Compute all comparisons needed between records and files: " + config.getTableName());
fileToComparisons = fileComparisonsRDD.mapToPair(t -> t).countByKey(); fileToComparisons = fileComparisonsRDD.mapToPair(t -> t).countByKey();
} else { } else {
fileToComparisons = new HashMap<>(); fileToComparisons = new HashMap<>();

View File

@@ -334,7 +334,7 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
}) })
.collect(Collectors.toList()); .collect(Collectors.toList());
context.setJobStatus(this.getClass().getSimpleName(), "Bootstrap metadata table."); context.setJobStatus(this.getClass().getSimpleName(), "Bootstrap metadata table: " + config.getTableName());
return context.parallelize(bootstrapPaths, config.getBootstrapParallelism()) return context.parallelize(bootstrapPaths, config.getBootstrapParallelism())
.map(partitionFsPair -> getMetadataHandler(config, table, partitionFsPair.getRight().getRight()).runMetadataBootstrap(partitionFsPair.getLeft(), .map(partitionFsPair -> getMetadataHandler(config, table, partitionFsPair.getRight().getRight()).runMetadataBootstrap(partitionFsPair.getLeft(),
partitionFsPair.getRight().getLeft(), keyGenerator)); partitionFsPair.getRight().getLeft(), keyGenerator));

View File

@@ -113,7 +113,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
} }
private HoodieData<HoodieRecord<T>> clusteringHandleUpdate(HoodieData<HoodieRecord<T>> inputRecords, Set<HoodieFileGroupId> fileGroupsInPendingClustering) { private HoodieData<HoodieRecord<T>> clusteringHandleUpdate(HoodieData<HoodieRecord<T>> inputRecords, Set<HoodieFileGroupId> fileGroupsInPendingClustering) {
context.setJobStatus(this.getClass().getSimpleName(), "Handling updates which are under clustering"); context.setJobStatus(this.getClass().getSimpleName(), "Handling updates which are under clustering: " + config.getTableName());
UpdateStrategy<T, HoodieData<HoodieRecord<T>>> updateStrategy = (UpdateStrategy<T, HoodieData<HoodieRecord<T>>>) ReflectionUtils UpdateStrategy<T, HoodieData<HoodieRecord<T>>> updateStrategy = (UpdateStrategy<T, HoodieData<HoodieRecord<T>>>) ReflectionUtils
.loadClass(config.getClusteringUpdatesStrategyClass(), this.context, fileGroupsInPendingClustering); .loadClass(config.getClusteringUpdatesStrategyClass(), this.context, fileGroupsInPendingClustering);
Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>> recordsAndPendingClusteringFileGroups = Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>> recordsAndPendingClusteringFileGroups =
@@ -152,7 +152,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
WorkloadProfile workloadProfile = null; WorkloadProfile workloadProfile = null;
if (isWorkloadProfileNeeded()) { if (isWorkloadProfileNeeded()) {
context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile"); context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile: " + config.getTableName());
workloadProfile = new WorkloadProfile(buildProfile(inputRecords), operationType, table.getIndex().canIndexLogFiles()); workloadProfile = new WorkloadProfile(buildProfile(inputRecords), operationType, table.getIndex().canIndexLogFiles());
LOG.info("Input workload profile :" + workloadProfile); LOG.info("Input workload profile :" + workloadProfile);
} }
@@ -168,7 +168,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet()); table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet());
HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate = fileGroupsInPendingClustering.isEmpty() ? inputRecords : clusteringHandleUpdate(inputRecords, fileGroupsInPendingClustering); HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate = fileGroupsInPendingClustering.isEmpty() ? inputRecords : clusteringHandleUpdate(inputRecords, fileGroupsInPendingClustering);
context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data"); context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data: " + config.getTableName());
HoodieData<WriteStatus> writeStatuses = mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner); HoodieData<WriteStatus> writeStatuses = mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);
HoodieWriteMetadata<HoodieData<WriteStatus>> result = new HoodieWriteMetadata<>(); HoodieWriteMetadata<HoodieData<WriteStatus>> result = new HoodieWriteMetadata<>();
updateIndexAndCommitIfNeeded(writeStatuses, result); updateIndexAndCommitIfNeeded(writeStatuses, result);
@@ -280,7 +280,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
@Override @Override
protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<HoodieData<WriteStatus>> result) { protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
context.setJobStatus(this.getClass().getSimpleName(), "Commit write status collect"); context.setJobStatus(this.getClass().getSimpleName(), "Commit write status collect: " + config.getTableName());
commit(extraMetadata, result, result.getWriteStatuses().map(WriteStatus::getStat).collectAsList()); commit(extraMetadata, result, result.getWriteStatuses().map(WriteStatus::getStat).collectAsList());
} }

View File

@@ -266,7 +266,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends SparkHo
} }
if (partitionPaths != null && partitionPaths.size() > 0) { if (partitionPaths != null && partitionPaths.size() > 0) {
context.setJobStatus(this.getClass().getSimpleName(), "Getting small files from partitions"); context.setJobStatus(this.getClass().getSimpleName(), "Getting small files from partitions: " + config.getTableName());
JavaRDD<String> partitionPathRdds = jsc.parallelize(partitionPaths, partitionPaths.size()); JavaRDD<String> partitionPathRdds = jsc.parallelize(partitionPaths, partitionPaths.size());
partitionSmallFilesMap = partitionPathRdds.mapToPair((PairFunction<String, String, List<SmallFile>>) partitionSmallFilesMap = partitionPathRdds.mapToPair((PairFunction<String, String, List<SmallFile>>)
partitionPath -> new Tuple2<>(partitionPath, getSmallFiles(partitionPath))).collectAsMap(); partitionPath -> new Tuple2<>(partitionPath, getSmallFiles(partitionPath))).collectAsMap();

View File

@@ -174,7 +174,7 @@ public class HDFSParquetImporter implements Serializable {
ParquetInputFormat.setReadSupportClass(job, (AvroReadSupport.class)); ParquetInputFormat.setReadSupportClass(job, (AvroReadSupport.class));
HoodieEngineContext context = new HoodieSparkEngineContext(jsc); HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
context.setJobStatus(this.getClass().getSimpleName(), "Build records for import"); context.setJobStatus(this.getClass().getSimpleName(), "Build records for import: " + cfg.tableName);
return jsc.newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class, return jsc.newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class,
job.getConfiguration()) job.getConfiguration())
// To reduce large number of tasks. // To reduce large number of tasks.

View File

@@ -107,7 +107,7 @@ public class HoodieSnapshotCopier implements Serializable {
fs.delete(new Path(outputDir), true); fs.delete(new Path(outputDir), true);
} }
context.setJobStatus(this.getClass().getSimpleName(), "Creating a snapshot"); context.setJobStatus(this.getClass().getSimpleName(), "Creating a snapshot: " + baseDir);
List<Tuple2<String, String>> filesToCopy = context.flatMap(partitions, partition -> { List<Tuple2<String, String>> filesToCopy = context.flatMap(partitions, partition -> {
// Only take latest version files <= latestCommit. // Only take latest version files <= latestCommit.

View File

@@ -177,7 +177,7 @@ public class HoodieSnapshotExporter {
: ReflectionUtils.loadClass(cfg.outputPartitioner); : ReflectionUtils.loadClass(cfg.outputPartitioner);
HoodieEngineContext context = new HoodieSparkEngineContext(jsc); HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
context.setJobStatus(this.getClass().getSimpleName(), "Exporting as non-HUDI dataset"); context.setJobStatus(this.getClass().getSimpleName(), "Exporting as non-HUDI dataset: " + cfg.targetOutputPath);
final BaseFileOnlyView fsView = getBaseFileOnlyView(jsc, cfg); final BaseFileOnlyView fsView = getBaseFileOnlyView(jsc, cfg);
Iterator<String> exportingFilePaths = jsc Iterator<String> exportingFilePaths = jsc
.parallelize(partitions, partitions.size()) .parallelize(partitions, partitions.size())