diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java index 743273244..f32b7bc36 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java @@ -31,6 +31,7 @@ public class HoodieTableHeaderFields { public static final String HEADER_CLEAN_TIME = "CleanTime"; public static final String HEADER_EARLIEST_COMMAND_RETAINED = "EarliestCommandRetained"; public static final String HEADER_CLEANING_POLICY = "Cleaning policy"; + public static final String HEADER_FILE_SIZE = "File Size"; public static final String HEADER_TOTAL_FILES_DELETED = "Total Files Deleted"; public static final String HEADER_TOTAL_FILES_SUCCESSFULLY_DELETED = "Total Files Successfully Deleted"; @@ -58,7 +59,7 @@ public class HoodieTableHeaderFields { public static final String HEADER_DELTA_SIZE = "Delta Size"; public static final String HEADER_DELTA_FILES = "Delta Files"; public static final String HEADER_TOTAL_DELTA_SIZE = "Total " + HEADER_DELTA_SIZE; - public static final String HEADER_TOTAL_DELTA_FILE_SIZE = "Total Delta File Size"; + public static final String HEADER_TOTAL_DELTA_FILE_SIZE = "Total Delta " + HEADER_FILE_SIZE; public static final String HEADER_NUM_DELTA_FILES = "Num " + HEADER_DELTA_FILES; /** @@ -78,7 +79,7 @@ public class HoodieTableHeaderFields { * Fields of Repair. */ public static final String HEADER_METADATA_PRESENT = "Metadata Present?"; - public static final String HEADER_REPAIR_ACTION = "Action"; + public static final String HEADER_ACTION = "Action"; public static final String HEADER_HOODIE_PROPERTY = "Property"; public static final String HEADER_OLD_VALUE = "Old Value"; public static final String HEADER_NEW_VALUE = "New Value"; @@ -113,4 +114,30 @@ public class HoodieTableHeaderFields { public static final String HEADER_HISTOGRAM_NUM_FILES = "NumFiles"; public static final String HEADER_HISTOGRAM_STD_DEV = "StdDev"; + /** + * Fields of Commit. + */ + public static final String HEADER_TOTAL_BYTES_WRITTEN = "Total Bytes Written"; + public static final String HEADER_TOTAL_FILES_ADDED = "Total Files Added"; + public static final String HEADER_TOTAL_FILES_UPDATED = "Total Files Updated"; + public static final String HEADER_TOTAL_PARTITIONS_WRITTEN = "Total Partitions Written"; + public static final String HEADER_TOTAL_RECORDS_WRITTEN = "Total Records Written"; + public static final String HEADER_TOTAL_UPDATE_RECORDS_WRITTEN = "Total Update Records Written"; + public static final String HEADER_TOTAL_RECORDS_INSERTED = "Total Records Inserted"; + public static final String HEADER_TOTAL_RECORDS_UPDATED = "Total Records Updated"; + public static final String HEADER_TOTAL_ERRORS = "Total Errors"; + + /** + * Fields of commit metadata. + */ + public static final String HEADER_PREVIOUS_COMMIT = "Previous Commit"; + public static final String HEADER_NUM_WRITES = "Num Writes"; + public static final String HEADER_NUM_INSERTS = "Num Inserts"; + public static final String HEADER_NUM_DELETES = "Num Deletes"; + public static final String HEADER_NUM_UPDATE_WRITES = "Num Update Writes"; + public static final String HEADER_TOTAL_LOG_BLOCKS = "Total Log Blocks"; + public static final String HEADER_TOTAL_CORRUPT_LOG_BLOCKS = "Total Corrupt LogBlocks"; + public static final String HEADER_TOTAL_ROLLBACK_BLOCKS = "Total Rollback Blocks"; + public static final String HEADER_TOTAL_LOG_RECORDS = "Total Log Records"; + public static final String HEADER_TOTAL_UPDATED_RECORDS_COMPACTED = "Total Updated Records Compacted"; } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java index 6670067d3..852a413b0 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java @@ -20,6 +20,7 @@ package org.apache.hudi.cli.commands; import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.HoodiePrintHelper; +import org.apache.hudi.cli.HoodieTableHeaderFields; import org.apache.hudi.cli.TableHeader; import org.apache.hudi.cli.utils.CommitUtil; import org.apache.hudi.cli.utils.InputStreamConsumer; @@ -84,19 +85,19 @@ public class CommitsCommand implements CommandMarker { } final Map> fieldNameToConverterMap = new HashMap<>(); - fieldNameToConverterMap.put("Total Bytes Written", entry -> { + fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, entry -> { return NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString()))); }); final TableHeader header = new TableHeader() - .addTableHeaderField("CommitTime") - .addTableHeaderField("Total Bytes Written") - .addTableHeaderField("Total Files Added") - .addTableHeaderField("Total Files Updated") - .addTableHeaderField("Total Partitions Written") - .addTableHeaderField("Total Records Written") - .addTableHeaderField("Total Update Records Written") - .addTableHeaderField("Total Errors"); + .addTableHeaderField(HoodieTableHeaderFields.HEADER_COMMIT_TIME) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_PARTITIONS_WRITTEN) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_WRITTEN) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_UPDATE_RECORDS_WRITTEN) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS); return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows, tempTableName); @@ -136,17 +137,26 @@ public class CommitsCommand implements CommandMarker { } final Map> fieldNameToConverterMap = new HashMap<>(); - fieldNameToConverterMap.put("Total Bytes Written", entry -> { + fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, entry -> { return NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString()))); }); - TableHeader header = new TableHeader().addTableHeaderField("Action").addTableHeaderField("Instant") - .addTableHeaderField("Partition").addTableHeaderField("File Id").addTableHeaderField("Prev Instant") - .addTableHeaderField("Num Writes").addTableHeaderField("Num Inserts").addTableHeaderField("Num Deletes") - .addTableHeaderField("Num Update Writes").addTableHeaderField("Total Write Errors") - .addTableHeaderField("Total Log Blocks").addTableHeaderField("Total Corrupt LogBlocks") - .addTableHeaderField("Total Rollback Blocks").addTableHeaderField("Total Log Records") - .addTableHeaderField("Total Updated Records Compacted").addTableHeaderField("Total Write Bytes"); + TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_PREVIOUS_COMMIT) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_WRITES) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_INSERTS) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_DELETES) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_UPDATE_WRITES) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_LOG_BLOCKS) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_CORRUPT_LOG_BLOCKS) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ROLLBACK_BLOCKS) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_LOG_RECORDS) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_UPDATED_RECORDS_COMPACTED) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN); return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows, tempTableName); @@ -216,7 +226,10 @@ public class CommitsCommand implements CommandMarker { @CliCommand(value = "commit rollback", help = "Rollback a commit") public String rollbackCommit(@CliOption(key = {"commit"}, help = "Commit to rollback") final String instantTime, - @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath) + @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath, + @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master, + @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", + help = "Spark executor memory") final String sparkMemory) throws Exception { HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); HoodieTimeline completedTimeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); @@ -226,7 +239,7 @@ public class CommitsCommand implements CommandMarker { } SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); - sparkLauncher.addAppArgs(SparkMain.SparkCommand.ROLLBACK.toString(), instantTime, + sparkLauncher.addAppArgs(SparkMain.SparkCommand.ROLLBACK.toString(), master, sparkMemory, instantTime, HoodieCLI.getTableMetaClient().getBasePath()); Process process = sparkLauncher.launch(); InputStreamConsumer.captureOutput(process); @@ -286,12 +299,16 @@ public class CommitsCommand implements CommandMarker { } Map> fieldNameToConverterMap = new HashMap<>(); - fieldNameToConverterMap.put("Total Bytes Written", entry -> NumericUtils.humanReadableByteCount((Long.parseLong(entry.toString())))); + fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, entry -> + NumericUtils.humanReadableByteCount((Long.parseLong(entry.toString())))); - TableHeader header = new TableHeader().addTableHeaderField("Partition Path") - .addTableHeaderField("Total Files Added").addTableHeaderField("Total Files Updated") - .addTableHeaderField("Total Records Inserted").addTableHeaderField("Total Records Updated") - .addTableHeaderField("Total Bytes Written").addTableHeaderField("Total Errors"); + TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_INSERTED) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_UPDATED) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS); return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows, exportTableName); @@ -328,27 +345,30 @@ public class CommitsCommand implements CommandMarker { } } - TableHeader header = new TableHeader().addTableHeaderField("Partition Path").addTableHeaderField("File ID") - .addTableHeaderField("Previous Commit").addTableHeaderField("Total Records Updated") - .addTableHeaderField("Total Records Written").addTableHeaderField("Total Bytes Written") - .addTableHeaderField("Total Errors").addTableHeaderField("File Size"); + TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_PREVIOUS_COMMIT) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_UPDATED) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_WRITTEN) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_SIZE); return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows, exportTableName); } @CliCommand(value = "commits compare", help = "Compare commits with another Hoodie table") - public String compareCommits(@CliOption(key = {"path"}, help = "Path of the table to compare to") final String path) - throws Exception { + public String compareCommits(@CliOption(key = {"path"}, help = "Path of the table to compare to") final String path) { HoodieTableMetaClient source = HoodieCLI.getTableMetaClient(); HoodieTableMetaClient target = new HoodieTableMetaClient(HoodieCLI.conf, path); HoodieTimeline targetTimeline = target.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); HoodieTimeline sourceTimeline = source.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); String targetLatestCommit = - targetTimeline.getInstants().iterator().hasNext() ? "0" : targetTimeline.lastInstant().get().getTimestamp(); + targetTimeline.getInstants().iterator().hasNext() ? targetTimeline.lastInstant().get().getTimestamp() : "0"; String sourceLatestCommit = - sourceTimeline.getInstants().iterator().hasNext() ? "0" : sourceTimeline.lastInstant().get().getTimestamp(); + sourceTimeline.getInstants().iterator().hasNext() ? sourceTimeline.lastInstant().get().getTimestamp() : "0"; if (sourceLatestCommit != null && HoodieTimeline.compareTimestamps(targetLatestCommit, HoodieTimeline.GREATER_THAN, sourceLatestCommit)) { diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java index 7b859c203..00ccf7013 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java @@ -136,7 +136,7 @@ public class RepairsCommand implements CommandMarker { } return HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH, - HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows); + HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_ACTION}, rows); } @CliCommand(value = "repair overwrite-hoodie-props", help = "Overwrite hoodie.properties with provided file. Risky operation. Proceed with caution!") diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RollbacksCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RollbacksCommand.java index 4feb4c195..faa778943 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RollbacksCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RollbacksCommand.java @@ -125,7 +125,7 @@ public class RollbacksCommand implements CommandMarker { /** * An Active timeline containing only rollbacks. */ - static class RollbackTimeline extends HoodieActiveTimeline { + public static class RollbackTimeline extends HoodieActiveTimeline { public RollbackTimeline(HoodieTableMetaClient metaClient) { super(metaClient, CollectionUtils.createImmutableSet(HoodieTimeline.ROLLBACK_EXTENSION)); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index b2871e41a..8f7aa8017 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -71,8 +71,8 @@ public class SparkMain { int returnCode = 0; switch (cmd) { case ROLLBACK: - assert (args.length == 3); - returnCode = rollback(jsc, args[1], args[2]); + assert (args.length == 5); + returnCode = rollback(jsc, args[3], args[4]); break; case DEDUPLICATE: assert (args.length == 7); @@ -174,7 +174,7 @@ public class SparkMain { List masterContained = Arrays.asList(SparkCommand.COMPACT_VALIDATE, SparkCommand.COMPACT_REPAIR, SparkCommand.COMPACT_UNSCHEDULE_PLAN, SparkCommand.COMPACT_UNSCHEDULE_FILE, SparkCommand.CLEAN, SparkCommand.IMPORT, SparkCommand.UPSERT, SparkCommand.DEDUPLICATE, SparkCommand.SAVEPOINT, - SparkCommand.DELETE_SAVEPOINT, SparkCommand.ROLLBACK_TO_SAVEPOINT); + SparkCommand.DELETE_SAVEPOINT, SparkCommand.ROLLBACK_TO_SAVEPOINT, SparkCommand.ROLLBACK); return masterContained.contains(command); } diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java new file mode 100644 index 000000000..8bacf4fe0 --- /dev/null +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.commands; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hudi.cli.HoodieCLI; +import org.apache.hudi.cli.HoodiePrintHelper; +import org.apache.hudi.cli.HoodieTableHeaderFields; +import org.apache.hudi.cli.TableHeader; +import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest; +import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.util.NumericUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTimelineArchiveLog; +import org.apache.hudi.testutils.HoodieTestDataGenerator; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.shell.core.CommandResult; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test class for {@link org.apache.hudi.cli.commands.CommitsCommand}. + */ +public class TestCommitsCommand extends AbstractShellIntegrationTest { + + private String tableName; + private String tablePath; + + @BeforeEach + public void init() throws IOException { + tableName = "test_table"; + tablePath = basePath + File.separator + tableName; + + HoodieCLI.conf = jsc.hadoopConfiguration(); + // Create table and connect + new TableCommand().createTable( + tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(), + "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); + } + + private LinkedHashMap generateData() { + // generate data and metadata + LinkedHashMap data = new LinkedHashMap<>(); + data.put("102", new Integer[] {15, 10}); + data.put("101", new Integer[] {20, 10}); + data.put("100", new Integer[] {15, 15}); + + data.forEach((key, value) -> { + HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, key, jsc.hadoopConfiguration(), + Option.of(value[0]), Option.of(value[1])); + }); + + metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); + assertEquals(3, metaClient.reloadActiveTimeline().getCommitsTimeline().countInstants(), + "There should have 3 commits"); + return data; + } + + private String generateExpectData(int records, Map data) throws IOException { + FileSystem fs = FileSystem.get(jsc.hadoopConfiguration()); + List partitionPaths = + FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath); + + int partitions = partitionPaths.size(); + // default pre-commit is not null, file add always be 0 and update always be partition nums + int fileAdded = 0; + int fileUpdated = partitions; + int errors = 0; + + // generate expect result + List rows = new ArrayList<>(); + data.forEach((key, value) -> { + for (int i = 0; i < records; i++) { + // there are more than 1 partitions, so need to * partitions + rows.add(new Comparable[]{key, partitions * HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES, + fileAdded, fileUpdated, partitions, partitions * value[0], partitions * value[1], errors}); + } + }); + + final Map> fieldNameToConverterMap = new HashMap<>(); + fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, entry -> { + return NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString()))); + }); + + final TableHeader header = new TableHeader() + .addTableHeaderField(HoodieTableHeaderFields.HEADER_COMMIT_TIME) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_PARTITIONS_WRITTEN) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_WRITTEN) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_UPDATE_RECORDS_WRITTEN) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS); + + return HoodiePrintHelper.print(header, fieldNameToConverterMap, "", false, + -1, false, rows); + } + + /** + * Test case of 'commits show' command. + */ + @Test + public void testShowCommits() throws IOException { + Map data = generateData(); + + CommandResult cr = getShell().executeCommand("commits show"); + assertTrue(cr.isSuccess()); + + String expected = generateExpectData(1, data); + expected = removeNonWordAndStripSpace(expected); + String got = removeNonWordAndStripSpace(cr.getResult().toString()); + assertEquals(expected, got); + } + + /** + * Test case of 'commits showarchived' command. + */ + @Test + public void testShowArchivedCommits() throws IOException { + // Generate archive + HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) + .withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build()) + .forTable("test-trip-table").build(); + + // generate data and metadata + Map data = new LinkedHashMap<>(); + data.put("104", new Integer[] {20, 10}); + data.put("103", new Integer[] {15, 15}); + data.put("102", new Integer[] {25, 45}); + data.put("101", new Integer[] {35, 15}); + + data.forEach((key, value) -> { + HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, key, jsc.hadoopConfiguration(), + Option.of(value[0]), Option.of(value[1])); + }); + + // archive + metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient); + archiveLog.archiveIfRequired(jsc.hadoopConfiguration()); + + CommandResult cr = getShell().executeCommand(String.format("commits showarchived --startTs %s --endTs %s", "100", "104")); + assertTrue(cr.isSuccess()); + + // archived 101 and 102 instant, generate expect data + assertEquals(2, metaClient.reloadActiveTimeline().getCommitsTimeline().countInstants(), + "There should 2 instants not be archived!"); + + // archived 101 and 102 instants, remove 103 and 104 instant + data.remove("103"); + data.remove("104"); + String expected = generateExpectData(3, data); + expected = removeNonWordAndStripSpace(expected); + String got = removeNonWordAndStripSpace(cr.getResult().toString()); + assertEquals(expected, got); + } + + /** + * Test case of 'commit showpartitions' command. + */ + @Test + public void testShowCommitPartitions() { + Map data = generateData(); + + String commitInstant = "101"; + CommandResult cr = getShell().executeCommand(String.format("commit showpartitions --commit %s", commitInstant)); + assertTrue(cr.isSuccess()); + + Integer[] value = data.get(commitInstant); + List rows = new ArrayList<>(); + // prevCommit not null, so add 0, update 1 + Arrays.asList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).stream().forEach(partition -> + rows.add(new Comparable[] {partition, 0, 1, 0, value[1], HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES, 0}) + ); + + Map> fieldNameToConverterMap = new HashMap<>(); + fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, + entry -> NumericUtils.humanReadableByteCount((Long.parseLong(entry.toString())))); + + TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_INSERTED) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_UPDATED) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS); + + String expected = HoodiePrintHelper.print(header, fieldNameToConverterMap, "", false, -1, false, rows); + expected = removeNonWordAndStripSpace(expected); + String got = removeNonWordAndStripSpace(cr.getResult().toString()); + assertEquals(expected, got); + } + + /** + * Test case of 'commit showfiles' command. + */ + @Test + public void testShowCommitFiles() { + Map data = generateData(); + + String commitInstant = "101"; + CommandResult cr = getShell().executeCommand(String.format("commit showfiles --commit %s", commitInstant)); + assertTrue(cr.isSuccess()); + + Integer[] value = data.get(commitInstant); + List rows = new ArrayList<>(); + Arrays.asList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).stream().forEach(partition -> + rows.add(new Comparable[] {partition, HoodieTestCommitMetadataGenerator.DEFAULT_FILEID, + HoodieTestCommitMetadataGenerator.DEFAULT_PRE_COMMIT, + value[1], value[0], HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES, + // default 0 errors and blank file with 0 size + 0, 0})); + TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_PREVIOUS_COMMIT) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_UPDATED) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_WRITTEN) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_SIZE); + + String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", false, -1, false, rows); + expected = removeNonWordAndStripSpace(expected); + String got = removeNonWordAndStripSpace(cr.getResult().toString()); + assertEquals(expected, got); + } + + /** + * Test case of 'commits compare' command. + */ + @Test + public void testCompareCommits() throws IOException { + Map data = generateData(); + + String tableName2 = "test_table2"; + String tablePath2 = basePath + File.separator + tableName2; + HoodieTestUtils.init(jsc.hadoopConfiguration(), tablePath2, getTableType()); + + data.remove("102"); + data.forEach((key, value) -> { + HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath2, key, jsc.hadoopConfiguration(), + Option.of(value[0]), Option.of(value[1])); + }); + + CommandResult cr = getShell().executeCommand(String.format("commits compare --path %s", tablePath2)); + assertTrue(cr.isSuccess()); + + // the latest instant of test_table2 is 101 + List commitsToCatchup = metaClient.getActiveTimeline().findInstantsAfter("101", Integer.MAX_VALUE) + .getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + String expected = String.format("Source %s is ahead by %d commits. Commits to catch up - %s", + tableName, commitsToCatchup.size(), commitsToCatchup); + assertEquals(expected, cr.getResult().toString()); + } + + /** + * Test case of 'commits sync' command. + */ + @Test + public void testSyncCommits() throws IOException { + Map data = generateData(); + + String tableName2 = "test_table2"; + String tablePath2 = basePath + File.separator + tableName2; + HoodieTestUtils.init(jsc.hadoopConfiguration(), tablePath2, getTableType(), tableName2); + + data.remove("102"); + data.forEach((key, value) -> { + HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath2, key, jsc.hadoopConfiguration(), + Option.of(value[0]), Option.of(value[1])); + }); + + CommandResult cr = getShell().executeCommand(String.format("commits sync --path %s", tablePath2)); + assertTrue(cr.isSuccess()); + + String expected = String.format("Load sync state between %s and %s", tableName, tableName2); + assertEquals(expected, cr.getResult().toString()); + } +} diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java index 1ada2a2ed..9764b9a1e 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java @@ -95,7 +95,7 @@ public class TestRepairsCommand extends AbstractShellIntegrationTest { .map(partition -> new String[]{partition, "No", "None"}) .toArray(String[][]::new); String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH, - HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows); + HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_ACTION}, rows); expected = removeNonWordAndStripSpace(expected); String got = removeNonWordAndStripSpace(cr.getResult().toString()); assertEquals(expected, got); @@ -126,7 +126,7 @@ public class TestRepairsCommand extends AbstractShellIntegrationTest { .map(partition -> new String[]{partition, "No", "Repaired"}) .toArray(String[][]::new); String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH, - HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows); + HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_ACTION}, rows); expected = removeNonWordAndStripSpace(expected); String got = removeNonWordAndStripSpace(cr.getResult().toString()); assertEquals(expected, got); @@ -138,7 +138,7 @@ public class TestRepairsCommand extends AbstractShellIntegrationTest { .map(partition -> new String[]{partition, "Yes", "None"}) .toArray(String[][]::new); expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH, - HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows); + HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_ACTION}, rows); expected = removeNonWordAndStripSpace(expected); got = removeNonWordAndStripSpace(cr.getResult().toString()); assertEquals(expected, got); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java new file mode 100644 index 000000000..dc1b51ee7 --- /dev/null +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.integ; + +import org.apache.hudi.cli.HoodieCLI; +import org.apache.hudi.cli.commands.RollbacksCommand; +import org.apache.hudi.cli.commands.TableCommand; +import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.testutils.HoodieTestDataGenerator; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.shell.core.CommandResult; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Integration test class for {@link org.apache.hudi.cli.commands.CommitsCommand}. + *

+ * A command use SparkLauncher need load jars under lib which generate during mvn package. + * Use integration test instead of unit test. + */ +public class ITTestCommitsCommand extends AbstractShellIntegrationTest { + + private String tablePath; + + @BeforeEach + public void init() throws IOException { + String tableName = "test_table"; + tablePath = basePath + File.separator + tableName; + + HoodieCLI.conf = jsc.hadoopConfiguration(); + // Create table and connect + new TableCommand().createTable( + tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(), + "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); + } + + /** + * Test case of 'commit rollback' command. + */ + @Test + public void testRollbackCommit() throws IOException { + //Create some commits files and parquet files + String commitTime1 = "100"; + String commitTime2 = "101"; + String commitTime3 = "102"; + HoodieTestDataGenerator.writePartitionMetadata(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, tablePath); + + // three commit files + HoodieTestUtils.createCommitFiles(tablePath, commitTime1, commitTime2, commitTime3); + + // generate commit files for commits + for (String commitTime : Arrays.asList(commitTime1, commitTime2, commitTime3)) { + HoodieTestUtils.createDataFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, commitTime, "file-1"); + HoodieTestUtils.createDataFile(tablePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, commitTime, "file-2"); + HoodieTestUtils.createDataFile(tablePath, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, commitTime, "file-3"); + } + + CommandResult cr = getShell().executeCommand(String.format("commit rollback --commit %s --sparkMaster %s --sparkMemory %s", + commitTime3, "local", "4G")); + assertTrue(cr.isSuccess()); + + metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); + + HoodieActiveTimeline rollbackTimeline = new RollbacksCommand.RollbackTimeline(metaClient); + assertEquals(1, rollbackTimeline.getRollbackTimeline().countInstants(), "There should have 1 rollback instant."); + + HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline(); + assertEquals(2, timeline.getCommitsTimeline().countInstants(), "There should have 2 instants."); + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java index 18dce0329..a7ef208e8 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java @@ -126,8 +126,13 @@ public class HoodieTestUtils { public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType) throws IOException { + return init(hadoopConf, basePath, tableType, RAW_TRIPS_TEST_NAME); + } + + public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType, + String tableName) throws IOException { Properties properties = new Properties(); - properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, RAW_TRIPS_TEST_NAME); + properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tableName); properties.setProperty(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name()); properties.setProperty(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, HoodieAvroPayload.class.getName()); return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties);