1
0

[CLI] Add export to table

This commit is contained in:
Satish Kotha
2020-02-12 11:23:40 -08:00
committed by n3nash
parent ccbf543607
commit 3d3781810c
6 changed files with 288 additions and 13 deletions

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.cli;
import org.apache.hudi.cli.utils.SparkTempViewProvider;
import org.apache.hudi.cli.utils.TempViewProvider;
import org.apache.hudi.common.model.TimelineLayoutVersion;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ConsistencyGuardConfig;
@@ -43,6 +45,7 @@ public class HoodieCLI {
protected static HoodieTableMetaClient tableMetadata;
public static HoodieTableMetaClient syncTableMetadata;
public static TimelineLayoutVersion layoutVersion;
private static TempViewProvider tempViewProvider;
/**
* Enum for CLI state.
@@ -105,4 +108,12 @@ public class HoodieCLI {
return tableMetadata;
}
public static synchronized TempViewProvider getTempViewProvider() {
if (tempViewProvider == null) {
tempViewProvider = new SparkTempViewProvider(HoodieCLI.class.getSimpleName());
}
return tempViewProvider;
}
}

View File

@@ -19,12 +19,15 @@
package org.apache.hudi.cli;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import com.jakewharton.fliptables.FlipTable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
@@ -57,11 +60,38 @@ public class HoodiePrintHelper {
*/
public static String print(TableHeader rowHeader, Map<String, Function<Object, String>> fieldNameToConverterMap,
String sortByField, boolean isDescending, Integer limit, boolean headerOnly, List<Comparable[]> rows) {
return print(rowHeader, fieldNameToConverterMap, sortByField, isDescending, limit, headerOnly, rows, "");
}
/**
* Serialize Table to printable string and also export a temporary view to easily write sql queries.
*
* Ideally, exporting view needs to be outside PrintHelper, but all commands use this. So this is easy
* way to add support for all commands
*
* @param rowHeader Row Header
* @param fieldNameToConverterMap Field Specific Converters
* @param sortByField Sorting field
* @param isDescending Order
* @param limit Limit
* @param headerOnly Headers only
* @param rows List of rows
* @param tempTableName table name to export
* @return Serialized form for printing
*/
public static String print(TableHeader rowHeader, Map<String, Function<Object, String>> fieldNameToConverterMap,
String sortByField, boolean isDescending, Integer limit, boolean headerOnly, List<Comparable[]> rows,
String tempTableName) {
if (headerOnly) {
return HoodiePrintHelper.print(rowHeader);
}
if (!StringUtils.isNullOrEmpty(tempTableName)) {
HoodieCLI.getTempViewProvider().createOrReplace(tempTableName, rowHeader.getFieldNames(),
rows.stream().map(columns -> Arrays.asList(columns)).collect(Collectors.toList()));
}
if (!sortByField.isEmpty() && !rowHeader.containsField(sortByField)) {
return String.format("Field[%s] is not in table, given columns[%s]", sortByField, rowHeader.getFieldNames());
}

View File

@@ -33,7 +33,6 @@ import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.NumericUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.spark.launcher.SparkLauncher;
import org.springframework.shell.core.CommandMarker;
@@ -59,7 +58,8 @@ public class CommitsCommand implements CommandMarker {
private String printCommits(HoodieDefaultTimeline timeline,
final Integer limit, final String sortByField,
final boolean descending,
final boolean headerOnly) throws IOException {
final boolean headerOnly,
final String tempTableName) throws IOException {
final List<Comparable[]> rows = new ArrayList<>();
final List<HoodieInstant> commits = timeline.getCommitsTimeline().filterCompletedInstants()
@@ -96,13 +96,16 @@ public class CommitsCommand implements CommandMarker {
.addTableHeaderField("Total Records Written")
.addTableHeaderField("Total Update Records Written")
.addTableHeaderField("Total Errors");
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending,
limit, headerOnly, rows, tempTableName);
}
private String printCommitsWithMetadata(HoodieDefaultTimeline timeline,
final Integer limit, final String sortByField,
final boolean descending,
final boolean headerOnly) throws IOException {
final boolean headerOnly,
final String tempTableName) throws IOException {
final List<Comparable[]> rows = new ArrayList<>();
final List<HoodieInstant> commits = timeline.getCommitsTimeline().filterCompletedInstants()
@@ -144,13 +147,16 @@ public class CommitsCommand implements CommandMarker {
.addTableHeaderField("Total Rollback Blocks").addTableHeaderField("Total Log Records")
.addTableHeaderField("Total Updated Records Compacted").addTableHeaderField("Total Write Bytes");
return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows);
return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending,
limit, headerOnly, rows, tempTableName);
}
@CliCommand(value = "commits show", help = "Show the commits")
public String showCommits(
@CliOption(key = {"includeExtraMetadata"}, help = "Include extra metadata",
unspecifiedDefaultValue = "false") final boolean includeExtraMetadata,
@CliOption(key = {"createView"}, mandatory = false, help = "view name to store output table",
unspecifiedDefaultValue = "") final String exportTableName,
@CliOption(key = {"limit"}, help = "Limit commits",
unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@@ -161,9 +167,9 @@ public class CommitsCommand implements CommandMarker {
HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
if (includeExtraMetadata) {
return printCommitsWithMetadata(activeTimeline, limit, sortByField, descending, headerOnly);
return printCommitsWithMetadata(activeTimeline, limit, sortByField, descending, headerOnly, exportTableName);
} else {
return printCommits(activeTimeline, limit, sortByField, descending, headerOnly);
return printCommits(activeTimeline, limit, sortByField, descending, headerOnly, exportTableName);
}
}
@@ -171,6 +177,8 @@ public class CommitsCommand implements CommandMarker {
public String showArchivedCommits(
@CliOption(key = {"includeExtraMetadata"}, help = "Include extra metadata",
unspecifiedDefaultValue = "false") final boolean includeExtraMetadata,
@CliOption(key = {"createView"}, mandatory = false, help = "view name to store output table",
unspecifiedDefaultValue = "") final String exportTableName,
@CliOption(key = {"startTs"}, mandatory = false, help = "start time for commits, default: now - 10 days")
String startTs,
@CliOption(key = {"endTs"}, mandatory = false, help = "end time for commits, default: now - 1 day")
@@ -195,9 +203,9 @@ public class CommitsCommand implements CommandMarker {
archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
HoodieDefaultTimeline timelineRange = archivedTimeline.findInstantsInRange(startTs, endTs);
if (includeExtraMetadata) {
return printCommitsWithMetadata(timelineRange, limit, sortByField, descending, headerOnly);
return printCommitsWithMetadata(timelineRange, limit, sortByField, descending, headerOnly, exportTableName);
} else {
return printCommits(timelineRange, limit, sortByField, descending, headerOnly);
return printCommits(timelineRange, limit, sortByField, descending, headerOnly, exportTableName);
}
} finally {
// clear the instant details from memory after printing to reduce usage
@@ -237,7 +245,10 @@ public class CommitsCommand implements CommandMarker {
}
@CliCommand(value = "commit showpartitions", help = "Show partition level details of a commit")
public String showCommitPartitions(@CliOption(key = {"commit"}, help = "Commit to show") final String commitTime,
public String showCommitPartitions(
@CliOption(key = {"createView"}, mandatory = false, help = "view name to store output table",
unspecifiedDefaultValue = "") final String exportTableName,
@CliOption(key = {"commit"}, help = "Commit to show") final String commitTime,
@CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@@ -287,11 +298,15 @@ public class CommitsCommand implements CommandMarker {
.addTableHeaderField("Total Records Inserted").addTableHeaderField("Total Records Updated")
.addTableHeaderField("Total Bytes Written").addTableHeaderField("Total Errors");
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending,
limit, headerOnly, rows, exportTableName);
}
@CliCommand(value = "commit showfiles", help = "Show file level details of a commit")
public String showCommitFiles(@CliOption(key = {"commit"}, help = "Commit to show") final String commitTime,
public String showCommitFiles(
@CliOption(key = {"createView"}, mandatory = false, help = "view name to store output table",
unspecifiedDefaultValue = "") final String exportTableName,
@CliOption(key = {"commit"}, help = "Commit to show") final String commitTime,
@CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@@ -323,7 +338,8 @@ public class CommitsCommand implements CommandMarker {
.addTableHeaderField("Total Records Written").addTableHeaderField("Total Bytes Written")
.addTableHeaderField("Total Errors").addTableHeaderField("File Size");
return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows);
return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending,
limit, headerOnly, rows, exportTableName);
}
@CliCommand(value = "commits compare", help = "Compare commits with another Hoodie table")

View File

@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.cli.commands;
import org.apache.hudi.cli.HoodieCLI;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* CLI command to query/delete temp views.
*/
@Component
public class TempViewCommand implements CommandMarker {
private static final String EMPTY_STRING = "";
@CliCommand(value = "temp_query", help = "query against created temp view")
public String query(
@CliOption(key = {"sql"}, mandatory = true, help = "select query to run against view") final String sql)
throws IOException {
HoodieCLI.getTempViewProvider().runQuery(sql);
return EMPTY_STRING;
}
@CliCommand(value = "temp_delete", help = "Delete view name")
public String delete(
@CliOption(key = {"view"}, mandatory = true, help = "view name") final String tableName)
throws IOException {
HoodieCLI.getTempViewProvider().deleteTable(tableName);
return EMPTY_STRING;
}
}

View File

@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.cli.utils;
import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import java.util.List;
import java.util.stream.Collectors;
public class SparkTempViewProvider implements TempViewProvider {
private static final Logger LOG = LogManager.getLogger(SparkTempViewProvider.class);
private JavaSparkContext jsc;
private SQLContext sqlContext;
public SparkTempViewProvider(String appName) {
try {
SparkConf sparkConf = new SparkConf().setAppName(appName)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[8]");
jsc = new JavaSparkContext(sparkConf);
jsc.setLogLevel("ERROR");
sqlContext = new SQLContext(jsc);
} catch (Throwable ex) {
// log full stack trace and rethrow. Without this its difficult to debug failures, if any
LOG.error("unable to initialize spark context ", ex);
throw new HoodieException(ex);
}
}
@Override
public void createOrReplace(String tableName, List<String> headers, List<List<Comparable>> rows) {
try {
if (headers.isEmpty() || rows.isEmpty()) {
return;
}
if (rows.stream().filter(row -> row.size() != headers.size()).count() > 0) {
throw new HoodieException("Invalid row, does not match headers " + headers.size() + " " + rows.size());
}
// replace all whitespaces in headers to make it easy to write sql queries
List<String> headersNoSpaces = headers.stream().map(title -> title.replaceAll("\\s+",""))
.collect(Collectors.toList());
// generate schema for table
StructType structType = new StructType();
for (int i = 0; i < headersNoSpaces.size(); i++) {
// try guessing data type from column data.
DataType headerDataType = getDataType(rows.get(0).get(i));
structType = structType.add(DataTypes.createStructField(headersNoSpaces.get(i), headerDataType, true));
}
List<Row> records = rows.stream().map(row -> RowFactory.create(row.toArray(new Comparable[row.size()])))
.collect(Collectors.toList());
Dataset<Row> dataset = this.sqlContext.createDataFrame(records, structType);
dataset.createOrReplaceTempView(tableName);
System.out.println("Wrote table view: " + tableName);
} catch (Throwable ex) {
// log full stack trace and rethrow. Without this its difficult to debug failures, if any
LOG.error("unable to write ", ex);
throw new HoodieException(ex);
}
}
@Override
public void runQuery(String sqlText) {
try {
this.sqlContext.sql(sqlText).show(Integer.MAX_VALUE, false);
} catch (Throwable ex) {
// log full stack trace and rethrow. Without this its difficult to debug failures, if any
LOG.error("unable to read ", ex);
throw new HoodieException(ex);
}
}
@Override
public void deleteTable(String tableName) {
try {
sqlContext.sql("DROP TABLE IF EXISTS " + tableName);
} catch (Throwable ex) {
// log full stack trace and rethrow. Without this its difficult to debug failures, if any
LOG.error("unable to initialize spark context ", ex);
throw new HoodieException(ex);
}
}
private DataType getDataType(Comparable comparable) {
if (comparable instanceof Integer) {
return DataTypes.IntegerType;
}
if (comparable instanceof Double) {
return DataTypes.DoubleType;
}
if (comparable instanceof Long) {
return DataTypes.LongType;
}
if (comparable instanceof Boolean) {
return DataTypes.BooleanType;
}
// TODO add additional types when needed. default to string
return DataTypes.StringType;
}
}

View File

@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.cli.utils;
import java.util.List;
public interface TempViewProvider {
void createOrReplace(String tableName, List<String> headers, List<List<Comparable>> rows);
void runQuery(String sqlText);
void deleteTable(String tableName);
}