diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java index 561e49970..af68035cb 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java @@ -18,6 +18,8 @@ package org.apache.hudi.cli; +import org.apache.hudi.cli.utils.SparkTempViewProvider; +import org.apache.hudi.cli.utils.TempViewProvider; import org.apache.hudi.common.model.TimelineLayoutVersion; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.ConsistencyGuardConfig; @@ -43,6 +45,7 @@ public class HoodieCLI { protected static HoodieTableMetaClient tableMetadata; public static HoodieTableMetaClient syncTableMetadata; public static TimelineLayoutVersion layoutVersion; + private static TempViewProvider tempViewProvider; /** * Enum for CLI state. @@ -105,4 +108,12 @@ public class HoodieCLI { return tableMetadata; } + public static synchronized TempViewProvider getTempViewProvider() { + if (tempViewProvider == null) { + tempViewProvider = new SparkTempViewProvider(HoodieCLI.class.getSimpleName()); + } + + return tempViewProvider; + } + } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java index 53114cefa..be640376e 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java @@ -19,12 +19,15 @@ package org.apache.hudi.cli; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import com.jakewharton.fliptables.FlipTable; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.function.Function; +import java.util.stream.Collectors; /** @@ -57,11 +60,38 @@ public class HoodiePrintHelper { */ public static String print(TableHeader rowHeader, Map> fieldNameToConverterMap, String sortByField, boolean isDescending, Integer limit, boolean headerOnly, List rows) { + return print(rowHeader, fieldNameToConverterMap, sortByField, isDescending, limit, headerOnly, rows, ""); + } + + /** + * Serialize Table to printable string and also export a temporary view to easily write sql queries. + * + * Ideally, exporting view needs to be outside PrintHelper, but all commands use this. So this is easy + * way to add support for all commands + * + * @param rowHeader Row Header + * @param fieldNameToConverterMap Field Specific Converters + * @param sortByField Sorting field + * @param isDescending Order + * @param limit Limit + * @param headerOnly Headers only + * @param rows List of rows + * @param tempTableName table name to export + * @return Serialized form for printing + */ + public static String print(TableHeader rowHeader, Map> fieldNameToConverterMap, + String sortByField, boolean isDescending, Integer limit, boolean headerOnly, List rows, + String tempTableName) { if (headerOnly) { return HoodiePrintHelper.print(rowHeader); } + if (!StringUtils.isNullOrEmpty(tempTableName)) { + HoodieCLI.getTempViewProvider().createOrReplace(tempTableName, rowHeader.getFieldNames(), + rows.stream().map(columns -> Arrays.asList(columns)).collect(Collectors.toList())); + } + if (!sortByField.isEmpty() && !rowHeader.containsField(sortByField)) { return String.format("Field[%s] is not in table, given columns[%s]", sortByField, rowHeader.getFieldNames()); } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java index 804096b8f..3c08305bb 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java @@ -33,7 +33,6 @@ import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.NumericUtils; - import org.apache.hudi.common.util.StringUtils; import org.apache.spark.launcher.SparkLauncher; import org.springframework.shell.core.CommandMarker; @@ -59,7 +58,8 @@ public class CommitsCommand implements CommandMarker { private String printCommits(HoodieDefaultTimeline timeline, final Integer limit, final String sortByField, final boolean descending, - final boolean headerOnly) throws IOException { + final boolean headerOnly, + final String tempTableName) throws IOException { final List rows = new ArrayList<>(); final List commits = timeline.getCommitsTimeline().filterCompletedInstants() @@ -96,13 +96,16 @@ public class CommitsCommand implements CommandMarker { .addTableHeaderField("Total Records Written") .addTableHeaderField("Total Update Records Written") .addTableHeaderField("Total Errors"); - return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows); + + return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, + limit, headerOnly, rows, tempTableName); } private String printCommitsWithMetadata(HoodieDefaultTimeline timeline, final Integer limit, final String sortByField, final boolean descending, - final boolean headerOnly) throws IOException { + final boolean headerOnly, + final String tempTableName) throws IOException { final List rows = new ArrayList<>(); final List commits = timeline.getCommitsTimeline().filterCompletedInstants() @@ -144,13 +147,16 @@ public class CommitsCommand implements CommandMarker { .addTableHeaderField("Total Rollback Blocks").addTableHeaderField("Total Log Records") .addTableHeaderField("Total Updated Records Compacted").addTableHeaderField("Total Write Bytes"); - return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows); + return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, + limit, headerOnly, rows, tempTableName); } @CliCommand(value = "commits show", help = "Show the commits") public String showCommits( @CliOption(key = {"includeExtraMetadata"}, help = "Include extra metadata", unspecifiedDefaultValue = "false") final boolean includeExtraMetadata, + @CliOption(key = {"createView"}, mandatory = false, help = "view name to store output table", + unspecifiedDefaultValue = "") final String exportTableName, @CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit, @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, @@ -161,9 +167,9 @@ public class CommitsCommand implements CommandMarker { HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); if (includeExtraMetadata) { - return printCommitsWithMetadata(activeTimeline, limit, sortByField, descending, headerOnly); + return printCommitsWithMetadata(activeTimeline, limit, sortByField, descending, headerOnly, exportTableName); } else { - return printCommits(activeTimeline, limit, sortByField, descending, headerOnly); + return printCommits(activeTimeline, limit, sortByField, descending, headerOnly, exportTableName); } } @@ -171,6 +177,8 @@ public class CommitsCommand implements CommandMarker { public String showArchivedCommits( @CliOption(key = {"includeExtraMetadata"}, help = "Include extra metadata", unspecifiedDefaultValue = "false") final boolean includeExtraMetadata, + @CliOption(key = {"createView"}, mandatory = false, help = "view name to store output table", + unspecifiedDefaultValue = "") final String exportTableName, @CliOption(key = {"startTs"}, mandatory = false, help = "start time for commits, default: now - 10 days") String startTs, @CliOption(key = {"endTs"}, mandatory = false, help = "end time for commits, default: now - 1 day") @@ -195,9 +203,9 @@ public class CommitsCommand implements CommandMarker { archivedTimeline.loadInstantDetailsInMemory(startTs, endTs); HoodieDefaultTimeline timelineRange = archivedTimeline.findInstantsInRange(startTs, endTs); if (includeExtraMetadata) { - return printCommitsWithMetadata(timelineRange, limit, sortByField, descending, headerOnly); + return printCommitsWithMetadata(timelineRange, limit, sortByField, descending, headerOnly, exportTableName); } else { - return printCommits(timelineRange, limit, sortByField, descending, headerOnly); + return printCommits(timelineRange, limit, sortByField, descending, headerOnly, exportTableName); } } finally { // clear the instant details from memory after printing to reduce usage @@ -237,7 +245,10 @@ public class CommitsCommand implements CommandMarker { } @CliCommand(value = "commit showpartitions", help = "Show partition level details of a commit") - public String showCommitPartitions(@CliOption(key = {"commit"}, help = "Commit to show") final String commitTime, + public String showCommitPartitions( + @CliOption(key = {"createView"}, mandatory = false, help = "view name to store output table", + unspecifiedDefaultValue = "") final String exportTableName, + @CliOption(key = {"commit"}, help = "Commit to show") final String commitTime, @CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit, @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, @@ -287,11 +298,15 @@ public class CommitsCommand implements CommandMarker { .addTableHeaderField("Total Records Inserted").addTableHeaderField("Total Records Updated") .addTableHeaderField("Total Bytes Written").addTableHeaderField("Total Errors"); - return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows); + return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, + limit, headerOnly, rows, exportTableName); } @CliCommand(value = "commit showfiles", help = "Show file level details of a commit") - public String showCommitFiles(@CliOption(key = {"commit"}, help = "Commit to show") final String commitTime, + public String showCommitFiles( + @CliOption(key = {"createView"}, mandatory = false, help = "view name to store output table", + unspecifiedDefaultValue = "") final String exportTableName, + @CliOption(key = {"commit"}, help = "Commit to show") final String commitTime, @CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit, @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, @@ -323,7 +338,8 @@ public class CommitsCommand implements CommandMarker { .addTableHeaderField("Total Records Written").addTableHeaderField("Total Bytes Written") .addTableHeaderField("Total Errors").addTableHeaderField("File Size"); - return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows); + return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, + limit, headerOnly, rows, exportTableName); } @CliCommand(value = "commits compare", help = "Compare commits with another Hoodie table") diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TempViewCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TempViewCommand.java new file mode 100644 index 000000000..39e3767e2 --- /dev/null +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TempViewCommand.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.commands; + +import org.apache.hudi.cli.HoodieCLI; + +import org.springframework.shell.core.CommandMarker; +import org.springframework.shell.core.annotation.CliCommand; +import org.springframework.shell.core.annotation.CliOption; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +/** + * CLI command to query/delete temp views. + */ +@Component +public class TempViewCommand implements CommandMarker { + + private static final String EMPTY_STRING = ""; + + @CliCommand(value = "temp_query", help = "query against created temp view") + public String query( + @CliOption(key = {"sql"}, mandatory = true, help = "select query to run against view") final String sql) + throws IOException { + + HoodieCLI.getTempViewProvider().runQuery(sql); + return EMPTY_STRING; + } + + @CliCommand(value = "temp_delete", help = "Delete view name") + public String delete( + @CliOption(key = {"view"}, mandatory = true, help = "view name") final String tableName) + throws IOException { + + HoodieCLI.getTempViewProvider().deleteTable(tableName); + return EMPTY_STRING; + } +} diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkTempViewProvider.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkTempViewProvider.java new file mode 100644 index 000000000..68c18f917 --- /dev/null +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkTempViewProvider.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.utils; + +import org.apache.hudi.exception.HoodieException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructType; + +import java.util.List; +import java.util.stream.Collectors; + +public class SparkTempViewProvider implements TempViewProvider { + private static final Logger LOG = LogManager.getLogger(SparkTempViewProvider.class); + + private JavaSparkContext jsc; + private SQLContext sqlContext; + + public SparkTempViewProvider(String appName) { + try { + SparkConf sparkConf = new SparkConf().setAppName(appName) + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[8]"); + jsc = new JavaSparkContext(sparkConf); + jsc.setLogLevel("ERROR"); + + sqlContext = new SQLContext(jsc); + } catch (Throwable ex) { + // log full stack trace and rethrow. Without this its difficult to debug failures, if any + LOG.error("unable to initialize spark context ", ex); + throw new HoodieException(ex); + } + } + + @Override + public void createOrReplace(String tableName, List headers, List> rows) { + try { + if (headers.isEmpty() || rows.isEmpty()) { + return; + } + + if (rows.stream().filter(row -> row.size() != headers.size()).count() > 0) { + throw new HoodieException("Invalid row, does not match headers " + headers.size() + " " + rows.size()); + } + + // replace all whitespaces in headers to make it easy to write sql queries + List headersNoSpaces = headers.stream().map(title -> title.replaceAll("\\s+","")) + .collect(Collectors.toList()); + + // generate schema for table + StructType structType = new StructType(); + for (int i = 0; i < headersNoSpaces.size(); i++) { + // try guessing data type from column data. + DataType headerDataType = getDataType(rows.get(0).get(i)); + structType = structType.add(DataTypes.createStructField(headersNoSpaces.get(i), headerDataType, true)); + } + List records = rows.stream().map(row -> RowFactory.create(row.toArray(new Comparable[row.size()]))) + .collect(Collectors.toList()); + Dataset dataset = this.sqlContext.createDataFrame(records, structType); + dataset.createOrReplaceTempView(tableName); + System.out.println("Wrote table view: " + tableName); + } catch (Throwable ex) { + // log full stack trace and rethrow. Without this its difficult to debug failures, if any + LOG.error("unable to write ", ex); + throw new HoodieException(ex); + } + } + + @Override + public void runQuery(String sqlText) { + try { + this.sqlContext.sql(sqlText).show(Integer.MAX_VALUE, false); + } catch (Throwable ex) { + // log full stack trace and rethrow. Without this its difficult to debug failures, if any + LOG.error("unable to read ", ex); + throw new HoodieException(ex); + } + } + + @Override + public void deleteTable(String tableName) { + try { + sqlContext.sql("DROP TABLE IF EXISTS " + tableName); + } catch (Throwable ex) { + // log full stack trace and rethrow. Without this its difficult to debug failures, if any + LOG.error("unable to initialize spark context ", ex); + throw new HoodieException(ex); + } + } + + private DataType getDataType(Comparable comparable) { + if (comparable instanceof Integer) { + return DataTypes.IntegerType; + } + + if (comparable instanceof Double) { + return DataTypes.DoubleType; + } + + if (comparable instanceof Long) { + return DataTypes.LongType; + } + + if (comparable instanceof Boolean) { + return DataTypes.BooleanType; + } + + // TODO add additional types when needed. default to string + return DataTypes.StringType; + } +} diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/TempViewProvider.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/TempViewProvider.java new file mode 100644 index 000000000..1075fddc1 --- /dev/null +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/TempViewProvider.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.utils; + +import java.util.List; + +public interface TempViewProvider { + void createOrReplace(String tableName, List headers, List> rows); + + void runQuery(String sqlText); + + void deleteTable(String tableName); +}