[HUDI-697]Add unit test for ArchivedCommitsCommand (#1424)
This commit is contained in:
@@ -0,0 +1,194 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.cli.commands;
|
||||
|
||||
import org.apache.hudi.cli.AbstractShellIntegrationTest;
|
||||
import org.apache.hudi.cli.HoodieCLI;
|
||||
import org.apache.hudi.cli.HoodiePrintHelper;
|
||||
import org.apache.hudi.cli.TableHeader;
|
||||
import org.apache.hudi.cli.common.HoodieTestCommitMetadataGenerator;
|
||||
import org.apache.hudi.cli.common.HoodieTestCommitUtilities;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieCommitArchiveLog;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.springframework.shell.core.CommandResult;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Test Cases for {@link ArchivedCommitsCommand}.
|
||||
*/
|
||||
public class TestArchivedCommitsCommand extends AbstractShellIntegrationTest {
|
||||
|
||||
private String tablePath;
|
||||
|
||||
@Before
|
||||
public void init() throws IOException {
|
||||
initDFS();
|
||||
jsc.hadoopConfiguration().addResource(dfs.getConf());
|
||||
HoodieCLI.conf = dfs.getConf();
|
||||
|
||||
// Create table and connect
|
||||
String tableName = "test_table";
|
||||
tablePath = basePath + File.separator + tableName;
|
||||
new TableCommand().createTable(
|
||||
tablePath, tableName,
|
||||
"COPY_ON_WRITE", "", 1, "org.apache.hudi.common.model.HoodieAvroPayload");
|
||||
|
||||
metaClient = HoodieCLI.getTableMetaClient();
|
||||
|
||||
// Generate archive
|
||||
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
|
||||
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
|
||||
.forTable("test-trip-table").build();
|
||||
|
||||
// Create six commits
|
||||
for (int i = 100; i < 106; i++) {
|
||||
String timestamp = String.valueOf(i);
|
||||
// Requested Compaction
|
||||
HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath,
|
||||
new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, timestamp), dfs.getConf());
|
||||
// Inflight Compaction
|
||||
HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath,
|
||||
new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, timestamp), dfs.getConf());
|
||||
HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, timestamp, dfs.getConf());
|
||||
}
|
||||
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
// reload the timeline and get all the commits before archive
|
||||
metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
|
||||
|
||||
// archive
|
||||
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
|
||||
archiveLog.archiveIfRequired(jsc);
|
||||
}
|
||||
|
||||
@After
|
||||
public void clean() throws IOException {
|
||||
cleanupDFS();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for command: show archived commit stats.
|
||||
*/
|
||||
@Test
|
||||
public void testShowArchivedCommits() {
|
||||
CommandResult cr = getShell().executeCommand("show archived commit stats");
|
||||
assertTrue(cr.isSuccess());
|
||||
|
||||
TableHeader header = new TableHeader().addTableHeaderField("action").addTableHeaderField("instant")
|
||||
.addTableHeaderField("partition").addTableHeaderField("file_id").addTableHeaderField("prev_instant")
|
||||
.addTableHeaderField("num_writes").addTableHeaderField("num_inserts").addTableHeaderField("num_deletes")
|
||||
.addTableHeaderField("num_update_writes").addTableHeaderField("total_log_files")
|
||||
.addTableHeaderField("total_log_blocks").addTableHeaderField("total_corrupt_log_blocks")
|
||||
.addTableHeaderField("total_rollback_blocks").addTableHeaderField("total_log_records")
|
||||
.addTableHeaderField("total_updated_records_compacted").addTableHeaderField("total_write_bytes")
|
||||
.addTableHeaderField("total_write_errors");
|
||||
|
||||
// Generate expected data
|
||||
final List<Comparable[]> rows = new ArrayList<>();
|
||||
for (int i = 100; i < 104; i++) {
|
||||
String instant = String.valueOf(i);
|
||||
for (int j = 0; j < 3; j++) {
|
||||
Comparable[] defaultComp = new Comparable[]{"commit", instant,
|
||||
HoodieTestCommitMetadataGenerator.DEFAULT_SECOND_PARTITION_PATH,
|
||||
HoodieTestCommitMetadataGenerator.DEFAULT_FILEID,
|
||||
HoodieTestCommitMetadataGenerator.DEFAULT_PRE_COMMIT,
|
||||
HoodieTestCommitMetadataGenerator.DEFAULT_NUM_WRITES,
|
||||
HoodieTestCommitMetadataGenerator.DEFAULT_OTHER_VALUE,
|
||||
HoodieTestCommitMetadataGenerator.DEFAULT_OTHER_VALUE,
|
||||
HoodieTestCommitMetadataGenerator.DEFAULT_NUM_UPDATE_WRITES,
|
||||
HoodieTestCommitMetadataGenerator.DEFAULT_NULL_VALUE,
|
||||
HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_LOG_BLOCKS,
|
||||
HoodieTestCommitMetadataGenerator.DEFAULT_OTHER_VALUE,
|
||||
HoodieTestCommitMetadataGenerator.DEFAULT_OTHER_VALUE,
|
||||
HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_LOG_RECORDS,
|
||||
HoodieTestCommitMetadataGenerator.DEFAULT_OTHER_VALUE,
|
||||
HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES,
|
||||
HoodieTestCommitMetadataGenerator.DEFAULT_OTHER_VALUE};
|
||||
rows.add(defaultComp.clone());
|
||||
defaultComp[2] = HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH;
|
||||
rows.add(defaultComp);
|
||||
}
|
||||
}
|
||||
|
||||
String expectedResult = HoodiePrintHelper.print(
|
||||
header, new HashMap<>(), "", false, -1, false, rows);
|
||||
assertEquals(expectedResult, cr.getResult().toString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for command: show archived commits.
|
||||
*/
|
||||
@Test
|
||||
public void testShowCommits() throws IOException {
|
||||
CommandResult cr = getShell().executeCommand("show archived commits");
|
||||
assertTrue(cr.isSuccess());
|
||||
final List<Comparable[]> rows = new ArrayList<>();
|
||||
|
||||
// Test default skipMetadata and limit 10
|
||||
TableHeader header = new TableHeader().addTableHeaderField("CommitTime").addTableHeaderField("CommitType");
|
||||
for (int i = 100; i < 103; i++) {
|
||||
String instant = String.valueOf(i);
|
||||
Comparable[] result = new Comparable[]{instant, "commit"};
|
||||
rows.add(result);
|
||||
rows.add(result);
|
||||
rows.add(result);
|
||||
}
|
||||
rows.add(new Comparable[]{"103", "commit"});
|
||||
String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", false, 10, false, rows);
|
||||
assertEquals(expected, cr.getResult().toString());
|
||||
|
||||
// Test with Metadata and no limit
|
||||
cr = getShell().executeCommand("show archived commits --skipMetadata false --limit -1");
|
||||
assertTrue(cr.isSuccess());
|
||||
|
||||
rows.clear();
|
||||
|
||||
HoodieCommitMetadata metadata = HoodieTestCommitMetadataGenerator.generateCommitMetadata(tablePath);
|
||||
for (int i = 100; i < 104; i++) {
|
||||
String instant = String.valueOf(i);
|
||||
// Since HoodiePrintHelper order data by default, need to order commitMetadata
|
||||
Comparable[] result = new Comparable[]{
|
||||
instant, "commit", HoodieTestCommitUtilities.convertAndOrderCommitMetadata(metadata)};
|
||||
rows.add(result);
|
||||
rows.add(result);
|
||||
rows.add(result);
|
||||
}
|
||||
header = header.addTableHeaderField("CommitDetails");
|
||||
expected = HoodiePrintHelper.print(header, new HashMap<>(), "", false, -1, false, rows);
|
||||
assertEquals(expected, cr.getResult().toString());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,124 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.cli.common;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.common.HoodieTestDataGenerator;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieTestUtils;
|
||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.HoodieTimeline;
|
||||
import org.apache.hudi.common.util.FSUtils;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Class to be used in tests to keep generating test inserts and updates against a corpus.
|
||||
*/
|
||||
public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator {
|
||||
|
||||
// default commit metadata value
|
||||
public static final String DEFAULT_PATH = "path";
|
||||
public static final String DEFAULT_FILEID = "fileId";
|
||||
public static final int DEFAULT_TOTAL_WRITE_BYTES = 50;
|
||||
public static final String DEFAULT_PRE_COMMIT = "commit-1";
|
||||
public static final int DEFAULT_NUM_WRITES = 10;
|
||||
public static final int DEFAULT_NUM_UPDATE_WRITES = 15;
|
||||
public static final int DEFAULT_TOTAL_LOG_BLOCKS = 1;
|
||||
public static final int DEFAULT_TOTAL_LOG_RECORDS = 10;
|
||||
public static final int DEFAULT_OTHER_VALUE = 0;
|
||||
public static final String DEFAULT_NULL_VALUE = "null";
|
||||
|
||||
/**
|
||||
* Create a commit file with default CommitMetadata.
|
||||
*/
|
||||
public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration) {
|
||||
Arrays.asList(HoodieTimeline.makeCommitFileName(commitTime), HoodieTimeline.makeInflightCommitFileName(commitTime),
|
||||
HoodieTimeline.makeRequestedCommitFileName(commitTime))
|
||||
.forEach(f -> {
|
||||
Path commitFile = new Path(
|
||||
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + f);
|
||||
FSDataOutputStream os = null;
|
||||
try {
|
||||
FileSystem fs = FSUtils.getFs(basePath, configuration);
|
||||
os = fs.create(commitFile, true);
|
||||
// Generate commitMetadata
|
||||
HoodieCommitMetadata commitMetadata = generateCommitMetadata(basePath);
|
||||
// Write empty commit metadata
|
||||
os.writeBytes(new String(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
||||
} catch (IOException ioe) {
|
||||
throw new HoodieIOException(ioe.getMessage(), ioe);
|
||||
} finally {
|
||||
if (null != os) {
|
||||
try {
|
||||
os.close();
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate commitMetadata in path.
|
||||
*/
|
||||
public static HoodieCommitMetadata generateCommitMetadata(String basePath) throws IOException {
|
||||
String file1P0C0 =
|
||||
HoodieTestUtils.createNewDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, "000");
|
||||
String file1P1C0 =
|
||||
HoodieTestUtils.createNewDataFile(basePath, DEFAULT_SECOND_PARTITION_PATH, "000");
|
||||
return generateCommitMetadata(new ImmutableMap.Builder()
|
||||
.put(DEFAULT_FIRST_PARTITION_PATH, new ImmutableList.Builder<>().add(file1P0C0).build())
|
||||
.put(DEFAULT_SECOND_PARTITION_PATH, new ImmutableList.Builder<>().add(file1P1C0).build())
|
||||
.build());
|
||||
}
|
||||
|
||||
/**
|
||||
* Method to generate commit metadata.
|
||||
*/
|
||||
private static HoodieCommitMetadata generateCommitMetadata(Map<String, List<String>> partitionToFilePaths) {
|
||||
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
|
||||
partitionToFilePaths.forEach((key, value) -> value.forEach(f -> {
|
||||
HoodieWriteStat writeStat = new HoodieWriteStat();
|
||||
writeStat.setPartitionPath(key);
|
||||
writeStat.setPath(DEFAULT_PATH);
|
||||
writeStat.setFileId(DEFAULT_FILEID);
|
||||
writeStat.setTotalWriteBytes(DEFAULT_TOTAL_WRITE_BYTES);
|
||||
writeStat.setPrevCommit(DEFAULT_PRE_COMMIT);
|
||||
writeStat.setNumWrites(DEFAULT_NUM_WRITES);
|
||||
writeStat.setNumUpdateWrites(DEFAULT_NUM_UPDATE_WRITES);
|
||||
writeStat.setTotalLogBlocks(DEFAULT_TOTAL_LOG_BLOCKS);
|
||||
writeStat.setTotalLogRecords(DEFAULT_TOTAL_LOG_RECORDS);
|
||||
metadata.addWriteStat(key, writeStat);
|
||||
}));
|
||||
return metadata;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,54 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.cli.common;
|
||||
|
||||
import org.apache.hudi.avro.model.HoodieWriteStat;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.table.HoodieCommitArchiveLog;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Utility methods to commit instant for test.
|
||||
*/
|
||||
public class HoodieTestCommitUtilities {
|
||||
|
||||
/**
|
||||
* Converter HoodieCommitMetadata to avro format and ordered by partition.
|
||||
*/
|
||||
public static org.apache.hudi.avro.model.HoodieCommitMetadata convertAndOrderCommitMetadata(
|
||||
HoodieCommitMetadata hoodieCommitMetadata) {
|
||||
return orderCommitMetadata(HoodieCommitArchiveLog.convertCommitMetadata(hoodieCommitMetadata));
|
||||
}
|
||||
|
||||
/**
|
||||
* Ordered by partition asc.
|
||||
*/
|
||||
public static org.apache.hudi.avro.model.HoodieCommitMetadata orderCommitMetadata(
|
||||
org.apache.hudi.avro.model.HoodieCommitMetadata hoodieCommitMetadata) {
|
||||
Map<String, List<HoodieWriteStat>> result = new LinkedHashMap<>();
|
||||
hoodieCommitMetadata.getPartitionToWriteStats().entrySet().stream()
|
||||
.sorted(Map.Entry.comparingByKey())
|
||||
.forEachOrdered(e -> result.put(e.getKey(), e.getValue()));
|
||||
hoodieCommitMetadata.setPartitionToWriteStats(result);
|
||||
return hoodieCommitMetadata;
|
||||
}
|
||||
}
|
||||
@@ -295,7 +295,7 @@ public class HoodieCommitArchiveLog {
|
||||
case HoodieTimeline.COMMIT_ACTION: {
|
||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
|
||||
.fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class);
|
||||
archivedMetaWrapper.setHoodieCommitMetadata(commitMetadataConverter(commitMetadata));
|
||||
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata));
|
||||
archivedMetaWrapper.setActionType(ActionType.commit.name());
|
||||
break;
|
||||
}
|
||||
@@ -314,7 +314,7 @@ public class HoodieCommitArchiveLog {
|
||||
case HoodieTimeline.DELTA_COMMIT_ACTION: {
|
||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
|
||||
.fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class);
|
||||
archivedMetaWrapper.setHoodieCommitMetadata(commitMetadataConverter(commitMetadata));
|
||||
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata));
|
||||
archivedMetaWrapper.setActionType(ActionType.commit.name());
|
||||
break;
|
||||
}
|
||||
@@ -331,7 +331,7 @@ public class HoodieCommitArchiveLog {
|
||||
return archivedMetaWrapper;
|
||||
}
|
||||
|
||||
public org.apache.hudi.avro.model.HoodieCommitMetadata commitMetadataConverter(
|
||||
public static org.apache.hudi.avro.model.HoodieCommitMetadata convertCommitMetadata(
|
||||
HoodieCommitMetadata hoodieCommitMetadata) {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
// Need this to ignore other public get() methods
|
||||
|
||||
@@ -417,7 +417,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCommitMetadataConverter() {
|
||||
public void testConvertCommitMetadata() {
|
||||
HoodieCommitMetadata hoodieCommitMetadata = new HoodieCommitMetadata();
|
||||
hoodieCommitMetadata.setOperationType(WriteOperationType.INSERT);
|
||||
|
||||
@@ -428,7 +428,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
|
||||
|
||||
org.apache.hudi.avro.model.HoodieCommitMetadata expectedCommitMetadata = archiveLog.commitMetadataConverter(hoodieCommitMetadata);
|
||||
org.apache.hudi.avro.model.HoodieCommitMetadata expectedCommitMetadata = archiveLog.convertCommitMetadata(hoodieCommitMetadata);
|
||||
assertEquals(expectedCommitMetadata.getOperationType(), WriteOperationType.INSERT.toString());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user