1
0

tool for importing hive tables (in parquet format) into hoodie dataset (#89)

* tool for importing hive tables (in parquet format) into hoodie dataset

* review fixes

* review fixes

* review fixes
This commit is contained in:
ovj
2017-03-21 14:42:13 -07:00
committed by prazanna
parent d835710c51
commit 21898907c1
15 changed files with 842 additions and 57 deletions

View File

@@ -0,0 +1,312 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.utilities;
import com.beust.jcommander.IValueValidator;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.google.common.annotations.VisibleForTesting;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.HoodieJsonPayload;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.table.HoodieTableConfig;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.index.HoodieIndex;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
public class HDFSParquetImporter implements Serializable{
private static volatile Logger logger = LogManager.getLogger(HDFSParquetImporter.class);
private final Config cfg;
private final transient FileSystem fs;
public static final SimpleDateFormat PARTITION_FORMATTER = new SimpleDateFormat("yyyy/MM/dd");
public HDFSParquetImporter(
Config cfg) throws IOException {
this.cfg = cfg;
fs = FSUtils.getFs();
}
public static class FormatValidator implements IValueValidator<String> {
List<String> validFormats = Arrays.asList("parquet");
@Override
public void validate(String name, String value) throws ParameterException {
if (value == null || !validFormats.contains(value)) {
throw new ParameterException(String
.format("Invalid format type: value:%s: supported formats:%s", value,
validFormats));
}
}
}
public static class SourceTypeValidator implements IValueValidator<String> {
List<String> validSourceTypes = Arrays.asList("hdfs");
@Override
public void validate(String name, String value) throws ParameterException {
if (value == null || !validSourceTypes.contains(value)) {
throw new ParameterException(String
.format("Invalid source type: value:%s: supported source types:%s", value,
validSourceTypes));
}
}
}
public static class Config implements Serializable {
@Parameter(names = {"--src-path",
"-sp"}, description = "Base path for the input dataset", required = true)
public String srcPath = null;
@Parameter(names = {"--src-type",
"-st"}, description = "Source type for the input dataset", required = true,
validateValueWith = SourceTypeValidator.class)
public String srcType = null;
@Parameter(names = {"--target-path",
"-tp"}, description = "Base path for the target hoodie dataset", required = true)
public String targetPath = null;
@Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
public String tableName = null;
@Parameter(names = {"--table-type", "-tt"}, description = "Table type", required = true)
public String tableType = null;
@Parameter(names = {"--row-key-field",
"-rk"}, description = "Row key field name", required = true)
public String rowKey = null;
@Parameter(names = {"--partition-key-field",
"-pk"}, description = "Partition key field name", required = true)
public String partitionKey = null;
@Parameter(names = {"--parallelism",
"-pl"}, description = "Parallelism for hoodie insert", required = true)
public int parallelism = 1;
@Parameter(names = {"--schema-file",
"-sf"}, description = "path for Avro schema file", required = true)
public String schemaFile = null;
@Parameter(names = {"--format",
"-f"}, description = "Format for the input data.", required = false,
validateValueWith = FormatValidator.class)
public String format = null;
@Parameter(names = {"--spark-master",
"-ms"}, description = "Spark master", required = false)
public String sparkMaster = null;
@Parameter(names = {"--spark-memory",
"-sm"}, description = "spark memory to use", required = true)
public String sparkMemory = null;
@Parameter(names = {"--retry",
"-rt"}, description = "number of retries", required = false)
public int retry = 0;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
}
public static void main(String args[]) throws Exception {
final HDFSParquetImporter.Config cfg = new HDFSParquetImporter.Config();
JCommander cmd = new JCommander(cfg, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
dataImporter.dataImport(dataImporter.getSparkContext(), cfg.retry);
}
private JavaSparkContext getSparkContext() {
SparkConf sparkConf = new SparkConf().setAppName("hoodie-data-importer-" + cfg.tableName);
sparkConf.setMaster(cfg.sparkMaster);
if (cfg.sparkMaster.startsWith("yarn")) {
sparkConf.set("spark.eventLog.overwrite", "true");
sparkConf.set("spark.eventLog.enabled", "true");
}
sparkConf.set("spark.driver.maxResultSize", "2g");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.executor.memory", cfg.sparkMemory);
// Configure hadoop conf
sparkConf.set("spark.hadoop.mapred.output.compress", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec",
"org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
sparkConf = HoodieWriteClient.registerClasses(sparkConf);
return new JavaSparkContext(sparkConf);
}
private String getSchema() throws Exception {
// Read schema file.
Path p = new Path(cfg.schemaFile);
if (!fs.exists(p)) {
throw new Exception(
String.format("Could not find - %s - schema file.", cfg.schemaFile));
}
long len = fs.getFileStatus(p).getLen();
ByteBuffer buf = ByteBuffer.allocate((int) len);
FSDataInputStream inputStream = null;
try {
inputStream = fs.open(p);
inputStream.readFully(0, buf.array(), 0, buf.array().length);
}
finally {
if (inputStream != null)
inputStream.close();
}
return new String(buf.array());
}
public int dataImport(JavaSparkContext jsc, int retry) throws Exception {
int ret = -1;
try {
// Verify that targetPath is not present.
if (fs.exists(new Path(cfg.targetPath))) {
throw new HoodieIOException(
String.format("Make sure %s is not present.", cfg.targetPath));
}
do {
ret = dataImport(jsc);
} while (ret != 0 && retry-- > 0);
} catch (Throwable t) {
logger.error(t);
}
return ret;
}
@VisibleForTesting
protected int dataImport(JavaSparkContext jsc) throws IOException {
try {
if (fs.exists(new Path(cfg.targetPath))) {
// cleanup target directory.
fs.delete(new Path(cfg.targetPath), true);
}
//Get schema.
String schemaStr = getSchema();
// Initialize target hoodie table.
Properties properties = new Properties();
properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, cfg.tableName);
properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, cfg.tableType);
HoodieTableMetaClient.initializePathAsHoodieDataset(fs, cfg.targetPath, properties);
HoodieWriteClient client = createHoodieClient(jsc, cfg.targetPath, schemaStr,
cfg.parallelism);
Job job = Job.getInstance(jsc.hadoopConfiguration());
// To parallelize reading file status.
job.getConfiguration().set(FileInputFormat.LIST_STATUS_NUM_THREADS, "1024");
AvroReadSupport.setAvroReadSchema(jsc.hadoopConfiguration(),
(new Schema.Parser().parse(schemaStr)));
ParquetInputFormat.setReadSupportClass(job, (AvroReadSupport.class));
JavaRDD<HoodieRecord<HoodieJsonPayload>> hoodieRecords = jsc
.newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, Void.class,
GenericRecord.class, job.getConfiguration())
// To reduce large number of tasks.
.coalesce(16 * cfg.parallelism)
.map(new Function<Tuple2<Void, GenericRecord>, HoodieRecord<HoodieJsonPayload>>() {
@Override
public HoodieRecord<HoodieJsonPayload> call(Tuple2<Void, GenericRecord> entry)
throws Exception {
GenericRecord genericRecord = entry._2();
Object partitionField = genericRecord.get(cfg.partitionKey);
if (partitionField == null) {
throw new HoodieIOException(
"partition key is missing. :" + cfg.partitionKey);
}
Object rowField = genericRecord.get(cfg.rowKey);
if (rowField == null) {
throw new HoodieIOException(
"row field is missing. :" + cfg.rowKey);
}
long ts = (long) ((Double) partitionField * 1000l);
String partitionPath = PARTITION_FORMATTER.format(new Date(ts));
return new HoodieRecord<HoodieJsonPayload>(
new HoodieKey((String) rowField, partitionPath),
new HoodieJsonPayload(genericRecord.toString()));
}
}
);
// Get commit time.
String commitTime = client.startCommit();
JavaRDD<WriteStatus> writeResponse = client.bulkInsert(hoodieRecords, commitTime);
Accumulator<Integer> errors = jsc.accumulator(0);
writeResponse.foreach(new VoidFunction<WriteStatus>() {
@Override
public void call(WriteStatus writeStatus) throws Exception {
if (writeStatus.hasErrors()) {
errors.add(1);
logger.error(String.format("Error processing records :writeStatus:%s",
writeStatus.getStat().toString()));
}
}
});
if (errors.value() == 0) {
logger.info(String
.format("Dataset imported into hoodie dataset with %s commit time.",
commitTime));
return 0;
}
logger.error(String.format("Import failed with %d errors.", errors.value()));
} catch (Throwable t) {
logger.error("Error occurred.", t);
}
return -1;
}
private static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath,
String schemaStr, int parallelism) throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withParallelism(parallelism, parallelism).withSchema(schemaStr)
.combineInput(true, true).withIndexConfig(
HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.build();
return new HoodieWriteClient(jsc, config);
}
}

