1
0

[HUDI-3429] Support clustering scheduleAndExecute for hudi-cli and add clustering-cli Tests (#4817)

Co-authored-by: yuezhang <yuezhang@freewheel.tv>
This commit is contained in:
YueZhang
2022-02-25 12:28:38 +08:00
committed by GitHub
parent aa1810d737
commit 3694485609
3 changed files with 253 additions and 5 deletions

View File

@@ -116,4 +116,40 @@ public class ClusteringCommand implements CommandMarker {
} }
return "Succeeded to run clustering for " + clusteringInstantTime; return "Succeeded to run clustering for " + clusteringInstantTime;
} }
/**
* Run clustering table service.
* <p>
* Example:
* > connect --path {path to hudi table}
* > clustering scheduleAndExecute --sparkMaster local --sparkMemory 2g
*/
@CliCommand(value = "clustering scheduleAndExecute", help = "Run Clustering. Make a cluster plan first and execute that plan immediately")
public String runClustering(
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = SparkUtil.DEFAULT_SPARK_MASTER, help = "Spark master") final String master,
@CliOption(key = "sparkMemory", help = "Spark executor memory", unspecifiedDefaultValue = "4g") final String sparkMemory,
@CliOption(key = "parallelism", help = "Parallelism for hoodie clustering", unspecifiedDefaultValue = "1") final String parallelism,
@CliOption(key = "retry", help = "Number of retries", unspecifiedDefaultValue = "1") final String retry,
@CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for "
+ "hoodie client for compacting", unspecifiedDefaultValue = "") final String propsFilePath,
@CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be "
+ "passed here in the form of an array", unspecifiedDefaultValue = "") final String[] configs) throws Exception {
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);
String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.CLUSTERING_SCHEDULE_AND_EXECUTE.toString(), master, sparkMemory,
client.getBasePath(), client.getTableConfig().getTableName(), parallelism, retry, propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to run clustering for scheduleAndExecute.";
}
return "Succeeded to run clustering for scheduleAndExecute";
}
} }

View File

