1
0

[HUDI-92] Provide reasonable names for Spark DAG stages in HUDI. (#1289)

This commit is contained in:
Prashant Wason
2020-07-19 10:29:25 -07:00
committed by GitHub
parent 1aae437257
commit b71f25f210
25 changed files with 79 additions and 10 deletions

View File

@@ -85,6 +85,7 @@ public class CompactionAdminClient extends AbstractHoodieClient {
if (plan.getOperations() != null) {
List<CompactionOperation> ops = plan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
jsc.setJobGroup(this.getClass().getSimpleName(), "Validate compaction operations");
return jsc.parallelize(ops, parallelism).map(op -> {
try {
return validateCompactionOperation(metaClient, compactionInstant, op, Option.of(fsView));
@@ -350,6 +351,7 @@ public class CompactionAdminClient extends AbstractHoodieClient {
} else {
LOG.info("The following compaction renaming operations needs to be performed to un-schedule");
if (!dryRun) {
jsc.setJobGroup(this.getClass().getSimpleName(), "Execute unschedule operations");
return jsc.parallelize(renameActions, parallelism).map(lfPair -> {
try {
LOG.info("RENAME " + lfPair.getLeft().getPath() + " => " + lfPair.getRight().getPath());
@@ -392,6 +394,7 @@ public class CompactionAdminClient extends AbstractHoodieClient {
"Number of Compaction Operations :" + plan.getOperations().size() + " for instant :" + compactionInstant);
List<CompactionOperation> ops = plan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
jsc.setJobGroup(this.getClass().getSimpleName(), "Generate compaction unscheduling operations");
return jsc.parallelize(ops, parallelism).flatMap(op -> {
try {
return getRenamingActionsForUnschedulingCompactionOperation(metaClient, compactionInstant, op,

View File

@@ -49,6 +49,7 @@ public class HoodieIndexUtils {
public static List<Pair<String, HoodieBaseFile>> getLatestBaseFilesForAllPartitions(final List<String> partitions,
final JavaSparkContext jsc,
final HoodieTable hoodieTable) {
jsc.setJobGroup(HoodieIndexUtils.class.getSimpleName(), "Load latest base files from all partitions");
return jsc.parallelize(partitions, Math.max(partitions.size(), 1))
.flatMap(partitionPath -> {
Option<HoodieInstant> latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline()

View File

@@ -199,6 +199,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
if (config.getBloomIndexPruneByRanges()) {
// also obtain file ranges, if range pruning is enabled
jsc.setJobDescription("Obtain key ranges for file slices (range pruning=on)");
return jsc.parallelize(partitionPathFileIDList, Math.max(partitionPathFileIDList.size(), 1)).mapToPair(pf -> {
try {
HoodieRangeInfoHandle<T> rangeInfoHandle = new HoodieRangeInfoHandle<T>(config, hoodieTable, pf);

View File

@@ -448,6 +448,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
}
// Now delete partially written files
jsc.setJobGroup(this.getClass().getSimpleName(), "Delete all partially written files");
jsc.parallelize(new ArrayList<>(groupByPartition.values()), config.getFinalizeWriteParallelism())
.map(partitionWithFileList -> {
final FileSystem fileSystem = metaClient.getFs();
@@ -489,6 +490,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
*/
private void waitForAllFiles(JavaSparkContext jsc, Map<String, List<Pair<String, String>>> groupByPartition, FileVisibility visibility) {
// This will either ensure all files to be deleted are present.
jsc.setJobGroup(this.getClass().getSimpleName(), "Wait for all files to appear/disappear");
boolean checkPassed =
jsc.parallelize(new ArrayList<>(groupByPartition.entrySet()), config.getFinalizeWriteParallelism())
.map(partitionWithFileList -> waitForCondition(partitionWithFileList.getKey(),

View File

@@ -81,6 +81,7 @@ public class CleanActionExecutor extends BaseActionExecutor<HoodieCleanMetadata>
int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
LOG.info("Using cleanerParallelism: " + cleanerParallelism);
jsc.setJobGroup(this.getClass().getSimpleName(), "Generates list of file slices to be cleaned");
Map<String, List<String>> cleanOps = jsc
.parallelize(partitionsToClean, cleanerParallelism)
.map(partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)))
@@ -147,6 +148,8 @@ public class CleanActionExecutor extends BaseActionExecutor<HoodieCleanMetadata>
(int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()),
config.getCleanerParallelism());
LOG.info("Using cleanerParallelism: " + cleanerParallelism);
jsc.setJobGroup(this.getClass().getSimpleName(), "Perform cleaning of partitions");
List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc
.parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream()
.flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(), y)))

View File

@@ -210,6 +210,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
Map<String, List<SmallFile>> partitionSmallFilesMap = new HashMap<>();
if (partitionPaths != null && partitionPaths.size() > 0) {
jsc.setJobGroup(this.getClass().getSimpleName(), "Getting small files from partitions");
JavaRDD<String> partitionPathRdds = jsc.parallelize(partitionPaths, partitionPaths.size());
partitionSmallFilesMap = partitionPathRdds.mapToPair((PairFunction<String, String, List<SmallFile>>)
partitionPath -> new Tuple2<>(partitionPath, getSmallFiles(partitionPath))).collectAsMap();

View File

@@ -94,6 +94,7 @@ public class HoodieMergeOnReadTableCompactor implements HoodieCompactor {
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
LOG.info("Compactor compacting " + operations + " files");
jsc.setJobGroup(this.getClass().getSimpleName(), "Compacting file slices");
return jsc.parallelize(operations, operations.size())
.map(s -> compact(table, metaClient, config, s, compactionInstantTime)).flatMap(List::iterator);
}
@@ -192,6 +193,7 @@ public class HoodieMergeOnReadTableCompactor implements HoodieCompactor {
SliceView fileSystemView = hoodieTable.getSliceView();
LOG.info("Compaction looking for files to compact in " + partitionPaths + " partitions");
jsc.setJobGroup(this.getClass().getSimpleName(), "Looking for files to compact");
List<HoodieCompactionOperation> operations = jsc.parallelize(partitionPaths, partitionPaths.size())
.flatMap((FlatMapFunction<String, CompactionOperation>) partitionPath -> fileSystemView
.getLatestFileSlices(partitionPath)

View File

@@ -118,6 +118,7 @@ public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecuto
List<String> partitions = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning());
int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1);
jsc.setJobGroup(this.getClass().getSimpleName(), "Generate all rollback requests");
return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions)).flatMap(partitionPath -> {
HoodieActiveTimeline activeTimeline = table.getMetaClient().reloadActiveTimeline();
List<RollbackRequest> partitionRollbackRequests = new ArrayList<>();

View File

@@ -85,6 +85,7 @@ public class RollbackHelper implements Serializable {
};
int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
jsc.setJobGroup(this.getClass().getSimpleName(), "Perform rollback actions");
return jsc.parallelize(rollbackRequests, sparkPartitions).mapToPair(rollbackRequest -> {
final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>();
switch (rollbackRequest.getRollbackAction()) {

View File

@@ -87,6 +87,7 @@ public class SavepointActionExecutor extends BaseActionExecutor<HoodieSavepointM
ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.GREATER_THAN_OR_EQUALS, lastCommitRetained),
"Could not savepoint commit " + instantTime + " as this is beyond the lookup window " + lastCommitRetained);
jsc.setJobGroup(this.getClass().getSimpleName(), "Collecting latest files for savepoint " + instantTime);
Map<String, List<String>> latestFilesMap = jsc.parallelize(FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(),
table.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning()))
.mapToPair(partitionPath -> {

View File

@@ -86,7 +86,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
@BeforeEach
public void setUp() throws Exception {
initSparkContexts("TestHoodieBloomIndex");
initSparkContexts();
initPath();
initFileSystem();
// We have some records to be tagged (two different partitions)

View File

@@ -71,7 +71,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
@BeforeEach
public void setUp() throws Exception {
initSparkContexts("TestHoodieGlobalBloomIndex");
initSparkContexts();
initPath();
// We have some records to be tagged (two different partitions)
String schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));

View File

@@ -60,7 +60,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
public void init() throws Exception {
initDFS();
initPath();
initSparkContexts("TestHoodieCommitArchiveLog");
initSparkContexts();
hadoopConf = dfs.getConf();
hadoopConf.addResource(dfs.getConf());
dfs.mkdirs(new Path(basePath));

View File

@@ -57,7 +57,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
@BeforeEach
public void setUp() throws Exception {
initSparkContexts("TestHoodieMergeHandle");
initSparkContexts();
initPath();
initFileSystem();
initTestDataGenerator();

View File

@@ -66,7 +66,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
@BeforeEach
public void setUp() throws Exception {
// Initialize a local spark env
initSparkContexts("TestHoodieCompactor");
initSparkContexts();
// Create a temp folder as the base path
initPath();

View File

@@ -41,6 +41,8 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,7 +58,8 @@ import java.util.concurrent.atomic.AtomicInteger;
public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(HoodieClientTestHarness.class);
private String testMethodName;
protected transient JavaSparkContext jsc = null;
protected transient Configuration hadoopConf = null;
protected transient SQLContext sqlContext;
@@ -82,6 +85,15 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
protected transient MiniDFSCluster dfsCluster;
protected transient DistributedFileSystem dfs;
@BeforeEach
public void setTestMethodName(TestInfo testInfo) {
if (testInfo.getTestMethod().isPresent()) {
testMethodName = testInfo.getTestMethod().get().getName();
} else {
testMethodName = "Unknown";
}
}
/**
* Initializes resource group for the subclasses of {@link HoodieClientTestBase}.
*/
@@ -113,7 +125,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
*/
protected void initSparkContexts(String appName) {
// Initialize a local spark env
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName));
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName + "#" + testMethodName));
jsc.setLogLevel("ERROR");
hadoopConf = jsc.hadoopConfiguration();
@@ -122,11 +134,11 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
}
/**
* Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with a default name
* <b>TestHoodieClient</b>.
* Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext})
* with a default name matching the name of the class.
*/
protected void initSparkContexts() {
initSparkContexts("TestHoodieClient");
initSparkContexts(this.getClass().getSimpleName());
}
/**

View File

@@ -146,9 +146,30 @@ public class HoodieClientTestUtils {
new RandomAccessFile(path, "rw").setLength(length);
}
/**
* Returns a Spark config for this test.
*
* The following properties may be set to customize the Spark context:
* SPARK_EVLOG_DIR: Local directory where event logs should be saved. This
* allows viewing the logs with spark-history-server.
*
* @note When running the tests using maven, use the following syntax to set
* a property:
* mvn -DSPARK_XXX=yyy ...
*
* @param appName A name for the Spark application. Shown in the Spark web UI.
* @return A Spark config
*/
public static SparkConf getSparkConfForTest(String appName) {
SparkConf sparkConf = new SparkConf().setAppName(appName)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[8]");
String evlogDir = System.getProperty("SPARK_EVLOG_DIR");
if (evlogDir != null) {
sparkConf.set("spark.eventLog.enabled", "true");
sparkConf.set("spark.eventLog.dir", evlogDir);
}
return HoodieReadClient.addHoodieSupport(sparkConf);
}