diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java index cb36cfdce..e317d5a4f 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java @@ -143,4 +143,25 @@ public class HoodieTableHeaderFields { public static final String HEADER_TOTAL_ROLLBACK_BLOCKS = "Total Rollback Blocks"; public static final String HEADER_TOTAL_LOG_RECORDS = "Total Log Records"; public static final String HEADER_TOTAL_UPDATED_RECORDS_COMPACTED = "Total Updated Records Compacted"; + + /** + * Fields of Compaction. + */ + public static final String HEADER_INSTANT_BLANK_TIME = "Instant Time"; + public static final String HEADER_FILE_PATH = "File Path"; + public static final String HEADER_COMPACTION_INSTANT_TIME = "Compaction " + HEADER_INSTANT_BLANK_TIME; + public static final String HEADER_STATE = "State"; + public static final String HEADER_TOTAL_FILES_TO_BE_COMPACTED = "Total FileIds to be Compacted"; + public static final String HEADER_EXTRA_METADATA = "Extra Metadata"; + public static final String HEADER_DATA_FILE_PATH = "Data " + HEADER_FILE_PATH; + public static final String HEADER_TOTAL_DELTA_FILES = "Total " + HEADER_DELTA_FILES; + public static final String HEADER_METRICS = "getMetrics"; + public static final String HEADER_BASE_INSTANT_TIME = "Base " + HEADER_INSTANT_BLANK_TIME; + public static final String HEADER_BASE_DATA_FILE = "Base Data File"; + public static final String HEADER_VALID = "Valid"; + public static final String HEADER_ERROR = "Error"; + public static final String HEADER_SOURCE_FILE_PATH = "Source " + HEADER_FILE_PATH; + public static final String HEADER_DESTINATION_FILE_PATH = "Destination " + HEADER_FILE_PATH; + public static final String HEADER_RENAME_EXECUTED = "Rename Executed?"; + public static final String HEADER_RENAME_SUCCEEDED = "Rename Succeeded?"; } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java index 67445ea87..b6a366bbb 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java @@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.HoodiePrintHelper; +import org.apache.hudi.cli.HoodieTableHeaderFields; import org.apache.hudi.cli.TableHeader; import org.apache.hudi.cli.commands.SparkMain.SparkCommand; import org.apache.hudi.cli.utils.CommitUtil; @@ -97,8 +98,7 @@ public class CompactionCommand implements CommandMarker { @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, @CliOption(key = {"headeronly"}, help = "Print Header Only", - unspecifiedDefaultValue = "false") final boolean headerOnly) - throws IOException { + unspecifiedDefaultValue = "false") final boolean headerOnly) { HoodieTableMetaClient client = checkAndGetMetaClient(); HoodieActiveTimeline activeTimeline = client.getActiveTimeline(); return printAllCompactions(activeTimeline, @@ -139,8 +139,7 @@ public class CompactionCommand implements CommandMarker { @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, @CliOption(key = {"headeronly"}, help = "Print Header Only", - unspecifiedDefaultValue = "false") final boolean headerOnly) - throws Exception { + unspecifiedDefaultValue = "false") final boolean headerOnly) { if (StringUtils.isNullOrEmpty(startTs)) { startTs = CommitUtil.getTimeDaysAgo(10); } @@ -150,7 +149,7 @@ public class CompactionCommand implements CommandMarker { HoodieTableMetaClient client = checkAndGetMetaClient(); HoodieArchivedTimeline archivedTimeline = client.getArchivedTimeline(); - archivedTimeline.loadInstantDetailsInMemory(startTs, endTs); + archivedTimeline.loadCompactionDetailsInMemory(startTs, endTs); try { return printAllCompactions(archivedTimeline, compactionPlanReader(this::readCompactionPlanForArchivedTimeline, archivedTimeline), @@ -175,25 +174,25 @@ public class CompactionCommand implements CommandMarker { HoodieArchivedTimeline archivedTimeline = client.getArchivedTimeline(); HoodieInstant instant = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime); - String startTs = CommitUtil.addHours(compactionInstantTime, -1); - String endTs = CommitUtil.addHours(compactionInstantTime, 1); try { - archivedTimeline.loadInstantDetailsInMemory(startTs, endTs); - HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeCompactionPlan( - archivedTimeline.getInstantDetails(instant).get()); + archivedTimeline.loadCompactionDetailsInMemory(compactionInstantTime); + HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeAvroRecordMetadata( + archivedTimeline.getInstantDetails(instant).get(), HoodieCompactionPlan.getClassSchema()); return printCompaction(compactionPlan, sortByField, descending, limit, headerOnly); } finally { - archivedTimeline.clearInstantDetailsFromMemory(startTs, endTs); + archivedTimeline.clearInstantDetailsFromMemory(compactionInstantTime); } } @CliCommand(value = "compaction schedule", help = "Schedule Compaction") public String scheduleCompact(@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "1G", help = "Spark executor memory") final String sparkMemory, - @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for compacting", - unspecifiedDefaultValue = "") final String propsFilePath, - @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array", - unspecifiedDefaultValue = "") final String[] configs) throws Exception { + @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for compacting", + unspecifiedDefaultValue = "") final String propsFilePath, + @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array", + unspecifiedDefaultValue = "") final String[] configs, + @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local", help = "Spark Master") String master) + throws Exception { HoodieTableMetaClient client = checkAndGetMetaClient(); boolean initialized = HoodieCLI.initConf(); HoodieCLI.initFS(initialized); @@ -204,8 +203,9 @@ public class CompactionCommand implements CommandMarker { String sparkPropertiesPath = Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); - sparkLauncher.addAppArgs(SparkCommand.COMPACT_SCHEDULE.toString(), client.getBasePath(), - client.getTableConfig().getTableName(), compactionInstantTime, sparkMemory, propsFilePath); + String cmd = SparkCommand.COMPACT_SCHEDULE.toString(); + sparkLauncher.addAppArgs(cmd, master, sparkMemory, client.getBasePath(), + client.getTableConfig().getTableName(), compactionInstantTime, propsFilePath); UtilHelpers.validateAndAddProperties(configs, sparkLauncher); Process process = sparkLauncher.launch(); InputStreamConsumer.captureOutput(process); @@ -222,6 +222,8 @@ public class CompactionCommand implements CommandMarker { help = "Parallelism for hoodie compaction") final String parallelism, @CliOption(key = "schemaFilePath", mandatory = true, help = "Path for Avro schema file") final String schemaFilePath, + @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local", + help = "Spark Master") String master, @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", help = "Spark executor memory") final String sparkMemory, @CliOption(key = "retry", unspecifiedDefaultValue = "1", help = "Number of retries") final String retry, @@ -249,9 +251,9 @@ public class CompactionCommand implements CommandMarker { String sparkPropertiesPath = Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); - sparkLauncher.addAppArgs(SparkCommand.COMPACT_RUN.toString(), client.getBasePath(), + sparkLauncher.addAppArgs(SparkCommand.COMPACT_RUN.toString(), master, sparkMemory, client.getBasePath(), client.getTableConfig().getTableName(), compactionInstantTime, parallelism, schemaFilePath, - sparkMemory, retry, propsFilePath); + retry, propsFilePath); UtilHelpers.validateAndAddProperties(configs, sparkLauncher); Process process = sparkLauncher.launch(); InputStreamConsumer.captureOutput(process); @@ -279,15 +281,15 @@ public class CompactionCommand implements CommandMarker { .filter(pair -> pair.getRight() != null) .collect(Collectors.toList()); - Set committedInstants = timeline.getCommitTimeline().filterCompletedInstants() - .getInstants().collect(Collectors.toSet()); + Set committedInstants = timeline.getCommitTimeline().filterCompletedInstants() + .getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()); List rows = new ArrayList<>(); for (Pair compactionPlan : compactionPlans) { HoodieCompactionPlan plan = compactionPlan.getRight(); HoodieInstant instant = compactionPlan.getLeft(); final HoodieInstant.State state; - if (committedInstants.contains(instant)) { + if (committedInstants.contains(instant.getTimestamp())) { state = HoodieInstant.State.COMPLETED; } else { state = instant.getState(); @@ -304,10 +306,12 @@ public class CompactionCommand implements CommandMarker { } Map> fieldNameToConverterMap = new HashMap<>(); - TableHeader header = new TableHeader().addTableHeaderField("Compaction Instant Time").addTableHeaderField("State") - .addTableHeaderField("Total FileIds to be Compacted"); + TableHeader header = new TableHeader() + .addTableHeaderField(HoodieTableHeaderFields.HEADER_COMPACTION_INSTANT_TIME) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_STATE) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_TO_BE_COMPACTED); if (includeExtraMetadata) { - header = header.addTableHeaderField("Extra Metadata"); + header = header.addTableHeaderField(HoodieTableHeaderFields.HEADER_EXTRA_METADATA); } return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows); } @@ -326,14 +330,17 @@ public class CompactionCommand implements CommandMarker { private HoodieCompactionPlan readCompactionPlanForArchivedTimeline(HoodieArchivedTimeline archivedTimeline, HoodieInstant instant) { - if (!HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) { - return null; - } else { + // filter inflight compaction + if (HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction()) + && HoodieInstant.State.INFLIGHT.equals(instant.getState())) { try { - return TimelineMetadataUtils.deserializeCompactionPlan(archivedTimeline.getInstantDetails(instant).get()); - } catch (IOException e) { - throw new HoodieIOException(e.getMessage(), e); + return TimelineMetadataUtils.deserializeAvroRecordMetadata(archivedTimeline.getInstantDetails(instant).get(), + HoodieCompactionPlan.getClassSchema()); + } catch (Exception e) { + throw new HoodieException(e.getMessage(), e); } + } else { + return null; } } @@ -362,7 +369,7 @@ public class CompactionCommand implements CommandMarker { } } - private String printCompaction(HoodieCompactionPlan compactionPlan, + protected String printCompaction(HoodieCompactionPlan compactionPlan, String sortByField, boolean descending, int limit, @@ -376,9 +383,13 @@ public class CompactionCommand implements CommandMarker { } Map> fieldNameToConverterMap = new HashMap<>(); - TableHeader header = new TableHeader().addTableHeaderField("Partition Path").addTableHeaderField("File Id") - .addTableHeaderField("Base Instant").addTableHeaderField("Data File Path") - .addTableHeaderField("Total Delta Files").addTableHeaderField("getMetrics"); + TableHeader header = new TableHeader() + .addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_BASE_INSTANT) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_DATA_FILE_PATH) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_DELTA_FILES) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_METRICS); return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows); } @@ -404,7 +415,7 @@ public class CompactionCommand implements CommandMarker { public String validateCompaction( @CliOption(key = "instant", mandatory = true, help = "Compaction Instant") String compactionInstant, @CliOption(key = {"parallelism"}, unspecifiedDefaultValue = "3", help = "Parallelism") String parallelism, - @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master ") String master, + @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local", help = "Spark Master") String master, @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = "executor memory") String sparkMemory, @CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") Integer limit, @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") String sortByField, @@ -444,9 +455,13 @@ public class CompactionCommand implements CommandMarker { }); Map> fieldNameToConverterMap = new HashMap<>(); - TableHeader header = new TableHeader().addTableHeaderField("File Id").addTableHeaderField("Base Instant Time") - .addTableHeaderField("Base Data File").addTableHeaderField("Num Delta Files").addTableHeaderField("Valid") - .addTableHeaderField("Error"); + TableHeader header = new TableHeader() + .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_BASE_INSTANT_TIME) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_BASE_DATA_FILE) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_DELTA_FILES) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_VALID) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_ERROR); output = message + HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows); @@ -463,7 +478,7 @@ public class CompactionCommand implements CommandMarker { public String unscheduleCompaction( @CliOption(key = "instant", mandatory = true, help = "Compaction Instant") String compactionInstant, @CliOption(key = {"parallelism"}, unspecifiedDefaultValue = "3", help = "Parallelism") String parallelism, - @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master ") String master, + @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local", help = "Spark Master") String master, @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = "executor memory") String sparkMemory, @CliOption(key = {"skipValidation"}, help = "skip validation", unspecifiedDefaultValue = "false") boolean skipV, @CliOption(key = {"dryRun"}, help = "Dry Run Mode", unspecifiedDefaultValue = "false") boolean dryRun, @@ -508,7 +523,8 @@ public class CompactionCommand implements CommandMarker { @CliCommand(value = "compaction unscheduleFileId", help = "UnSchedule Compaction for a fileId") public String unscheduleCompactFile( @CliOption(key = "fileId", mandatory = true, help = "File Id") final String fileId, - @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master ") String master, + @CliOption(key = "partitionPath", mandatory = true, help = "partition path") final String partitionPath, + @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local", help = "Spark Master") String master, @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = "executor memory") String sparkMemory, @CliOption(key = {"skipValidation"}, help = "skip validation", unspecifiedDefaultValue = "false") boolean skipV, @CliOption(key = {"dryRun"}, help = "Dry Run Mode", unspecifiedDefaultValue = "false") boolean dryRun, @@ -529,7 +545,7 @@ public class CompactionCommand implements CommandMarker { .getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_FILE.toString(), master, sparkMemory, client.getBasePath(), - fileId, outputPathStr, "1", Boolean.valueOf(skipV).toString(), + fileId, partitionPath, outputPathStr, "1", Boolean.valueOf(skipV).toString(), Boolean.valueOf(dryRun).toString()); Process process = sparkLauncher.launch(); InputStreamConsumer.captureOutput(process); @@ -554,7 +570,7 @@ public class CompactionCommand implements CommandMarker { public String repairCompaction( @CliOption(key = "instant", mandatory = true, help = "Compaction Instant") String compactionInstant, @CliOption(key = {"parallelism"}, unspecifiedDefaultValue = "3", help = "Parallelism") String parallelism, - @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master ") String master, + @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local", help = "Spark Master") String master, @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = "executor memory") String sparkMemory, @CliOption(key = {"dryRun"}, help = "Dry Run Mode", unspecifiedDefaultValue = "false") boolean dryRun, @CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") Integer limit, @@ -616,9 +632,13 @@ public class CompactionCommand implements CommandMarker { }); Map> fieldNameToConverterMap = new HashMap<>(); - TableHeader header = new TableHeader().addTableHeaderField("File Id").addTableHeaderField("Source File Path") - .addTableHeaderField("Destination File Path").addTableHeaderField("Rename Executed?") - .addTableHeaderField("Rename Succeeded?").addTableHeaderField("Error"); + TableHeader header = new TableHeader() + .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_SOURCE_FILE_PATH) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_DESTINATION_FILE_PATH) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_RENAME_EXECUTED) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_RENAME_SUCCEEDED) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_ERROR); return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows); } else { diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index 7833ee750..afb22fd7e 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -79,9 +79,7 @@ public class SparkMain { SparkCommand cmd = SparkCommand.valueOf(command); - JavaSparkContext jsc = sparkMasterContained(cmd) - ? SparkUtil.initJavaSparkConf("hoodie-cli-" + command, Option.of(args[1]), Option.of(args[2])) - : SparkUtil.initJavaSparkConf("hoodie-cli-" + command); + JavaSparkContext jsc = SparkUtil.initJavaSparkConf("hoodie-cli-" + command, Option.of(args[1]), Option.of(args[2])); int returnCode = 0; try { switch (cmd) { @@ -112,29 +110,29 @@ public class SparkMain { Integer.parseInt(args[9]), args[10], Integer.parseInt(args[11]), propsFilePath, configs); break; case COMPACT_RUN: - assert (args.length >= 9); + assert (args.length >= 10); propsFilePath = null; - if (!StringUtils.isNullOrEmpty(args[8])) { - propsFilePath = args[8]; + if (!StringUtils.isNullOrEmpty(args[9])) { + propsFilePath = args[9]; } configs = new ArrayList<>(); - if (args.length > 9) { + if (args.length > 10) { configs.addAll(Arrays.asList(args).subList(9, args.length)); } - returnCode = compact(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6], - Integer.parseInt(args[7]), false, propsFilePath, configs); + returnCode = compact(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), args[7], + Integer.parseInt(args[8]), false, propsFilePath, configs); break; case COMPACT_SCHEDULE: - assert (args.length >= 6); + assert (args.length >= 7); propsFilePath = null; - if (!StringUtils.isNullOrEmpty(args[5])) { - propsFilePath = args[5]; + if (!StringUtils.isNullOrEmpty(args[6])) { + propsFilePath = args[6]; } configs = new ArrayList<>(); - if (args.length > 6) { - configs.addAll(Arrays.asList(args).subList(6, args.length)); + if (args.length > 7) { + configs.addAll(Arrays.asList(args).subList(7, args.length)); } - returnCode = compact(jsc, args[1], args[2], args[3], 1, "", args[4], 0, true, propsFilePath, configs); + returnCode = compact(jsc, args[3], args[4], args[5], 1, "", 0, true, propsFilePath, configs); break; case COMPACT_VALIDATE: assert (args.length == 7); @@ -148,9 +146,9 @@ public class SparkMain { returnCode = 0; break; case COMPACT_UNSCHEDULE_FILE: - assert (args.length == 9); - doCompactUnscheduleFile(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), - Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8])); + assert (args.length == 10); + doCompactUnscheduleFile(jsc, args[3], args[4], args[5], args[6], Integer.parseInt(args[7]), + Boolean.parseBoolean(args[8]), Boolean.parseBoolean(args[9])); returnCode = 0; break; case COMPACT_UNSCHEDULE_PLAN: @@ -209,14 +207,6 @@ public class SparkMain { System.exit(returnCode); } - private static boolean sparkMasterContained(SparkCommand command) { - List masterContained = Arrays.asList(SparkCommand.COMPACT_VALIDATE, SparkCommand.COMPACT_REPAIR, - SparkCommand.COMPACT_UNSCHEDULE_PLAN, SparkCommand.COMPACT_UNSCHEDULE_FILE, SparkCommand.CLEAN, - SparkCommand.IMPORT, SparkCommand.UPSERT, SparkCommand.DEDUPLICATE, SparkCommand.SAVEPOINT, - SparkCommand.DELETE_SAVEPOINT, SparkCommand.ROLLBACK_TO_SAVEPOINT, SparkCommand.ROLLBACK, SparkCommand.BOOTSTRAP); - return masterContained.contains(command); - } - protected static void clean(JavaSparkContext jsc, String basePath, String propsFilePath, List configs) { HoodieCleaner.Config cfg = new HoodieCleaner.Config(); @@ -280,13 +270,14 @@ public class SparkMain { new HoodieCompactionAdminTool(cfg).run(jsc); } - private static void doCompactUnscheduleFile(JavaSparkContext jsc, String basePath, String fileId, String outputPath, - int parallelism, boolean skipValidation, boolean dryRun) + private static void doCompactUnscheduleFile(JavaSparkContext jsc, String basePath, String fileId, String partitionPath, + String outputPath, int parallelism, boolean skipValidation, boolean dryRun) throws Exception { HoodieCompactionAdminTool.Config cfg = new HoodieCompactionAdminTool.Config(); cfg.basePath = basePath; cfg.operation = Operation.UNSCHEDULE_FILE; cfg.outputPath = outputPath; + cfg.partitionPath = partitionPath; cfg.fileId = fileId; cfg.parallelism = parallelism; cfg.dryRun = dryRun; @@ -295,7 +286,7 @@ public class SparkMain { } private static int compact(JavaSparkContext jsc, String basePath, String tableName, String compactionInstant, - int parallelism, String schemaFile, String sparkMemory, int retry, boolean schedule, String propsFilePath, + int parallelism, String schemaFile, int retry, boolean schedule, String propsFilePath, List configs) { HoodieCompactor.Config cfg = new HoodieCompactor.Config(); cfg.basePath = basePath; @@ -308,7 +299,6 @@ public class SparkMain { cfg.runSchedule = schedule; cfg.propsFilePath = propsFilePath; cfg.configs = configs; - jsc.getConf().set("spark.executor.memory", sparkMemory); return new HoodieCompactor(jsc, cfg).compact(retry); } diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java index 84b357622..5ad4c4c13 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java @@ -194,7 +194,7 @@ public class TestCommitsCommand extends AbstractShellIntegrationTest { // archived 101 and 102 instants, remove 103 and 104 instant data.remove("103"); data.remove("104"); - String expected = generateExpectData(3, data); + String expected = generateExpectData(1, data); expected = removeNonWordAndStripSpace(expected); String got = removeNonWordAndStripSpace(cr.getResult().toString()); assertEquals(expected, got); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java new file mode 100644 index 000000000..4dd69dc17 --- /dev/null +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.commands; + +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.cli.HoodieCLI; +import org.apache.hudi.cli.HoodiePrintHelper; +import org.apache.hudi.cli.TableHeader; +import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest; +import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.testutils.CompactionTestUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.HoodieTimelineArchiveLog; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.shell.core.CommandResult; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * Test Cases for {@link CompactionCommand}. + */ +public class TestCompactionCommand extends AbstractShellIntegrationTest { + + private String tableName; + private String tablePath; + + @BeforeEach + public void init() { + tableName = "test_table"; + tablePath = basePath + tableName; + } + + @Test + public void testVerifyTableType() throws IOException { + // create COW table. + new TableCommand().createTable( + tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(), + "", TimelineLayoutVersion.VERSION_1, HoodieAvroPayload.class.getName()); + + // expect HoodieException for COPY_ON_WRITE table. + assertThrows(HoodieException.class, + () -> new CompactionCommand().compactionsAll(false, -1, "", false, false)); + } + + /** + * Test case for command 'compactions show all'. + */ + @Test + public void testCompactionsAll() throws IOException { + // create MOR table. + new TableCommand().createTable( + tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(), + "", TimelineLayoutVersion.VERSION_1, HoodieAvroPayload.class.getName()); + + CompactionTestUtils.setupAndValidateCompactionOperations(HoodieCLI.getTableMetaClient(), false, 3, 4, 3, 3); + + HoodieCLI.getTableMetaClient().reloadActiveTimeline(); + + CommandResult cr = getShell().executeCommand("compactions show all"); + System.out.println(cr.getResult().toString()); + + TableHeader header = new TableHeader().addTableHeaderField("Compaction Instant Time").addTableHeaderField("State") + .addTableHeaderField("Total FileIds to be Compacted"); + Map fileIds = new HashMap(); + fileIds.put("001", 3); + fileIds.put("003", 4); + fileIds.put("005", 3); + fileIds.put("007", 3); + List rows = new ArrayList<>(); + Arrays.asList("001", "003", "005", "007").stream().sorted(Comparator.reverseOrder()).forEach(instant -> { + rows.add(new Comparable[] {instant, "REQUESTED", fileIds.get(instant)}); + }); + String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", false, -1, false, rows); + assertEquals(expected, cr.getResult().toString()); + } + + /** + * Test case for command 'compaction show'. + */ + @Test + public void testCompactionShow() throws IOException { + // create MOR table. + new TableCommand().createTable( + tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(), + "", TimelineLayoutVersion.VERSION_1, HoodieAvroPayload.class.getName()); + + CompactionTestUtils.setupAndValidateCompactionOperations(HoodieCLI.getTableMetaClient(), false, 3, 4, 3, 3); + + HoodieCLI.getTableMetaClient().reloadActiveTimeline(); + + CommandResult cr = getShell().executeCommand("compaction show --instant 001"); + System.out.println(cr.getResult().toString()); + } + + private void generateCompactionInstances() throws IOException { + // create MOR table. + new TableCommand().createTable( + tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(), + "", TimelineLayoutVersion.VERSION_1, HoodieAvroPayload.class.getName()); + + CompactionTestUtils.setupAndValidateCompactionOperations(HoodieCLI.getTableMetaClient(), true, 1, 2, 3, 4); + + HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().reloadActiveTimeline(); + // Create six commits + Arrays.asList("001", "003", "005", "007").forEach(timestamp -> { + activeTimeline.transitionCompactionInflightToComplete( + new HoodieInstant(HoodieInstant.State.INFLIGHT, COMPACTION_ACTION, timestamp), Option.empty()); + }); + + metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); + } + + private void generateArchive() throws IOException { + // Generate archive + HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) + .withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build()) + .forTable("test-trip-table").build(); + // archive + metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); + HoodieSparkTable table = HoodieSparkTable.create(cfg, context, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); + archiveLog.archiveIfRequired(context); + } + + /** + * Test case for command 'compactions showarchived'. + */ + @Test + public void testCompactionsShowArchived() throws IOException { + generateCompactionInstances(); + + generateArchive(); + + CommandResult cr = getShell().executeCommand("compactions showarchived --startTs 001 --endTs 005"); + + // generate result + Map fileMap = new HashMap<>(); + fileMap.put("001", 1); + fileMap.put("003", 2); + fileMap.put("005", 3); + List rows = Arrays.asList("005", "003", "001").stream().map(i -> + new Comparable[] {i, HoodieInstant.State.COMPLETED, fileMap.get(i)}).collect(Collectors.toList()); + Map> fieldNameToConverterMap = new HashMap<>(); + TableHeader header = new TableHeader().addTableHeaderField("Compaction Instant Time").addTableHeaderField("State") + .addTableHeaderField("Total FileIds to be Compacted"); + String expected = HoodiePrintHelper.print(header, fieldNameToConverterMap, "", false, -1, false, rows); + + expected = removeNonWordAndStripSpace(expected); + String got = removeNonWordAndStripSpace(cr.getResult().toString()); + assertEquals(expected, got); + } + + /** + * Test case for command 'compaction showarchived'. + */ + @Test + public void testCompactionShowArchived() throws IOException { + generateCompactionInstances(); + + String instance = "001"; + // get compaction plan before compaction + HoodieCompactionPlan plan = TimelineMetadataUtils.deserializeCompactionPlan( + HoodieCLI.getTableMetaClient().reloadActiveTimeline().readCompactionPlanAsBytes( + HoodieTimeline.getCompactionRequestedInstant(instance)).get()); + + generateArchive(); + + CommandResult cr = getShell().executeCommand("compaction showarchived --instant " + instance); + + // generate expected + String expected = new CompactionCommand().printCompaction(plan, "", false, -1, false); + + expected = removeNonWordAndStripSpace(expected); + String got = removeNonWordAndStripSpace(cr.getResult().toString()); + assertEquals(expected, got); + } +} diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java new file mode 100644 index 000000000..37a2098d0 --- /dev/null +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.integ; + +import org.apache.hudi.cli.HoodieCLI; +import org.apache.hudi.cli.commands.TableCommand; +import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest; +import org.apache.hudi.client.CompactionAdminClient; +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.TestCompactionAdminClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.model.CompactionOperation; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.testutils.CompactionTestUtils; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.CompactionUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.testutils.HoodieClientTestBase; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.shell.core.CommandResult; + +import java.io.BufferedWriter; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Integration test class for {@link org.apache.hudi.cli.commands.CompactionCommand}. + *

+ * A command use SparkLauncher need load jars under lib which generate during mvn package. + * Use integration test instead of unit test. + */ +public class ITTestCompactionCommand extends AbstractShellIntegrationTest { + + private String tablePath; + private String tableName; + + @BeforeEach + public void init() throws IOException { + tableName = "test_table_" + ITTestCompactionCommand.class.getName(); + tablePath = Paths.get(basePath, tableName).toString(); + + HoodieCLI.conf = jsc.hadoopConfiguration(); + // Create table and connect + new TableCommand().createTable( + tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(), + "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); + metaClient.setBasePath(tablePath); + metaClient = HoodieTableMetaClient.reload(metaClient); + } + + /** + * Test case for command 'compaction schedule'. + */ + @Test + public void testScheduleCompact() throws IOException { + // generate commits + generateCommits(); + + CommandResult cr = getShell().executeCommand( + String.format("compaction schedule --hoodieConfigs hoodie.compact.inline.max.delta.commits=1 --sparkMaster %s", + "local")); + assertAll("Command run failed", + () -> assertTrue(cr.isSuccess()), + () -> assertTrue( + cr.getResult().toString().startsWith("Attempted to schedule compaction for"))); + + // there is 1 requested compaction + HoodieActiveTimeline timeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); + assertEquals(1, timeline.filterPendingCompactionTimeline().countInstants()); + } + + /** + * Test case for command 'compaction run'. + */ + @Test + public void testCompact() throws IOException { + // generate commits + generateCommits(); + + String instance = prepareScheduleCompaction(); + + String schemaPath = Paths.get(basePath, "compaction.schema").toString(); + writeSchemaToTmpFile(schemaPath); + + CommandResult cr2 = getShell().executeCommand( + String.format("compaction run --parallelism %s --schemaFilePath %s --sparkMaster %s", + 2, schemaPath, "local")); + + assertAll("Command run failed", + () -> assertTrue(cr2.isSuccess()), + () -> assertTrue( + cr2.getResult().toString().startsWith("Compaction successfully completed for"))); + + // assert compaction complete + assertTrue(HoodieCLI.getTableMetaClient().getActiveTimeline().reload() + .filterCompletedInstants().getInstants() + .map(HoodieInstant::getTimestamp).collect(Collectors.toList()).contains(instance), + "Pending compaction must be completed"); + } + + /** + * Test case for command 'compaction validate'. + */ + @Test + public void testValidateCompaction() throws IOException { + // generate commits + generateCommits(); + + String instance = prepareScheduleCompaction(); + + CommandResult cr = getShell().executeCommand( + String.format("compaction validate --instant %s --sparkMaster %s", instance, "local")); + + assertAll("Command run failed", + () -> assertTrue(cr.isSuccess()), + () -> assertTrue( + // compaction requested should be valid + cr.getResult().toString().contains("COMPACTION PLAN VALID"))); + } + + /** + * This function mainly tests the workflow of 'compaction unschedule' command. + * The real test of {@link org.apache.hudi.client.CompactionAdminClient#unscheduleCompactionPlan} + * is {@link TestCompactionAdminClient#testUnscheduleCompactionPlan()}. + */ + @Test + public void testUnscheduleCompaction() throws Exception { + // generate commits + generateCommits(); + + String instance = prepareScheduleCompaction(); + + CommandResult cr = getShell().executeCommand( + String.format("compaction unschedule --instant %s --sparkMaster %s", instance, "local")); + + // Always has no file + assertAll("Command run failed", + () -> assertTrue(cr.isSuccess()), + () -> assertEquals("No File renames needed to unschedule pending compaction. Operation successful.", + cr.getResult().toString())); + } + + /** + * This function mainly tests the workflow of 'compaction unscheduleFileId' command. + * The real test of {@link org.apache.hudi.client.CompactionAdminClient#unscheduleCompactionFileId} + * is {@link TestCompactionAdminClient#testUnscheduleCompactionFileId}. + */ + @Test + public void testUnscheduleCompactFile() throws IOException { + int numEntriesPerInstant = 10; + CompactionTestUtils.setupAndValidateCompactionOperations(metaClient, false, numEntriesPerInstant, + numEntriesPerInstant, numEntriesPerInstant, numEntriesPerInstant); + + CompactionOperation op = CompactionOperation.convertFromAvroRecordInstance( + CompactionUtils.getCompactionPlan(metaClient, "001").getOperations().stream().findFirst().get()); + + CommandResult cr = getShell().executeCommand( + String.format("compaction unscheduleFileId --fileId %s --partitionPath %s --sparkMaster %s", + op.getFileGroupId().getFileId(), op.getFileGroupId().getPartitionPath(), "local")); + + assertAll("Command run failed", + () -> assertTrue(cr.isSuccess()), + () -> assertTrue(removeNonWordAndStripSpace(cr.getResult().toString()).contains("true")), + () -> assertFalse(removeNonWordAndStripSpace(cr.getResult().toString()).contains("false"))); + } + + /** + * This function mainly tests the workflow of 'compaction repair' command. + * The real test of {@link org.apache.hudi.client.CompactionAdminClient#repairCompaction} + * is {@link TestCompactionAdminClient#testRepairCompactionPlan}. + */ + @Test + public void testRepairCompaction() throws Exception { + int numEntriesPerInstant = 10; + String compactionInstant = "001"; + CompactionTestUtils.setupAndValidateCompactionOperations(metaClient, false, numEntriesPerInstant, + numEntriesPerInstant, numEntriesPerInstant, numEntriesPerInstant); + + metaClient.reloadActiveTimeline(); + CompactionAdminClient client = new CompactionAdminClient(new HoodieSparkEngineContext(jsc), metaClient.getBasePath()); + List> renameFiles = + client.getRenamingActionsForUnschedulingCompactionPlan(metaClient, compactionInstant, 1, Option.empty(), false); + + renameFiles.forEach(lfPair -> { + try { + metaClient.getFs().rename(lfPair.getLeft().getPath(), lfPair.getRight().getPath()); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + }); + + client.unscheduleCompactionPlan(compactionInstant, false, 1, false); + + CommandResult cr = getShell().executeCommand( + String.format("compaction repair --instant %s --sparkMaster %s", compactionInstant, "local")); + + // All Executes is succeeded, result contains true and has no false + // Expected: + // ║ File Id │ Source File Path │ Destination File Path │ Rename Executed? │ Rename Succeeded? │ Error ║ + // ║ * │ * │ * │ true │ true │ ║ + assertAll("Command run failed", + () -> assertTrue(cr.isSuccess()), + () -> assertTrue(removeNonWordAndStripSpace(cr.getResult().toString()).contains("true")), + () -> assertFalse(removeNonWordAndStripSpace(cr.getResult().toString()).contains("false"))); + } + + private String prepareScheduleCompaction() { + // generate requested compaction + CommandResult cr = getShell().executeCommand( + String.format("compaction schedule --hoodieConfigs hoodie.compact.inline.max.delta.commits=1 --sparkMaster %s", + "local")); + assertTrue(cr.isSuccess()); + + // get compaction instance + HoodieActiveTimeline timeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); + Option instance = + timeline.filterPendingCompactionTimeline().firstInstant().map(HoodieInstant::getTimestamp); + assertTrue(instance.isPresent(), "Must have pending compaction."); + return instance.get(); + } + + private void writeSchemaToTmpFile(String schemaPath) throws IOException { + try (BufferedWriter out = new BufferedWriter(new FileWriter(schemaPath))) { + out.write(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); + } + } + + private void generateCommits() throws IOException { + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + + // Create the write client to write some records in + HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) + .withDeleteParallelism(2).forTable(tableName) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(); + + SparkRDDWriteClient client = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), cfg); + + List records = insert(jsc, client, dataGen); + upsert(jsc, client, dataGen, records); + delete(jsc, client, records); + } + + private List insert(JavaSparkContext jsc, SparkRDDWriteClient client, + HoodieTestDataGenerator dataGen) throws IOException { + // inserts + String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + + List records = dataGen.generateInserts(newCommitTime, 10); + JavaRDD writeRecords = jsc.parallelize(records, 1); + operateFunc(SparkRDDWriteClient::insert, client, writeRecords, newCommitTime); + return records; + } + + private void upsert(JavaSparkContext jsc, SparkRDDWriteClient client, + HoodieTestDataGenerator dataGen, List records) + throws IOException { + // updates + String newCommitTime = "002"; + client.startCommitWithTime(newCommitTime); + + List toBeUpdated = dataGen.generateUpdates(newCommitTime, 2); + records.addAll(toBeUpdated); + JavaRDD writeRecords = jsc.parallelize(records, 1); + operateFunc(SparkRDDWriteClient::upsert, client, writeRecords, newCommitTime); + } + + private void delete(JavaSparkContext jsc, SparkRDDWriteClient client, + List records) { + // Delete + String newCommitTime = "003"; + client.startCommitWithTime(newCommitTime); + + // just delete half of the records + int numToDelete = records.size() / 2; + List toBeDeleted = records.stream().map(HoodieRecord::getKey).limit(numToDelete).collect(Collectors.toList()); + JavaRDD deleteRecords = jsc.parallelize(toBeDeleted, 1); + client.delete(deleteRecords, newCommitTime); + } + + private JavaRDD operateFunc( + HoodieClientTestBase.Function3, SparkRDDWriteClient, JavaRDD, String> writeFn, + SparkRDDWriteClient client, JavaRDD writeRecords, String commitTime) + throws IOException { + return writeFn.apply(client, writeRecords, commitTime); + } +} diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/AbstractShellIntegrationTest.java b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/AbstractShellIntegrationTest.java index a7cf85cf9..67449dc98 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/AbstractShellIntegrationTest.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/AbstractShellIntegrationTest.java @@ -18,6 +18,7 @@ package org.apache.hudi.cli.testutils; +import org.apache.hudi.common.model.HoodieTableType; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -37,4 +38,8 @@ public abstract class AbstractShellIntegrationTest extends AbstractShellBaseInte public void teardown() throws Exception { cleanupResources(); } + + protected HoodieTableType getTableType() { + return HoodieTableType.MERGE_ON_READ; + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java index 9ace03a6d..1c869e46f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java @@ -383,7 +383,7 @@ public class CompactionAdminClient extends AbstractHoodieClient { * @return list of pairs of log-files (old, new) and for each pair, rename must be done to successfully unschedule * compaction. */ - protected List> getRenamingActionsForUnschedulingCompactionPlan( + public List> getRenamingActionsForUnschedulingCompactionPlan( HoodieTableMetaClient metaClient, String compactionInstant, int parallelism, Option fsViewOpt, boolean skipValidation) throws IOException { HoodieTableFileSystemView fsView = fsViewOpt.isPresent() ? fsViewOpt.get() diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java index 5b1888101..f0f13923e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java @@ -464,7 +464,8 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { assertTrue(result); HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(); List archivedInstants = Arrays.asList(instant1, instant2, instant3); - assertEquals(new HashSet<>(archivedInstants), archivedTimeline.getInstants().collect(Collectors.toSet())); + assertEquals(new HashSet<>(archivedInstants), + archivedTimeline.filterCompletedInstants().getInstants().collect(Collectors.toSet())); assertFalse(wrapperFs.exists(markerPath)); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java index 9f8c4393b..6b05eddcc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java @@ -18,12 +18,14 @@ package org.apache.hudi.common.table.timeline; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; @@ -43,6 +45,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -66,6 +69,7 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { private static final String HOODIE_COMMIT_ARCHIVE_LOG_FILE_PREFIX = "commits"; private static final String ACTION_TYPE_KEY = "actionType"; + private static final String ACTION_STATE = "actionState"; private HoodieTableMetaClient metaClient; private Map readCommits = new HashMap<>(); @@ -108,6 +112,22 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { loadInstants(startTs, endTs); } + public void loadCompactionDetailsInMemory(String compactionInstantTime) { + loadCompactionDetailsInMemory(compactionInstantTime, compactionInstantTime); + } + + public void loadCompactionDetailsInMemory(String startTs, String endTs) { + // load compactionPlan + loadInstants(new TimeRangeFilter(startTs, endTs), true, record -> + record.get(ACTION_TYPE_KEY).toString().equals(HoodieTimeline.COMPACTION_ACTION) + && HoodieInstant.State.INFLIGHT.toString().equals(record.get(ACTION_STATE).toString()) + ); + } + + public void clearInstantDetailsFromMemory(String instantTime) { + this.readCommits.remove(instantTime); + } + public void clearInstantDetailsFromMemory(String startTs, String endTs) { this.findInstantsInRange(startTs, endTs).getInstants().forEach(instant -> this.readCommits.remove(instant.getTimestamp())); @@ -126,11 +146,16 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { final String instantTime = record.get(HoodiePartitionMetadata.COMMIT_TIME_KEY).toString(); final String action = record.get(ACTION_TYPE_KEY).toString(); if (loadDetails) { - Option.ofNullable(record.get(getMetadataKey(action))).map(actionData -> - this.readCommits.put(instantTime, actionData.toString().getBytes(StandardCharsets.UTF_8)) - ); + Option.ofNullable(record.get(getMetadataKey(action))).map(actionData -> { + if (action.equals(HoodieTimeline.COMPACTION_ACTION)) { + this.readCommits.put(instantTime, HoodieAvroUtils.indexedRecordToBytes((IndexedRecord)actionData)); + } else { + this.readCommits.put(instantTime, actionData.toString().getBytes(StandardCharsets.UTF_8)); + } + return null; + }); } - return new HoodieInstant(false, action, instantTime); + return new HoodieInstant(HoodieInstant.State.valueOf(record.get(ACTION_STATE).toString()), action, instantTime); } private String getMetadataKey(String action) { @@ -145,6 +170,8 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { return "hoodieRollbackMetadata"; case HoodieTimeline.SAVEPOINT_ACTION: return "hoodieSavePointMetadata"; + case HoodieTimeline.COMPACTION_ACTION: + return "hoodieCompactionPlan"; default: throw new HoodieIOException("Unknown action in metadata " + action); } @@ -158,12 +185,18 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { return loadInstants(new TimeRangeFilter(startTs, endTs), true); } + private List loadInstants(TimeRangeFilter filter, boolean loadInstantDetails) { + return loadInstants(filter, loadInstantDetails, record -> true); + } + /** * This is method to read selected instants. Do NOT use this directly use one of the helper methods above * If loadInstantDetails is set to true, this would also update 'readCommits' map with commit details * If filter is specified, only the filtered instants are loaded + * If commitsFilter is specified, only the filtered records are loaded */ - private List loadInstants(TimeRangeFilter filter, boolean loadInstantDetails) { + private List loadInstants(TimeRangeFilter filter, boolean loadInstantDetails, + Function commitsFilter) { try { // list all files FileStatus[] fsStatuses = metaClient.getFs().globStatus( @@ -187,6 +220,7 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { List records = blk.getRecords(); // filter blocks in desired time window Stream instantsInBlkStream = records.stream() + .filter(r -> commitsFilter.apply((GenericRecord) r)) .map(r -> readCommit((GenericRecord) r, loadInstantDetails)); if (filter != null) { @@ -254,4 +288,13 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { return 0; } } + + @Override + public HoodieDefaultTimeline getWriteTimeline() { + // filter in-memory instants + Set validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION); + return new HoodieDefaultTimeline(getInstants().filter(i -> + readCommits.keySet().contains(i.getTimestamp())) + .filter(s -> validActions.contains(s.getAction())), details); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java index 9b419caed..a50c2998a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.table.timeline; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; @@ -33,12 +34,14 @@ import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.avro.Schema; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileWriter; import org.apache.avro.file.FileReader; import org.apache.avro.file.SeekableByteArrayInput; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; +import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.specific.SpecificRecordBase; @@ -176,4 +179,13 @@ public class TimelineMetadataUtils { ValidationUtils.checkArgument(fileReader.hasNext(), "Could not deserialize metadata of type " + clazz); return fileReader.next(); } + + public static T deserializeAvroRecordMetadata(byte[] bytes, Schema schema) + throws IOException { + return deserializeAvroRecordMetadata(HoodieAvroUtils.bytesToAvro(bytes, schema), schema); + } + + public static T deserializeAvroRecordMetadata(Object object, Schema schema) { + return (T) SpecificData.get().deepCopy(schema, object); + } }