1
0

Add CLI support inspect, schedule and run compaction

This commit is contained in:
Balaji Varadarajan
2018-06-05 18:26:01 -07:00
committed by vinoth chandar
parent 2e12c86d01
commit 594059a19c
7 changed files with 653 additions and 179 deletions

View File

@@ -0,0 +1,229 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.cli.commands;
import com.uber.hoodie.avro.model.HoodieCompactionOperation;
import com.uber.hoodie.avro.model.HoodieCompactionPlan;
import com.uber.hoodie.cli.HoodieCLI;
import com.uber.hoodie.cli.HoodiePrintHelper;
import com.uber.hoodie.cli.TableHeader;
import com.uber.hoodie.cli.commands.SparkMain.SparkCommand;
import com.uber.hoodie.cli.utils.InputStreamConsumer;
import com.uber.hoodie.cli.utils.SparkUtil;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.timeline.HoodieInstant.State;
import com.uber.hoodie.common.util.AvroUtils;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.util.Utils;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
@Component
public class CompactionCommand implements CommandMarker {
private static Logger log = LogManager.getLogger(HDFSParquetImportCommand.class);
@CliCommand(value = "compactions show all", help = "Shows all compactions that are in active timeline")
public String compactionsAll(
@CliOption(key = {
"includeExtraMetadata"}, help = "Include extra metadata", unspecifiedDefaultValue = "false") final
boolean includeExtraMetadata,
@CliOption(key = {
"limit"}, mandatory = false, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@CliOption(key = {
"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") final
boolean headerOnly) throws IOException {
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitsAndCompactionTimeline();
HoodieTimeline commitTimeline = activeTimeline.getCommitTimeline().filterCompletedInstants();
Set<String> committed = commitTimeline.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
List<HoodieInstant> instants = timeline.getInstants().collect(Collectors.toList());
List<Comparable[]> rows = new ArrayList<>();
Collections.reverse(instants);
for (int i = 0; i < instants.size(); i++) {
HoodieInstant instant = instants.get(i);
HoodieCompactionPlan workload = null;
if (!instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) {
try {
// This could be a completed compaction. Assume a compaction request file is present but skip if fails
workload = AvroUtils.deserializeCompactionPlan(
activeTimeline.getInstantAuxiliaryDetails(
HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get());
} catch (HoodieIOException ioe) {
// SKIP
}
} else {
workload = AvroUtils.deserializeCompactionPlan(activeTimeline.getInstantAuxiliaryDetails(
HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get());
}
if (null != workload) {
HoodieInstant.State state = instant.getState();
if (committed.contains(instant.getTimestamp())) {
state = State.COMPLETED;
}
if (includeExtraMetadata) {
rows.add(new Comparable[]{instant.getTimestamp(),
state.toString(),
workload.getOperations() == null ? 0 : workload.getOperations().size(),
workload.getExtraMetadata().toString()});
} else {
rows.add(new Comparable[]{instant.getTimestamp(),
state.toString(),
workload.getOperations() == null ? 0 : workload.getOperations().size()});
}
}
}
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
TableHeader header = new TableHeader()
.addTableHeaderField("Compaction Instant Time")
.addTableHeaderField("State")
.addTableHeaderField("Total FileIds to be Compacted");
if (includeExtraMetadata) {
header = header.addTableHeaderField("Extra Metadata");
}
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
}
@CliCommand(value = "compaction show", help = "Shows compaction details for a specific compaction instant")
public String compactionShow(
@CliOption(key = "instant", mandatory = true, help = "Base path for the target hoodie dataset") final
String compactionInstantTime,
@CliOption(key = {
"limit"}, mandatory = false, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@CliOption(key = {
"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") final boolean headerOnly)
throws Exception {
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieCompactionPlan workload = AvroUtils.deserializeCompactionPlan(
activeTimeline.getInstantAuxiliaryDetails(
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get());
List<Comparable[]> rows = new ArrayList<>();
if ((null != workload) && (null != workload.getOperations())) {
for (HoodieCompactionOperation op : workload.getOperations()) {
rows.add(new Comparable[]{op.getPartitionPath(),
op.getFileId(),
op.getBaseInstantTime(),
op.getDataFilePath(),
op.getDeltaFilePaths().size(),
op.getMetrics().toString()
});
}
}
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
TableHeader header = new TableHeader()
.addTableHeaderField("Partition Path")
.addTableHeaderField("File Id")
.addTableHeaderField("Base Instant")
.addTableHeaderField("Data File Path")
.addTableHeaderField("Total Delta Files")
.addTableHeaderField("getMetrics");
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
}
@CliCommand(value = "compaction schedule", help = "Schedule Compaction")
public String scheduleCompact(
@CliOption(key = "tableName", mandatory = true, help = "Table name") final String tableName,
@CliOption(key = "rowKeyField", mandatory = true, help = "Row key field name") final String rowKeyField,
@CliOption(key = {
"parallelism"}, mandatory = true, help = "Parallelism for hoodie compaction") final String parallelism,
@CliOption(key = "schemaFilePath", mandatory = true, help = "Path for Avro schema file") final String
schemaFilePath,
@CliOption(key = "sparkMemory", mandatory = true, help = "Spark executor memory") final String sparkMemory,
@CliOption(key = "retry", mandatory = true, help = "Number of retries") final String retry) throws Exception {
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);
// First get a compaction instant time and pass it to spark launcher for scheduling compaction
String compactionInstantTime = HoodieActiveTimeline.createNewCommitTime();
if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) {
String sparkPropertiesPath = Utils.getDefaultPropertiesFile(
scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_SCHEDULE.toString(), HoodieCLI.tableMetadata.getBasePath(),
tableName, compactionInstantTime, rowKeyField, parallelism, schemaFilePath, sparkMemory, retry);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to run compaction for " + compactionInstantTime;
}
return "Compaction successfully completed for " + compactionInstantTime;
} else {
throw new Exception("Compactions can only be run for table type : MERGE_ON_READ");
}
}
@CliCommand(value = "compaction run", help = "Run Compaction for given instant time")
public String compact(
@CliOption(key = "tableName", mandatory = true, help = "Table name") final String tableName,
@CliOption(key = "rowKeyField", mandatory = true, help = "Row key field name") final String rowKeyField,
@CliOption(key = {
"parallelism"}, mandatory = true, help = "Parallelism for hoodie compaction") final String parallelism,
@CliOption(key = "schemaFilePath", mandatory = true, help = "Path for Avro schema file") final String
schemaFilePath,
@CliOption(key = "sparkMemory", mandatory = true, help = "Spark executor memory") final String sparkMemory,
@CliOption(key = "retry", mandatory = true, help = "Number of retries") final String retry,
@CliOption(key = "compactionInstant", mandatory = true, help = "Base path for the target hoodie dataset") final
String compactionInstantTime) throws Exception {
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);
if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) {
String sparkPropertiesPath = Utils.getDefaultPropertiesFile(
scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_RUN.toString(), HoodieCLI.tableMetadata.getBasePath(),
tableName, compactionInstantTime, rowKeyField, parallelism, schemaFilePath, sparkMemory, retry);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to run compaction for " + compactionInstantTime;
}
return "Compaction successfully completed for " + compactionInstantTime;
} else {
throw new Exception("Compactions can only be run for table type : MERGE_ON_READ");
}
}
}

View File

@@ -29,14 +29,17 @@ import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
import scala.collection.JavaConverters;
@Component
public class HDFSParquetImportCommand implements CommandMarker {
private static Logger log = LogManager.getLogger(HDFSParquetImportCommand.class);
@CliCommand(value = "hdfsparquetimport", help = "Imports hdfs dataset to a hoodie dataset")
@CliCommand(value = "hdfsparquetimport", help = "Imports Parquet dataset to a hoodie dataset")
public String convert(
@CliOption(key = "upsert", mandatory = false, unspecifiedDefaultValue = "false",
help = "Uses upsert API instead of the default insert API of WriteClient") boolean useUpsert,
@CliOption(key = "srcPath", mandatory = true, help = "Base path for the input dataset") final String srcPath,
@CliOption(key = "srcType", mandatory = true, help = "Source type for the input dataset") final String srcType,
@CliOption(key = "targetPath", mandatory = true, help = "Base path for the target hoodie dataset") final String
@@ -59,10 +62,16 @@ public class HDFSParquetImportCommand implements CommandMarker {
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);
String sparkPropertiesPath = Utils.getDefaultPropertiesFile(
scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.IMPORT.toString(), srcPath, targetPath, tableName, tableType, rowKeyField,
String cmd = SparkCommand.IMPORT.toString();
if (useUpsert) {
cmd = SparkCommand.UPSERT.toString();
}
sparkLauncher.addAppArgs(cmd, srcPath, targetPath, tableName, tableType, rowKeyField,
partitionPathField, parallelism, schemaFilePath, sparkMemory, retry);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);

View File

@@ -24,6 +24,7 @@ import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.utilities.HDFSParquetImporter;
import com.uber.hoodie.utilities.HoodieCompactor;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
@@ -32,12 +33,11 @@ public class SparkMain {
protected static final Logger LOG = Logger.getLogger(SparkMain.class);
/**
* Commands
*/
enum SparkCommand {
ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT
ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN,
}
public static void main(String[] args) throws Exception {
@@ -62,9 +62,20 @@ public class SparkMain {
returnCode = rollbackToSavepoint(jsc, args[1], args[2]);
break;
case IMPORT:
case UPSERT:
assert (args.length == 11);
returnCode = dataImport(jsc, args[1], args[2], args[3], args[4], args[5], args[6], Integer.parseInt(args[7]),
args[8], SparkUtil.DEFUALT_SPARK_MASTER, args[9], Integer.parseInt(args[10]));
returnCode = dataLoad(jsc, command, args[1], args[2], args[3], args[4], args[5], args[6],
Integer.parseInt(args[7]), args[8], SparkUtil.DEFUALT_SPARK_MASTER, args[9], Integer.parseInt(args[10]));
break;
case COMPACT_RUN:
assert (args.length == 9);
returnCode = compact(jsc, args[1], args[2], args[3], args[4], args[5], Integer.parseInt(args[6]),
args[7], args[8], Integer.parseInt(args[9]), false);
break;
case COMPACT_SCHEDULE:
assert (args.length == 10);
returnCode = compact(jsc, args[1], args[2], args[3], args[4], args[5], Integer.parseInt(args[6]),
args[7], args[8], Integer.parseInt(args[9]), true);
break;
default:
break;
@@ -73,10 +84,12 @@ public class SparkMain {
System.exit(returnCode);
}
private static int dataImport(JavaSparkContext jsc, String srcPath, String targetPath, String tableName,
private static int dataLoad(JavaSparkContext jsc, String command,
String srcPath, String targetPath, String tableName,
String tableType, String rowKey, String partitionKey, int parallelism, String schemaFile, String sparkMaster,
String sparkMemory, int retry) throws Exception {
HDFSParquetImporter.Config cfg = new HDFSParquetImporter.Config();
cfg.command = command;
cfg.srcPath = srcPath;
cfg.targetPath = targetPath;
cfg.tableName = tableName;
@@ -89,6 +102,22 @@ public class SparkMain {
return new HDFSParquetImporter(cfg).dataImport(jsc, retry);
}
private static int compact(JavaSparkContext jsc, String basePath, String tableName, String compactionInstant,
String rowKey, String partitionKey, int parallelism, String schemaFile,
String sparkMemory, int retry, boolean schedule) throws Exception {
HoodieCompactor.Config cfg = new HoodieCompactor.Config();
cfg.basePath = basePath;
cfg.tableName = tableName;
cfg.compactionInstantTime = compactionInstant;
cfg.rowKey = rowKey;
cfg.partitionKey = partitionKey;
cfg.parallelism = parallelism;
cfg.schemaFile = schemaFile;
cfg.runSchedule = schedule;
jsc.getConf().set("spark.executor.memory", sparkMemory);
return new HoodieCompactor(cfg).compact(jsc, retry);
}
private static int deduplicatePartitionPath(JavaSparkContext jsc, String duplicatedPartitionPath,
String repairedOutputPath, String basePath) throws Exception {
DedupeSparkJob job = new DedupeSparkJob(basePath, duplicatedPartitionPath, repairedOutputPath, new SQLContext(jsc),

View File

@@ -149,6 +149,10 @@ public class HoodieInstant implements Serializable {
&& Objects.equals(timestamp, that.timestamp);
}
public State getState() {
return state;
}
@Override
public int hashCode() {
return Objects.hash(state, action, timestamp);

View File

@@ -26,24 +26,21 @@ import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.HoodieJsonPayload;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.HoodieTableConfig;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.index.HoodieIndex;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
@@ -52,23 +49,150 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
/**
* Loads data from Parquet Sources
*/
public class HDFSParquetImporter implements Serializable {
public static final SimpleDateFormat PARTITION_FORMATTER = new SimpleDateFormat("yyyy/MM/dd");
private static volatile Logger logger = LogManager.getLogger(HDFSParquetImporter.class);
private final Config cfg;
private transient FileSystem fs;
public static final SimpleDateFormat PARTITION_FORMATTER = new SimpleDateFormat("yyyy/MM/dd");
public HDFSParquetImporter(Config cfg) throws IOException {
this.cfg = cfg;
}
public static void main(String[] args) throws Exception {
final Config cfg = new Config();
JCommander cmd = new JCommander(cfg, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
dataImporter.dataImport(UtilHelpers.buildSparkContext(cfg.tableName, cfg.sparkMaster, cfg.sparkMemory), cfg.retry);
}
public int dataImport(JavaSparkContext jsc, int retry) throws Exception {
this.fs = FSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration());
int ret = -1;
try {
// Verify that targetPath is not present.
if (fs.exists(new Path(cfg.targetPath))) {
throw new HoodieIOException(String.format("Make sure %s is not present.", cfg.targetPath));
}
do {
ret = dataImport(jsc);
} while (ret != 0 && retry-- > 0);
} catch (Throwable t) {
logger.error(t);
}
return ret;
}
@VisibleForTesting
protected int dataImport(JavaSparkContext jsc) throws IOException {
try {
if (fs.exists(new Path(cfg.targetPath))) {
// cleanup target directory.
fs.delete(new Path(cfg.targetPath), true);
}
//Get schema.
String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
// Initialize target hoodie table.
Properties properties = new Properties();
properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, cfg.tableName);
properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, cfg.tableType);
HoodieTableMetaClient
.initializePathAsHoodieDataset(jsc.hadoopConfiguration(), cfg.targetPath, properties);
HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.targetPath, schemaStr,
cfg.parallelism, Optional.empty());
JavaRDD<HoodieRecord<HoodieRecordPayload>> hoodieRecords = buildHoodieRecordsForImport(jsc, schemaStr);
// Get instant time.
String instantTime = client.startCommit();
JavaRDD<WriteStatus> writeResponse = load(client, instantTime, hoodieRecords);
return UtilHelpers.handleErrors(jsc, instantTime, writeResponse);
} catch (Throwable t) {
logger.error("Error occurred.", t);
}
return -1;
}
protected JavaRDD<HoodieRecord<HoodieRecordPayload>> buildHoodieRecordsForImport(
JavaSparkContext jsc, String schemaStr) throws IOException {
Job job = Job.getInstance(jsc.hadoopConfiguration());
// Allow recursive directories to be found
job.getConfiguration().set(FileInputFormat.INPUT_DIR_RECURSIVE, "true");
// To parallelize reading file status.
job.getConfiguration().set(FileInputFormat.LIST_STATUS_NUM_THREADS, "1024");
AvroReadSupport
.setAvroReadSchema(jsc.hadoopConfiguration(), (new Schema.Parser().parse(schemaStr)));
ParquetInputFormat.setReadSupportClass(job, (AvroReadSupport.class));
return jsc.newAPIHadoopFile(cfg.srcPath,
ParquetInputFormat.class, Void.class, GenericRecord.class, job.getConfiguration())
// To reduce large number of
// tasks.
.coalesce(16 * cfg.parallelism)
.map(entry -> {
GenericRecord genericRecord
= ((Tuple2<Void, GenericRecord>) entry)._2();
Object partitionField =
genericRecord.get(cfg.partitionKey);
if (partitionField == null) {
throw new HoodieIOException(
"partition key is missing. :"
+ cfg.partitionKey);
}
Object rowField = genericRecord.get(cfg.rowKey);
if (rowField == null) {
throw new HoodieIOException(
"row field is missing. :" + cfg.rowKey);
}
String partitionPath = partitionField.toString();
logger.info("Row Key : " + rowField + ", Partition Path is (" + partitionPath + ")");
if (partitionField instanceof Number) {
try {
long ts = (long) (Double.parseDouble(partitionField.toString()) * 1000L);
partitionPath =
PARTITION_FORMATTER.format(new Date(ts));
} catch (NumberFormatException nfe) {
logger.warn("Unable to parse date from partition field. Assuming partition as (" + partitionField + ")");
}
}
return new HoodieRecord<>(
new HoodieKey(
(String) rowField, partitionPath),
new HoodieJsonPayload(
genericRecord.toString()));
});
}
/**
* Imports records to Hoodie dataset
*
* @param client Hoodie Client
* @param instantTime Instant Time
* @param hoodieRecords Hoodie Records
* @param <T> Type
*/
protected <T extends HoodieRecordPayload> JavaRDD<WriteStatus> load(HoodieWriteClient client,
String instantTime, JavaRDD<HoodieRecord<T>> hoodieRecords) {
if (cfg.command.toLowerCase().equals("insert")) {
return client.insert(hoodieRecords, instantTime);
}
return client.upsert(hoodieRecords, instantTime);
}
public static class FormatValidator implements IValueValidator<String> {
List<String> validFormats = Arrays.asList("parquet");
@@ -97,6 +221,10 @@ public class HDFSParquetImporter implements Serializable {
public static class Config implements Serializable {
@Parameter(names = {"--command", "-c"},
description = "Write command Valid values are insert(default)/upsert",
required = false)
public String command = "INSERT";
@Parameter(names = {"--src-path",
"-sp"}, description = "Base path for the input dataset", required = true)
public String srcPath = null;
@@ -137,167 +265,4 @@ public class HDFSParquetImporter implements Serializable {
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
}
public static void main(String[] args) throws Exception {
final HDFSParquetImporter.Config cfg = new HDFSParquetImporter.Config();
JCommander cmd = new JCommander(cfg, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
dataImporter.dataImport(dataImporter.getSparkContext(), cfg.retry);
}
private JavaSparkContext getSparkContext() {
SparkConf sparkConf = new SparkConf().setAppName("hoodie-data-importer-" + cfg.tableName);
sparkConf.setMaster(cfg.sparkMaster);
if (cfg.sparkMaster.startsWith("yarn")) {
sparkConf.set("spark.eventLog.overwrite", "true");
sparkConf.set("spark.eventLog.enabled", "true");
}
sparkConf.set("spark.driver.maxResultSize", "2g");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.executor.memory", cfg.sparkMemory);
// Configure hadoop conf
sparkConf.set("spark.hadoop.mapred.output.compress", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec",
"org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
sparkConf = HoodieWriteClient.registerClasses(sparkConf);
return new JavaSparkContext(sparkConf);
}
private String getSchema() throws Exception {
// Read schema file.
Path p = new Path(cfg.schemaFile);
if (!fs.exists(p)) {
throw new Exception(String.format("Could not find - %s - schema file.", cfg.schemaFile));
}
long len = fs.getFileStatus(p).getLen();
ByteBuffer buf = ByteBuffer.allocate((int) len);
FSDataInputStream inputStream = null;
try {
inputStream = fs.open(p);
inputStream.readFully(0, buf.array(), 0, buf.array().length);
} finally {
if (inputStream != null) {
inputStream.close();
}
}
return new String(buf.array());
}
public int dataImport(JavaSparkContext jsc, int retry) throws Exception {
this.fs = FSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration());
int ret = -1;
try {
// Verify that targetPath is not present.
if (fs.exists(new Path(cfg.targetPath))) {
throw new HoodieIOException(String.format("Make sure %s is not present.", cfg.targetPath));
}
do {
ret = dataImport(jsc);
} while (ret != 0 && retry-- > 0);
} catch (Throwable t) {
logger.error(t);
}
return ret;
}
@VisibleForTesting
protected int dataImport(JavaSparkContext jsc) throws IOException {
try {
if (fs.exists(new Path(cfg.targetPath))) {
// cleanup target directory.
fs.delete(new Path(cfg.targetPath), true);
}
//Get schema.
String schemaStr = getSchema();
// Initialize target hoodie table.
Properties properties = new Properties();
properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, cfg.tableName);
properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, cfg.tableType);
HoodieTableMetaClient
.initializePathAsHoodieDataset(jsc.hadoopConfiguration(), cfg.targetPath, properties);
HoodieWriteClient client = createHoodieClient(jsc, cfg.targetPath, schemaStr,
cfg.parallelism);
Job job = Job.getInstance(jsc.hadoopConfiguration());
// To parallelize reading file status.
job.getConfiguration().set(FileInputFormat.LIST_STATUS_NUM_THREADS, "1024");
AvroReadSupport
.setAvroReadSchema(jsc.hadoopConfiguration(), (new Schema.Parser().parse(schemaStr)));
ParquetInputFormat.setReadSupportClass(job, (AvroReadSupport.class));
JavaRDD<HoodieRecord<HoodieJsonPayload>> hoodieRecords = jsc.newAPIHadoopFile(cfg.srcPath,
ParquetInputFormat.class, Void.class, GenericRecord.class, job.getConfiguration())
// To reduce large number of
// tasks.
.coalesce(16 * cfg.parallelism)
.map(entry -> {
GenericRecord genericRecord
= ((Tuple2<Void, GenericRecord>) entry)._2();
Object partitionField =
genericRecord.get(cfg.partitionKey);
if (partitionField == null) {
throw new HoodieIOException(
"partition key is missing. :"
+ cfg.partitionKey);
}
Object rowField = genericRecord.get(cfg.rowKey);
if (rowField == null) {
throw new HoodieIOException(
"row field is missing. :" + cfg.rowKey);
}
long ts = (long) ((Double) partitionField * 1000L);
String partitionPath =
PARTITION_FORMATTER.format(new Date(ts));
return new HoodieRecord<>(
new HoodieKey(
(String) rowField, partitionPath),
new HoodieJsonPayload(
genericRecord.toString()));
});
// Get commit time.
String commitTime = client.startCommit();
JavaRDD<WriteStatus> writeResponse = client.bulkInsert(hoodieRecords, commitTime);
Accumulator<Integer> errors = jsc.accumulator(0);
writeResponse.foreach(writeStatus -> {
if (writeStatus.hasErrors()) {
errors.add(1);
logger.error(String.format("Error processing records :writeStatus:%s",
writeStatus.getStat().toString()));
}
});
if (errors.value() == 0) {
logger.info(
String.format("Dataset imported into hoodie dataset with %s commit time.", commitTime));
return 0;
}
logger.error(String.format("Import failed with %d errors.", errors.value()));
} catch (Throwable t) {
logger.error("Error occurred.", t);
}
return -1;
}
private static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath,
String schemaStr, int parallelism) throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withParallelism(parallelism, parallelism).withSchema(schemaStr)
.combineInput(true, true).withIndexConfig(
HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.build();
return new HoodieWriteClient(jsc, config);
}
}

View File

@@ -0,0 +1,129 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.util.FSUtils;
import java.io.Serializable;
import java.util.Optional;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class HoodieCompactor {
private static volatile Logger logger = LogManager.getLogger(HDFSParquetImporter.class);
private final Config cfg;
private transient FileSystem fs;
public HoodieCompactor(Config cfg) {
this.cfg = cfg;
}
public static class Config implements Serializable {
@Parameter(names = {"--base-path",
"-sp"}, description = "Base path for the dataset", required = true)
public String basePath = null;
@Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
public String tableName = null;
@Parameter(names = {"--instant-time",
"-sp"}, description = "Compaction Instant time", required = true)
public String compactionInstantTime = null;
@Parameter(names = {"--row-key-field",
"-rk"}, description = "Row key field name", required = true)
public String rowKey = null;
@Parameter(names = {"--partition-key-field",
"-pk"}, description = "Partition key field name", required = true)
public String partitionKey = null;
@Parameter(names = {"--parallelism",
"-pl"}, description = "Parallelism for hoodie insert", required = true)
public int parallelism = 1;
@Parameter(names = {"--schema-file",
"-sf"}, description = "path for Avro schema file", required = true)
public String schemaFile = null;
@Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false)
public String sparkMaster = null;
@Parameter(names = {"--spark-memory",
"-sm"}, description = "spark memory to use", required = true)
public String sparkMemory = null;
@Parameter(names = {"--retry", "-rt"}, description = "number of retries", required = false)
public int retry = 0;
@Parameter(names = {"--schedule", "-sc"}, description = "Schedule compaction", required = false)
public Boolean runSchedule = false;
@Parameter(names = {"--strategy", "-st"}, description = "Stratgey Class", required = false)
public String strategyClassName = null;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
}
public static void main(String[] args) throws Exception {
final Config cfg = new Config();
JCommander cmd = new JCommander(cfg, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
HoodieCompactor compactor = new HoodieCompactor(cfg);
compactor.compact(UtilHelpers.buildSparkContext(cfg.tableName, cfg.sparkMaster, cfg.sparkMemory), cfg.retry);
}
public int compact(JavaSparkContext jsc, int retry) {
this.fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration());
int ret = -1;
try {
do {
if (cfg.runSchedule) {
if (null == cfg.strategyClassName) {
throw new IllegalArgumentException("Missing Strategy class name for running compaction");
}
ret = doSchedule(jsc);
} else {
ret = doCompact(jsc);
}
} while (ret != 0 && retry-- > 0);
} catch (Throwable t) {
logger.error(t);
}
return ret;
}
private int doCompact(JavaSparkContext jsc) throws Exception {
//Get schema.
String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism,
Optional.empty());
JavaRDD<WriteStatus> writeResponse = client.compact(cfg.compactionInstantTime);
return UtilHelpers.handleErrors(jsc, cfg.compactionInstantTime, writeResponse);
}
private int doSchedule(JavaSparkContext jsc) throws Exception {
//Get schema.
String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism,
Optional.of(cfg.strategyClassName));
client.scheduleCompactionAtInstant(cfg.compactionInstantTime, Optional.empty());
return 0;
}
}

View File

@@ -18,24 +18,39 @@
package com.uber.hoodie.utilities;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import com.uber.hoodie.utilities.sources.Source;
import com.uber.hoodie.utilities.sources.SourceDataFormat;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
/**
* Bunch of helper methods
*/
public class UtilHelpers {
private static Logger logger = LogManager.getLogger(UtilHelpers.class);
public static Source createSource(String sourceClass, PropertiesConfiguration cfg,
JavaSparkContext jssc, SourceDataFormat dataFormat, SchemaProvider schemaProvider)
@@ -76,4 +91,98 @@ public class UtilHelpers {
}
}
/**
* Parse Schema from file
*
* @param fs File System
* @param schemaFile Schema File
*/
public static String parseSchema(FileSystem fs, String schemaFile) throws Exception {
// Read schema file.
Path p = new Path(schemaFile);
if (!fs.exists(p)) {
throw new Exception(String.format("Could not find - %s - schema file.", schemaFile));
}
long len = fs.getFileStatus(p).getLen();
ByteBuffer buf = ByteBuffer.allocate((int) len);
FSDataInputStream inputStream = null;
try {
inputStream = fs.open(p);
inputStream.readFully(0, buf.array(), 0, buf.array().length);
} finally {
if (inputStream != null) {
inputStream.close();
}
}
return new String(buf.array());
}
/**
* Build Spark Context for ingestion/compaction
* @return
*/
public static JavaSparkContext buildSparkContext(String tableName, String sparkMaster, String sparkMemory) {
SparkConf sparkConf = new SparkConf().setAppName("hoodie-data-importer-" + tableName);
sparkConf.setMaster(sparkMaster);
if (sparkMaster.startsWith("yarn")) {
sparkConf.set("spark.eventLog.overwrite", "true");
sparkConf.set("spark.eventLog.enabled", "true");
}
sparkConf.set("spark.driver.maxResultSize", "2g");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.executor.memory", sparkMemory);
// Configure hadoop conf
sparkConf.set("spark.hadoop.mapred.output.compress", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec",
"org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
sparkConf = HoodieWriteClient.registerClasses(sparkConf);
return new JavaSparkContext(sparkConf);
}
/**
* Build Hoodie write client
*
* @param jsc Java Spark Context
* @param basePath Base Path
* @param schemaStr Schema
* @param parallelism Parallelism
*/
public static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath,
String schemaStr, int parallelism, Optional<String> compactionStrategyClass) throws Exception {
HoodieCompactionConfig compactionConfig =
compactionStrategyClass.map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false)
.withCompactionStrategy(ReflectionUtils.loadClass(strategy))
.build()).orElse(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build());
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withParallelism(parallelism, parallelism).withSchema(schemaStr)
.combineInput(true, true)
.withCompactionConfig(compactionConfig)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.build();
return new HoodieWriteClient(jsc, config);
}
public static int handleErrors(JavaSparkContext jsc, String instantTime, JavaRDD<WriteStatus> writeResponse) {
Accumulator<Integer> errors = jsc.accumulator(0);
writeResponse.foreach(writeStatus -> {
if (writeStatus.hasErrors()) {
errors.add(1);
logger.error(String.format("Error processing records :writeStatus:%s",
writeStatus.getStat().toString()));
}
});
if (errors.value() == 0) {
logger.info(
String.format("Dataset imported into hoodie dataset with %s instant time.", instantTime));
return 0;
}
logger.error(String.format("Import failed with %d errors.", errors.value()));
return -1;
}
}