1
0

[HUDI-1746] Added support for replace commits in commit showpartitions, commit show_write_stats, commit showfiles (#2678)

* Added support for replace commits in commit showpartitions, commit show_write_stats, commit showfiles

* Adding CR changes

* [HUDI-1746] Code review changes
This commit is contained in:
jsbali
2021-04-21 23:01:35 +05:30
committed by GitHub
parent b31c520c66
commit 4a3431866d
4 changed files with 282 additions and 30 deletions

View File

@@ -24,10 +24,12 @@ import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
import org.apache.hudi.cli.testutils.HoodieTestReplaceCommitMetadatGenerator;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
@@ -75,6 +77,7 @@ public class TestCommitsCommand extends AbstractShellIntegrationTest {
new TableCommand().createTable(
tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
metaClient = HoodieCLI.getTableMetaClient();
}
private LinkedHashMap<String, Integer[]> generateData() throws Exception {
@@ -97,6 +100,42 @@ public class TestCommitsCommand extends AbstractShellIntegrationTest {
return data;
}
/*
* generates both replace commit and commit data
* */
private LinkedHashMap<HoodieInstant, Integer[]> generateMixedData() throws Exception {
// generate data and metadata
LinkedHashMap<HoodieInstant, Integer[]> replaceCommitData = new LinkedHashMap<>();
replaceCommitData.put(new HoodieInstant(false, HoodieTimeline.REPLACE_COMMIT_ACTION, "103"), new Integer[] {15, 10});
LinkedHashMap<HoodieInstant, Integer[]> commitData = new LinkedHashMap<>();
commitData.put(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "102"), new Integer[] {15, 10});
commitData.put(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "101"), new Integer[] {20, 10});
for (Map.Entry<HoodieInstant, Integer[]> entry : commitData.entrySet()) {
String key = entry.getKey().getTimestamp();
Integer[] value = entry.getValue();
HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, key, jsc.hadoopConfiguration(),
Option.of(value[0]), Option.of(value[1]));
}
for (Map.Entry<HoodieInstant, Integer[]> entry : replaceCommitData.entrySet()) {
String key = entry.getKey().getTimestamp();
Integer[] value = entry.getValue();
HoodieTestReplaceCommitMetadatGenerator.createReplaceCommitFileWithMetadata(tablePath, key,
Option.of(value[0]), Option.of(value[1]), metaClient);
}
metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
assertEquals(3, metaClient.reloadActiveTimeline().getCommitsTimeline().countInstants(),
"There should be 3 commits");
LinkedHashMap<HoodieInstant, Integer[]> data = replaceCommitData;
data.putAll(commitData);
return data;
}
private String generateExpectData(int records, Map<String, Integer[]> data) throws IOException {
FileSystem fs = FileSystem.get(jsc.hadoopConfiguration());
List<String> partitionPaths =
@@ -216,14 +255,15 @@ public class TestCommitsCommand extends AbstractShellIntegrationTest {
// prevCommit not null, so add 0, update 1
Arrays.asList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).stream().forEach(partition ->
rows.add(new Comparable[] {partition, 0, 1, 0, value[1], HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES, 0})
rows.add(new Comparable[] {HoodieTimeline.COMMIT_ACTION, partition, 0, 1, 0, value[1], HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES, 0})
);
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN,
entry -> NumericUtils.humanReadableByteCount((Long.parseLong(entry.toString()))));
TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_INSERTED)
@@ -237,6 +277,43 @@ public class TestCommitsCommand extends AbstractShellIntegrationTest {
assertEquals(expected, got);
}
@Test
public void testShowCommitPartitionsWithReplaceCommits() throws Exception {
Map<HoodieInstant, Integer[]> data = generateMixedData();
for (HoodieInstant commitInstant: data.keySet()) {
CommandResult cr = getShell().executeCommand(String.format("commit showpartitions --commit %s", commitInstant.getTimestamp()));
assertTrue(cr.isSuccess());
Integer[] value = data.get(commitInstant);
List<Comparable[]> rows = new ArrayList<>();
// prevCommit not null, so add 0, update 1
Arrays.asList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).stream().forEach(partition ->
rows.add(new Comparable[] {commitInstant.getAction(), partition, 0, 1, 0, value[1], HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES, 0})
);
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN,
entry -> NumericUtils.humanReadableByteCount((Long.parseLong(entry.toString()))));
TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_INSERTED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_UPDATED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS);
String expected = HoodiePrintHelper.print(header, fieldNameToConverterMap, "", false, -1, false, rows);
expected = removeNonWordAndStripSpace(expected);
String got = removeNonWordAndStripSpace(cr.getResult().toString());
assertEquals(expected, got);
}
}
/**
* Test case of 'commit showfiles' command.
*/
@@ -252,12 +329,13 @@ public class TestCommitsCommand extends AbstractShellIntegrationTest {
List<Comparable[]> rows = new ArrayList<>();
Arrays.asList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).stream().forEach(partition ->
rows.add(new Comparable[] {partition, HoodieTestCommitMetadataGenerator.DEFAULT_FILEID,
rows.add(new Comparable[] {HoodieTimeline.COMMIT_ACTION, partition, HoodieTestCommitMetadataGenerator.DEFAULT_FILEID,
HoodieTestCommitMetadataGenerator.DEFAULT_PRE_COMMIT,
value[1], value[0], HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES,
// default 0 errors and blank file with 0 size
0, 0}));
TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_PREVIOUS_COMMIT)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_UPDATED)
@@ -272,6 +350,40 @@ public class TestCommitsCommand extends AbstractShellIntegrationTest {
assertEquals(expected, got);
}
@Test
public void testShowCommitFilesWithReplaceCommits() throws Exception {
Map<HoodieInstant, Integer[]> data = generateMixedData();
for (HoodieInstant commitInstant : data.keySet()) {
CommandResult cr = getShell().executeCommand(String.format("commit showfiles --commit %s", commitInstant.getTimestamp()));
assertTrue(cr.isSuccess());
Integer[] value = data.get(commitInstant);
List<Comparable[]> rows = new ArrayList<>();
Arrays.asList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).stream().forEach(partition ->
rows.add(new Comparable[] {commitInstant.getAction(), partition, HoodieTestCommitMetadataGenerator.DEFAULT_FILEID,
HoodieTestCommitMetadataGenerator.DEFAULT_PRE_COMMIT,
value[1], value[0], HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES,
// default 0 errors and blank file with 0 size
0, 0}));
TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_PREVIOUS_COMMIT)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_UPDATED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_SIZE);
String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", false, -1, false, rows);
expected = removeNonWordAndStripSpace(expected);
String got = removeNonWordAndStripSpace(cr.getResult().toString());
assertEquals(expected, got);
}
}
/**
* Test case of 'commits compare' command.
*/

