Add hoodie-hive module to support hive registration of hoodie datasets
This commit is contained in:
@@ -0,0 +1,186 @@
|
||||
/*
|
||||
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.uber.hoodie.hive;
|
||||
|
||||
import com.uber.hoodie.hive.client.SchemaUtil;
|
||||
import com.uber.hoodie.hive.model.HoodieDatasetReference;
|
||||
import com.uber.hoodie.hive.model.SchemaDifference;
|
||||
import com.uber.hoodie.hive.util.TestUtil;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runners.model.InitializationError;
|
||||
import parquet.schema.MessageType;
|
||||
import parquet.schema.OriginalType;
|
||||
import parquet.schema.PrimitiveType;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class DatasetSchemaTest {
|
||||
@Before
|
||||
public void setUp() throws IOException, InterruptedException {
|
||||
TestUtil.setUp();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSchemaDiff() throws IOException, InitializationError {
|
||||
HoodieDatasetReference metadata = TestUtil
|
||||
.createDataset("test1", "/tmp/hdfs/DatasetSchemaTest/testSchema/", 5, "/nation.schema");
|
||||
HoodieHiveSchemaSyncTask schema =
|
||||
HoodieHiveSchemaSyncTask.newBuilder().withReference(metadata)
|
||||
.withConfiguration(TestUtil.hDroneConfiguration).build();
|
||||
SchemaDifference diff = schema.getSchemaDifference();
|
||||
assertEquals("There should be 4 columns to be added", 4, diff.getAddColumnTypes().size());
|
||||
assertEquals("No update columns expected", 0, diff.getUpdateColumnTypes().size());
|
||||
assertEquals("No delete columns expected", 0, diff.getDeleteColumns().size());
|
||||
schema.sync();
|
||||
|
||||
schema = HoodieHiveSchemaSyncTask.newBuilder().withReference(metadata)
|
||||
.withConfiguration(TestUtil.hDroneConfiguration).build();
|
||||
diff = schema.getSchemaDifference();
|
||||
assertEquals("After sync, there should not be any new columns to add", 0,
|
||||
diff.getAddColumnTypes().size());
|
||||
assertEquals("After sync, there should not be any new columns to update", 0,
|
||||
diff.getUpdateColumnTypes().size());
|
||||
assertEquals("After sync, there should not be any new columns to delete", 0,
|
||||
diff.getDeleteColumns().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSchemaEvolution() throws IOException, InitializationError {
|
||||
int initialPartitionsCount = 5;
|
||||
HoodieDatasetReference metadata = TestUtil
|
||||
.createDataset("test1", "/tmp/hdfs/DatasetSchemaTest/testSchema/",
|
||||
initialPartitionsCount, "/nation.schema");
|
||||
HoodieHiveSchemaSyncTask schema =
|
||||
HoodieHiveSchemaSyncTask.newBuilder().withReference(metadata)
|
||||
.withConfiguration(TestUtil.hDroneConfiguration).build();
|
||||
schema.sync();
|
||||
|
||||
schema = HoodieHiveSchemaSyncTask.newBuilder().withReference(metadata)
|
||||
.withConfiguration(TestUtil.hDroneConfiguration).build();
|
||||
SchemaDifference diff = schema.getSchemaDifference();
|
||||
assertEquals("After sync, diff should be empty", true, diff.isEmpty());
|
||||
int newSchemaversion = 2;
|
||||
int newPartitionsCount = 2;
|
||||
TestUtil.evolveDataset(metadata, newPartitionsCount, "/nation_evolved.schema",
|
||||
DateTime.now().getMillis(), newSchemaversion);
|
||||
schema = HoodieHiveSchemaSyncTask.newBuilder().withReference(metadata)
|
||||
.withConfiguration(TestUtil.hDroneConfiguration).build();
|
||||
diff = schema.getSchemaDifference();
|
||||
assertEquals("Schema has evolved, there should be a diff", false, diff.isEmpty());
|
||||
assertEquals("Schema has evolved, there should be 1 column to add", 1,
|
||||
diff.getAddColumnTypes().size());
|
||||
assertEquals("Schema has evolved, there should be 1 column to update", 1,
|
||||
diff.getUpdateColumnTypes().size());
|
||||
assertEquals(0, diff.getDeleteColumns().size());
|
||||
}
|
||||
|
||||
/**
|
||||
* Testing converting array types to Hive field declaration strings,
|
||||
* according to the Parquet-113 spec:
|
||||
* https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
|
||||
*/
|
||||
@Test
|
||||
public void testSchemaConvertArray() throws IOException {
|
||||
// Testing the 3-level annotation structure
|
||||
MessageType schema =
|
||||
parquet.schema.Types.buildMessage().optionalGroup().as(parquet.schema.OriginalType.LIST)
|
||||
.repeatedGroup().optional(PrimitiveType.PrimitiveTypeName.INT32).named("element")
|
||||
.named("list").named("int_list").named("ArrayOfInts");
|
||||
|
||||
String schemaString = SchemaUtil.generateSchemaString(schema);
|
||||
assertEquals("`int_list` ARRAY< int>", schemaString);
|
||||
|
||||
// A array of arrays
|
||||
schema =
|
||||
parquet.schema.Types.buildMessage().optionalGroup().as(parquet.schema.OriginalType.LIST)
|
||||
.repeatedGroup().requiredGroup().as(OriginalType.LIST).repeatedGroup()
|
||||
.required(PrimitiveType.PrimitiveTypeName.INT32).named("element").named("list")
|
||||
.named("element").named("list").named("int_list_list").named("ArrayOfArrayOfInts");
|
||||
|
||||
schemaString = SchemaUtil.generateSchemaString(schema);
|
||||
assertEquals("`int_list_list` ARRAY< ARRAY< int>>", schemaString);
|
||||
|
||||
// A list of integers
|
||||
schema =
|
||||
parquet.schema.Types.buildMessage().optionalGroup().as(parquet.schema.OriginalType.LIST)
|
||||
.repeated(PrimitiveType.PrimitiveTypeName.INT32).named("element").named("int_list")
|
||||
.named("ArrayOfInts");
|
||||
|
||||
schemaString = SchemaUtil.generateSchemaString(schema);
|
||||
assertEquals("`int_list` ARRAY< int>", schemaString);
|
||||
|
||||
// A list of structs with two fields
|
||||
schema =
|
||||
parquet.schema.Types.buildMessage().optionalGroup().as(parquet.schema.OriginalType.LIST)
|
||||
.repeatedGroup().required(PrimitiveType.PrimitiveTypeName.BINARY).named("str")
|
||||
.required(PrimitiveType.PrimitiveTypeName.INT32).named("num").named("element")
|
||||
.named("tuple_list").named("ArrayOfTuples");
|
||||
|
||||
schemaString = SchemaUtil.generateSchemaString(schema);
|
||||
assertEquals("`tuple_list` ARRAY< STRUCT< `str` : binary, `num` : int>>", schemaString);
|
||||
|
||||
// A list of structs with a single field
|
||||
// For this case, since the inner group name is "array", we treat the
|
||||
// element type as a one-element struct.
|
||||
schema =
|
||||
parquet.schema.Types.buildMessage().optionalGroup().as(parquet.schema.OriginalType.LIST)
|
||||
.repeatedGroup().required(PrimitiveType.PrimitiveTypeName.BINARY).named("str")
|
||||
.named("array").named("one_tuple_list").named("ArrayOfOneTuples");
|
||||
|
||||
schemaString = SchemaUtil.generateSchemaString(schema);
|
||||
assertEquals("`one_tuple_list` ARRAY< STRUCT< `str` : binary>>", schemaString);
|
||||
|
||||
// A list of structs with a single field
|
||||
// For this case, since the inner group name ends with "_tuple", we also treat the
|
||||
// element type as a one-element struct.
|
||||
schema =
|
||||
parquet.schema.Types.buildMessage().optionalGroup().as(parquet.schema.OriginalType.LIST)
|
||||
.repeatedGroup().required(PrimitiveType.PrimitiveTypeName.BINARY).named("str")
|
||||
.named("one_tuple_list_tuple").named("one_tuple_list").named("ArrayOfOneTuples2");
|
||||
|
||||
schemaString = SchemaUtil.generateSchemaString(schema);
|
||||
assertEquals("`one_tuple_list` ARRAY< STRUCT< `str` : binary>>", schemaString);
|
||||
|
||||
// A list of structs with a single field
|
||||
// Unlike the above two cases, for this the element type is the type of the
|
||||
// only field in the struct.
|
||||
schema =
|
||||
parquet.schema.Types.buildMessage().optionalGroup().as(parquet.schema.OriginalType.LIST)
|
||||
.repeatedGroup().required(PrimitiveType.PrimitiveTypeName.BINARY).named("str")
|
||||
.named("one_tuple_list").named("one_tuple_list").named("ArrayOfOneTuples3");
|
||||
|
||||
schemaString = SchemaUtil.generateSchemaString(schema);
|
||||
assertEquals("`one_tuple_list` ARRAY< binary>", schemaString);
|
||||
|
||||
// A list of maps
|
||||
schema =
|
||||
parquet.schema.Types.buildMessage().optionalGroup().as(parquet.schema.OriginalType.LIST)
|
||||
.repeatedGroup().as(OriginalType.MAP).repeatedGroup().as(OriginalType.MAP_KEY_VALUE)
|
||||
.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8)
|
||||
.named("string_key").required(PrimitiveType.PrimitiveTypeName.INT32)
|
||||
.named("int_value").named("key_value").named("array").named("map_list")
|
||||
.named("ArrayOfMaps");
|
||||
|
||||
schemaString = SchemaUtil.generateSchemaString(schema);
|
||||
assertEquals("`map_list` ARRAY< MAP< string, int>>", schemaString);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,98 @@
|
||||
/*
|
||||
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.uber.hoodie.hive;
|
||||
|
||||
import com.uber.hoodie.hive.client.HoodieHiveClient;
|
||||
import com.uber.hoodie.hive.model.HoodieDatasetReference;
|
||||
import com.uber.hoodie.hive.util.TestUtil;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runners.model.InitializationError;
|
||||
import parquet.schema.MessageType;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class HDroneDatasetTest {
|
||||
private HoodieHiveClient hiveClient;
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException, InterruptedException {
|
||||
TestUtil.setUp();
|
||||
hiveClient = new HoodieHiveClient(TestUtil.hDroneConfiguration);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDatasetCreation() throws IOException, InitializationError {
|
||||
HoodieDatasetReference metadata = TestUtil
|
||||
.createDataset("test1", "/tmp/hdfs/DatasetSchemaTest/testSchema/", 5, "/nation.schema");
|
||||
HoodieHiveDatasetSyncTask dataset =
|
||||
HoodieHiveDatasetSyncTask.newBuilder().withReference(metadata)
|
||||
.withConfiguration(TestUtil.hDroneConfiguration).build();
|
||||
assertEquals("There should be 5 new partitions", 5, dataset.getNewPartitions().size());
|
||||
assertEquals("There should not be any changed partitions", 0,
|
||||
dataset.getChangedPartitions().size());
|
||||
assertFalse("Table should not exist", hiveClient.checkTableExists(metadata));
|
||||
dataset.sync();
|
||||
|
||||
dataset = HoodieHiveDatasetSyncTask.newBuilder().withReference(metadata)
|
||||
.withConfiguration(TestUtil.hDroneConfiguration).build();
|
||||
assertTrue("Table should exist after sync", hiveClient.checkTableExists(metadata));
|
||||
assertEquals("After sync, There should not be any new partitions to sync", 0,
|
||||
dataset.getNewPartitions().size());
|
||||
assertEquals("After sync, There should not be any modified partitions to sync", 0,
|
||||
dataset.getChangedPartitions().size());
|
||||
|
||||
assertEquals("Table Schema should have 5 fields", 5,
|
||||
hiveClient.getTableSchema(metadata).size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDatasetEvolution() throws IOException, InitializationError {
|
||||
int initialPartitionsCount = 5;
|
||||
HoodieDatasetReference metadata = TestUtil
|
||||
.createDataset("test1", "/tmp/hdfs/DatasetSchemaTest/testSchema/",
|
||||
initialPartitionsCount, "/nation.schema");
|
||||
HoodieHiveDatasetSyncTask dataset =
|
||||
HoodieHiveDatasetSyncTask.newBuilder().withReference(metadata)
|
||||
.withConfiguration(TestUtil.hDroneConfiguration).build();
|
||||
dataset.sync();
|
||||
|
||||
dataset = HoodieHiveDatasetSyncTask.newBuilder(dataset).build();
|
||||
int newSchemaversion = 2;
|
||||
int newPartitionsCount = 2;
|
||||
TestUtil.evolveDataset(metadata, newPartitionsCount, "/nation_evolved.schema",
|
||||
DateTime.now().getMillis(), newSchemaversion);
|
||||
dataset = HoodieHiveDatasetSyncTask.newBuilder(dataset).build();
|
||||
assertEquals("There should be " + newPartitionsCount + " partitions to be added",
|
||||
newPartitionsCount, dataset.getNewPartitions().size());
|
||||
dataset.sync();
|
||||
|
||||
dataset = HoodieHiveDatasetSyncTask.newBuilder(dataset).build();
|
||||
MessageType newDatasetSchema = dataset.getSchemaSyncTask().getStorageSchema();
|
||||
MessageType expectedSchema = TestUtil.readSchema("/nation_evolved.schema");
|
||||
assertEquals("Table schema should be evolved schema", expectedSchema, newDatasetSchema);
|
||||
assertEquals("Table schema should have 6 fields", 6,
|
||||
hiveClient.getTableSchema(metadata).size());
|
||||
assertEquals("", "BIGINT", hiveClient.getTableSchema(metadata).get("region_key"));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,44 @@
|
||||
/*
|
||||
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.uber.hoodie.hive.util;
|
||||
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import parquet.hadoop.ParquetWriter;
|
||||
import parquet.hadoop.metadata.CompressionCodecName;
|
||||
import parquet.schema.MessageType;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
public class CsvParquetWriter extends ParquetWriter<List<String>> {
|
||||
|
||||
public CsvParquetWriter(Path file, MessageType schema) throws IOException {
|
||||
this(file, schema, false);
|
||||
}
|
||||
|
||||
public CsvParquetWriter(Path file, MessageType schema, boolean enableDictionary)
|
||||
throws IOException {
|
||||
this(file, schema, CompressionCodecName.UNCOMPRESSED, enableDictionary);
|
||||
}
|
||||
|
||||
public CsvParquetWriter(Path file, MessageType schema, CompressionCodecName codecName,
|
||||
boolean enableDictionary) throws IOException {
|
||||
super(file, new CsvWriteSupport(schema), codecName,
|
||||
DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE, enableDictionary, false);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,94 @@
|
||||
/*
|
||||
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.uber.hoodie.hive.util;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import parquet.column.ColumnDescriptor;
|
||||
import parquet.hadoop.api.WriteSupport;
|
||||
import parquet.io.ParquetEncodingException;
|
||||
import parquet.io.api.Binary;
|
||||
import parquet.io.api.RecordConsumer;
|
||||
import parquet.schema.MessageType;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
public class CsvWriteSupport extends WriteSupport<List<String>> {
|
||||
MessageType schema;
|
||||
RecordConsumer recordConsumer;
|
||||
List<ColumnDescriptor> cols;
|
||||
|
||||
// TODO: support specifying encodings and compression
|
||||
public CsvWriteSupport(MessageType schema) {
|
||||
this.schema = schema;
|
||||
this.cols = schema.getColumns();
|
||||
}
|
||||
|
||||
@Override public WriteContext init(Configuration config) {
|
||||
return new WriteContext(schema, new HashMap<String, String>());
|
||||
}
|
||||
|
||||
@Override public void prepareForWrite(RecordConsumer r) {
|
||||
recordConsumer = r;
|
||||
}
|
||||
|
||||
@Override public void write(List<String> values) {
|
||||
if (values.size() != cols.size()) {
|
||||
throw new ParquetEncodingException("Invalid input data. Expecting " +
|
||||
cols.size() + " columns. Input had " + values.size() + " columns (" + cols + ") : "
|
||||
+ values);
|
||||
}
|
||||
|
||||
recordConsumer.startMessage();
|
||||
for (int i = 0; i < cols.size(); ++i) {
|
||||
String val = values.get(i);
|
||||
// val.length() == 0 indicates a NULL value.
|
||||
if (val.length() > 0) {
|
||||
recordConsumer.startField(cols.get(i).getPath()[0], i);
|
||||
switch (cols.get(i).getType()) {
|
||||
case BOOLEAN:
|
||||
recordConsumer.addBoolean(Boolean.parseBoolean(val));
|
||||
break;
|
||||
case FLOAT:
|
||||
recordConsumer.addFloat(Float.parseFloat(val));
|
||||
break;
|
||||
case DOUBLE:
|
||||
recordConsumer.addDouble(Double.parseDouble(val));
|
||||
break;
|
||||
case INT32:
|
||||
recordConsumer.addInteger(Integer.parseInt(val));
|
||||
break;
|
||||
case INT64:
|
||||
recordConsumer.addLong(Long.parseLong(val));
|
||||
break;
|
||||
case BINARY:
|
||||
recordConsumer.addBinary(stringToBinary(val));
|
||||
break;
|
||||
default:
|
||||
throw new ParquetEncodingException(
|
||||
"Unsupported column type: " + cols.get(i).getType());
|
||||
}
|
||||
recordConsumer.endField(cols.get(i).getPath()[0], i);
|
||||
}
|
||||
}
|
||||
recordConsumer.endMessage();
|
||||
}
|
||||
|
||||
private Binary stringToBinary(Object value) {
|
||||
return Binary.fromString(value.toString());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,166 @@
|
||||
/*
|
||||
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.uber.hoodie.hive.util;
|
||||
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.io.Files;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* An HDFS minicluster service implementation.
|
||||
*/
|
||||
public class HdfsTestService {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(HdfsTestService.class);
|
||||
|
||||
/**
|
||||
* Configuration settings
|
||||
*/
|
||||
private Configuration hadoopConf;
|
||||
private String workDir;
|
||||
private String bindIP = "127.0.0.1";
|
||||
private int namenodeRpcPort = 8020;
|
||||
private int namenodeHttpPort = 50070;
|
||||
private int datanodePort = 50010;
|
||||
private int datanodeIpcPort = 50020;
|
||||
private int datanodeHttpPort = 50075;
|
||||
|
||||
/**
|
||||
* Embedded HDFS cluster
|
||||
*/
|
||||
private MiniDFSCluster miniDfsCluster;
|
||||
|
||||
public HdfsTestService() {
|
||||
hadoopConf = new Configuration();
|
||||
workDir = Files.createTempDir().getAbsolutePath();
|
||||
}
|
||||
|
||||
public Configuration getHadoopConf() {
|
||||
return hadoopConf;
|
||||
}
|
||||
|
||||
public MiniDFSCluster start(boolean format) throws IOException {
|
||||
Preconditions
|
||||
.checkState(workDir != null, "The work dir must be set before starting cluster.");
|
||||
|
||||
if (hadoopConf == null) {
|
||||
hadoopConf = new Configuration();
|
||||
}
|
||||
|
||||
// If clean, then remove the work dir so we can start fresh.
|
||||
String localDFSLocation = getDFSLocation(workDir);
|
||||
if (format) {
|
||||
logger.info(
|
||||
"Cleaning HDFS cluster data at: " + localDFSLocation + " and starting fresh.");
|
||||
File file = new File(localDFSLocation);
|
||||
FileUtils.deleteDirectory(file);
|
||||
}
|
||||
|
||||
// Configure and start the HDFS cluster
|
||||
// boolean format = shouldFormatDFSCluster(localDFSLocation, clean);
|
||||
hadoopConf = configureDFSCluster(hadoopConf, localDFSLocation, bindIP, namenodeRpcPort,
|
||||
namenodeHttpPort, datanodePort, datanodeIpcPort, datanodeHttpPort);
|
||||
miniDfsCluster = new MiniDFSCluster.Builder(hadoopConf).numDataNodes(1).format(format)
|
||||
.checkDataNodeAddrConfig(true).checkDataNodeHostConfig(true).build();
|
||||
logger.info("HDFS Minicluster service started.");
|
||||
return miniDfsCluster;
|
||||
}
|
||||
|
||||
public void stop() throws IOException {
|
||||
miniDfsCluster.shutdown();
|
||||
logger.info("HDFS Minicluster service shut down.");
|
||||
miniDfsCluster = null;
|
||||
hadoopConf = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the location on the local FS where we store the HDFS data.
|
||||
*
|
||||
* @param baseFsLocation The base location on the local filesystem we have write access to
|
||||
* create dirs.
|
||||
* @return The location for HDFS data.
|
||||
*/
|
||||
private static String getDFSLocation(String baseFsLocation) {
|
||||
return baseFsLocation + Path.SEPARATOR + "dfs";
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if we should format the DFS Cluster. We'll format if clean is
|
||||
* true, or if the dfsFsLocation does not exist.
|
||||
*
|
||||
* @param localDFSLocation The location on the local FS to hold the HDFS metadata and block
|
||||
* data
|
||||
* @param clean Specifies if we want to start a clean cluster
|
||||
* @return Returns true if we should format a DFSCluster, otherwise false
|
||||
*/
|
||||
private static boolean shouldFormatDFSCluster(String localDFSLocation, boolean clean) {
|
||||
boolean format = true;
|
||||
File f = new File(localDFSLocation);
|
||||
if (f.exists() && f.isDirectory() && !clean) {
|
||||
format = false;
|
||||
}
|
||||
return format;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure the DFS Cluster before launching it.
|
||||
*
|
||||
* @param config The already created Hadoop configuration we'll further configure
|
||||
* for HDFS
|
||||
* @param localDFSLocation The location on the local filesystem where cluster data is stored
|
||||
* @param bindIP An IP address we want to force the datanode and namenode to bind
|
||||
* to.
|
||||
* @param namenodeRpcPort
|
||||
* @param namenodeHttpPort
|
||||
* @param datanodePort
|
||||
* @param datanodeIpcPort
|
||||
* @param datanodeHttpPort
|
||||
* @return The updated Configuration object.
|
||||
*/
|
||||
private static Configuration configureDFSCluster(Configuration config, String localDFSLocation,
|
||||
String bindIP, int namenodeRpcPort, int namenodeHttpPort, int datanodePort,
|
||||
int datanodeIpcPort, int datanodeHttpPort) {
|
||||
|
||||
logger.info("HDFS force binding to ip: " + bindIP);
|
||||
config.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + bindIP + ":" + namenodeRpcPort);
|
||||
config.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, bindIP + ":" + datanodePort);
|
||||
config.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, bindIP + ":" + datanodeIpcPort);
|
||||
config.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, bindIP + ":" + datanodeHttpPort);
|
||||
// When a datanode registers with the namenode, the Namenode do a hostname
|
||||
// check of the datanode which will fail on OpenShift due to reverse DNS
|
||||
// issues with the internal IP addresses. This config disables that check,
|
||||
// and will allow a datanode to connect regardless.
|
||||
config.setBoolean("dfs.namenode.datanode.registration.ip-hostname-check", false);
|
||||
config.set("hdfs.minidfs.basedir", localDFSLocation);
|
||||
// allow current user to impersonate others
|
||||
String user = System.getProperty("user.name");
|
||||
config.set("hadoop.proxyuser." + user + ".groups", "*");
|
||||
config.set("hadoop.proxyuser." + user + ".hosts", "*");
|
||||
return config;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,322 @@
|
||||
/*
|
||||
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.uber.hoodie.hive.util;
|
||||
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.io.Files;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hive.conf.HiveConf;
|
||||
import org.apache.hadoop.hive.metastore.HiveMetaStore;
|
||||
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
|
||||
import org.apache.hadoop.hive.metastore.IHMSHandler;
|
||||
import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor;
|
||||
import org.apache.hadoop.hive.metastore.TUGIBasedProcessor;
|
||||
import org.apache.hadoop.hive.metastore.api.MetaException;
|
||||
import org.apache.hadoop.hive.thrift.TUGIContainingTransport;
|
||||
import org.apache.hive.service.server.HiveServer2;
|
||||
import org.apache.thrift.TProcessor;
|
||||
import org.apache.thrift.protocol.TBinaryProtocol;
|
||||
import org.apache.thrift.server.TServer;
|
||||
import org.apache.thrift.server.TThreadPoolServer;
|
||||
import org.apache.thrift.transport.TFramedTransport;
|
||||
import org.apache.thrift.transport.TServerSocket;
|
||||
import org.apache.thrift.transport.TServerTransport;
|
||||
import org.apache.thrift.transport.TSocket;
|
||||
import org.apache.thrift.transport.TTransport;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
import org.apache.thrift.transport.TTransportFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
public class HiveTestService {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(HiveTestService.class);
|
||||
|
||||
private static final int CONNECTION_TIMEOUT = 30000;
|
||||
|
||||
/**
|
||||
* Configuration settings
|
||||
*/
|
||||
private Configuration hadoopConf;
|
||||
private String workDir;
|
||||
private String bindIP = "127.0.0.1";
|
||||
private int metastorePort = 9083;
|
||||
private int serverPort = 9999;
|
||||
private boolean clean = true;
|
||||
|
||||
private Map<String, String> sysProps = Maps.newHashMap();
|
||||
private ExecutorService executorService;
|
||||
private TServer tServer;
|
||||
private HiveServer2 hiveServer;
|
||||
|
||||
public HiveTestService(Configuration configuration) {
|
||||
this.workDir = Files.createTempDir().getAbsolutePath();
|
||||
}
|
||||
|
||||
public Configuration getHadoopConf() {
|
||||
return hadoopConf;
|
||||
}
|
||||
|
||||
public HiveServer2 start() throws IOException {
|
||||
Preconditions
|
||||
.checkState(workDir != null, "The work dir must be set before starting cluster.");
|
||||
|
||||
if (hadoopConf == null) {
|
||||
hadoopConf = new Configuration();
|
||||
}
|
||||
|
||||
String localHiveLocation = getHiveLocation(workDir);
|
||||
if (clean) {
|
||||
LOG.info(
|
||||
"Cleaning Hive cluster data at: " + localHiveLocation + " and starting fresh.");
|
||||
File file = new File(localHiveLocation);
|
||||
FileUtils.deleteDirectory(file);
|
||||
}
|
||||
|
||||
HiveConf serverConf = configureHive(hadoopConf, localHiveLocation);
|
||||
|
||||
executorService = Executors.newSingleThreadExecutor();
|
||||
tServer = startMetaStore(bindIP, metastorePort, serverConf);
|
||||
|
||||
hiveServer = startHiveServer(serverConf);
|
||||
|
||||
String serverHostname;
|
||||
if (bindIP.equals("0.0.0.0")) {
|
||||
serverHostname = "localhost";
|
||||
} else {
|
||||
serverHostname = bindIP;
|
||||
}
|
||||
if (!waitForServerUp(serverConf, serverHostname, metastorePort, CONNECTION_TIMEOUT)) {
|
||||
throw new IOException("Waiting for startup of standalone server");
|
||||
}
|
||||
|
||||
LOG.info("Hive Minicluster service started.");
|
||||
return hiveServer;
|
||||
}
|
||||
|
||||
public void stop() throws IOException {
|
||||
resetSystemProperties();
|
||||
if (tServer != null) {
|
||||
tServer.stop();
|
||||
}
|
||||
if (hiveServer != null) {
|
||||
hiveServer.stop();
|
||||
}
|
||||
LOG.info("Hive Minicluster service shut down.");
|
||||
tServer = null;
|
||||
hiveServer = null;
|
||||
hadoopConf = null;
|
||||
}
|
||||
|
||||
private HiveConf configureHive(Configuration conf, String localHiveLocation)
|
||||
throws IOException {
|
||||
conf.set("hive.metastore.local", "false");
|
||||
conf.set(HiveConf.ConfVars.METASTOREURIS.varname,
|
||||
"thrift://" + bindIP + ":" + metastorePort);
|
||||
conf.set(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, bindIP);
|
||||
conf.setInt(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, serverPort);
|
||||
// The following line to turn of SASL has no effect since HiveAuthFactory calls
|
||||
// 'new HiveConf()'. This is fixed by https://issues.apache.org/jira/browse/HIVE-6657,
|
||||
// in Hive 0.14.
|
||||
// As a workaround, the property is set in hive-site.xml in this module.
|
||||
//conf.set(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname, "NOSASL");
|
||||
File localHiveDir = new File(localHiveLocation);
|
||||
localHiveDir.mkdirs();
|
||||
File metastoreDbDir = new File(localHiveDir, "metastore_db");
|
||||
conf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname,
|
||||
"jdbc:derby:" + metastoreDbDir.getPath() + ";create=true");
|
||||
File derbyLogFile = new File(localHiveDir, "derby.log");
|
||||
derbyLogFile.createNewFile();
|
||||
setSystemProperty("derby.stream.error.file", derbyLogFile.getPath());
|
||||
conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
|
||||
Files.createTempDir().getAbsolutePath());
|
||||
|
||||
return new HiveConf(conf, this.getClass());
|
||||
}
|
||||
|
||||
private boolean waitForServerUp(HiveConf serverConf, String hostname, int port, int timeout) {
|
||||
long start = System.currentTimeMillis();
|
||||
while (true) {
|
||||
try {
|
||||
new HiveMetaStoreClient(serverConf);
|
||||
return true;
|
||||
} catch (MetaException e) {
|
||||
// ignore as this is expected
|
||||
LOG.info("server " + hostname + ":" + port + " not up " + e);
|
||||
}
|
||||
|
||||
if (System.currentTimeMillis() > start + timeout) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(250);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void setSystemProperty(String name, String value) {
|
||||
if (!sysProps.containsKey(name)) {
|
||||
String currentValue = System.getProperty(name);
|
||||
sysProps.put(name, currentValue);
|
||||
}
|
||||
if (value != null) {
|
||||
System.setProperty(name, value);
|
||||
} else {
|
||||
System.getProperties().remove(name);
|
||||
}
|
||||
}
|
||||
|
||||
private void resetSystemProperties() {
|
||||
for (Map.Entry<String, String> entry : sysProps.entrySet()) {
|
||||
if (entry.getValue() != null) {
|
||||
System.setProperty(entry.getKey(), entry.getValue());
|
||||
} else {
|
||||
System.getProperties().remove(entry.getKey());
|
||||
}
|
||||
}
|
||||
sysProps.clear();
|
||||
}
|
||||
|
||||
private static String getHiveLocation(String baseLocation) {
|
||||
return baseLocation + Path.SEPARATOR + "hive";
|
||||
}
|
||||
|
||||
private HiveServer2 startHiveServer(HiveConf serverConf) {
|
||||
HiveServer2 hiveServer = new HiveServer2();
|
||||
hiveServer.init(serverConf);
|
||||
hiveServer.start();
|
||||
return hiveServer;
|
||||
}
|
||||
|
||||
// XXX: From org.apache.hadoop.hive.metastore.HiveMetaStore,
|
||||
// with changes to support binding to a specified IP address (not only 0.0.0.0)
|
||||
|
||||
|
||||
private static final class ChainedTTransportFactory extends TTransportFactory {
|
||||
private final TTransportFactory parentTransFactory;
|
||||
private final TTransportFactory childTransFactory;
|
||||
|
||||
private ChainedTTransportFactory(TTransportFactory parentTransFactory,
|
||||
TTransportFactory childTransFactory) {
|
||||
this.parentTransFactory = parentTransFactory;
|
||||
this.childTransFactory = childTransFactory;
|
||||
}
|
||||
|
||||
@Override public TTransport getTransport(TTransport trans) {
|
||||
return childTransFactory.getTransport(parentTransFactory.getTransport(trans));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static final class TServerSocketKeepAlive extends TServerSocket {
|
||||
public TServerSocketKeepAlive(int port) throws TTransportException {
|
||||
super(port, 0);
|
||||
}
|
||||
|
||||
public TServerSocketKeepAlive(InetSocketAddress address) throws TTransportException {
|
||||
super(address, 0);
|
||||
}
|
||||
|
||||
@Override protected TSocket acceptImpl() throws TTransportException {
|
||||
TSocket ts = super.acceptImpl();
|
||||
try {
|
||||
ts.getSocket().setKeepAlive(true);
|
||||
} catch (SocketException e) {
|
||||
throw new TTransportException(e);
|
||||
}
|
||||
return ts;
|
||||
}
|
||||
}
|
||||
|
||||
public TServer startMetaStore(String forceBindIP, int port, HiveConf conf) throws IOException {
|
||||
try {
|
||||
// Server will create new threads up to max as necessary. After an idle
|
||||
// period, it will destory threads to keep the number of threads in the
|
||||
// pool to min.
|
||||
int minWorkerThreads = conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMINTHREADS);
|
||||
int maxWorkerThreads = conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMAXTHREADS);
|
||||
boolean tcpKeepAlive = conf.getBoolVar(HiveConf.ConfVars.METASTORE_TCP_KEEP_ALIVE);
|
||||
boolean useFramedTransport =
|
||||
conf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT);
|
||||
|
||||
// don't support SASL yet
|
||||
//boolean useSasl = conf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL);
|
||||
|
||||
TServerTransport serverTransport;
|
||||
if (forceBindIP != null) {
|
||||
InetSocketAddress address = new InetSocketAddress(forceBindIP, port);
|
||||
serverTransport =
|
||||
tcpKeepAlive ? new TServerSocketKeepAlive(address) : new TServerSocket(address);
|
||||
|
||||
} else {
|
||||
serverTransport =
|
||||
tcpKeepAlive ? new TServerSocketKeepAlive(port) : new TServerSocket(port);
|
||||
}
|
||||
|
||||
TProcessor processor;
|
||||
TTransportFactory transFactory;
|
||||
|
||||
IHMSHandler handler = (IHMSHandler) HiveMetaStore
|
||||
.newRetryingHMSHandler("new db based metaserver", conf, true);
|
||||
|
||||
if (conf.getBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI)) {
|
||||
transFactory = useFramedTransport ?
|
||||
new ChainedTTransportFactory(new TFramedTransport.Factory(),
|
||||
new TUGIContainingTransport.Factory()) :
|
||||
new TUGIContainingTransport.Factory();
|
||||
|
||||
processor = new TUGIBasedProcessor<IHMSHandler>(handler);
|
||||
LOG.info("Starting DB backed MetaStore Server with SetUGI enabled");
|
||||
} else {
|
||||
transFactory =
|
||||
useFramedTransport ? new TFramedTransport.Factory() : new TTransportFactory();
|
||||
processor = new TSetIpAddressProcessor<IHMSHandler>(handler);
|
||||
LOG.info("Starting DB backed MetaStore Server");
|
||||
}
|
||||
|
||||
TThreadPoolServer.Args args =
|
||||
new TThreadPoolServer.Args(serverTransport).processor(processor)
|
||||
.transportFactory(transFactory).protocolFactory(new TBinaryProtocol.Factory())
|
||||
.minWorkerThreads(minWorkerThreads).maxWorkerThreads(maxWorkerThreads);
|
||||
|
||||
final TServer tServer = new TThreadPoolServer(args);
|
||||
executorService.submit(new Runnable() {
|
||||
@Override public void run() {
|
||||
tServer.serve();
|
||||
}
|
||||
});
|
||||
return tServer;
|
||||
} catch (Throwable x) {
|
||||
throw new IOException(x);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,199 @@
|
||||
/*
|
||||
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.uber.hoodie.hive.util;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import com.uber.hoodie.hive.HoodieHiveConfiguration;
|
||||
import com.uber.hoodie.hive.client.HoodieHiveClient;
|
||||
import com.uber.hoodie.hive.model.HoodieDatasetReference;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hive.service.server.HiveServer2;
|
||||
import org.apache.zookeeper.server.ZooKeeperServer;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.format.DateTimeFormat;
|
||||
import org.joda.time.format.DateTimeFormatter;
|
||||
import org.junit.runners.model.InitializationError;
|
||||
import parquet.schema.MessageType;
|
||||
import parquet.schema.MessageTypeParser;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.util.Arrays;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public class TestUtil {
|
||||
private static MiniDFSCluster dfsCluster;
|
||||
private static ZooKeeperServer zkServer;
|
||||
private static HiveServer2 hiveServer;
|
||||
public static Configuration configuration;
|
||||
public static HoodieHiveConfiguration hDroneConfiguration;
|
||||
private static DateTimeFormatter dtfOut;
|
||||
public static final String CSV_DELIMITER = "|";
|
||||
private static FileSystem fileSystem;
|
||||
private static Set<String> createdTablesSet = Sets.newHashSet();
|
||||
|
||||
public static void setUp() throws IOException, InterruptedException {
|
||||
if (dfsCluster == null) {
|
||||
HdfsTestService service = new HdfsTestService();
|
||||
dfsCluster = service.start(true);
|
||||
configuration = service.getHadoopConf();
|
||||
}
|
||||
if (zkServer == null) {
|
||||
ZookeeperTestService zkService = new ZookeeperTestService(configuration);
|
||||
zkServer = zkService.start();
|
||||
}
|
||||
if (hiveServer == null) {
|
||||
HiveTestService hiveService = new HiveTestService(configuration);
|
||||
hiveServer = hiveService.start();
|
||||
}
|
||||
hDroneConfiguration =
|
||||
HoodieHiveConfiguration.newBuilder().hiveJdbcUrl("jdbc:hive2://127.0.0.1:9999/")
|
||||
.hivedb("hdrone_test").jdbcUsername("").jdbcPassword("")
|
||||
.hadoopConfiguration(hiveServer.getHiveConf()).build();
|
||||
dtfOut = DateTimeFormat.forPattern("yyyy/MM/dd");
|
||||
|
||||
HoodieHiveClient client = new HoodieHiveClient(hDroneConfiguration);
|
||||
for (String tableName : createdTablesSet) {
|
||||
client.updateHiveSQL("drop table if exists " + tableName);
|
||||
}
|
||||
createdTablesSet.clear();
|
||||
client.updateHiveSQL(
|
||||
"drop database if exists " + hDroneConfiguration.getDbName());
|
||||
client.updateHiveSQL("create database " + hDroneConfiguration.getDbName());
|
||||
|
||||
fileSystem = FileSystem.get(configuration);
|
||||
}
|
||||
|
||||
public static void shutdown() {
|
||||
if (hiveServer != null) {
|
||||
hiveServer.stop();
|
||||
}
|
||||
if (dfsCluster != null) {
|
||||
dfsCluster.shutdown();
|
||||
}
|
||||
if (zkServer != null) {
|
||||
zkServer.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
public static HoodieDatasetReference createDataset(String tableName, String hdfsPath, int numberOfPartitions,
|
||||
String schemaFile) throws IOException, InitializationError {
|
||||
Path path = new Path(hdfsPath);
|
||||
FileUtils.deleteDirectory(new File(hdfsPath));
|
||||
|
||||
boolean result = fileSystem.mkdirs(path);
|
||||
checkResult(result);
|
||||
HoodieDatasetReference metadata =
|
||||
new HoodieDatasetReference(tableName, path.toString(),
|
||||
hDroneConfiguration.getDbName());
|
||||
DateTime dateTime = DateTime.now();
|
||||
createPartitions(metadata, numberOfPartitions, schemaFile, dateTime, 1);
|
||||
createdTablesSet.add(metadata.getDatabaseTableName());
|
||||
return metadata;
|
||||
}
|
||||
|
||||
private static void createPartitions(HoodieDatasetReference metadata, int numberOfPartitions,
|
||||
String schemaFile, DateTime startFrom, int schemaVersion) throws IOException {
|
||||
startFrom = startFrom.withTimeAtStartOfDay();
|
||||
|
||||
for (int i = 0; i < numberOfPartitions; i++) {
|
||||
Path partPath = new Path(metadata.getBaseDatasetPath() + "/" + dtfOut.print(startFrom));
|
||||
fileSystem.makeQualified(partPath);
|
||||
fileSystem.mkdirs(partPath);
|
||||
createTestData(partPath, schemaFile, schemaVersion);
|
||||
startFrom = startFrom.minusDays(1);
|
||||
}
|
||||
}
|
||||
|
||||
private static void createTestData(Path partPath, String schemaFile, int schemaVersion)
|
||||
throws IOException {
|
||||
for (int i = 0; i < 5; i++) {
|
||||
// Create 5 files
|
||||
Path filePath =
|
||||
new Path(partPath.toString() + "/" + getParquetFilePath(schemaVersion, i));
|
||||
generateParquetData(filePath, schemaFile);
|
||||
}
|
||||
}
|
||||
|
||||
private static String getParquetFilePath(int version, int iteration) {
|
||||
return "test.topic.name@sjc1@SV_" + version + "@" + iteration + ".parquet";
|
||||
}
|
||||
|
||||
public static MessageType readSchema(String schemaFile) throws IOException {
|
||||
return MessageTypeParser
|
||||
.parseMessageType(IOUtils.toString(TestUtil.class.getResourceAsStream(schemaFile)));
|
||||
}
|
||||
|
||||
public static void generateParquetData(Path filePath, String schemaFile) throws IOException {
|
||||
MessageType schema = readSchema(schemaFile);
|
||||
CsvParquetWriter writer = new CsvParquetWriter(filePath, schema);
|
||||
|
||||
BufferedReader br = new BufferedReader(
|
||||
new InputStreamReader(TestUtil.class.getResourceAsStream(getDataFile(schemaFile))));
|
||||
String line;
|
||||
try {
|
||||
while ((line = br.readLine()) != null) {
|
||||
String[] fields = line.split(Pattern.quote(CSV_DELIMITER));
|
||||
writer.write(Arrays.asList(fields));
|
||||
}
|
||||
writer.close();
|
||||
} finally {
|
||||
br.close();
|
||||
}
|
||||
|
||||
InputStreamReader io = null;
|
||||
FSDataOutputStream hdfsPath = null;
|
||||
try {
|
||||
io = new FileReader(filePath.toString());
|
||||
hdfsPath = fileSystem.create(filePath);
|
||||
IOUtils.copy(io, hdfsPath);
|
||||
} finally {
|
||||
if (io != null) {
|
||||
io.close();
|
||||
}
|
||||
if (hdfsPath != null) {
|
||||
hdfsPath.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static String getDataFile(String schemaFile) {
|
||||
return schemaFile.replaceAll(".schema", ".csv");
|
||||
}
|
||||
|
||||
private static void checkResult(boolean result) throws InitializationError {
|
||||
if (!result) {
|
||||
throw new InitializationError("Could not initialize");
|
||||
}
|
||||
}
|
||||
|
||||
public static void evolveDataset(HoodieDatasetReference metadata, int newPartitionCount,
|
||||
String newSchema, Long startFrom, int schemaVersion) throws IOException {
|
||||
createPartitions(metadata, newPartitionCount, newSchema,
|
||||
new DateTime(startFrom).plusDays(newPartitionCount + 1), schemaVersion);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,241 @@
|
||||
/*
|
||||
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.uber.hoodie.hive.util;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.io.Files;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.zookeeper.server.NIOServerCnxnFactory;
|
||||
import org.apache.zookeeper.server.ZooKeeperServer;
|
||||
import org.apache.zookeeper.server.persistence.FileTxnLog;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
import java.io.Reader;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
|
||||
/**
|
||||
* A Zookeeper minicluster service implementation.
|
||||
* <p/>
|
||||
* This class was ripped from MiniZooKeeperCluster from the HBase tests. Changes
|
||||
* made include:
|
||||
* <p/>
|
||||
* 1. It will now only launch 1 zookeeper server.
|
||||
* <p/>
|
||||
* 2. It will only attempt to bind to the port specified, and will fail if it
|
||||
* can't.
|
||||
* <p/>
|
||||
* 3. The startup method now takes a bindAddress, which allows us to configure
|
||||
* which IP the ZK server binds to. This was not configurable in the original
|
||||
* class.
|
||||
* <p/>
|
||||
* 4. The ZK cluster will re-use a data dir on the local filesystem if it
|
||||
* already exists instead of blowing it away.
|
||||
*/
|
||||
public class ZookeeperTestService {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(ZookeeperTestService.class);
|
||||
|
||||
private static final int TICK_TIME = 2000;
|
||||
private static final int CONNECTION_TIMEOUT = 30000;
|
||||
|
||||
/**
|
||||
* Configuration settings
|
||||
*/
|
||||
private Configuration hadoopConf;
|
||||
private String workDir;
|
||||
private Integer clientPort = 2828;
|
||||
private String bindIP = "127.0.0.1";
|
||||
private Boolean clean = false;
|
||||
private int tickTime = 0;
|
||||
|
||||
/**
|
||||
* Embedded ZooKeeper cluster
|
||||
*/
|
||||
private NIOServerCnxnFactory standaloneServerFactory;
|
||||
private ZooKeeperServer zooKeeperServer;
|
||||
private boolean started = false;
|
||||
|
||||
public ZookeeperTestService(Configuration config) {
|
||||
this.workDir = Files.createTempDir().getAbsolutePath();
|
||||
this.hadoopConf = config;
|
||||
}
|
||||
|
||||
public Configuration getHadoopConf() {
|
||||
return hadoopConf;
|
||||
}
|
||||
|
||||
public ZooKeeperServer start() throws IOException, InterruptedException {
|
||||
Preconditions.checkState(workDir != null,
|
||||
"The localBaseFsLocation must be set before starting cluster.");
|
||||
|
||||
setupTestEnv();
|
||||
stop();
|
||||
|
||||
File dir = new File(workDir, "zookeeper").getAbsoluteFile();
|
||||
recreateDir(dir, clean);
|
||||
int tickTimeToUse;
|
||||
if (this.tickTime > 0) {
|
||||
tickTimeToUse = this.tickTime;
|
||||
} else {
|
||||
tickTimeToUse = TICK_TIME;
|
||||
}
|
||||
this.zooKeeperServer = new ZooKeeperServer(dir, dir, tickTimeToUse);
|
||||
standaloneServerFactory = new NIOServerCnxnFactory();
|
||||
|
||||
// NOTE: Changed from the original, where InetSocketAddress was
|
||||
// originally created to bind to the wildcard IP, we now configure it.
|
||||
logger.info("Zookeeper force binding to: " + this.bindIP);
|
||||
standaloneServerFactory.configure(new InetSocketAddress(bindIP, clientPort), 1000);
|
||||
|
||||
// Start up this ZK server
|
||||
standaloneServerFactory.startup(zooKeeperServer);
|
||||
|
||||
String serverHostname;
|
||||
if (bindIP.equals("0.0.0.0")) {
|
||||
serverHostname = "localhost";
|
||||
} else {
|
||||
serverHostname = bindIP;
|
||||
}
|
||||
if (!waitForServerUp(serverHostname, clientPort, CONNECTION_TIMEOUT)) {
|
||||
throw new IOException("Waiting for startup of standalone server");
|
||||
}
|
||||
|
||||
started = true;
|
||||
logger.info("Zookeeper Minicluster service started on client port: " + clientPort);
|
||||
return zooKeeperServer;
|
||||
}
|
||||
|
||||
public void stop() throws IOException {
|
||||
if (!started) {
|
||||
return;
|
||||
}
|
||||
|
||||
standaloneServerFactory.shutdown();
|
||||
if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
|
||||
throw new IOException("Waiting for shutdown of standalone server");
|
||||
}
|
||||
|
||||
// clear everything
|
||||
started = false;
|
||||
standaloneServerFactory = null;
|
||||
zooKeeperServer = null;
|
||||
|
||||
logger.info("Zookeeper Minicluster service shut down.");
|
||||
}
|
||||
|
||||
private void recreateDir(File dir, boolean clean) throws IOException {
|
||||
if (dir.exists() && clean) {
|
||||
FileUtil.fullyDelete(dir);
|
||||
} else if (dir.exists() && !clean) {
|
||||
// the directory's exist, and we don't want to clean, so exit
|
||||
return;
|
||||
}
|
||||
try {
|
||||
dir.mkdirs();
|
||||
} catch (SecurityException e) {
|
||||
throw new IOException("creating dir: " + dir, e);
|
||||
}
|
||||
}
|
||||
|
||||
// / XXX: From o.a.zk.t.ClientBase
|
||||
private static void setupTestEnv() {
|
||||
// during the tests we run with 100K prealloc in the logs.
|
||||
// on windows systems prealloc of 64M was seen to take ~15seconds
|
||||
// resulting in test failure (client timeout on first session).
|
||||
// set env and directly in order to handle static init/gc issues
|
||||
System.setProperty("zookeeper.preAllocSize", "100");
|
||||
FileTxnLog.setPreallocSize(100 * 1024);
|
||||
}
|
||||
|
||||
// XXX: From o.a.zk.t.ClientBase
|
||||
private static boolean waitForServerDown(int port, long timeout) {
|
||||
long start = System.currentTimeMillis();
|
||||
while (true) {
|
||||
try {
|
||||
Socket sock = new Socket("localhost", port);
|
||||
try {
|
||||
OutputStream outstream = sock.getOutputStream();
|
||||
outstream.write("stat".getBytes());
|
||||
outstream.flush();
|
||||
} finally {
|
||||
sock.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (System.currentTimeMillis() > start + timeout) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(250);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// XXX: From o.a.zk.t.ClientBase
|
||||
private static boolean waitForServerUp(String hostname, int port, long timeout) {
|
||||
long start = System.currentTimeMillis();
|
||||
while (true) {
|
||||
try {
|
||||
Socket sock = new Socket(hostname, port);
|
||||
BufferedReader reader = null;
|
||||
try {
|
||||
OutputStream outstream = sock.getOutputStream();
|
||||
outstream.write("stat".getBytes());
|
||||
outstream.flush();
|
||||
|
||||
Reader isr = new InputStreamReader(sock.getInputStream());
|
||||
reader = new BufferedReader(isr);
|
||||
String line = reader.readLine();
|
||||
if (line != null && line.startsWith("Zookeeper version:")) {
|
||||
return true;
|
||||
}
|
||||
} finally {
|
||||
sock.close();
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// ignore as this is expected
|
||||
logger.info("server " + hostname + ":" + port + " not up " + e);
|
||||
}
|
||||
|
||||
if (System.currentTimeMillis() > start + timeout) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(250);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user