1
0

[HUDI-3733] Adding HoodieFailedWritesCleaningPolicy for restore with hudi-cli (#5158)

* Adding HoodieFailedWritesCleaningPolicy for restore with hudi-cli

Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com>
This commit is contained in:
Sivabalan Narayanan
2022-03-31 00:30:49 -07:00
committed by GitHub
parent ce45f7f129
commit 3cdb590e15
2 changed files with 21 additions and 14 deletions

View File

@@ -93,6 +93,8 @@ public class SavepointsCommand implements CommandMarker {
@CliOption(key = {"savepoint"}, help = "Savepoint to rollback") final String instantTime, @CliOption(key = {"savepoint"}, help = "Savepoint to rollback") final String instantTime,
@CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath, @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath,
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master, @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master,
@CliOption(key = {"lazyFailedWritesCleanPolicy"}, help = "True if FailedWriteCleanPolicy is lazy",
unspecifiedDefaultValue = "false") final String lazyFailedWritesCleanPolicy,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
help = "Spark executor memory") final String sparkMemory) help = "Spark executor memory") final String sparkMemory)
throws Exception { throws Exception {
@@ -110,7 +112,7 @@ public class SavepointsCommand implements CommandMarker {
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkMain.SparkCommand.ROLLBACK_TO_SAVEPOINT.toString(), master, sparkMemory, sparkLauncher.addAppArgs(SparkMain.SparkCommand.ROLLBACK_TO_SAVEPOINT.toString(), master, sparkMemory,
instantTime, metaClient.getBasePath()); instantTime, metaClient.getBasePath(), lazyFailedWritesCleanPolicy);
Process process = sparkLauncher.launch(); Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process); InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor(); int exitCode = process.waitFor();

View File

@@ -28,6 +28,7 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
@@ -35,6 +36,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieBootstrapConfig; import org.apache.hudi.config.HoodieBootstrapConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieSavepointException; import org.apache.hudi.exception.HoodieSavepointException;
@@ -102,8 +104,8 @@ public class SparkMain {
returnCode = deduplicatePartitionPath(jsc, args[3], args[4], args[5], Boolean.parseBoolean(args[6]), args[7]); returnCode = deduplicatePartitionPath(jsc, args[3], args[4], args[5], Boolean.parseBoolean(args[6]), args[7]);
break; break;
case ROLLBACK_TO_SAVEPOINT: case ROLLBACK_TO_SAVEPOINT:
assert (args.length == 5); assert (args.length == 6);
returnCode = rollbackToSavepoint(jsc, args[3], args[4]); returnCode = rollbackToSavepoint(jsc, args[3], args[4], Boolean.parseBoolean(args[5]));
break; break;
case IMPORT: case IMPORT:
case UPSERT: case UPSERT:
@@ -285,7 +287,7 @@ public class SparkMain {
protected static int deleteMarker(JavaSparkContext jsc, String instantTime, String basePath) { protected static int deleteMarker(JavaSparkContext jsc, String instantTime, String basePath) {
try { try {
SparkRDDWriteClient client = createHoodieClient(jsc, basePath); SparkRDDWriteClient client = createHoodieClient(jsc, basePath, false);
HoodieWriteConfig config = client.getConfig(); HoodieWriteConfig config = client.getConfig();
HoodieEngineContext context = client.getEngineContext(); HoodieEngineContext context = client.getEngineContext();
HoodieSparkTable table = HoodieSparkTable.create(config, context, true); HoodieSparkTable table = HoodieSparkTable.create(config, context, true);
@@ -455,7 +457,7 @@ public class SparkMain {
private static int createSavepoint(JavaSparkContext jsc, String commitTime, String user, private static int createSavepoint(JavaSparkContext jsc, String commitTime, String user,
String comments, String basePath) throws Exception { String comments, String basePath) throws Exception {
SparkRDDWriteClient client = createHoodieClient(jsc, basePath); SparkRDDWriteClient client = createHoodieClient(jsc, basePath, false);
try { try {
client.savepoint(commitTime, user, comments); client.savepoint(commitTime, user, comments);
LOG.info(String.format("The commit \"%s\" has been savepointed.", commitTime)); LOG.info(String.format("The commit \"%s\" has been savepointed.", commitTime));
@@ -466,8 +468,8 @@ public class SparkMain {
} }
} }
private static int rollbackToSavepoint(JavaSparkContext jsc, String savepointTime, String basePath) throws Exception { private static int rollbackToSavepoint(JavaSparkContext jsc, String savepointTime, String basePath, boolean lazyCleanPolicy) throws Exception {
SparkRDDWriteClient client = createHoodieClient(jsc, basePath); SparkRDDWriteClient client = createHoodieClient(jsc, basePath, lazyCleanPolicy);
try { try {
client.restoreToSavepoint(savepointTime); client.restoreToSavepoint(savepointTime);
LOG.info(String.format("The commit \"%s\" rolled back.", savepointTime)); LOG.info(String.format("The commit \"%s\" rolled back.", savepointTime));
@@ -479,7 +481,7 @@ public class SparkMain {
} }
private static int deleteSavepoint(JavaSparkContext jsc, String savepointTime, String basePath) throws Exception { private static int deleteSavepoint(JavaSparkContext jsc, String savepointTime, String basePath) throws Exception {
SparkRDDWriteClient client = createHoodieClient(jsc, basePath); SparkRDDWriteClient client = createHoodieClient(jsc, basePath, false);
try { try {
client.deleteSavepoint(savepointTime); client.deleteSavepoint(savepointTime);
LOG.info(String.format("Savepoint \"%s\" deleted.", savepointTime)); LOG.info(String.format("Savepoint \"%s\" deleted.", savepointTime));
@@ -500,7 +502,8 @@ public class SparkMain {
* @throws Exception * @throws Exception
*/ */
protected static int upgradeOrDowngradeTable(JavaSparkContext jsc, String basePath, String toVersion) { protected static int upgradeOrDowngradeTable(JavaSparkContext jsc, String basePath, String toVersion) {
HoodieWriteConfig config = getWriteConfig(basePath, Boolean.parseBoolean(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.defaultValue())); HoodieWriteConfig config = getWriteConfig(basePath, Boolean.parseBoolean(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.defaultValue()),
false);
HoodieTableMetaClient metaClient = HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(config.getBasePath()) HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(config.getBasePath())
.setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(config.getConsistencyGuardConfig()) .setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
@@ -517,18 +520,20 @@ public class SparkMain {
} }
} }
private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, Boolean rollbackUsingMarkers) throws Exception { private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, Boolean rollbackUsingMarkers, boolean lazyCleanPolicy) throws Exception {
HoodieWriteConfig config = getWriteConfig(basePath, rollbackUsingMarkers); HoodieWriteConfig config = getWriteConfig(basePath, rollbackUsingMarkers, lazyCleanPolicy);
return new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), config); return new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), config);
} }
private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) throws Exception { private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, boolean lazyCleanPolicy) throws Exception {
return createHoodieClient(jsc, basePath, Boolean.parseBoolean(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.defaultValue())); return createHoodieClient(jsc, basePath, Boolean.parseBoolean(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.defaultValue()), lazyCleanPolicy);
} }
private static HoodieWriteConfig getWriteConfig(String basePath, Boolean rollbackUsingMarkers) { private static HoodieWriteConfig getWriteConfig(String basePath, Boolean rollbackUsingMarkers, boolean lazyCleanPolicy) {
return HoodieWriteConfig.newBuilder().withPath(basePath) return HoodieWriteConfig.newBuilder().withPath(basePath)
.withRollbackUsingMarkers(rollbackUsingMarkers) .withRollbackUsingMarkers(rollbackUsingMarkers)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(lazyCleanPolicy ? HoodieFailedWritesCleaningPolicy.LAZY :
HoodieFailedWritesCleaningPolicy.EAGER).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(); .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
} }
} }