View File

@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
@@ -76,14 +77,17 @@ public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator {
List<String> commitFileNames = Arrays.asList(HoodieTimeline.makeCommitFileName(commitTime), HoodieTimeline.makeInflightCommitFileName(commitTime),
HoodieTimeline.makeRequestedCommitFileName(commitTime));
for (String name : commitFileNames) {
Path commitFilePath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + name);
try (FSDataOutputStream os = FSUtils.getFs(basePath, configuration).create(commitFilePath, true)) {
// Generate commitMetadata
HoodieCommitMetadata commitMetadata =
generateCommitMetadata(basePath, commitTime, fileId1, fileId2, writes, updates);
// Write empty commit metadata
os.writeBytes(new String(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
}
HoodieCommitMetadata commitMetadata =
generateCommitMetadata(basePath, commitTime, fileId1, fileId2, writes, updates);
String content = commitMetadata.toJsonString();
createFileWithMetadata(basePath, configuration, name, content);
}
}
static void createFileWithMetadata(String basePath, Configuration configuration, String name, String content) throws IOException {
Path commitFilePath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + name);
try (FSDataOutputStream os = FSUtils.getFs(basePath, configuration).create(commitFilePath, true)) {
os.writeBytes(new String(content.getBytes(StandardCharsets.UTF_8)));
}
}
@@ -133,4 +137,5 @@ public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator {
}));
return metadata;
}
}

