[HUDI-702] Add test for HoodieLogFileCommand (#1522)
This commit is contained in:
@@ -26,15 +26,25 @@ public class HoodieTableHeaderFields {
|
||||
public static final String HEADER_PARTITION_PATH = HEADER_PARTITION + " Path";
|
||||
public static final String HEADER_FILE_ID = "FileId";
|
||||
public static final String HEADER_BASE_INSTANT = "Base-Instant";
|
||||
|
||||
public static final String HEADER_INSTANT_TIME = "InstantTime";
|
||||
public static final String HEADER_CLEAN_TIME = "CleanTime";
|
||||
public static final String HEADER_EARLIEST_COMMAND_RETAINED = "EarliestCommandRetained";
|
||||
public static final String HEADER_CLEANING_POLICY = "Cleaning policy";
|
||||
|
||||
public static final String HEADER_TOTAL_FILES_DELETED = "Total Files Deleted";
|
||||
public static final String HEADER_TOTAL_FILES_SUCCESSFULLY_DELETED = "Total Files Successfully Deleted";
|
||||
public static final String HEADER_TOTAL_FAILED_DELETIONS = "Total Failed Deletions";
|
||||
public static final String HEADER_TOTAL_TIME_TAKEN = "Total Time Taken";
|
||||
|
||||
/**
|
||||
* Fields of log file.
|
||||
*/
|
||||
public static final String HEADER_RECORDS = "Records";
|
||||
public static final String HEADER_RECORD_COUNT = "RecordCount";
|
||||
public static final String HEADER_BLOCK_TYPE = "BlockType";
|
||||
public static final String HEADER_HEADER_METADATA = "HeaderMetadata";
|
||||
public static final String HEADER_FOOTER_METADATA = "FooterMetadata";
|
||||
|
||||
/**
|
||||
* Fields of data header.
|
||||
*/
|
||||
|
||||
@@ -20,6 +20,7 @@ package org.apache.hudi.cli.commands;
|
||||
|
||||
import org.apache.hudi.cli.HoodieCLI;
|
||||
import org.apache.hudi.cli.HoodiePrintHelper;
|
||||
import org.apache.hudi.cli.HoodieTableHeaderFields;
|
||||
import org.apache.hudi.cli.TableHeader;
|
||||
import org.apache.hudi.common.model.HoodieLogFile;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
@@ -53,6 +54,7 @@ import org.springframework.stereotype.Component;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -134,7 +136,6 @@ public class HoodieLogFileCommand implements CommandMarker {
|
||||
reader.close();
|
||||
}
|
||||
List<Comparable[]> rows = new ArrayList<>();
|
||||
int i = 0;
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
for (Map.Entry<String, List<Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer>>> entry : commitCountAndMetadata
|
||||
.entrySet()) {
|
||||
@@ -148,12 +149,14 @@ public class HoodieLogFileCommand implements CommandMarker {
|
||||
output[3] = objectMapper.writeValueAsString(tuple3._2()._1());
|
||||
output[4] = objectMapper.writeValueAsString(tuple3._2()._2());
|
||||
rows.add(output);
|
||||
i++;
|
||||
}
|
||||
}
|
||||
|
||||
TableHeader header = new TableHeader().addTableHeaderField("InstantTime").addTableHeaderField("RecordCount")
|
||||
.addTableHeaderField("BlockType").addTableHeaderField("HeaderMetadata").addTableHeaderField("FooterMetadata");
|
||||
TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT_TIME)
|
||||
.addTableHeaderField(HoodieTableHeaderFields.HEADER_RECORD_COUNT)
|
||||
.addTableHeaderField(HoodieTableHeaderFields.HEADER_BLOCK_TYPE)
|
||||
.addTableHeaderField(HoodieTableHeaderFields.HEADER_HEADER_METADATA)
|
||||
.addTableHeaderField(HoodieTableHeaderFields.HEADER_FOOTER_METADATA);
|
||||
|
||||
return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows);
|
||||
}
|
||||
@@ -173,7 +176,11 @@ public class HoodieLogFileCommand implements CommandMarker {
|
||||
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
|
||||
FileSystem fs = client.getFs();
|
||||
List<String> logFilePaths = Arrays.stream(fs.globStatus(new Path(logFilePathPattern)))
|
||||
.map(status -> status.getPath().toString()).collect(Collectors.toList());
|
||||
.map(status -> status.getPath().toString()).sorted(Comparator.reverseOrder())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// logFilePaths size must > 1
|
||||
assert logFilePaths.size() > 0 : "There is no log file";
|
||||
|
||||
// TODO : readerSchema can change across blocks/log files, fix this inside Scanner
|
||||
AvroSchemaConverter converter = new AvroSchemaConverter();
|
||||
@@ -232,6 +239,6 @@ public class HoodieLogFileCommand implements CommandMarker {
|
||||
rows[i] = data;
|
||||
i++;
|
||||
}
|
||||
return HoodiePrintHelper.print(new String[] {"Records"}, rows);
|
||||
return HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_RECORDS}, rows);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,222 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.cli.commands;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||
import org.apache.hudi.cli.AbstractShellIntegrationTest;
|
||||
import org.apache.hudi.cli.HoodieCLI;
|
||||
import org.apache.hudi.cli.HoodieTableHeaderFields;
|
||||
import org.apache.hudi.cli.HoodiePrintHelper;
|
||||
import org.apache.hudi.cli.TableHeader;
|
||||
import org.apache.hudi.cli.common.HoodieTestCommitMetadataGenerator;
|
||||
import org.apache.hudi.common.model.HoodieLogFile;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.table.log.HoodieLogFormat;
|
||||
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
|
||||
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
|
||||
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
|
||||
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.SchemaTestUtil;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
import org.apache.hudi.config.HoodieMemoryConfig;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.springframework.shell.core.CommandResult;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Arrays;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hudi.common.util.SchemaTestUtil.getSimpleSchema;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
/**
|
||||
* Test Cases for {@link HoodieLogFileCommand}.
|
||||
*/
|
||||
public class TestHoodieLogFileCommand extends AbstractShellIntegrationTest {
|
||||
|
||||
private String partitionPath;
|
||||
private HoodieAvroDataBlock dataBlock;
|
||||
private String tablePath;
|
||||
|
||||
private static final String INSTANT_TIME = "100";
|
||||
|
||||
@BeforeEach
|
||||
public void init() throws IOException, InterruptedException, URISyntaxException {
|
||||
HoodieCLI.conf = jsc.hadoopConfiguration();
|
||||
|
||||
// Create table and connect
|
||||
String tableName = "test_table";
|
||||
tablePath = basePath + File.separator + tableName;
|
||||
partitionPath = tablePath + File.separator + HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH;
|
||||
new TableCommand().createTable(
|
||||
tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(),
|
||||
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
|
||||
|
||||
Files.createDirectories(Paths.get(partitionPath));
|
||||
|
||||
HoodieLogFormat.Writer writer = null;
|
||||
try {
|
||||
writer =
|
||||
HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionPath))
|
||||
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
|
||||
.withFileId("test-log-fileid1").overBaseCommit("100").withFs(fs).build();
|
||||
|
||||
// write data to file
|
||||
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME);
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
|
||||
dataBlock = new HoodieAvroDataBlock(records, header);
|
||||
writer = writer.appendBlock(dataBlock);
|
||||
} finally {
|
||||
if (writer != null) {
|
||||
writer.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test case for 'show logfile metadata'.
|
||||
*/
|
||||
@Test
|
||||
public void testShowLogFileCommits() throws JsonProcessingException {
|
||||
CommandResult cr = getShell().executeCommand("show logfile metadata --logFilePathPattern " + partitionPath + "/*");
|
||||
assertTrue(cr.isSuccess());
|
||||
|
||||
TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT_TIME)
|
||||
.addTableHeaderField(HoodieTableHeaderFields.HEADER_RECORD_COUNT)
|
||||
.addTableHeaderField(HoodieTableHeaderFields.HEADER_BLOCK_TYPE)
|
||||
.addTableHeaderField(HoodieTableHeaderFields.HEADER_HEADER_METADATA)
|
||||
.addTableHeaderField(HoodieTableHeaderFields.HEADER_FOOTER_METADATA);
|
||||
|
||||
// construct expect result, there is only 1 line.
|
||||
List<Comparable[]> rows = new ArrayList<>();
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
String headerStr = objectMapper.writeValueAsString(dataBlock.getLogBlockHeader());
|
||||
String footerStr = objectMapper.writeValueAsString(dataBlock.getLogBlockFooter());
|
||||
Comparable[] output = new Comparable[]{INSTANT_TIME, 100, dataBlock.getBlockType(), headerStr, footerStr};
|
||||
rows.add(output);
|
||||
|
||||
String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", false, -1, false, rows);
|
||||
|
||||
assertEquals(expected, cr.getResult().toString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test case for 'show logfile records'.
|
||||
*/
|
||||
@Test
|
||||
public void testShowLogFileRecords() throws IOException, URISyntaxException {
|
||||
CommandResult cr = getShell().executeCommand("show logfile records --logFilePathPattern " + partitionPath + "/*");
|
||||
assertTrue(cr.isSuccess());
|
||||
|
||||
// construct expect result, get 10 records.
|
||||
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 10);
|
||||
String[][] rows = records.stream().map(r -> new String[]{r.toString()}).toArray(String[][]::new);
|
||||
String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_RECORDS}, rows);
|
||||
|
||||
assertEquals(expected, cr.getResult().toString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test case for 'show logfile records' with merge.
|
||||
*/
|
||||
@Test
|
||||
public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedException, URISyntaxException {
|
||||
// create commit instant
|
||||
HoodieTestCommitMetadataGenerator.createCommitFile(tablePath, INSTANT_TIME, HoodieCLI.conf);
|
||||
|
||||
// write to path '2015/03/16'.
|
||||
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
|
||||
partitionPath = tablePath + File.separator + HoodieTestCommitMetadataGenerator.DEFAULT_SECOND_PARTITION_PATH;
|
||||
Files.createDirectories(Paths.get(partitionPath));
|
||||
|
||||
HoodieLogFormat.Writer writer = null;
|
||||
try {
|
||||
// set little threshold to split file.
|
||||
writer =
|
||||
HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionPath))
|
||||
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
|
||||
.withFileId("test-log-fileid1").overBaseCommit(INSTANT_TIME).withFs(fs).withSizeThreshold(500).build();
|
||||
|
||||
List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME);
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
|
||||
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
|
||||
writer = writer.appendBlock(dataBlock);
|
||||
} finally {
|
||||
if (writer != null) {
|
||||
writer.close();
|
||||
}
|
||||
}
|
||||
|
||||
CommandResult cr = getShell().executeCommand("show logfile records --logFilePathPattern "
|
||||
+ partitionPath + "/* --mergeRecords true");
|
||||
assertTrue(cr.isSuccess());
|
||||
|
||||
// get expected result of 10 records.
|
||||
List<String> logFilePaths = Arrays.stream(fs.globStatus(new Path(partitionPath + "/*")))
|
||||
.map(status -> status.getPath().toString()).collect(Collectors.toList());
|
||||
HoodieMergedLogRecordScanner scanner =
|
||||
new HoodieMergedLogRecordScanner(fs, tablePath, logFilePaths, schema, INSTANT_TIME,
|
||||
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES,
|
||||
Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED),
|
||||
Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED),
|
||||
HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE,
|
||||
HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH);
|
||||
|
||||
Iterator<HoodieRecord<? extends HoodieRecordPayload>> records = scanner.iterator();
|
||||
int num = 0;
|
||||
int maxSize = 10;
|
||||
List<IndexedRecord> indexRecords = new ArrayList<>();
|
||||
while (records.hasNext() && num < maxSize) {
|
||||
Option<IndexedRecord> hoodieRecord = records.next().getData().getInsertValue(schema);
|
||||
indexRecords.add(hoodieRecord.get());
|
||||
num++;
|
||||
}
|
||||
String[][] rows = indexRecords.stream().map(r -> new String[]{r.toString()}).toArray(String[][]::new);
|
||||
assertNotNull(rows);
|
||||
|
||||
String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_RECORDS}, rows);
|
||||
|
||||
assertEquals(expected, cr.getResult().toString());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user