From 919590988a4db1b73a67188d8b1b0d971f8b766d Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Mon, 7 Jun 2021 19:04:35 -0400 Subject: [PATCH] [HUDI-1914] Add fetching latest schema to table command in hudi-cli (#2964) --- .../hudi/cli/commands/TableCommand.java | 46 ++++++++++ .../hudi/cli/commands/TestTableCommand.java | 84 +++++++++++++++++++ .../HoodieTestCommitMetadataGenerator.java | 32 ++++++- 3 files changed, 158 insertions(+), 4 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java index d25e0c853..d1fd694ef 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java @@ -23,14 +23,21 @@ import org.apache.hudi.cli.HoodiePrintHelper; import org.apache.hudi.cli.TableHeader; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.exception.TableNotFoundException; +import org.apache.avro.Schema; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.springframework.shell.core.CommandMarker; import org.springframework.shell.core.annotation.CliCommand; import org.springframework.shell.core.annotation.CliOption; import org.springframework.stereotype.Component; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -41,6 +48,8 @@ import java.util.List; @Component public class TableCommand implements CommandMarker { + private static final Logger LOG = LogManager.getLogger(TableCommand.class); + static { System.out.println("Table command getting loaded"); } @@ -142,4 +151,41 @@ public class TableCommand implements CommandMarker { HoodieCLI.refreshTableMetadata(); return "Metadata for table " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " refreshed."; } + + /** + * Fetches table schema in avro format. + */ + @CliCommand(value = "fetch table schema", help = "Fetches latest table schema") + public String fetchTableSchema( + @CliOption(key = {"outputFilePath"}, mandatory = false, help = "File path to write schema") final String outputFilePath) throws Exception { + HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); + TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(client); + Schema schema = tableSchemaResolver.getTableAvroSchema(); + if (outputFilePath != null) { + LOG.info("Latest table schema : " + schema.toString(true)); + writeToFile(outputFilePath, schema.toString(true)); + return String.format("Latest table schema written to %s", outputFilePath); + } else { + return String.format("Latest table schema %s", schema.toString(true)); + } + } + + /** + * Use Streams when you are dealing with raw data. + * @param filePath output file path. + * @param data to be written to file. + */ + private static void writeToFile(String filePath, String data) throws IOException { + File outFile = new File(filePath); + if (outFile.exists()) { + outFile.delete(); + } + OutputStream os = null; + try { + os = new FileOutputStream(outFile); + os.write(data.getBytes(), 0, data.length()); + } finally { + os.close(); + } + } } diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java index cdf9db344..fe3407fcd 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java @@ -18,14 +18,20 @@ package org.apache.hudi.cli.commands; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest; +import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator; import org.apache.hudi.common.fs.ConsistencyGuardConfig; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.avro.Schema; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.BeforeEach; @@ -33,9 +39,13 @@ import org.junit.jupiter.api.Test; import org.springframework.shell.core.CommandResult; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -192,4 +202,78 @@ public class TestTableCommand extends AbstractShellIntegrationTest { // After refresh, there are 4 instants assertEquals(4, timeline.countInstants(), "there should have 4 instants"); } + + @Test + public void testFetchTableSchema() throws Exception { + // Create table and connect + HoodieCLI.conf = jsc.hadoopConfiguration(); + new TableCommand().createTable( + tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(), + "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); + metaClient = HoodieCLI.getTableMetaClient(); + + String schemaStr = "{\n" + + " \"type\" : \"record\",\n" + + " \"name\" : \"SchemaName\",\n" + + " \"namespace\" : \"SchemaNS\",\n" + + " \"fields\" : [ {\n" + + " \"name\" : \"key\",\n" + + " \"type\" : \"int\"\n" + + " }, {\n" + + " \"name\" : \"val\",\n" + + " \"type\" : [ \"null\", \"string\" ],\n" + + " \"default\" : null\n" + + " }]};"; + + generateData(schemaStr); + + CommandResult cr = getShell().executeCommand("fetch table schema"); + assertTrue(cr.isSuccess()); + + String actualSchemaStr = cr.getResult().toString().substring(cr.getResult().toString().indexOf("{")); + Schema actualSchema = new Schema.Parser().parse(actualSchemaStr); + + Schema expectedSchema = new Schema.Parser().parse(schemaStr); + expectedSchema = HoodieAvroUtils.addMetadataFields(expectedSchema); + assertEquals(actualSchema, expectedSchema); + + File file = File.createTempFile("temp", null); + cr = getShell().executeCommand("fetch table schema --outputFilePath " + file.getAbsolutePath()); + assertTrue(cr.isSuccess()); + + actualSchemaStr = getFileContent(file.getAbsolutePath()); + actualSchema = new Schema.Parser().parse(actualSchemaStr); + assertEquals(actualSchema, expectedSchema); + } + + private LinkedHashMap generateData(String schemaStr) throws Exception { + // generate data and metadata + LinkedHashMap data = new LinkedHashMap<>(); + data.put("102", new Integer[] {15, 10}); + data.put("101", new Integer[] {20, 10}); + data.put("100", new Integer[] {15, 15}); + for (Map.Entry entry : data.entrySet()) { + String key = entry.getKey(); + Integer[] value = entry.getValue(); + HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, key, HoodieCLI.conf, + Option.of(value[0]), Option.of(value[1]), Collections.singletonMap(HoodieCommitMetadata.SCHEMA_KEY, schemaStr)); + } + + metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); + assertEquals(3, metaClient.reloadActiveTimeline().getCommitsTimeline().countInstants(), + "There should have 3 commits"); + return data; + } + + private String getFileContent(String fileToReadStr) throws IOException { + File fileToRead = new File(fileToReadStr); + if (!fileToRead.exists()) { + throw new IllegalStateException("Outfile " + fileToReadStr + "not found "); + } + FileInputStream fis = new FileInputStream(fileToRead); + byte[] data = new byte[(int) fileToRead.length()]; + fis.read(data); + fis.close(); + return new String(data, "UTF-8"); + } } diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java index c33bb2613..105a9f639 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -68,17 +69,27 @@ public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator { public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration, Option writes, Option updates) throws Exception { + createCommitFileWithMetadata(basePath, commitTime, configuration, writes, updates, Collections.emptyMap()); + } + + public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration, + Option writes, Option updates, Map extraMetdata) throws Exception { createCommitFileWithMetadata(basePath, commitTime, configuration, UUID.randomUUID().toString(), - UUID.randomUUID().toString(), writes, updates); + UUID.randomUUID().toString(), writes, updates, extraMetdata); } public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration, String fileId1, String fileId2, Option writes, Option updates) throws Exception { + createCommitFileWithMetadata(basePath, commitTime, configuration, fileId1, fileId2, writes, updates, Collections.emptyMap()); + } + + public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration, + String fileId1, String fileId2, Option writes, Option updates, Map extraMetadata) throws Exception { List commitFileNames = Arrays.asList(HoodieTimeline.makeCommitFileName(commitTime), HoodieTimeline.makeInflightCommitFileName(commitTime), HoodieTimeline.makeRequestedCommitFileName(commitTime)); for (String name : commitFileNames) { HoodieCommitMetadata commitMetadata = - generateCommitMetadata(basePath, commitTime, fileId1, fileId2, writes, updates); + generateCommitMetadata(basePath, commitTime, fileId1, fileId2, writes, updates, extraMetadata); String content = commitMetadata.toJsonString(); createFileWithMetadata(basePath, configuration, name, content); } @@ -106,6 +117,11 @@ public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator { public static HoodieCommitMetadata generateCommitMetadata(String basePath, String commitTime, String fileId1, String fileId2, Option writes, Option updates) throws Exception { + return generateCommitMetadata(basePath, commitTime, fileId1, fileId2, writes, updates, Collections.emptyMap()); + } + + public static HoodieCommitMetadata generateCommitMetadata(String basePath, String commitTime, String fileId1, + String fileId2, Option writes, Option updates, Map extraMetadata) throws Exception { FileCreateUtils.createBaseFile(basePath, DEFAULT_FIRST_PARTITION_PATH, commitTime, fileId1); FileCreateUtils.createBaseFile(basePath, DEFAULT_SECOND_PARTITION_PATH, commitTime, fileId2); return generateCommitMetadata(new HashMap>() { @@ -113,15 +129,23 @@ public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator { put(DEFAULT_FIRST_PARTITION_PATH, createImmutableList(baseFileName(DEFAULT_FIRST_PARTITION_PATH, fileId1))); put(DEFAULT_SECOND_PARTITION_PATH, createImmutableList(baseFileName(DEFAULT_SECOND_PARTITION_PATH, fileId2))); } - }, writes, updates); + }, writes, updates, extraMetadata); + } + + private static HoodieCommitMetadata generateCommitMetadata(Map> partitionToFilePaths, + Option writes, Option updates) { + return generateCommitMetadata(partitionToFilePaths, writes, updates, Collections.emptyMap()); } /** * Method to generate commit metadata. */ private static HoodieCommitMetadata generateCommitMetadata(Map> partitionToFilePaths, - Option writes, Option updates) { + Option writes, Option updates, Map extraMetadata) { HoodieCommitMetadata metadata = new HoodieCommitMetadata(); + for (Map.Entry entry: extraMetadata.entrySet()) { + metadata.addMetadata(entry.getKey(), entry.getValue()); + } partitionToFilePaths.forEach((key, value) -> value.forEach(f -> { HoodieWriteStat writeStat = new HoodieWriteStat(); writeStat.setPartitionPath(key);