diff --git a/hoodie-cli/hoodie-cli.sh b/hoodie-cli/hoodie-cli.sh index 05adb9496..70a54a4c7 100755 --- a/hoodie-cli/hoodie-cli.sh +++ b/hoodie-cli/hoodie-cli.sh @@ -1,4 +1,12 @@ #!/usr/bin/env bash DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" HOODIE_JAR=`ls $DIR/target/hoodie-cli-*-SNAPSHOT.jar` -java -cp /etc/hadoop/conf:/etc/spark/conf:$DIR/target/lib/*:$HOODIE_JAR org.springframework.shell.Bootstrap +if [ -z "$HADOOP_CONF_DIR" ]; then + echo "setting hadoop conf dir" + HADOOP_CONF_DIR="/etc/hadoop/conf" +fi +if [ -z "$SPARK_CONF_DIR" ]; then + echo "setting spark conf dir" + SPARK_CONF_DIR="/etc/spark/conf" +fi +java -cp ${HADOOP_CONF_DIR}:${SPARK_CONF_DIR}:$DIR/target/lib/*:$HOODIE_JAR org.springframework.shell.Bootstrap diff --git a/hoodie-cli/pom.xml b/hoodie-cli/pom.xml index d3a586624..b9724d37d 100644 --- a/hoodie-cli/pom.xml +++ b/hoodie-cli/pom.xml @@ -151,8 +151,6 @@ org.apache.spark spark-sql_2.10 - ${spark.version} - provided @@ -195,6 +193,11 @@ joda-time 2.9.6 + + com.uber.hoodie + hoodie-utilities + ${project.version} + diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HDFSParquetImportCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HDFSParquetImportCommand.java new file mode 100644 index 000000000..4053d4e72 --- /dev/null +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HDFSParquetImportCommand.java @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.cli.commands; + +import com.uber.hoodie.cli.HoodieCLI; +import com.uber.hoodie.cli.commands.SparkMain.SparkCommand; +import com.uber.hoodie.cli.utils.InputStreamConsumer; +import com.uber.hoodie.cli.utils.SparkUtil; +import com.uber.hoodie.utilities.HDFSParquetImporter; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.launcher.SparkLauncher; +import org.apache.spark.util.Utils; +import org.springframework.shell.core.CommandMarker; +import org.springframework.shell.core.annotation.CliCommand; +import org.springframework.shell.core.annotation.CliOption; +import org.springframework.stereotype.Component; + +@Component +public class HDFSParquetImportCommand implements CommandMarker { + + private static Logger log = LogManager.getLogger(HDFSParquetImportCommand.class); + + @CliCommand(value = "hdfsparquetimport", help = "Imports hdfs dataset to a hoodie dataset") + public String convert( + @CliOption(key = "srcPath", mandatory = true, help = "Base path for the input dataset") + final String srcPath, + @CliOption(key = "srcType", mandatory = true, help = "Source type for the input dataset") + final String srcType, + @CliOption(key = "targetPath", mandatory = true, help = "Base path for the target hoodie dataset") + final String targetPath, + @CliOption(key = "tableName", mandatory = true, help = "Table name") + final String tableName, + @CliOption(key = "tableType", mandatory = true, help = "Table type") + final String tableType, + @CliOption(key = "rowKeyField", mandatory = true, help = "Row key field name") + final String rowKeyField, + @CliOption(key = "partitionPathField", mandatory = true, help = "Partition path field name") + final String partitionPathField, + @CliOption(key = {"parallelism"}, mandatory = true, help = "Parallelism for hoodie insert") + final String parallelism, + @CliOption(key = "schemaFilePath", mandatory = true, help = "Path for Avro schema file") + final String schemaFilePath, + @CliOption(key = "format", mandatory = true, help = "Format for the input data") + final String format, + @CliOption(key = "sparkMemory", mandatory = true, help = "Spark executor memory") + final String sparkMemory, + @CliOption(key = "retry", mandatory = true, help = "Number of retries") + final String retry) + throws Exception { + + validate(format, srcType); + + boolean initialized = HoodieCLI.initConf(); + HoodieCLI.initFS(initialized); + + String sparkPropertiesPath = Utils + .getDefaultPropertiesFile(scala.collection.JavaConversions.asScalaMap(System.getenv())); + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + + sparkLauncher.addAppArgs(SparkCommand.IMPORT.toString(), srcPath, targetPath, tableName, + tableType, rowKeyField, partitionPathField, parallelism, schemaFilePath, sparkMemory, + retry); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + if (exitCode != 0) { + return "Failed to import dataset to hoodie format"; + } + return "Dataset imported to hoodie format"; + } + + private void validate(String format, String srcType) { + (new HDFSParquetImporter.FormatValidator()).validate("format", format); + (new HDFSParquetImporter.SourceTypeValidator()).validate("srcType", srcType); + } +} diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SparkMain.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SparkMain.java index 227226f00..1564b87b2 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SparkMain.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SparkMain.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * Copyright (c) 2016,2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,13 +23,11 @@ import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.index.HoodieIndex; - +import com.uber.hoodie.utilities.HDFSParquetImporter; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; -import java.util.Date; - public class SparkMain { protected final static Logger LOG = Logger.getLogger(SparkMain.class); @@ -41,7 +39,9 @@ public class SparkMain { enum SparkCommand { ROLLBACK, DEDUPLICATE, - ROLLBACK_TO_SAVEPOINT, SAVEPOINT + ROLLBACK_TO_SAVEPOINT, + SAVEPOINT, + IMPORT } public static void main(String[] args) throws Exception { @@ -52,20 +52,46 @@ public class SparkMain { JavaSparkContext jsc = SparkUtil.initJavaSparkConf("hoodie-cli-" + command); int returnCode = 0; - if (SparkCommand.ROLLBACK.equals(cmd)) { - assert (args.length == 3); - returnCode = rollback(jsc, args[1], args[2]); - } else if(SparkCommand.DEDUPLICATE.equals(cmd)) { - assert (args.length == 4); - returnCode = deduplicatePartitionPath(jsc, args[1], args[2], args[3]); - } else if(SparkCommand.ROLLBACK_TO_SAVEPOINT.equals(cmd)) { - assert (args.length == 3); - returnCode = rollbackToSavepoint(jsc, args[1], args[2]); + switch(cmd) { + case ROLLBACK: + assert (args.length == 3); + returnCode = rollback(jsc, args[1], args[2]); + break; + case DEDUPLICATE: + assert (args.length == 4); + returnCode = deduplicatePartitionPath(jsc, args[1], args[2], args[3]); + break; + case ROLLBACK_TO_SAVEPOINT: + assert (args.length == 3); + returnCode = rollbackToSavepoint(jsc, args[1], args[2]); + break; + case IMPORT: + assert (args.length == 11); + returnCode = dataImport(jsc, args[1], args[2], args[3], args[4], args[5], args[6], + Integer.parseInt(args[7]), args[8], SparkUtil.DEFUALT_SPARK_MASTER, args[9], + Integer.parseInt(args[10])); + break; } System.exit(returnCode); } + private static int dataImport(JavaSparkContext jsc, String srcPath, String targetPath, + String tableName, String tableType, String rowKey, String partitionKey, int parallelism, + String schemaFile, String sparkMaster, String sparkMemory, int retry) throws Exception { + HDFSParquetImporter.Config cfg = new HDFSParquetImporter.Config(); + cfg.srcPath = srcPath; + cfg.targetPath = targetPath; + cfg.tableName = tableName; + cfg.tableType = tableType; + cfg.rowKey = rowKey; + cfg.partitionKey = partitionKey; + cfg.parallelism = parallelism; + cfg.schemaFile = schemaFile; + jsc.getConf().set("spark.executor.memory", sparkMemory); + return new HDFSParquetImporter(cfg).dataImport(jsc, retry); + } + private static int deduplicatePartitionPath(JavaSparkContext jsc, String duplicatedPartitionPath, String repairedOutputPath, diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/utils/SparkUtil.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/utils/SparkUtil.java index d2d5e4c8f..5cb5e4bd2 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/utils/SparkUtil.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/utils/SparkUtil.java @@ -30,6 +30,7 @@ import java.net.URISyntaxException; public class SparkUtil { public static Logger logger = Logger.getLogger(SparkUtil.class); + public static final String DEFUALT_SPARK_MASTER = "yarn-client"; /** * @@ -55,7 +56,7 @@ public class SparkUtil { public static JavaSparkContext initJavaSparkConf(String name) { SparkConf sparkConf = new SparkConf().setAppName(name); - sparkConf.setMaster("yarn-client"); + sparkConf.setMaster(DEFUALT_SPARK_MASTER); sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); sparkConf.set("spark.driver.maxResultSize", "2g"); sparkConf.set("spark.eventLog.overwrite", "true"); diff --git a/hoodie-client/pom.xml b/hoodie-client/pom.xml index b811a2118..617dc9c4b 100644 --- a/hoodie-client/pom.xml +++ b/hoodie-client/pom.xml @@ -154,14 +154,11 @@ org.apache.spark spark-core_2.10 - provided org.apache.spark spark-sql_2.10 - ${spark.version} - provided diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index dc8251874..c1359f5c7 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -37,12 +37,10 @@ import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; -import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; import com.uber.hoodie.common.util.AvroUtils; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieCommitException; -import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.HoodieInsertException; import com.uber.hoodie.exception.HoodieRollbackException; @@ -55,44 +53,31 @@ import com.uber.hoodie.io.HoodieCommitArchiveLog; import com.uber.hoodie.metrics.HoodieMetrics; import com.uber.hoodie.table.HoodieTable; import com.uber.hoodie.table.WorkloadProfile; - -import org.apache.commons.lang3.tuple.Pair; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.spark.Accumulator; -import org.apache.spark.Partitioner; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.api.java.function.VoidFunction; -import org.apache.spark.storage.StorageLevel; - import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.text.ParseException; -import java.util.Arrays; +import java.text.SimpleDateFormat; import java.util.Collections; import java.util.Date; - import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; -import java.util.stream.Stream; - -import org.apache.spark.util.AccumulatorV2; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.Partitioner; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.storage.StorageLevel; import org.apache.spark.util.LongAccumulator; import scala.Option; import scala.Tuple2; @@ -776,7 +761,7 @@ public class HoodieWriteClient implements Seriali * Provides a new commit time for a write operation (insert/update) */ public String startCommit() { - String commitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()); + String commitTime = HoodieActiveTimeline.createNewCommitTime(); startCommitWithTime(commitTime); return commitTime; } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java index 6eff5fe4f..e2b50c544 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java @@ -50,6 +50,7 @@ public class HoodieTestDataGenerator { public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ " + + "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": \"_row_key\", \"type\": \"string\"}," + "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": \"driver\", \"type\": \"string\"}," @@ -146,17 +147,25 @@ public class HoodieTestDataGenerator { * provided. */ public static TestRawTripPayload generateRandomValue(HoodieKey key, String commitTime) throws IOException { + GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime, + "driver-" + commitTime, 0.0); + HoodieAvroUtils.addCommitMetadataToRecord(rec, commitTime, "-1"); + return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA); + } + + public static GenericRecord generateGenericRecord(String rowKey, String riderName, + String driverName, double timestamp) { GenericRecord rec = new GenericData.Record(avroSchema); - rec.put("_row_key", key.getRecordKey()); - rec.put("rider", "rider-" + commitTime); - rec.put("driver", "driver-" + commitTime); + rec.put("_row_key", rowKey); + rec.put("timestamp", timestamp); + rec.put("rider", riderName); + rec.put("driver", driverName); rec.put("begin_lat", rand.nextDouble()); rec.put("begin_lon", rand.nextDouble()); rec.put("end_lat", rand.nextDouble()); rec.put("end_lon", rand.nextDouble()); rec.put("fare", rand.nextDouble() * 100); - HoodieAvroUtils.addCommitMetadataToRecord(rec, commitTime, "-1"); - return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA); + return rec; } public static void createCommitFile(String basePath, String commitTime) throws IOException { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java index 1c991ad26..475f3ffd4 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java @@ -23,6 +23,7 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.exception.HoodieIOException; +import java.util.Date; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -56,11 +57,20 @@ import java.util.stream.Stream; public class HoodieActiveTimeline extends HoodieDefaultTimeline { public static final SimpleDateFormat COMMIT_FORMATTER = new SimpleDateFormat("yyyyMMddHHmmss"); + private final transient static Logger log = LogManager.getLogger(HoodieActiveTimeline.class); private String metaPath; private transient FileSystem fs; + /** + * Returns next commit time in the {@link #COMMIT_FORMATTER} format. + * @return + */ + public static String createNewCommitTime() { + return HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()); + } + protected HoodieActiveTimeline(FileSystem fs, String metaPath, String[] includedExtensions) { // Filter all the filter in the metapath and include only the extensions passed and // convert them into HoodieInstant diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java index 4385b746b..77448d058 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java @@ -16,6 +16,7 @@ package com.uber.hoodie.common.util; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.log.HoodieLogFile; @@ -52,8 +53,21 @@ public class FSUtils { private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10; private static final long MIN_CLEAN_TO_KEEP = 10; private static final long MIN_ROLLBACK_TO_KEEP = 10; + private static FileSystem fs; + + /** + * Only to be used for testing. + */ + @VisibleForTesting + public static void setFs(FileSystem fs) { + FSUtils.fs = fs; + } + public static FileSystem getFs() { + if (fs != null) { + return fs; + } Configuration conf = new Configuration(); conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); @@ -66,6 +80,7 @@ public class FSUtils { } LOG.info(String.format("Hadoop Configuration: fs.defaultFS: [%s], Config:[%s], FileSystem: [%s]", conf.getRaw("fs.defaultFS"), conf.toString(), fs.toString())); + return fs; } diff --git a/hoodie-utilities/pom.xml b/hoodie-utilities/pom.xml index a708ebc56..d76569ac4 100644 --- a/hoodie-utilities/pom.xml +++ b/hoodie-utilities/pom.xml @@ -79,6 +79,10 @@ + + org.apache.spark + spark-sql_2.10 + com.uber.hoodie hoodie-common @@ -219,7 +223,6 @@ org.apache.spark spark-core_2.10 - provided javax.servlet diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HDFSParquetImporter.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HDFSParquetImporter.java new file mode 100644 index 000000000..a8338b727 --- /dev/null +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HDFSParquetImporter.java @@ -0,0 +1,312 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.utilities; + +import com.beust.jcommander.IValueValidator; +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; +import com.google.common.annotations.VisibleForTesting; +import com.uber.hoodie.HoodieWriteClient; +import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.common.HoodieJsonPayload; +import com.uber.hoodie.common.model.HoodieKey; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.table.HoodieTableConfig; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.config.HoodieIndexConfig; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.exception.HoodieIOException; +import com.uber.hoodie.index.HoodieIndex; +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.Properties; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.spark.Accumulator; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.VoidFunction; +import scala.Tuple2; + +public class HDFSParquetImporter implements Serializable{ + + private static volatile Logger logger = LogManager.getLogger(HDFSParquetImporter.class); + private final Config cfg; + private final transient FileSystem fs; + public static final SimpleDateFormat PARTITION_FORMATTER = new SimpleDateFormat("yyyy/MM/dd"); + + public HDFSParquetImporter( + Config cfg) throws IOException { + this.cfg = cfg; + fs = FSUtils.getFs(); + } + + public static class FormatValidator implements IValueValidator { + List validFormats = Arrays.asList("parquet"); + + @Override + public void validate(String name, String value) throws ParameterException { + if (value == null || !validFormats.contains(value)) { + throw new ParameterException(String + .format("Invalid format type: value:%s: supported formats:%s", value, + validFormats)); + } + } + } + + public static class SourceTypeValidator implements IValueValidator { + List validSourceTypes = Arrays.asList("hdfs"); + + @Override + public void validate(String name, String value) throws ParameterException { + if (value == null || !validSourceTypes.contains(value)) { + throw new ParameterException(String + .format("Invalid source type: value:%s: supported source types:%s", value, + validSourceTypes)); + } + } + } + + public static class Config implements Serializable { + + @Parameter(names = {"--src-path", + "-sp"}, description = "Base path for the input dataset", required = true) + public String srcPath = null; + @Parameter(names = {"--src-type", + "-st"}, description = "Source type for the input dataset", required = true, + validateValueWith = SourceTypeValidator.class) + public String srcType = null; + @Parameter(names = {"--target-path", + "-tp"}, description = "Base path for the target hoodie dataset", required = true) + public String targetPath = null; + @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true) + public String tableName = null; + @Parameter(names = {"--table-type", "-tt"}, description = "Table type", required = true) + public String tableType = null; + @Parameter(names = {"--row-key-field", + "-rk"}, description = "Row key field name", required = true) + public String rowKey = null; + @Parameter(names = {"--partition-key-field", + "-pk"}, description = "Partition key field name", required = true) + public String partitionKey = null; + @Parameter(names = {"--parallelism", + "-pl"}, description = "Parallelism for hoodie insert", required = true) + public int parallelism = 1; + @Parameter(names = {"--schema-file", + "-sf"}, description = "path for Avro schema file", required = true) + public String schemaFile = null; + @Parameter(names = {"--format", + "-f"}, description = "Format for the input data.", required = false, + validateValueWith = FormatValidator.class) + public String format = null; + @Parameter(names = {"--spark-master", + "-ms"}, description = "Spark master", required = false) + public String sparkMaster = null; + @Parameter(names = {"--spark-memory", + "-sm"}, description = "spark memory to use", required = true) + public String sparkMemory = null; + @Parameter(names = {"--retry", + "-rt"}, description = "number of retries", required = false) + public int retry = 0; + @Parameter(names = {"--help", "-h"}, help = true) + public Boolean help = false; + } + + public static void main(String args[]) throws Exception { + final HDFSParquetImporter.Config cfg = new HDFSParquetImporter.Config(); + JCommander cmd = new JCommander(cfg, args); + if (cfg.help || args.length == 0) { + cmd.usage(); + System.exit(1); + } + HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg); + dataImporter.dataImport(dataImporter.getSparkContext(), cfg.retry); + } + + private JavaSparkContext getSparkContext() { + SparkConf sparkConf = new SparkConf().setAppName("hoodie-data-importer-" + cfg.tableName); + sparkConf.setMaster(cfg.sparkMaster); + + if (cfg.sparkMaster.startsWith("yarn")) { + sparkConf.set("spark.eventLog.overwrite", "true"); + sparkConf.set("spark.eventLog.enabled", "true"); + } + + sparkConf.set("spark.driver.maxResultSize", "2g"); + sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + sparkConf.set("spark.executor.memory", cfg.sparkMemory); + + // Configure hadoop conf + sparkConf.set("spark.hadoop.mapred.output.compress", "true"); + sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true"); + sparkConf.set("spark.hadoop.mapred.output.compression.codec", + "org.apache.hadoop.io.compress.GzipCodec"); + sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK"); + + sparkConf = HoodieWriteClient.registerClasses(sparkConf); + return new JavaSparkContext(sparkConf); + } + + private String getSchema() throws Exception { + // Read schema file. + Path p = new Path(cfg.schemaFile); + if (!fs.exists(p)) { + throw new Exception( + String.format("Could not find - %s - schema file.", cfg.schemaFile)); + } + long len = fs.getFileStatus(p).getLen(); + ByteBuffer buf = ByteBuffer.allocate((int) len); + FSDataInputStream inputStream = null; + try { + inputStream = fs.open(p); + inputStream.readFully(0, buf.array(), 0, buf.array().length); + } + finally { + if (inputStream != null) + inputStream.close(); + } + return new String(buf.array()); + } + + public int dataImport(JavaSparkContext jsc, int retry) throws Exception { + int ret = -1; + try { + // Verify that targetPath is not present. + if (fs.exists(new Path(cfg.targetPath))) { + throw new HoodieIOException( + String.format("Make sure %s is not present.", cfg.targetPath)); + } + do { + ret = dataImport(jsc); + } while (ret != 0 && retry-- > 0); + } catch (Throwable t) { + logger.error(t); + } + return ret; + } + + @VisibleForTesting + protected int dataImport(JavaSparkContext jsc) throws IOException { + try { + if (fs.exists(new Path(cfg.targetPath))) { + // cleanup target directory. + fs.delete(new Path(cfg.targetPath), true); + } + + //Get schema. + String schemaStr = getSchema(); + + // Initialize target hoodie table. + Properties properties = new Properties(); + properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, cfg.tableName); + properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, cfg.tableType); + HoodieTableMetaClient.initializePathAsHoodieDataset(fs, cfg.targetPath, properties); + + HoodieWriteClient client = createHoodieClient(jsc, cfg.targetPath, schemaStr, + cfg.parallelism); + + Job job = Job.getInstance(jsc.hadoopConfiguration()); + // To parallelize reading file status. + job.getConfiguration().set(FileInputFormat.LIST_STATUS_NUM_THREADS, "1024"); + AvroReadSupport.setAvroReadSchema(jsc.hadoopConfiguration(), + (new Schema.Parser().parse(schemaStr))); + ParquetInputFormat.setReadSupportClass(job, (AvroReadSupport.class)); + + JavaRDD> hoodieRecords = jsc + .newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, Void.class, + GenericRecord.class, job.getConfiguration()) + // To reduce large number of tasks. + .coalesce(16 * cfg.parallelism) + .map(new Function, HoodieRecord>() { + @Override + public HoodieRecord call(Tuple2 entry) + throws Exception { + GenericRecord genericRecord = entry._2(); + Object partitionField = genericRecord.get(cfg.partitionKey); + if (partitionField == null) { + throw new HoodieIOException( + "partition key is missing. :" + cfg.partitionKey); + } + Object rowField = genericRecord.get(cfg.rowKey); + if (rowField == null) { + throw new HoodieIOException( + "row field is missing. :" + cfg.rowKey); + } + long ts = (long) ((Double) partitionField * 1000l); + String partitionPath = PARTITION_FORMATTER.format(new Date(ts)); + return new HoodieRecord( + new HoodieKey((String) rowField, partitionPath), + new HoodieJsonPayload(genericRecord.toString())); + } + } + ); + // Get commit time. + String commitTime = client.startCommit(); + + JavaRDD writeResponse = client.bulkInsert(hoodieRecords, commitTime); + Accumulator errors = jsc.accumulator(0); + writeResponse.foreach(new VoidFunction() { + @Override + public void call(WriteStatus writeStatus) throws Exception { + if (writeStatus.hasErrors()) { + errors.add(1); + logger.error(String.format("Error processing records :writeStatus:%s", + writeStatus.getStat().toString())); + } + } + }); + if (errors.value() == 0) { + logger.info(String + .format("Dataset imported into hoodie dataset with %s commit time.", + commitTime)); + return 0; + } + logger.error(String.format("Import failed with %d errors.", errors.value())); + } catch (Throwable t) { + logger.error("Error occurred.", t); + } + return -1; + } + + private static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, + String schemaStr, int parallelism) throws Exception { + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withParallelism(parallelism, parallelism).withSchema(schemaStr) + .combineInput(true, true).withIndexConfig( + HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) + .build(); + return new HoodieWriteClient(jsc, config); + } +} diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHDFSParquetImporter.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHDFSParquetImporter.java new file mode 100644 index 000000000..36021c2fa --- /dev/null +++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHDFSParquetImporter.java @@ -0,0 +1,291 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.utilities; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.uber.hoodie.HoodieReadClient; +import com.uber.hoodie.HoodieWriteClient; +import com.uber.hoodie.common.HoodieTestDataGenerator; +import com.uber.hoodie.common.minicluster.HdfsTestService; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; +import com.uber.hoodie.common.util.FSUtils; +import java.io.IOException; +import java.io.Serializable; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SQLContext; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestHDFSParquetImporter implements Serializable { + private static String dfsBasePath; + private static HdfsTestService hdfsTestService; + private static MiniDFSCluster dfsCluster; + private static DistributedFileSystem dfs; + + + @BeforeClass + public static void initClass() throws Exception { + hdfsTestService = new HdfsTestService(); + dfsCluster = hdfsTestService.start(true); + + // Create a temp folder as the base path + dfs = dfsCluster.getFileSystem(); + dfsBasePath = dfs.getWorkingDirectory().toString(); + dfs.mkdirs(new Path(dfsBasePath)); + FSUtils.setFs(dfs); + } + + @AfterClass + public static void cleanupClass() throws Exception { + if (hdfsTestService != null) { + hdfsTestService.stop(); + } + FSUtils.setFs(null); + } + + /** + * Test successful data import with retries. + */ + @Test + public void testDatasetImportWithRetries() throws Exception { + JavaSparkContext jsc = null; + try { + jsc = getJavaSparkContext(); + + // Test root folder. + String basePath = (new Path(dfsBasePath, + Thread.currentThread().getStackTrace()[1].getMethodName())).toString(); + + // Hoodie root folder + Path hoodieFolder = new Path(basePath, "testTarget"); + + // Create schema file. + String schemaFile = new Path(basePath, "file.schema").toString(); + + + //Create generic records. + Path srcFolder = new Path(basePath, "testSrc"); + createRecords(srcFolder); + + HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(), + "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", + 1, schemaFile); + AtomicInteger retry = new AtomicInteger(3); + AtomicInteger fileCreated = new AtomicInteger(0); + HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg) { + @Override + protected int dataImport(JavaSparkContext jsc) throws IOException { + int ret = super.dataImport(jsc); + if (retry.decrementAndGet() == 0) { + fileCreated.incrementAndGet(); + createSchemaFile(schemaFile); + } + + return ret; + } + }; + // Schema file is not created so this operation should fail. + assertEquals(0, dataImporter.dataImport(jsc, retry.get())); + assertEquals(retry.get(), -1); + assertEquals(fileCreated.get(), 1); + + // Check if + // 1. .commit file is present + // 2. number of records in each partition == 24 + // 3. total number of partitions == 4; + boolean isCommitFilePresent = false; + Map recordCounts = new HashMap(); + RemoteIterator hoodieFiles = dfs.listFiles(hoodieFolder, true); + while (hoodieFiles.hasNext()) { + LocatedFileStatus f = hoodieFiles.next(); + isCommitFilePresent = isCommitFilePresent || f.getPath().toString().endsWith(HoodieTimeline.COMMIT_EXTENSION); + + if (f.getPath().toString().endsWith("parquet")) { + SQLContext sc = new SQLContext(jsc); + String partitionPath = f.getPath().getParent().toString(); + long count = sc.read().parquet(f.getPath().toString()).count(); + if (!recordCounts.containsKey(partitionPath)) recordCounts.put(partitionPath, 0L); + recordCounts.put(partitionPath, recordCounts.get(partitionPath) + count); + } + } + assertTrue("commit file is missing", isCommitFilePresent); + assertEquals("partition is missing", 4, recordCounts.size()); + for (Entry e : recordCounts.entrySet()) { + assertEquals( "missing records", 24, e.getValue().longValue()); + } + } finally { + if (jsc != null) { + jsc.stop(); + } + } + } + + private void createRecords(Path srcFolder) throws ParseException, IOException { + Path srcFile = new Path(srcFolder.toString(), "file1.parquet"); + long startTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000; + List records = new ArrayList(); + for (long recordNum = 0; recordNum < 96; recordNum++) { + records.add(HoodieTestDataGenerator + .generateGenericRecord(Long.toString(recordNum), "rider-" + recordNum, + "driver-" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum))); + } + ParquetWriter writer = AvroParquetWriter + .builder(srcFile) + .withSchema(HoodieTestDataGenerator.avroSchema) + .withConf(new Configuration()) + .build(); + for (GenericRecord record : records) { + writer.write(record); + } + writer.close(); + } + + private void createSchemaFile(String schemaFile) throws IOException { + FSDataOutputStream schemaFileOS = dfs.create(new Path(schemaFile)); + schemaFileOS.write(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA.getBytes()); + schemaFileOS.close(); + } + + /** + * Tests for scheme file. + * 1. File is missing. + * 2. File has invalid data. + */ + @Test + public void testSchemaFile() throws Exception { + JavaSparkContext jsc = null; + try { + jsc = getJavaSparkContext(); + + // Test root folder. + String basePath = (new Path(dfsBasePath, + Thread.currentThread().getStackTrace()[1].getMethodName())).toString(); + // Hoodie root folder + Path hoodieFolder = new Path(basePath, "testTarget"); + Path srcFolder = new Path(basePath.toString(), "srcTest"); + Path schemaFile = new Path(basePath.toString(), "missingFile.schema"); + HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(), + "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", + 1, schemaFile.toString()); + HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg); + // Should fail - return : -1. + assertEquals(-1, dataImporter.dataImport(jsc, 0)); + + dfs.create(schemaFile).write("Random invalid schema data".getBytes()); + // Should fail - return : -1. + assertEquals(-1, dataImporter.dataImport(jsc, 0)); + + } finally { + if (jsc != null) { + jsc.stop(); + } + } + } + + /** + * Test for missing rowKey and partitionKey. + */ + @Test + public void testRowAndPartitionKey() throws Exception { + JavaSparkContext jsc = null; + try { + jsc = getJavaSparkContext(); + + // Test root folder. + String basePath = (new Path(dfsBasePath, + Thread.currentThread().getStackTrace()[1].getMethodName())).toString(); + // Hoodie root folder + Path hoodieFolder = new Path(basePath, "testTarget"); + + //Create generic records. + Path srcFolder = new Path(basePath, "testSrc"); + createRecords(srcFolder); + + // Create schema file. + Path schemaFile = new Path(basePath.toString(), "missingFile.schema"); + createSchemaFile(schemaFile.toString()); + + HDFSParquetImporter dataImporter; + HDFSParquetImporter.Config cfg; + + // Check for invalid row key. + cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(), + "testTable", "COPY_ON_WRITE", "invalidRowKey", "timestamp", + 1, schemaFile.toString()); + dataImporter = new HDFSParquetImporter(cfg); + assertEquals(-1, dataImporter.dataImport(jsc, 0)); + + // Check for invalid partition key. + cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(), + "testTable", "COPY_ON_WRITE", "_row_key", "invalidTimeStamp", + 1, schemaFile.toString()); + dataImporter = new HDFSParquetImporter(cfg); + assertEquals(-1, dataImporter.dataImport(jsc, 0)); + + } finally { + if (jsc != null) { + jsc.stop(); + } + } + } + + private HDFSParquetImporter.Config getHDFSParquetImporterConfig(String srcPath, String targetPath, + String tableName, String tableType, String rowKey, String partitionKey, int parallelism, + String schemaFile) { + HDFSParquetImporter.Config cfg = new HDFSParquetImporter.Config(); + cfg.srcPath = srcPath; + cfg.targetPath = targetPath; + cfg.tableName = tableName; + cfg.tableType = tableType; + cfg.rowKey = rowKey; + cfg.partitionKey = partitionKey; + cfg.parallelism = parallelism; + cfg.schemaFile = schemaFile; + return cfg; + } + + private JavaSparkContext getJavaSparkContext() { + // Initialize a local spark env + SparkConf sparkConf = new SparkConf().setAppName("TestConversionCommand").setMaster("local[4]"); + sparkConf = HoodieWriteClient.registerClasses(sparkConf); + return new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf)); + } +} diff --git a/hoodie-utilities/src/test/resources/log4j-surefire.properties b/hoodie-utilities/src/test/resources/log4j-surefire.properties new file mode 100644 index 000000000..eab225528 --- /dev/null +++ b/hoodie-utilities/src/test/resources/log4j-surefire.properties @@ -0,0 +1,24 @@ +# +# Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +log4j.rootLogger=WARN, A1 +log4j.category.com.uber=INFO +log4j.category.org.apache.parquet.hadoop=WARN + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n diff --git a/pom.xml b/pom.xml index e45a9ccfc..4f65a9ff3 100644 --- a/pom.xml +++ b/pom.xml @@ -71,6 +71,10 @@ Siddhartha Gunda Uber + + Omkar Joshi + Uber + 2015-2016 @@ -413,6 +417,12 @@ ${spark.version} provided + + org.apache.spark + spark-sql_2.10 + ${spark.version} + provided + org.apache.hbase