View File

@@ -0,0 +1,291 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.utilities;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import com.uber.hoodie.HoodieReadClient;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.minicluster.HdfsTestService;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.util.FSUtils;
import java.io.IOException;
import java.io.Serializable;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestHDFSParquetImporter implements Serializable {
private static String dfsBasePath;
private static HdfsTestService hdfsTestService;
private static MiniDFSCluster dfsCluster;
private static DistributedFileSystem dfs;
@BeforeClass
public static void initClass() throws Exception {
hdfsTestService = new HdfsTestService();
dfsCluster = hdfsTestService.start(true);
// Create a temp folder as the base path
dfs = dfsCluster.getFileSystem();
dfsBasePath = dfs.getWorkingDirectory().toString();
dfs.mkdirs(new Path(dfsBasePath));
FSUtils.setFs(dfs);
}
@AfterClass
public static void cleanupClass() throws Exception {
if (hdfsTestService != null) {
hdfsTestService.stop();
}
FSUtils.setFs(null);
}
/**
* Test successful data import with retries.
*/
@Test
public void testDatasetImportWithRetries() throws Exception {
JavaSparkContext jsc = null;
try {
jsc = getJavaSparkContext();
// Test root folder.
String basePath = (new Path(dfsBasePath,
Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
// Hoodie root folder
Path hoodieFolder = new Path(basePath, "testTarget");
// Create schema file.
String schemaFile = new Path(basePath, "file.schema").toString();
//Create generic records.
Path srcFolder = new Path(basePath, "testSrc");
createRecords(srcFolder);
HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
"testTable", "COPY_ON_WRITE", "_row_key", "timestamp",
1, schemaFile);
AtomicInteger retry = new AtomicInteger(3);
AtomicInteger fileCreated = new AtomicInteger(0);
HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg) {
@Override
protected int dataImport(JavaSparkContext jsc) throws IOException {
int ret = super.dataImport(jsc);
if (retry.decrementAndGet() == 0) {
fileCreated.incrementAndGet();
createSchemaFile(schemaFile);
}
return ret;
}
};
// Schema file is not created so this operation should fail.
assertEquals(0, dataImporter.dataImport(jsc, retry.get()));
assertEquals(retry.get(), -1);
assertEquals(fileCreated.get(), 1);
// Check if
// 1. .commit file is present
// 2. number of records in each partition == 24
// 3. total number of partitions == 4;
boolean isCommitFilePresent = false;
Map<String, Long> recordCounts = new HashMap<String, Long>();
RemoteIterator<LocatedFileStatus> hoodieFiles = dfs.listFiles(hoodieFolder, true);
while (hoodieFiles.hasNext()) {
LocatedFileStatus f = hoodieFiles.next();
isCommitFilePresent = isCommitFilePresent || f.getPath().toString().endsWith(HoodieTimeline.COMMIT_EXTENSION);
if (f.getPath().toString().endsWith("parquet")) {
SQLContext sc = new SQLContext(jsc);
String partitionPath = f.getPath().getParent().toString();
long count = sc.read().parquet(f.getPath().toString()).count();
if (!recordCounts.containsKey(partitionPath)) recordCounts.put(partitionPath, 0L);
recordCounts.put(partitionPath, recordCounts.get(partitionPath) + count);
}
}
assertTrue("commit file is missing", isCommitFilePresent);
assertEquals("partition is missing", 4, recordCounts.size());
for (Entry<String, Long> e : recordCounts.entrySet()) {
assertEquals( "missing records", 24, e.getValue().longValue());
}
} finally {
if (jsc != null) {
jsc.stop();
}
}
}
private void createRecords(Path srcFolder) throws ParseException, IOException {
Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
long startTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000;
List<GenericRecord> records = new ArrayList<GenericRecord>();
for (long recordNum = 0; recordNum < 96; recordNum++) {
records.add(HoodieTestDataGenerator
.generateGenericRecord(Long.toString(recordNum), "rider-" + recordNum,
"driver-" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
}
ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(srcFile)
.withSchema(HoodieTestDataGenerator.avroSchema)
.withConf(new Configuration())
.build();
for (GenericRecord record : records) {
writer.write(record);
}
writer.close();
}
private void createSchemaFile(String schemaFile) throws IOException {
FSDataOutputStream schemaFileOS = dfs.create(new Path(schemaFile));
schemaFileOS.write(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA.getBytes());
schemaFileOS.close();
}
/**
* Tests for scheme file.
* 1. File is missing.
* 2. File has invalid data.
*/
@Test
public void testSchemaFile() throws Exception {
JavaSparkContext jsc = null;
try {
jsc = getJavaSparkContext();
// Test root folder.
String basePath = (new Path(dfsBasePath,
Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
// Hoodie root folder
Path hoodieFolder = new Path(basePath, "testTarget");
Path srcFolder = new Path(basePath.toString(), "srcTest");
Path schemaFile = new Path(basePath.toString(), "missingFile.schema");
HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
"testTable", "COPY_ON_WRITE", "_row_key", "timestamp",
1, schemaFile.toString());
HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
// Should fail - return : -1.
assertEquals(-1, dataImporter.dataImport(jsc, 0));
dfs.create(schemaFile).write("Random invalid schema data".getBytes());
// Should fail - return : -1.
assertEquals(-1, dataImporter.dataImport(jsc, 0));
} finally {
if (jsc != null) {
jsc.stop();
}
}
}
/**
* Test for missing rowKey and partitionKey.
*/
@Test
public void testRowAndPartitionKey() throws Exception {
JavaSparkContext jsc = null;
try {
jsc = getJavaSparkContext();
// Test root folder.
String basePath = (new Path(dfsBasePath,
Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
// Hoodie root folder
Path hoodieFolder = new Path(basePath, "testTarget");
//Create generic records.
Path srcFolder = new Path(basePath, "testSrc");
createRecords(srcFolder);
// Create schema file.
Path schemaFile = new Path(basePath.toString(), "missingFile.schema");
createSchemaFile(schemaFile.toString());
HDFSParquetImporter dataImporter;
HDFSParquetImporter.Config cfg;
// Check for invalid row key.
cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
"testTable", "COPY_ON_WRITE", "invalidRowKey", "timestamp",
1, schemaFile.toString());
dataImporter = new HDFSParquetImporter(cfg);
assertEquals(-1, dataImporter.dataImport(jsc, 0));
// Check for invalid partition key.
cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
"testTable", "COPY_ON_WRITE", "_row_key", "invalidTimeStamp",
1, schemaFile.toString());
dataImporter = new HDFSParquetImporter(cfg);
assertEquals(-1, dataImporter.dataImport(jsc, 0));
} finally {
if (jsc != null) {
jsc.stop();
}
}
}
private HDFSParquetImporter.Config getHDFSParquetImporterConfig(String srcPath, String targetPath,
String tableName, String tableType, String rowKey, String partitionKey, int parallelism,
String schemaFile) {
HDFSParquetImporter.Config cfg = new HDFSParquetImporter.Config();
cfg.srcPath = srcPath;
cfg.targetPath = targetPath;
cfg.tableName = tableName;
cfg.tableType = tableType;
cfg.rowKey = rowKey;
cfg.partitionKey = partitionKey;
cfg.parallelism = parallelism;
cfg.schemaFile = schemaFile;
return cfg;
}
private JavaSparkContext getJavaSparkContext() {
// Initialize a local spark env
SparkConf sparkConf = new SparkConf().setAppName("TestConversionCommand").setMaster("local[4]");
sparkConf = HoodieWriteClient.registerClasses(sparkConf);
return new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf));
}
}

View File

@@ -0,0 +1,24 @@
#
# Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
log4j.rootLogger=WARN, A1
log4j.category.com.uber=INFO
log4j.category.org.apache.parquet.hadoop=WARN
# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n