diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java new file mode 100644 index 000000000..7ccc3c81a --- /dev/null +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.commands; + +import org.apache.hudi.cli.AbstractShellIntegrationTest; +import org.apache.hudi.cli.HoodieCLI; +import org.apache.hudi.cli.HoodiePrintHelper; +import org.apache.hudi.cli.TableHeader; +import org.apache.hudi.cli.common.HoodieTestCommitMetadataGenerator; +import org.apache.hudi.cli.common.HoodieTestCommitUtilities; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieCommitArchiveLog; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.springframework.shell.core.CommandResult; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test Cases for {@link ArchivedCommitsCommand}. + */ +public class TestArchivedCommitsCommand extends AbstractShellIntegrationTest { + + private String tablePath; + + @Before + public void init() throws IOException { + initDFS(); + jsc.hadoopConfiguration().addResource(dfs.getConf()); + HoodieCLI.conf = dfs.getConf(); + + // Create table and connect + String tableName = "test_table"; + tablePath = basePath + File.separator + tableName; + new TableCommand().createTable( + tablePath, tableName, + "COPY_ON_WRITE", "", 1, "org.apache.hudi.common.model.HoodieAvroPayload"); + + metaClient = HoodieCLI.getTableMetaClient(); + + // Generate archive + HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) + .withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build()) + .forTable("test-trip-table").build(); + + // Create six commits + for (int i = 100; i < 106; i++) { + String timestamp = String.valueOf(i); + // Requested Compaction + HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath, + new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, timestamp), dfs.getConf()); + // Inflight Compaction + HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath, + new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, timestamp), dfs.getConf()); + HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, timestamp, dfs.getConf()); + } + + metaClient = HoodieTableMetaClient.reload(metaClient); + // reload the timeline and get all the commits before archive + metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants(); + + // archive + HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); + archiveLog.archiveIfRequired(jsc); + } + + @After + public void clean() throws IOException { + cleanupDFS(); + } + + /** + * Test for command: show archived commit stats. + */ + @Test + public void testShowArchivedCommits() { + CommandResult cr = getShell().executeCommand("show archived commit stats"); + assertTrue(cr.isSuccess()); + + TableHeader header = new TableHeader().addTableHeaderField("action").addTableHeaderField("instant") + .addTableHeaderField("partition").addTableHeaderField("file_id").addTableHeaderField("prev_instant") + .addTableHeaderField("num_writes").addTableHeaderField("num_inserts").addTableHeaderField("num_deletes") + .addTableHeaderField("num_update_writes").addTableHeaderField("total_log_files") + .addTableHeaderField("total_log_blocks").addTableHeaderField("total_corrupt_log_blocks") + .addTableHeaderField("total_rollback_blocks").addTableHeaderField("total_log_records") + .addTableHeaderField("total_updated_records_compacted").addTableHeaderField("total_write_bytes") + .addTableHeaderField("total_write_errors"); + + // Generate expected data + final List rows = new ArrayList<>(); + for (int i = 100; i < 104; i++) { + String instant = String.valueOf(i); + for (int j = 0; j < 3; j++) { + Comparable[] defaultComp = new Comparable[]{"commit", instant, + HoodieTestCommitMetadataGenerator.DEFAULT_SECOND_PARTITION_PATH, + HoodieTestCommitMetadataGenerator.DEFAULT_FILEID, + HoodieTestCommitMetadataGenerator.DEFAULT_PRE_COMMIT, + HoodieTestCommitMetadataGenerator.DEFAULT_NUM_WRITES, + HoodieTestCommitMetadataGenerator.DEFAULT_OTHER_VALUE, + HoodieTestCommitMetadataGenerator.DEFAULT_OTHER_VALUE, + HoodieTestCommitMetadataGenerator.DEFAULT_NUM_UPDATE_WRITES, + HoodieTestCommitMetadataGenerator.DEFAULT_NULL_VALUE, + HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_LOG_BLOCKS, + HoodieTestCommitMetadataGenerator.DEFAULT_OTHER_VALUE, + HoodieTestCommitMetadataGenerator.DEFAULT_OTHER_VALUE, + HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_LOG_RECORDS, + HoodieTestCommitMetadataGenerator.DEFAULT_OTHER_VALUE, + HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES, + HoodieTestCommitMetadataGenerator.DEFAULT_OTHER_VALUE}; + rows.add(defaultComp.clone()); + defaultComp[2] = HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH; + rows.add(defaultComp); + } + } + + String expectedResult = HoodiePrintHelper.print( + header, new HashMap<>(), "", false, -1, false, rows); + assertEquals(expectedResult, cr.getResult().toString()); + } + + /** + * Test for command: show archived commits. + */ + @Test + public void testShowCommits() throws IOException { + CommandResult cr = getShell().executeCommand("show archived commits"); + assertTrue(cr.isSuccess()); + final List rows = new ArrayList<>(); + + // Test default skipMetadata and limit 10 + TableHeader header = new TableHeader().addTableHeaderField("CommitTime").addTableHeaderField("CommitType"); + for (int i = 100; i < 103; i++) { + String instant = String.valueOf(i); + Comparable[] result = new Comparable[]{instant, "commit"}; + rows.add(result); + rows.add(result); + rows.add(result); + } + rows.add(new Comparable[]{"103", "commit"}); + String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", false, 10, false, rows); + assertEquals(expected, cr.getResult().toString()); + + // Test with Metadata and no limit + cr = getShell().executeCommand("show archived commits --skipMetadata false --limit -1"); + assertTrue(cr.isSuccess()); + + rows.clear(); + + HoodieCommitMetadata metadata = HoodieTestCommitMetadataGenerator.generateCommitMetadata(tablePath); + for (int i = 100; i < 104; i++) { + String instant = String.valueOf(i); + // Since HoodiePrintHelper order data by default, need to order commitMetadata + Comparable[] result = new Comparable[]{ + instant, "commit", HoodieTestCommitUtilities.convertAndOrderCommitMetadata(metadata)}; + rows.add(result); + rows.add(result); + rows.add(result); + } + header = header.addTableHeaderField("CommitDetails"); + expected = HoodiePrintHelper.print(header, new HashMap<>(), "", false, -1, false, rows); + assertEquals(expected, cr.getResult().toString()); + } +} diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/common/HoodieTestCommitMetadataGenerator.java b/hudi-cli/src/test/java/org/apache/hudi/cli/common/HoodieTestCommitMetadataGenerator.java new file mode 100644 index 000000000..659b15b88 --- /dev/null +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/common/HoodieTestCommitMetadataGenerator.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.common; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.HoodieTestDataGenerator; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieTestUtils; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTimeline; +import org.apache.hudi.common.util.FSUtils; +import org.apache.hudi.exception.HoodieIOException; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * Class to be used in tests to keep generating test inserts and updates against a corpus. + */ +public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator { + + // default commit metadata value + public static final String DEFAULT_PATH = "path"; + public static final String DEFAULT_FILEID = "fileId"; + public static final int DEFAULT_TOTAL_WRITE_BYTES = 50; + public static final String DEFAULT_PRE_COMMIT = "commit-1"; + public static final int DEFAULT_NUM_WRITES = 10; + public static final int DEFAULT_NUM_UPDATE_WRITES = 15; + public static final int DEFAULT_TOTAL_LOG_BLOCKS = 1; + public static final int DEFAULT_TOTAL_LOG_RECORDS = 10; + public static final int DEFAULT_OTHER_VALUE = 0; + public static final String DEFAULT_NULL_VALUE = "null"; + + /** + * Create a commit file with default CommitMetadata. + */ + public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration) { + Arrays.asList(HoodieTimeline.makeCommitFileName(commitTime), HoodieTimeline.makeInflightCommitFileName(commitTime), + HoodieTimeline.makeRequestedCommitFileName(commitTime)) + .forEach(f -> { + Path commitFile = new Path( + basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + f); + FSDataOutputStream os = null; + try { + FileSystem fs = FSUtils.getFs(basePath, configuration); + os = fs.create(commitFile, true); + // Generate commitMetadata + HoodieCommitMetadata commitMetadata = generateCommitMetadata(basePath); + // Write empty commit metadata + os.writeBytes(new String(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } finally { + if (null != os) { + try { + os.close(); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + } + } + }); + } + + /** + * Generate commitMetadata in path. + */ + public static HoodieCommitMetadata generateCommitMetadata(String basePath) throws IOException { + String file1P0C0 = + HoodieTestUtils.createNewDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, "000"); + String file1P1C0 = + HoodieTestUtils.createNewDataFile(basePath, DEFAULT_SECOND_PARTITION_PATH, "000"); + return generateCommitMetadata(new ImmutableMap.Builder() + .put(DEFAULT_FIRST_PARTITION_PATH, new ImmutableList.Builder<>().add(file1P0C0).build()) + .put(DEFAULT_SECOND_PARTITION_PATH, new ImmutableList.Builder<>().add(file1P1C0).build()) + .build()); + } + + /** + * Method to generate commit metadata. + */ + private static HoodieCommitMetadata generateCommitMetadata(Map> partitionToFilePaths) { + HoodieCommitMetadata metadata = new HoodieCommitMetadata(); + partitionToFilePaths.forEach((key, value) -> value.forEach(f -> { + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setPartitionPath(key); + writeStat.setPath(DEFAULT_PATH); + writeStat.setFileId(DEFAULT_FILEID); + writeStat.setTotalWriteBytes(DEFAULT_TOTAL_WRITE_BYTES); + writeStat.setPrevCommit(DEFAULT_PRE_COMMIT); + writeStat.setNumWrites(DEFAULT_NUM_WRITES); + writeStat.setNumUpdateWrites(DEFAULT_NUM_UPDATE_WRITES); + writeStat.setTotalLogBlocks(DEFAULT_TOTAL_LOG_BLOCKS); + writeStat.setTotalLogRecords(DEFAULT_TOTAL_LOG_RECORDS); + metadata.addWriteStat(key, writeStat); + })); + return metadata; + } +} diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/common/HoodieTestCommitUtilities.java b/hudi-cli/src/test/java/org/apache/hudi/cli/common/HoodieTestCommitUtilities.java new file mode 100644 index 000000000..bfd0f0fe7 --- /dev/null +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/common/HoodieTestCommitUtilities.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.common; + +import org.apache.hudi.avro.model.HoodieWriteStat; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.table.HoodieCommitArchiveLog; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Utility methods to commit instant for test. + */ +public class HoodieTestCommitUtilities { + + /** + * Converter HoodieCommitMetadata to avro format and ordered by partition. + */ + public static org.apache.hudi.avro.model.HoodieCommitMetadata convertAndOrderCommitMetadata( + HoodieCommitMetadata hoodieCommitMetadata) { + return orderCommitMetadata(HoodieCommitArchiveLog.convertCommitMetadata(hoodieCommitMetadata)); + } + + /** + * Ordered by partition asc. + */ + public static org.apache.hudi.avro.model.HoodieCommitMetadata orderCommitMetadata( + org.apache.hudi.avro.model.HoodieCommitMetadata hoodieCommitMetadata) { + Map> result = new LinkedHashMap<>(); + hoodieCommitMetadata.getPartitionToWriteStats().entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .forEachOrdered(e -> result.put(e.getKey(), e.getValue())); + hoodieCommitMetadata.setPartitionToWriteStats(result); + return hoodieCommitMetadata; + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java index e99c1e797..a3a11a9db 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java @@ -295,7 +295,7 @@ public class HoodieCommitArchiveLog { case HoodieTimeline.COMMIT_ACTION: { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata .fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class); - archivedMetaWrapper.setHoodieCommitMetadata(commitMetadataConverter(commitMetadata)); + archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata)); archivedMetaWrapper.setActionType(ActionType.commit.name()); break; } @@ -314,7 +314,7 @@ public class HoodieCommitArchiveLog { case HoodieTimeline.DELTA_COMMIT_ACTION: { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata .fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class); - archivedMetaWrapper.setHoodieCommitMetadata(commitMetadataConverter(commitMetadata)); + archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata)); archivedMetaWrapper.setActionType(ActionType.commit.name()); break; } @@ -331,7 +331,7 @@ public class HoodieCommitArchiveLog { return archivedMetaWrapper; } - public org.apache.hudi.avro.model.HoodieCommitMetadata commitMetadataConverter( + public static org.apache.hudi.avro.model.HoodieCommitMetadata convertCommitMetadata( HoodieCommitMetadata hoodieCommitMetadata) { ObjectMapper mapper = new ObjectMapper(); // Need this to ignore other public get() methods diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java index 8d00d3820..2969effed 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java @@ -417,7 +417,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness { } @Test - public void testCommitMetadataConverter() { + public void testConvertCommitMetadata() { HoodieCommitMetadata hoodieCommitMetadata = new HoodieCommitMetadata(); hoodieCommitMetadata.setOperationType(WriteOperationType.INSERT); @@ -428,7 +428,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness { metaClient = HoodieTableMetaClient.reload(metaClient); HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); - org.apache.hudi.avro.model.HoodieCommitMetadata expectedCommitMetadata = archiveLog.commitMetadataConverter(hoodieCommitMetadata); + org.apache.hudi.avro.model.HoodieCommitMetadata expectedCommitMetadata = archiveLog.convertCommitMetadata(hoodieCommitMetadata); assertEquals(expectedCommitMetadata.getOperationType(), WriteOperationType.INSERT.toString()); } }