[HUDI-1914] Add fetching latest schema to table command in hudi-cli (#2964)
This commit is contained in:
committed by
GitHub
parent
441076b2cc
commit
919590988a
@@ -18,14 +18,20 @@
|
||||
|
||||
package org.apache.hudi.cli.commands;
|
||||
|
||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||
import org.apache.hudi.cli.HoodieCLI;
|
||||
import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
|
||||
import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
|
||||
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
|
||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
@@ -33,9 +39,13 @@ import org.junit.jupiter.api.Test;
|
||||
import org.springframework.shell.core.CommandResult;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
@@ -192,4 +202,78 @@ public class TestTableCommand extends AbstractShellIntegrationTest {
|
||||
// After refresh, there are 4 instants
|
||||
assertEquals(4, timeline.countInstants(), "there should have 4 instants");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchTableSchema() throws Exception {
|
||||
// Create table and connect
|
||||
HoodieCLI.conf = jsc.hadoopConfiguration();
|
||||
new TableCommand().createTable(
|
||||
tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
|
||||
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
|
||||
metaClient = HoodieCLI.getTableMetaClient();
|
||||
|
||||
String schemaStr = "{\n"
|
||||
+ " \"type\" : \"record\",\n"
|
||||
+ " \"name\" : \"SchemaName\",\n"
|
||||
+ " \"namespace\" : \"SchemaNS\",\n"
|
||||
+ " \"fields\" : [ {\n"
|
||||
+ " \"name\" : \"key\",\n"
|
||||
+ " \"type\" : \"int\"\n"
|
||||
+ " }, {\n"
|
||||
+ " \"name\" : \"val\",\n"
|
||||
+ " \"type\" : [ \"null\", \"string\" ],\n"
|
||||
+ " \"default\" : null\n"
|
||||
+ " }]};";
|
||||
|
||||
generateData(schemaStr);
|
||||
|
||||
CommandResult cr = getShell().executeCommand("fetch table schema");
|
||||
assertTrue(cr.isSuccess());
|
||||
|
||||
String actualSchemaStr = cr.getResult().toString().substring(cr.getResult().toString().indexOf("{"));
|
||||
Schema actualSchema = new Schema.Parser().parse(actualSchemaStr);
|
||||
|
||||
Schema expectedSchema = new Schema.Parser().parse(schemaStr);
|
||||
expectedSchema = HoodieAvroUtils.addMetadataFields(expectedSchema);
|
||||
assertEquals(actualSchema, expectedSchema);
|
||||
|
||||
File file = File.createTempFile("temp", null);
|
||||
cr = getShell().executeCommand("fetch table schema --outputFilePath " + file.getAbsolutePath());
|
||||
assertTrue(cr.isSuccess());
|
||||
|
||||
actualSchemaStr = getFileContent(file.getAbsolutePath());
|
||||
actualSchema = new Schema.Parser().parse(actualSchemaStr);
|
||||
assertEquals(actualSchema, expectedSchema);
|
||||
}
|
||||
|
||||
private LinkedHashMap<String, Integer[]> generateData(String schemaStr) throws Exception {
|
||||
// generate data and metadata
|
||||
LinkedHashMap<String, Integer[]> data = new LinkedHashMap<>();
|
||||
data.put("102", new Integer[] {15, 10});
|
||||
data.put("101", new Integer[] {20, 10});
|
||||
data.put("100", new Integer[] {15, 15});
|
||||
for (Map.Entry<String, Integer[]> entry : data.entrySet()) {
|
||||
String key = entry.getKey();
|
||||
Integer[] value = entry.getValue();
|
||||
HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, key, HoodieCLI.conf,
|
||||
Option.of(value[0]), Option.of(value[1]), Collections.singletonMap(HoodieCommitMetadata.SCHEMA_KEY, schemaStr));
|
||||
}
|
||||
|
||||
metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
|
||||
assertEquals(3, metaClient.reloadActiveTimeline().getCommitsTimeline().countInstants(),
|
||||
"There should have 3 commits");
|
||||
return data;
|
||||
}
|
||||
|
||||
private String getFileContent(String fileToReadStr) throws IOException {
|
||||
File fileToRead = new File(fileToReadStr);
|
||||
if (!fileToRead.exists()) {
|
||||
throw new IllegalStateException("Outfile " + fileToReadStr + "not found ");
|
||||
}
|
||||
FileInputStream fis = new FileInputStream(fileToRead);
|
||||
byte[] data = new byte[(int) fileToRead.length()];
|
||||
fis.read(data);
|
||||
fis.close();
|
||||
return new String(data, "UTF-8");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -68,17 +69,27 @@ public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator {
|
||||
|
||||
public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration,
|
||||
Option<Integer> writes, Option<Integer> updates) throws Exception {
|
||||
createCommitFileWithMetadata(basePath, commitTime, configuration, writes, updates, Collections.emptyMap());
|
||||
}
|
||||
|
||||
public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration,
|
||||
Option<Integer> writes, Option<Integer> updates, Map<String, String> extraMetdata) throws Exception {
|
||||
createCommitFileWithMetadata(basePath, commitTime, configuration, UUID.randomUUID().toString(),
|
||||
UUID.randomUUID().toString(), writes, updates);
|
||||
UUID.randomUUID().toString(), writes, updates, extraMetdata);
|
||||
}
|
||||
|
||||
public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration,
|
||||
String fileId1, String fileId2, Option<Integer> writes, Option<Integer> updates) throws Exception {
|
||||
createCommitFileWithMetadata(basePath, commitTime, configuration, fileId1, fileId2, writes, updates, Collections.emptyMap());
|
||||
}
|
||||
|
||||
public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration,
|
||||
String fileId1, String fileId2, Option<Integer> writes, Option<Integer> updates, Map<String, String> extraMetadata) throws Exception {
|
||||
List<String> commitFileNames = Arrays.asList(HoodieTimeline.makeCommitFileName(commitTime), HoodieTimeline.makeInflightCommitFileName(commitTime),
|
||||
HoodieTimeline.makeRequestedCommitFileName(commitTime));
|
||||
for (String name : commitFileNames) {
|
||||
HoodieCommitMetadata commitMetadata =
|
||||
generateCommitMetadata(basePath, commitTime, fileId1, fileId2, writes, updates);
|
||||
generateCommitMetadata(basePath, commitTime, fileId1, fileId2, writes, updates, extraMetadata);
|
||||
String content = commitMetadata.toJsonString();
|
||||
createFileWithMetadata(basePath, configuration, name, content);
|
||||
}
|
||||
@@ -106,6 +117,11 @@ public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator {
|
||||
|
||||
public static HoodieCommitMetadata generateCommitMetadata(String basePath, String commitTime, String fileId1,
|
||||
String fileId2, Option<Integer> writes, Option<Integer> updates) throws Exception {
|
||||
return generateCommitMetadata(basePath, commitTime, fileId1, fileId2, writes, updates, Collections.emptyMap());
|
||||
}
|
||||
|
||||
public static HoodieCommitMetadata generateCommitMetadata(String basePath, String commitTime, String fileId1,
|
||||
String fileId2, Option<Integer> writes, Option<Integer> updates, Map<String, String> extraMetadata) throws Exception {
|
||||
FileCreateUtils.createBaseFile(basePath, DEFAULT_FIRST_PARTITION_PATH, commitTime, fileId1);
|
||||
FileCreateUtils.createBaseFile(basePath, DEFAULT_SECOND_PARTITION_PATH, commitTime, fileId2);
|
||||
return generateCommitMetadata(new HashMap<String, List<String>>() {
|
||||
@@ -113,15 +129,23 @@ public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator {
|
||||
put(DEFAULT_FIRST_PARTITION_PATH, createImmutableList(baseFileName(DEFAULT_FIRST_PARTITION_PATH, fileId1)));
|
||||
put(DEFAULT_SECOND_PARTITION_PATH, createImmutableList(baseFileName(DEFAULT_SECOND_PARTITION_PATH, fileId2)));
|
||||
}
|
||||
}, writes, updates);
|
||||
}, writes, updates, extraMetadata);
|
||||
}
|
||||
|
||||
private static HoodieCommitMetadata generateCommitMetadata(Map<String, List<String>> partitionToFilePaths,
|
||||
Option<Integer> writes, Option<Integer> updates) {
|
||||
return generateCommitMetadata(partitionToFilePaths, writes, updates, Collections.emptyMap());
|
||||
}
|
||||
|
||||
/**
|
||||
* Method to generate commit metadata.
|
||||
*/
|
||||
private static HoodieCommitMetadata generateCommitMetadata(Map<String, List<String>> partitionToFilePaths,
|
||||
Option<Integer> writes, Option<Integer> updates) {
|
||||
Option<Integer> writes, Option<Integer> updates, Map<String, String> extraMetadata) {
|
||||
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
|
||||
for (Map.Entry<String, String> entry: extraMetadata.entrySet()) {
|
||||
metadata.addMetadata(entry.getKey(), entry.getValue());
|
||||
}
|
||||
partitionToFilePaths.forEach((key, value) -> value.forEach(f -> {
|
||||
HoodieWriteStat writeStat = new HoodieWriteStat();
|
||||
writeStat.setPartitionPath(key);
|
||||
|
||||
Reference in New Issue
Block a user