@@ -76,7 +76,7 @@ public class SparkMain {
enum SparkCommand { enum SparkCommand {
BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, COMPACT_SCHEDULE_AND_EXECUTE, BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, COMPACT_SCHEDULE_AND_EXECUTE,
COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLUSTERING_SCHEDULE, COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLUSTERING_SCHEDULE,
CLUSTERING_RUN, CLEAN, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE CLUSTERING_RUN, CLUSTERING_SCHEDULE_AND_EXECUTE, CLEAN, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
@@ -190,7 +190,20 @@ public class SparkMain {
configs.addAll(Arrays.asList(args).subList(9, args.length)); configs.addAll(Arrays.asList(args).subList(9, args.length));
} }
returnCode = cluster(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), args[2], returnCode = cluster(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), args[2],
Integer.parseInt(args[7]), false, propsFilePath, configs); Integer.parseInt(args[7]), HoodieClusteringJob.EXECUTE, propsFilePath, configs);
break;
case CLUSTERING_SCHEDULE_AND_EXECUTE:
assert (args.length >= 8);
propsFilePath = null;
if (!StringUtils.isNullOrEmpty(args[7])) {
propsFilePath = args[7];
}
configs = new ArrayList<>();
if (args.length > 8) {
configs.addAll(Arrays.asList(args).subList(8, args.length));
}
returnCode = cluster(jsc, args[3], args[4], null, Integer.parseInt(args[5]), args[2],
Integer.parseInt(args[6]), HoodieClusteringJob.SCHEDULE_AND_EXECUTE, propsFilePath, configs);
break; break;
case CLUSTERING_SCHEDULE: case CLUSTERING_SCHEDULE:
assert (args.length >= 7); assert (args.length >= 7);
@@ -203,7 +216,7 @@ public class SparkMain {
configs.addAll(Arrays.asList(args).subList(7, args.length)); configs.addAll(Arrays.asList(args).subList(7, args.length));
} }
returnCode = cluster(jsc, args[3], args[4], args[5], 1, args[2], returnCode = cluster(jsc, args[3], args[4], args[5], 1, args[2],
0, true, propsFilePath, configs); 0, HoodieClusteringJob.SCHEDULE, propsFilePath, configs);
break; break;
case CLEAN: case CLEAN:
assert (args.length >= 5); assert (args.length >= 5);
@@ -351,13 +364,13 @@ public class SparkMain {
} }
private static int cluster(JavaSparkContext jsc, String basePath, String tableName, String clusteringInstant, private static int cluster(JavaSparkContext jsc, String basePath, String tableName, String clusteringInstant,
int parallelism, String sparkMemory, int retry, boolean schedule, String propsFilePath, List<String> configs) { int parallelism, String sparkMemory, int retry, String runningMode, String propsFilePath, List<String> configs) {
HoodieClusteringJob.Config cfg = new HoodieClusteringJob.Config(); HoodieClusteringJob.Config cfg = new HoodieClusteringJob.Config();
cfg.basePath = basePath; cfg.basePath = basePath;
cfg.tableName = tableName; cfg.tableName = tableName;
cfg.clusteringInstantTime = clusteringInstant; cfg.clusteringInstantTime = clusteringInstant;
cfg.parallelism = parallelism; cfg.parallelism = parallelism;
cfg.runSchedule = schedule; cfg.runningMode = runningMode;
cfg.propsFilePath = propsFilePath; cfg.propsFilePath = propsFilePath;
cfg.configs = configs; cfg.configs = configs;
jsc.getConf().set("spark.executor.memory", sparkMemory); jsc.getConf().set("spark.executor.memory", sparkMemory);

View File

@@ -0,0 +1,199 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.cli.integ;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.commands.TableCommand;
import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.shell.core.CommandResult;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.List;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Integration test class for {@link org.apache.hudi.cli.commands.ClusteringCommand}.
* <p/>
* A command use SparkLauncher need load jars under lib which generate during mvn package.
* Use integration test instead of unit test.
*/
public class ITTestClusteringCommand extends AbstractShellIntegrationTest {
private String tablePath;
private String tableName;
@BeforeEach
public void init() throws IOException {
tableName = "test_table_" + ITTestClusteringCommand.class.getName();
tablePath = Paths.get(basePath, tableName).toString();
HoodieCLI.conf = jsc.hadoopConfiguration();
// Create table and connect
new TableCommand().createTable(
tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
metaClient.setBasePath(tablePath);
metaClient = HoodieTableMetaClient.reload(metaClient);
}
/**
* Test case for command 'clustering schedule'.
*/
@Test
public void testScheduleClustering() throws IOException {
// generate commits
generateCommits();
CommandResult cr = scheduleClustering();
assertAll("Command run failed",
() -> assertTrue(cr.isSuccess()),
() -> assertTrue(
cr.getResult().toString().startsWith("Succeeded to schedule clustering for")));
// there is 1 requested clustering
HoodieActiveTimeline timeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
assertEquals(1, timeline.filterPendingReplaceTimeline().countInstants());
}
/**
* Test case for command 'clustering run'.
*/
@Test
public void testClustering() throws IOException {
// generate commits
generateCommits();
CommandResult cr1 = scheduleClustering();
assertTrue(cr1.isSuccess());
// get clustering instance
HoodieActiveTimeline timeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
Option<String> instance =
timeline.filterPendingReplaceTimeline().firstInstant().map(HoodieInstant::getTimestamp);
assertTrue(instance.isPresent(), "Must have pending clustering.");
CommandResult cr2 = getShell().executeCommand(
String.format("clustering run --parallelism %s --clusteringInstant %s --sparkMaster %s",
2, instance, "local"));
assertAll("Command run failed",
() -> assertTrue(cr2.isSuccess()),
() -> assertTrue(
cr2.getResult().toString().startsWith("Succeeded to run clustering for ")));
// assert clustering complete
assertTrue(HoodieCLI.getTableMetaClient().getActiveTimeline().reload()
.filterCompletedInstants().getInstants()
.map(HoodieInstant::getTimestamp).collect(Collectors.toList()).contains(instance),
"Pending clustering must be completed");
assertTrue(HoodieCLI.getTableMetaClient().getActiveTimeline().reload()
.getCompletedReplaceTimeline().getInstants()
.map(HoodieInstant::getTimestamp).collect(Collectors.toList()).contains(instance),
"Pending clustering must be completed");
}
/**
* Test case for command 'clustering scheduleAndExecute'.
*/
@Test
public void testClusteringScheduleAndExecute() throws IOException {
// generate commits
generateCommits();
CommandResult cr2 = getShell().executeCommand(
String.format("clustering scheduleAndExecute --parallelism %s --sparkMaster %s", 2, "local"));
assertAll("Command run failed",
() -> assertTrue(cr2.isSuccess()),
() -> assertTrue(
cr2.getResult().toString().startsWith("Succeeded to run clustering for scheduleAndExecute")));
// assert clustering complete
assertTrue(HoodieCLI.getTableMetaClient().getActiveTimeline().reload()
.getCompletedReplaceTimeline().getInstants()
.map(HoodieInstant::getTimestamp).count() > 0,
"Completed clustering couldn't be 0");
}
private CommandResult scheduleClustering() {
// generate requested clustering
return getShell().executeCommand(
String.format("clustering schedule --hoodieConfigs hoodie.clustering.inline.max.commits=1 --sparkMaster %s", "local"));
}
private void generateCommits() throws IOException {
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
// Create the write client to write some records in
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withDeleteParallelism(2).forTable(tableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
SparkRDDWriteClient<HoodieAvroPayload> client = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), cfg);
insert(jsc, client, dataGen, "001");
insert(jsc, client, dataGen, "002");
}
private List<HoodieRecord> insert(JavaSparkContext jsc, SparkRDDWriteClient<HoodieAvroPayload> client,
HoodieTestDataGenerator dataGen, String newCommitTime) throws IOException {
// inserts
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
operateFunc(SparkRDDWriteClient::insert, client, writeRecords, newCommitTime);
return records;
}
private JavaRDD<WriteStatus> operateFunc(
HoodieClientTestBase.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
SparkRDDWriteClient<HoodieAvroPayload> client, JavaRDD<HoodieRecord> writeRecords, String commitTime)
throws IOException {
return writeFn.apply(client, writeRecords, commitTime);
}
}