diff --git a/README.md b/README.md index 66b9138ae..542ff2e51 100644 --- a/README.md +++ b/README.md @@ -92,6 +92,18 @@ spark-2.4.4-bin-hadoop2.7/bin/spark-shell \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' ``` +## Running Tests + +All tests can be run with maven +``` +mvn test +``` + +To run tests with spark event logging enabled, define the Spark event log directory. This allows visualizing test DAG and stages using Spark History Server UI. +``` +mvn test -DSPARK_EVLOG_DIR=/path/for/spark/event/log +``` + ## Quickstart Please visit [https://hudi.apache.org/docs/quick-start-guide.html](https://hudi.apache.org/docs/quick-start-guide.html) to quickly explore Hudi's capabilities using spark-shell. diff --git a/hudi-client/src/main/java/org/apache/hudi/client/CompactionAdminClient.java b/hudi-client/src/main/java/org/apache/hudi/client/CompactionAdminClient.java index 627348cb8..48042e72e 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/CompactionAdminClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/CompactionAdminClient.java @@ -85,6 +85,7 @@ public class CompactionAdminClient extends AbstractHoodieClient { if (plan.getOperations() != null) { List ops = plan.getOperations().stream() .map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList()); + jsc.setJobGroup(this.getClass().getSimpleName(), "Validate compaction operations"); return jsc.parallelize(ops, parallelism).map(op -> { try { return validateCompactionOperation(metaClient, compactionInstant, op, Option.of(fsView)); @@ -350,6 +351,7 @@ public class CompactionAdminClient extends AbstractHoodieClient { } else { LOG.info("The following compaction renaming operations needs to be performed to un-schedule"); if (!dryRun) { + jsc.setJobGroup(this.getClass().getSimpleName(), "Execute unschedule operations"); return jsc.parallelize(renameActions, parallelism).map(lfPair -> { try { LOG.info("RENAME " + lfPair.getLeft().getPath() + " => " + lfPair.getRight().getPath()); @@ -392,6 +394,7 @@ public class CompactionAdminClient extends AbstractHoodieClient { "Number of Compaction Operations :" + plan.getOperations().size() + " for instant :" + compactionInstant); List ops = plan.getOperations().stream() .map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList()); + jsc.setJobGroup(this.getClass().getSimpleName(), "Generate compaction unscheduling operations"); return jsc.parallelize(ops, parallelism).flatMap(op -> { try { return getRenamingActionsForUnschedulingCompactionOperation(metaClient, compactionInstant, op, diff --git a/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java index 806dcf515..ebd30b3ba 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -49,6 +49,7 @@ public class HoodieIndexUtils { public static List> getLatestBaseFilesForAllPartitions(final List partitions, final JavaSparkContext jsc, final HoodieTable hoodieTable) { + jsc.setJobGroup(HoodieIndexUtils.class.getSimpleName(), "Load latest base files from all partitions"); return jsc.parallelize(partitions, Math.max(partitions.size(), 1)) .flatMap(partitionPath -> { Option latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline() diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java index ca960ef88..e03f38ac0 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java @@ -199,6 +199,7 @@ public class HoodieBloomIndex extends HoodieIndex if (config.getBloomIndexPruneByRanges()) { // also obtain file ranges, if range pruning is enabled + jsc.setJobDescription("Obtain key ranges for file slices (range pruning=on)"); return jsc.parallelize(partitionPathFileIDList, Math.max(partitionPathFileIDList.size(), 1)).mapToPair(pf -> { try { HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf); diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java index 49d885889..b545eae3e 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -448,6 +448,7 @@ public abstract class HoodieTable implements Seri } // Now delete partially written files + jsc.setJobGroup(this.getClass().getSimpleName(), "Delete all partially written files"); jsc.parallelize(new ArrayList<>(groupByPartition.values()), config.getFinalizeWriteParallelism()) .map(partitionWithFileList -> { final FileSystem fileSystem = metaClient.getFs(); @@ -489,6 +490,7 @@ public abstract class HoodieTable implements Seri */ private void waitForAllFiles(JavaSparkContext jsc, Map>> groupByPartition, FileVisibility visibility) { // This will either ensure all files to be deleted are present. + jsc.setJobGroup(this.getClass().getSimpleName(), "Wait for all files to appear/disappear"); boolean checkPassed = jsc.parallelize(new ArrayList<>(groupByPartition.entrySet()), config.getFinalizeWriteParallelism()) .map(partitionWithFileList -> waitForCondition(partitionWithFileList.getKey(), diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java index 57feebc51..c72a453ac 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -81,6 +81,7 @@ public class CleanActionExecutor extends BaseActionExecutor int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism()); LOG.info("Using cleanerParallelism: " + cleanerParallelism); + jsc.setJobGroup(this.getClass().getSimpleName(), "Generates list of file slices to be cleaned"); Map> cleanOps = jsc .parallelize(partitionsToClean, cleanerParallelism) .map(partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean))) @@ -147,6 +148,8 @@ public class CleanActionExecutor extends BaseActionExecutor (int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()), config.getCleanerParallelism()); LOG.info("Using cleanerParallelism: " + cleanerParallelism); + + jsc.setJobGroup(this.getClass().getSimpleName(), "Perform cleaning of partitions"); List> partitionCleanStats = jsc .parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream() .flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(), y))) diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index a59871052..8857fc38e 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -210,6 +210,7 @@ public class UpsertPartitioner> extends Partiti Map> partitionSmallFilesMap = new HashMap<>(); if (partitionPaths != null && partitionPaths.size() > 0) { + jsc.setJobGroup(this.getClass().getSimpleName(), "Getting small files from partitions"); JavaRDD partitionPathRdds = jsc.parallelize(partitionPaths, partitionPaths.size()); partitionSmallFilesMap = partitionPathRdds.mapToPair((PairFunction>) partitionPath -> new Tuple2<>(partitionPath, getSmallFiles(partitionPath))).collectAsMap(); diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java index a2ce958d6..80afcacfa 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java @@ -94,6 +94,7 @@ public class HoodieMergeOnReadTableCompactor implements HoodieCompactor { .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); LOG.info("Compactor compacting " + operations + " files"); + jsc.setJobGroup(this.getClass().getSimpleName(), "Compacting file slices"); return jsc.parallelize(operations, operations.size()) .map(s -> compact(table, metaClient, config, s, compactionInstantTime)).flatMap(List::iterator); } @@ -192,6 +193,7 @@ public class HoodieMergeOnReadTableCompactor implements HoodieCompactor { SliceView fileSystemView = hoodieTable.getSliceView(); LOG.info("Compaction looking for files to compact in " + partitionPaths + " partitions"); + jsc.setJobGroup(this.getClass().getSimpleName(), "Looking for files to compact"); List operations = jsc.parallelize(partitionPaths, partitionPaths.size()) .flatMap((FlatMapFunction) partitionPath -> fileSystemView .getLatestFileSlices(partitionPath) diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java index d30f0e2ff..481fb2d7b 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java @@ -118,6 +118,7 @@ public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecuto List partitions = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning()); int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1); + jsc.setJobGroup(this.getClass().getSimpleName(), "Generate all rollback requests"); return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions)).flatMap(partitionPath -> { HoodieActiveTimeline activeTimeline = table.getMetaClient().reloadActiveTimeline(); List partitionRollbackRequests = new ArrayList<>(); diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java index 4a9c20a2d..f5b92e760 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java @@ -85,6 +85,7 @@ public class RollbackHelper implements Serializable { }; int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1); + jsc.setJobGroup(this.getClass().getSimpleName(), "Perform rollback actions"); return jsc.parallelize(rollbackRequests, sparkPartitions).mapToPair(rollbackRequest -> { final Map filesToDeletedStatus = new HashMap<>(); switch (rollbackRequest.getRollbackAction()) { diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java index 5fee8a6d8..ac95118d9 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java @@ -87,6 +87,7 @@ public class SavepointActionExecutor extends BaseActionExecutor> latestFilesMap = jsc.parallelize(FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning())) .mapToPair(partitionPath -> { diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java index 2e91a84f6..719f042e6 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java @@ -86,7 +86,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { @BeforeEach public void setUp() throws Exception { - initSparkContexts("TestHoodieBloomIndex"); + initSparkContexts(); initPath(); initFileSystem(); // We have some records to be tagged (two different partitions) diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java index 6aab654c5..bb227ae92 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java @@ -71,7 +71,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { @BeforeEach public void setUp() throws Exception { - initSparkContexts("TestHoodieGlobalBloomIndex"); + initSparkContexts(); initPath(); // We have some records to be tagged (two different partitions) String schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt")); diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java index 9cd3b3f26..127cface3 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java @@ -60,7 +60,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness { public void init() throws Exception { initDFS(); initPath(); - initSparkContexts("TestHoodieCommitArchiveLog"); + initSparkContexts(); hadoopConf = dfs.getConf(); hadoopConf.addResource(dfs.getConf()); dfs.mkdirs(new Path(basePath)); diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java index e06f9b360..18744ef5d 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java @@ -57,7 +57,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness { @BeforeEach public void setUp() throws Exception { - initSparkContexts("TestHoodieMergeHandle"); + initSparkContexts(); initPath(); initFileSystem(); initTestDataGenerator(); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java index 7fa64a591..6819cc586 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java @@ -66,7 +66,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { @BeforeEach public void setUp() throws Exception { // Initialize a local spark env - initSparkContexts("TestHoodieCompactor"); + initSparkContexts(); // Create a temp folder as the base path initPath(); diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index 69e177601..a3b534cf5 100644 --- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -41,6 +41,8 @@ import org.apache.hudi.table.HoodieTable; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +58,8 @@ import java.util.concurrent.atomic.AtomicInteger; public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(HoodieClientTestHarness.class); - + + private String testMethodName; protected transient JavaSparkContext jsc = null; protected transient Configuration hadoopConf = null; protected transient SQLContext sqlContext; @@ -82,6 +85,15 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im protected transient MiniDFSCluster dfsCluster; protected transient DistributedFileSystem dfs; + @BeforeEach + public void setTestMethodName(TestInfo testInfo) { + if (testInfo.getTestMethod().isPresent()) { + testMethodName = testInfo.getTestMethod().get().getName(); + } else { + testMethodName = "Unknown"; + } + } + /** * Initializes resource group for the subclasses of {@link HoodieClientTestBase}. */ @@ -113,7 +125,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im */ protected void initSparkContexts(String appName) { // Initialize a local spark env - jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName)); + jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName + "#" + testMethodName)); jsc.setLogLevel("ERROR"); hadoopConf = jsc.hadoopConfiguration(); @@ -122,11 +134,11 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im } /** - * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with a default name - * TestHoodieClient. + * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) + * with a default name matching the name of the class. */ protected void initSparkContexts() { - initSparkContexts("TestHoodieClient"); + initSparkContexts(this.getClass().getSimpleName()); } /** diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java index 1bb827510..5c57c25ea 100644 --- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java @@ -146,9 +146,30 @@ public class HoodieClientTestUtils { new RandomAccessFile(path, "rw").setLength(length); } + /** + * Returns a Spark config for this test. + * + * The following properties may be set to customize the Spark context: + * SPARK_EVLOG_DIR: Local directory where event logs should be saved. This + * allows viewing the logs with spark-history-server. + * + * @note When running the tests using maven, use the following syntax to set + * a property: + * mvn -DSPARK_XXX=yyy ... + * + * @param appName A name for the Spark application. Shown in the Spark web UI. + * @return A Spark config + */ public static SparkConf getSparkConfForTest(String appName) { SparkConf sparkConf = new SparkConf().setAppName(appName) .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[8]"); + + String evlogDir = System.getProperty("SPARK_EVLOG_DIR"); + if (evlogDir != null) { + sparkConf.set("spark.eventLog.enabled", "true"); + sparkConf.set("spark.eventLog.dir", evlogDir); + } + return HoodieReadClient.addHoodieSupport(sparkConf); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java index 4befaec88..e0cfba067 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java @@ -166,6 +166,7 @@ public class HDFSParquetImporter implements Serializable { AvroReadSupport.setAvroReadSchema(jsc.hadoopConfiguration(), (new Schema.Parser().parse(schemaStr))); ParquetInputFormat.setReadSupportClass(job, (AvroReadSupport.class)); + jsc.setJobGroup(this.getClass().getSimpleName(), "Build records for import"); return jsc.newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class, job.getConfiguration()) // To reduce large number of tasks. diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java index 651cbbf0e..916d01905 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java @@ -97,6 +97,7 @@ public class HoodieSnapshotCopier implements Serializable { fs.delete(new Path(outputDir), true); } + jsc.setJobGroup(this.getClass().getSimpleName(), "Creating a snapshot"); jsc.parallelize(partitions, partitions.size()).flatMap(partition -> { // Only take latest version files <= latestCommit. FileSystem fs1 = FSUtils.getFs(baseDir, serConf.newCopy()); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java index 7e21b4ebf..0743839ff 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java @@ -175,6 +175,7 @@ public class HoodieSnapshotExporter { ? defaultPartitioner : ReflectionUtils.loadClass(cfg.outputPartitioner); + jsc.setJobGroup(this.getClass().getSimpleName(), "Exporting as non-HUDI dataset"); final BaseFileOnlyView fsView = getBaseFileOnlyView(jsc, cfg); Iterator exportingFilePaths = jsc .parallelize(partitions, partitions.size()) @@ -193,6 +194,7 @@ public class HoodieSnapshotExporter { private void exportAsHudi(JavaSparkContext jsc, Config cfg, List partitions, String latestCommitTimestamp) throws IOException { final BaseFileOnlyView fsView = getBaseFileOnlyView(jsc, cfg); final SerializableConfiguration serConf = new SerializableConfiguration(jsc.hadoopConfiguration()); + jsc.setJobGroup(this.getClass().getSimpleName(), "Exporting as HUDI dataset"); jsc.parallelize(partitions, partitions.size()).flatMap(partition -> { // Only take latest version files <= latestCommit. List> filePaths = new ArrayList<>(); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieWithTimelineServer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieWithTimelineServer.java index 06fdda309..06c1084c9 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieWithTimelineServer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieWithTimelineServer.java @@ -86,6 +86,7 @@ public class HoodieWithTimelineServer implements Serializable { System.out.println("Driver Hostname is :" + driverHost); List messages = new ArrayList<>(); IntStream.range(0, cfg.numPartitions).forEach(i -> messages.add("Hello World")); + jsc.setJobGroup(this.getClass().getSimpleName(), "Sending requests to driver host"); List gotMessages = jsc.parallelize(messages).map(msg -> sendRequest(driverHost, cfg.serverPort)).collect(); System.out.println("Got Messages :" + gotMessages); ValidationUtils.checkArgument(gotMessages.equals(messages), "Got expected reply from Server"); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java index a6692cc5b..6aaa6bd26 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java @@ -132,6 +132,7 @@ public class TimelineServerPerf implements Serializable { public List runLookups(JavaSparkContext jsc, List partitionPaths, SyncableFileSystemView fsView, int numIterations, int concurrency) { + jsc.setJobGroup(this.getClass().getSimpleName(), "Lookup all performance stats"); return jsc.parallelize(partitionPaths, cfg.numExecutors).flatMap(p -> { ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(100); final List result = new ArrayList<>(); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java index c8aa3e89e..a2f7df90f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java @@ -56,6 +56,7 @@ public class AvroDFSSource extends AvroSource { } private JavaRDD fromFiles(String pathStr) { + sparkContext.setJobGroup(this.getClass().getSimpleName(), "Fetch Avro data from files"); JavaPairRDD avroRDD = sparkContext.newAPIHadoopFile(pathStr, AvroKeyInputFormat.class, AvroKey.class, NullWritable.class, sparkContext.hadoopConfiguration()); return avroRDD.keys().map(r -> ((GenericRecord) r.datum())); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java index a8bea98e2..585319082 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java @@ -128,6 +128,7 @@ public class HiveIncrPullSource extends AvroSource { String pathStr = commitDeltaFiles.stream().map(f -> f.getPath().toString()).collect(Collectors.joining(",")); JavaPairRDD avroRDD = sparkContext.newAPIHadoopFile(pathStr, AvroKeyInputFormat.class, AvroKey.class, NullWritable.class, sparkContext.hadoopConfiguration()); + sparkContext.setJobGroup(this.getClass().getSimpleName(), "Fetch new data"); return new InputBatch<>(Option.of(avroRDD.keys().map(r -> ((GenericRecord) r.datum()))), String.valueOf(commitToPull.get())); } catch (IOException ioe) {