View File

@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.cli.testutils;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.Option;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName;
import static org.apache.hudi.common.util.CollectionUtils.createImmutableList;
public class HoodieTestReplaceCommitMetadatGenerator extends HoodieTestCommitMetadataGenerator {
public static void createReplaceCommitFileWithMetadata(String basePath, String commitTime, Option<Integer> writes, Option<Integer> updates,
HoodieTableMetaClient metaclient) throws Exception {
HoodieReplaceCommitMetadata replaceMetadata = generateReplaceCommitMetadata(basePath, commitTime, UUID.randomUUID().toString(),
UUID.randomUUID().toString(), writes, updates);
HoodieRequestedReplaceMetadata requestedReplaceMetadata = getHoodieRequestedReplaceMetadata();
HoodieTestTable.of(metaclient).addReplaceCommit(commitTime, requestedReplaceMetadata, replaceMetadata);
}
private static HoodieRequestedReplaceMetadata getHoodieRequestedReplaceMetadata() {
return HoodieRequestedReplaceMetadata.newBuilder()
.setOperationType(WriteOperationType.INSERT_OVERWRITE.toString())
.setVersion(1)
.setExtraMetadata(Collections.emptyMap())
.build();
}
private static HoodieReplaceCommitMetadata generateReplaceCommitMetadata(String basePath, String commitTime, String fileId1, String fileId2, Option<Integer> writes, Option<Integer> updates)
throws Exception {
FileCreateUtils.createBaseFile(basePath, DEFAULT_FIRST_PARTITION_PATH, commitTime, fileId1);
FileCreateUtils.createBaseFile(basePath, DEFAULT_SECOND_PARTITION_PATH, commitTime, fileId2);
return generateReplaceCommitMetadata(new HashMap<String, List<String>>() {
{
put(DEFAULT_FIRST_PARTITION_PATH, createImmutableList(baseFileName(DEFAULT_FIRST_PARTITION_PATH, fileId1)));
put(DEFAULT_SECOND_PARTITION_PATH, createImmutableList(baseFileName(DEFAULT_SECOND_PARTITION_PATH, fileId2)));
}
}, writes, updates);
}
private static HoodieReplaceCommitMetadata generateReplaceCommitMetadata(HashMap<String, List<String>> partitionToFilePaths, Option<Integer> writes, Option<Integer> updates) {
HoodieReplaceCommitMetadata metadata = new HoodieReplaceCommitMetadata();
partitionToFilePaths.forEach((key, value) -> value.forEach(f -> {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setPartitionPath(key);
writeStat.setPath(DEFAULT_PATH);
writeStat.setFileId(DEFAULT_FILEID);
writeStat.setTotalWriteBytes(DEFAULT_TOTAL_WRITE_BYTES);
writeStat.setPrevCommit(DEFAULT_PRE_COMMIT);
writeStat.setNumWrites(writes.orElse(DEFAULT_NUM_WRITES));
writeStat.setNumUpdateWrites(updates.orElse(DEFAULT_NUM_UPDATE_WRITES));
writeStat.setTotalLogBlocks(DEFAULT_TOTAL_LOG_BLOCKS);
writeStat.setTotalLogRecords(DEFAULT_TOTAL_LOG_RECORDS);
metadata.addWriteStat(key, writeStat);
}));
metadata.setPartitionToReplaceFileIds(new HashMap<String, List<String>>() {
{
//TODO fix
put(DEFAULT_FIRST_PARTITION_PATH, createImmutableList(baseFileName(DEFAULT_FIRST_PARTITION_PATH, "1")));
}
});
return metadata;
}
}