[HUDI-3441] Add support for "marker delete" in hudi-cli (#4922)
This commit is contained in:
@@ -0,0 +1,58 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.cli.commands;
|
||||||
|
|
||||||
|
import org.apache.hudi.cli.HoodieCLI;
|
||||||
|
import org.apache.hudi.cli.utils.InputStreamConsumer;
|
||||||
|
import org.apache.hudi.cli.utils.SparkUtil;
|
||||||
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
|
import org.apache.spark.launcher.SparkLauncher;
|
||||||
|
import org.springframework.shell.core.CommandMarker;
|
||||||
|
import org.springframework.shell.core.annotation.CliCommand;
|
||||||
|
import org.springframework.shell.core.annotation.CliOption;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* CLI command for marker options.
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class MarkersCommand implements CommandMarker {
|
||||||
|
|
||||||
|
@CliCommand(value = "marker delete", help = "Delete the marker")
|
||||||
|
public String deleteMarker(@CliOption(key = {"commit"}, help = "Delete a marker") final String instantTime,
|
||||||
|
@CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath,
|
||||||
|
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master,
|
||||||
|
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "1G",
|
||||||
|
help = "Spark executor memory") final String sparkMemory)
|
||||||
|
throws Exception {
|
||||||
|
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
|
||||||
|
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
|
||||||
|
sparkLauncher.addAppArgs(SparkMain.SparkCommand.DELETE_MARKER.toString(), master, sparkMemory, instantTime,
|
||||||
|
metaClient.getBasePath());
|
||||||
|
Process process = sparkLauncher.launch();
|
||||||
|
InputStreamConsumer.captureOutput(process);
|
||||||
|
int exitCode = process.waitFor();
|
||||||
|
// Refresh the current
|
||||||
|
HoodieCLI.refreshTableMetadata();
|
||||||
|
if (exitCode != 0) {
|
||||||
|
return String.format("Failed: Could not delete marker \"%s\".", instantTime);
|
||||||
|
}
|
||||||
|
return String.format("Marker \"%s\" deleted.", instantTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.cli.commands;
|
package org.apache.hudi.cli.commands;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hudi.DataSourceWriteOptions;
|
import org.apache.hudi.DataSourceWriteOptions;
|
||||||
import org.apache.hudi.cli.DeDupeType;
|
import org.apache.hudi.cli.DeDupeType;
|
||||||
import org.apache.hudi.cli.DedupeSparkJob;
|
import org.apache.hudi.cli.DedupeSparkJob;
|
||||||
@@ -25,6 +26,7 @@ import org.apache.hudi.cli.utils.SparkUtil;
|
|||||||
import org.apache.hudi.client.SparkRDDWriteClient;
|
import org.apache.hudi.client.SparkRDDWriteClient;
|
||||||
import org.apache.hudi.client.common.HoodieSparkEngineContext;
|
import org.apache.hudi.client.common.HoodieSparkEngineContext;
|
||||||
import org.apache.hudi.common.config.TypedProperties;
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
|
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.common.table.HoodieTableVersion;
|
import org.apache.hudi.common.table.HoodieTableVersion;
|
||||||
@@ -38,7 +40,9 @@ import org.apache.hudi.config.HoodieWriteConfig;
|
|||||||
import org.apache.hudi.exception.HoodieSavepointException;
|
import org.apache.hudi.exception.HoodieSavepointException;
|
||||||
import org.apache.hudi.index.HoodieIndex;
|
import org.apache.hudi.index.HoodieIndex;
|
||||||
import org.apache.hudi.keygen.constant.KeyGeneratorType;
|
import org.apache.hudi.keygen.constant.KeyGeneratorType;
|
||||||
|
import org.apache.hudi.table.HoodieSparkTable;
|
||||||
import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
|
import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
|
||||||
|
import org.apache.hudi.table.marker.WriteMarkersFactory;
|
||||||
import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
|
import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
|
||||||
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
|
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
|
||||||
import org.apache.hudi.utilities.HDFSParquetImporter;
|
import org.apache.hudi.utilities.HDFSParquetImporter;
|
||||||
@@ -51,8 +55,6 @@ import org.apache.hudi.utilities.HoodieCompactor;
|
|||||||
import org.apache.hudi.utilities.UtilHelpers;
|
import org.apache.hudi.utilities.UtilHelpers;
|
||||||
import org.apache.hudi.utilities.deltastreamer.BootstrapExecutor;
|
import org.apache.hudi.utilities.deltastreamer.BootstrapExecutor;
|
||||||
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
|
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
import org.apache.spark.sql.SQLContext;
|
import org.apache.spark.sql.SQLContext;
|
||||||
@@ -76,7 +78,7 @@ public class SparkMain {
|
|||||||
enum SparkCommand {
|
enum SparkCommand {
|
||||||
BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, COMPACT_SCHEDULE_AND_EXECUTE,
|
BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, COMPACT_SCHEDULE_AND_EXECUTE,
|
||||||
COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLUSTERING_SCHEDULE,
|
COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLUSTERING_SCHEDULE,
|
||||||
CLUSTERING_RUN, CLUSTERING_SCHEDULE_AND_EXECUTE, CLEAN, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE
|
CLUSTERING_RUN, CLUSTERING_SCHEDULE_AND_EXECUTE, CLEAN, DELETE_MARKER, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
@@ -234,6 +236,10 @@ public class SparkMain {
|
|||||||
assert (args.length == 7);
|
assert (args.length == 7);
|
||||||
returnCode = createSavepoint(jsc, args[3], args[4], args[5], args[6]);
|
returnCode = createSavepoint(jsc, args[3], args[4], args[5], args[6]);
|
||||||
break;
|
break;
|
||||||
|
case DELETE_MARKER:
|
||||||
|
assert (args.length == 5);
|
||||||
|
returnCode = deleteMarker(jsc, args[3], args[4]);
|
||||||
|
break;
|
||||||
case DELETE_SAVEPOINT:
|
case DELETE_SAVEPOINT:
|
||||||
assert (args.length == 5);
|
assert (args.length == 5);
|
||||||
returnCode = deleteSavepoint(jsc, args[3], args[4]);
|
returnCode = deleteSavepoint(jsc, args[3], args[4]);
|
||||||
@@ -277,6 +283,21 @@ public class SparkMain {
|
|||||||
new HoodieCleaner(cfg, jsc).run();
|
new HoodieCleaner(cfg, jsc).run();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected static int deleteMarker(JavaSparkContext jsc, String instantTime, String basePath) {
|
||||||
|
try {
|
||||||
|
SparkRDDWriteClient client = createHoodieClient(jsc, basePath);
|
||||||
|
HoodieWriteConfig config = client.getConfig();
|
||||||
|
HoodieEngineContext context = client.getEngineContext();
|
||||||
|
HoodieSparkTable table = HoodieSparkTable.create(config, context, true);
|
||||||
|
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
|
||||||
|
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
|
||||||
|
return 0;
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn(String.format("Failed: Could not clean marker instantTime: \"%s\".", instantTime), e);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static int dataLoad(JavaSparkContext jsc, String command, String srcPath, String targetPath, String tableName,
|
private static int dataLoad(JavaSparkContext jsc, String command, String srcPath, String targetPath, String tableName,
|
||||||
String tableType, String rowKey, String partitionKey, int parallelism, String schemaFile,
|
String tableType, String rowKey, String partitionKey, int parallelism, String schemaFile,
|
||||||
int retry, String propsFilePath, List<String> configs) {
|
int retry, String propsFilePath, List<String> configs) {
|
||||||
|
|||||||
@@ -0,0 +1,77 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.cli.integ;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hudi.cli.commands.TableCommand;
|
||||||
|
import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
|
||||||
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
|
import org.apache.hudi.common.model.IOType;
|
||||||
|
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
|
||||||
|
import org.apache.hudi.common.testutils.FileCreateUtils;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.shell.core.CommandResult;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Integration test class for {@link org.apache.hudi.cli.commands.MarkersCommand}.
|
||||||
|
* <p/>
|
||||||
|
* A command use SparkLauncher need load jars under lib which generate during mvn package.
|
||||||
|
* Use integration test instead of unit test.
|
||||||
|
*/
|
||||||
|
public class ITTestMarkersCommand extends AbstractShellIntegrationTest {
|
||||||
|
|
||||||
|
private String tablePath;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void init() throws IOException {
|
||||||
|
String tableName = "test_table";
|
||||||
|
tablePath = basePath + Path.SEPARATOR + tableName;
|
||||||
|
|
||||||
|
// Create table and connect
|
||||||
|
new TableCommand().createTable(
|
||||||
|
tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
|
||||||
|
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test case of command 'marker delete'.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testDeleteMarker() throws IOException {
|
||||||
|
// generate markers
|
||||||
|
String instantTime1 = "101";
|
||||||
|
|
||||||
|
FileCreateUtils.createMarkerFile(tablePath, "partA", instantTime1, "f0", IOType.APPEND);
|
||||||
|
FileCreateUtils.createMarkerFile(tablePath, "partA", instantTime1, "f1", IOType.APPEND);
|
||||||
|
|
||||||
|
assertEquals(2, FileCreateUtils.getTotalMarkerFileCount(tablePath, "partA", instantTime1, IOType.APPEND));
|
||||||
|
|
||||||
|
CommandResult cr = getShell().executeCommand(
|
||||||
|
String.format("marker delete --commit %s --sparkMaster %s", instantTime1, "local"));
|
||||||
|
assertTrue(cr.isSuccess());
|
||||||
|
|
||||||
|
assertEquals(0, FileCreateUtils.getTotalMarkerFileCount(tablePath, "partA", instantTime1, IOType.APPEND));
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user