[HUDI-1437] support more accurate spark JobGroup for better performance tracking (#2322)
This commit is contained in:
@@ -53,7 +53,7 @@ public abstract class AsyncCompactService extends HoodieAsyncService {
|
||||
|
||||
private final int maxConcurrentCompaction;
|
||||
private transient AbstractCompactor compactor;
|
||||
private transient HoodieEngineContext context;
|
||||
protected transient HoodieEngineContext context;
|
||||
private transient BlockingQueue<HoodieInstant> pendingCompactions = new LinkedBlockingQueue<>();
|
||||
private transient ReentrantLock queueLock = new ReentrantLock();
|
||||
private transient Condition consumed = queueLock.newCondition();
|
||||
|
||||
@@ -79,6 +79,7 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl
|
||||
@Override
|
||||
public void close() {
|
||||
stopEmbeddedServerView(true);
|
||||
this.context.setJobStatus("", "");
|
||||
}
|
||||
|
||||
private synchronized void stopEmbeddedServerView(boolean resetViewStorageConfig) {
|
||||
|
||||
@@ -674,8 +674,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
||||
*/
|
||||
protected abstract void completeCompaction(HoodieCommitMetadata metadata, O writeStatuses,
|
||||
HoodieTable<T, I, K, O> table, String compactionCommitTime);
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file to the .requested file
|
||||
*
|
||||
|
||||
@@ -68,7 +68,7 @@ public class ReplaceArchivalHelper implements Serializable {
|
||||
public static boolean deleteReplacedFileGroups(HoodieEngineContext context, HoodieTableMetaClient metaClient,
|
||||
TableFileSystemView fileSystemView,
|
||||
HoodieInstant instant, List<String> replacedPartitions) {
|
||||
|
||||
context.setJobStatus(ReplaceArchivalHelper.class.getSimpleName(), "Delete replaced file groups");
|
||||
List<Boolean> f = context.map(replacedPartitions, partition -> {
|
||||
Stream<FileSlice> fileSlices = fileSystemView.getReplacedFileGroupsBeforeOrOn(instant.getTimestamp(), partition)
|
||||
.flatMap(HoodieFileGroup::getAllRawFileSlices);
|
||||
|
||||
@@ -403,6 +403,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
|
||||
|
||||
private void deleteInvalidFilesByPartitions(HoodieEngineContext context, Map<String, List<Pair<String, String>>> invalidFilesByPartition) {
|
||||
// Now delete partially written files
|
||||
context.setJobStatus(this.getClass().getSimpleName(), "Delete invalid files generated during the write operation");
|
||||
context.map(new ArrayList<>(invalidFilesByPartition.values()), partitionWithFileList -> {
|
||||
final FileSystem fileSystem = metaClient.getFs();
|
||||
LOG.info("Deleting invalid data files=" + partitionWithFileList);
|
||||
|
||||
@@ -135,6 +135,7 @@ public class MarkerFiles implements Serializable {
|
||||
if (subDirectories.size() > 0) {
|
||||
parallelism = Math.min(subDirectories.size(), parallelism);
|
||||
SerializableConfiguration serializedConf = new SerializableConfiguration(fs.getConf());
|
||||
context.setJobStatus(this.getClass().getSimpleName(), "Obtaining marker files for all created, merged paths");
|
||||
dataFiles.addAll(context.flatMap(subDirectories, directory -> {
|
||||
Path path = new Path(directory);
|
||||
FileSystem fileSystem = path.getFileSystem(serializedConf.get());
|
||||
|
||||
@@ -31,6 +31,6 @@ public class SparkAsyncCompactService extends AsyncCompactService {
|
||||
|
||||
@Override
|
||||
protected AbstractCompactor createCompactor(AbstractHoodieWriteClient client) {
|
||||
return new HoodieSparkCompactor(client);
|
||||
return new HoodieSparkCompactor(client, this.context);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hudi.client;
|
||||
|
||||
import org.apache.hudi.client.common.HoodieEngineContext;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
@@ -33,9 +34,12 @@ import java.io.IOException;
|
||||
public class HoodieSparkCompactor<T extends HoodieRecordPayload> extends AbstractCompactor<T,
|
||||
JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
|
||||
private static final Logger LOG = LogManager.getLogger(HoodieSparkCompactor.class);
|
||||
private transient HoodieEngineContext context;
|
||||
|
||||
public HoodieSparkCompactor(AbstractHoodieWriteClient<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> compactionClient) {
|
||||
public HoodieSparkCompactor(AbstractHoodieWriteClient<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> compactionClient,
|
||||
HoodieEngineContext context) {
|
||||
super(compactionClient);
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -43,6 +47,7 @@ public class HoodieSparkCompactor<T extends HoodieRecordPayload> extends Abstrac
|
||||
LOG.info("Compactor executing compaction " + instant);
|
||||
SparkRDDWriteClient<T> writeClient = (SparkRDDWriteClient<T>)compactionClient;
|
||||
JavaRDD<WriteStatus> res = writeClient.compact(instant.getTimestamp());
|
||||
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status");
|
||||
long numWriteErrors = res.collect().stream().filter(WriteStatus::hasErrors).count();
|
||||
if (numWriteErrors != 0) {
|
||||
// We treat even a single error in compaction as fatal
|
||||
|
||||
@@ -277,6 +277,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
||||
protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD<WriteStatus> writeStatuses,
|
||||
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
|
||||
String compactionCommitTime) {
|
||||
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction");
|
||||
List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect();
|
||||
finalizeWrite(table, compactionCommitTime, writeStats);
|
||||
LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);
|
||||
|
||||
@@ -125,7 +125,7 @@ public class SparkHoodieBloomIndex<T extends HoodieRecordPayload> extends SparkH
|
||||
JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
|
||||
explodeRecordRDDWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairRDD);
|
||||
Map<String, Long> comparisonsPerFileGroup =
|
||||
computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, fileComparisonsRDD);
|
||||
computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, fileComparisonsRDD, context);
|
||||
int inputParallelism = partitionRecordKeyPairRDD.partitions().size();
|
||||
int joinParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism());
|
||||
LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: ${"
|
||||
@@ -139,11 +139,13 @@ public class SparkHoodieBloomIndex<T extends HoodieRecordPayload> extends SparkH
|
||||
*/
|
||||
private Map<String, Long> computeComparisonsPerFileGroup(final Map<String, Long> recordsPerPartition,
|
||||
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
|
||||
final JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD) {
|
||||
final JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD,
|
||||
final HoodieEngineContext context) {
|
||||
Map<String, Long> fileToComparisons;
|
||||
if (config.getBloomIndexPruneByRanges()) {
|
||||
// we will just try exploding the input and then count to determine comparisons
|
||||
// FIX(vc): Only do sampling here and extrapolate?
|
||||
context.setJobStatus(this.getClass().getSimpleName(), "Compute all comparisons needed between records and files");
|
||||
fileToComparisons = fileComparisonsRDD.mapToPair(t -> t).countByKey();
|
||||
} else {
|
||||
fileToComparisons = new HashMap<>();
|
||||
|
||||
@@ -101,6 +101,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
|
||||
|
||||
WorkloadProfile profile = null;
|
||||
if (isWorkloadProfileNeeded()) {
|
||||
context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile");
|
||||
profile = new WorkloadProfile(buildProfile(inputRecordsRDD));
|
||||
LOG.info("Workload profile :" + profile);
|
||||
saveWorkloadProfileMetadataToInflight(profile, instantTime);
|
||||
@@ -206,6 +207,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
|
||||
|
||||
@Override
|
||||
protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<JavaRDD<WriteStatus>> result) {
|
||||
context.setJobStatus(this.getClass().getSimpleName(), "Commit write status collect");
|
||||
commit(extraMetadata, result, result.getWriteStatuses().map(WriteStatus::getStat).collect());
|
||||
}
|
||||
|
||||
|
||||
@@ -76,6 +76,7 @@ public class SparkRunCompactionActionExecutor<T extends HoodieRecordPayload> ext
|
||||
JavaRDD<WriteStatus> statuses = compactor.compact(context, compactionPlan, table, config, instantTime);
|
||||
|
||||
statuses.persist(SparkMemoryUtils.getWriteStatusStorageLevel(config.getProps()));
|
||||
context.setJobStatus(this.getClass().getSimpleName(), "Preparing compaction metadata");
|
||||
List<HoodieWriteStat> updateStatusMap = statuses.map(WriteStatus::getStat).collect();
|
||||
HoodieCommitMetadata metadata = new HoodieCommitMetadata(true);
|
||||
for (HoodieWriteStat stat : updateStatusMap) {
|
||||
|
||||
@@ -52,6 +52,7 @@ public class SparkMarkerBasedRollbackStrategy<T extends HoodieRecordPayload> ext
|
||||
MarkerFiles markerFiles = new MarkerFiles(table, instantToRollback.getTimestamp());
|
||||
List<String> markerFilePaths = markerFiles.allMarkerFilePaths();
|
||||
int parallelism = Math.max(Math.min(markerFilePaths.size(), config.getRollbackParallelism()), 1);
|
||||
jsc.setJobGroup(this.getClass().getSimpleName(), "Rolling back using marker files");
|
||||
return jsc.parallelize(markerFilePaths, parallelism)
|
||||
.map(markerFilePath -> {
|
||||
String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
|
||||
|
||||
@@ -37,6 +37,6 @@ public class SparkStreamingAsyncCompactService extends AsyncCompactService {
|
||||
|
||||
@Override
|
||||
protected AbstractCompactor createCompactor(AbstractHoodieWriteClient client) {
|
||||
return new HoodieSparkCompactor(client);
|
||||
return new HoodieSparkCompactor(client, this.context);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user