1
0

HUDI-180 : Adding support for hive registration using metastore along with JDBC

This commit is contained in:
Nishith Agarwal
2019-08-30 12:30:56 -07:00
committed by Balaji Varadarajan
parent 69ca45b2da
commit 1104f9526f
3 changed files with 177 additions and 55 deletions

View File

@@ -48,11 +48,10 @@ public class HiveSyncConfig implements Serializable {
"--base-path"}, description = "Basepath of hoodie dataset to sync", required = true) "--base-path"}, description = "Basepath of hoodie dataset to sync", required = true)
public String basePath; public String basePath;
@Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by", @Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by")
required = false)
public List<String> partitionFields = new ArrayList<>(); public List<String> partitionFields = new ArrayList<>();
@Parameter(names = "-partition-value-extractor", description = "Class which implements " @Parameter(names = "--partition-value-extractor", description = "Class which implements "
+ "PartitionValueExtractor " + "PartitionValueExtractor "
+ "to extract the partition " + "to extract the partition "
+ "values from HDFS path") + "values from HDFS path")
@@ -74,25 +73,12 @@ public class HiveSyncConfig implements Serializable {
+ "org.apache.hudi input format.") + "org.apache.hudi input format.")
public Boolean usePreApacheInputFormat = false; public Boolean usePreApacheInputFormat = false;
@Parameter(names = {"--use-jdbc"}, description = "Hive jdbc connect url")
public Boolean useJdbc = true;
@Parameter(names = {"--help", "-h"}, help = true) @Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false; public Boolean help = false;
@Override
public String toString() {
return "HiveSyncConfig{"
+ "databaseName='" + databaseName + '\''
+ ", tableName='" + tableName + '\''
+ ", hiveUser='" + hiveUser + '\''
+ ", hivePass='" + hivePass + '\''
+ ", jdbcUrl='" + jdbcUrl + '\''
+ ", basePath='" + basePath + '\''
+ ", partitionFields=" + partitionFields
+ ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\''
+ ", assumeDatePartitioning=" + assumeDatePartitioning
+ ", help=" + help
+ '}';
}
public static HiveSyncConfig copy(HiveSyncConfig cfg) { public static HiveSyncConfig copy(HiveSyncConfig cfg) {
HiveSyncConfig newConfig = new HiveSyncConfig(); HiveSyncConfig newConfig = new HiveSyncConfig();
newConfig.basePath = cfg.basePath; newConfig.basePath = cfg.basePath;
@@ -107,4 +93,22 @@ public class HiveSyncConfig implements Serializable {
newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat; newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat;
return newConfig; return newConfig;
} }
@Override
public String toString() {
return "HiveSyncConfig{"
+ "databaseName='" + databaseName + '\''
+ ", tableName='" + tableName + '\''
+ ", hiveUser='" + hiveUser + '\''
+ ", hivePass='" + hivePass + '\''
+ ", jdbcUrl='" + jdbcUrl + '\''
+ ", basePath='" + basePath + '\''
+ ", partitionFields=" + partitionFields
+ ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\''
+ ", assumeDatePartitioning=" + assumeDatePartitioning
+ ", usePreApacheInputFormat=" + usePreApacheInputFormat
+ ", useJdbc=" + useJdbc
+ ", help=" + help
+ '}';
}
} }

View File

@@ -29,7 +29,9 @@ import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@@ -40,6 +42,8 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.jdbc.HiveDriver; import org.apache.hive.jdbc.HiveDriver;
import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileFormat;
@@ -86,6 +90,7 @@ public class HoodieHiveClient {
private FileSystem fs; private FileSystem fs;
private Connection connection; private Connection connection;
private HoodieTimeline activeTimeline; private HoodieTimeline activeTimeline;
private HiveConf configuration;
public HoodieHiveClient(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) { public HoodieHiveClient(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
this.syncConfig = cfg; this.syncConfig = cfg;
@@ -93,8 +98,13 @@ public class HoodieHiveClient {
this.metaClient = new HoodieTableMetaClient(fs.getConf(), cfg.basePath, true); this.metaClient = new HoodieTableMetaClient(fs.getConf(), cfg.basePath, true);
this.tableType = metaClient.getTableType(); this.tableType = metaClient.getTableType();
LOG.info("Creating hive connection " + cfg.jdbcUrl); this.configuration = configuration;
createHiveConnection(); // Support both JDBC and metastore based implementations for backwards compatiblity. Future users should
// disable jdbc and depend on metastore client for all hive registrations
if (cfg.useJdbc) {
LOG.info("Creating hive connection " + cfg.jdbcUrl);
createHiveConnection();
}
try { try {
this.client = new HiveMetaStoreClient(configuration); this.client = new HiveMetaStoreClient(configuration);
} catch (MetaException e) { } catch (MetaException e) {
@@ -269,32 +279,59 @@ public class HoodieHiveClient {
* Get the table schema * Get the table schema
*/ */
public Map<String, String> getTableSchema() { public Map<String, String> getTableSchema() {
if (!doesTableExist()) { if (syncConfig.useJdbc) {
throw new IllegalArgumentException( if (!doesTableExist()) {
"Failed to get schema for table " + syncConfig.tableName + " does not exist"); throw new IllegalArgumentException(
} "Failed to get schema for table " + syncConfig.tableName + " does not exist");
Map<String, String> schema = Maps.newHashMap();
ResultSet result = null;
try {
DatabaseMetaData databaseMetaData = connection.getMetaData();
result = databaseMetaData
.getColumns(null, syncConfig.databaseName, syncConfig.tableName, null);
while (result.next()) {
String columnName = result.getString(4);
String columnType = result.getString(6);
if ("DECIMAL".equals(columnType)) {
int columnSize = result.getInt("COLUMN_SIZE");
int decimalDigits = result.getInt("DECIMAL_DIGITS");
columnType += String.format("(%s,%s)", columnSize, decimalDigits);
}
schema.put(columnName, columnType);
} }
Map<String, String> schema = Maps.newHashMap();
ResultSet result = null;
try {
DatabaseMetaData databaseMetaData = connection.getMetaData();
result = databaseMetaData
.getColumns(null, syncConfig.databaseName, syncConfig.tableName, null);
while (result.next()) {
String columnName = result.getString(4);
String columnType = result.getString(6);
if ("DECIMAL".equals(columnType)) {
int columnSize = result.getInt("COLUMN_SIZE");
int decimalDigits = result.getInt("DECIMAL_DIGITS");
columnType += String.format("(%s,%s)", columnSize, decimalDigits);
}
schema.put(columnName, columnType);
}
return schema;
} catch (SQLException e) {
throw new HoodieHiveSyncException("Failed to get table schema for " + syncConfig.tableName,
e);
} finally {
closeQuietly(result, null);
}
} else {
return getTableSchemaUsingMetastoreClient();
}
}
public Map<String, String> getTableSchemaUsingMetastoreClient() {
try {
// HiveMetastoreClient returns partition keys separate from Columns, hence get both and merge to
// get the Schema of the table.
final long start = System.currentTimeMillis();
Table table = this.client.getTable(syncConfig.databaseName, syncConfig.tableName);
Map<String, String> partitionKeysMap = table.getPartitionKeys().stream()
.collect(Collectors.toMap(f -> f.getName(), f -> f.getType().toUpperCase()));
Map<String, String> columnsMap = table.getSd().getCols().stream()
.collect(Collectors.toMap(f -> f.getName(), f -> f.getType().toUpperCase()));
Map<String, String> schema = new HashMap<>();
schema.putAll(columnsMap);
schema.putAll(partitionKeysMap);
final long end = System.currentTimeMillis();
LOG.info(String.format("Time taken to getTableSchema: %s ms", (end - start)));
return schema; return schema;
} catch (SQLException e) { } catch (Exception e) {
throw new HoodieHiveSyncException("Failed to get table schema for " + syncConfig.tableName, throw new HoodieHiveSyncException("Failed to get table schema for : " + syncConfig.tableName, e);
e);
} finally {
closeQuietly(result, null);
} }
} }
@@ -455,18 +492,70 @@ public class HoodieHiveClient {
* @param s SQL to execute * @param s SQL to execute
*/ */
public void updateHiveSQL(String s) { public void updateHiveSQL(String s) {
Statement stmt = null; if (syncConfig.useJdbc) {
try { Statement stmt = null;
stmt = connection.createStatement(); try {
LOG.info("Executing SQL " + s); stmt = connection.createStatement();
stmt.execute(s); LOG.info("Executing SQL " + s);
} catch (SQLException e) { stmt.execute(s);
throw new HoodieHiveSyncException("Failed in executing SQL " + s, e); } catch (SQLException e) {
} finally { throw new HoodieHiveSyncException("Failed in executing SQL " + s, e);
closeQuietly(null, stmt); } finally {
closeQuietly(null, stmt);
}
} else {
updateHiveSQLUsingHiveDriver(s);
} }
} }
/**
* Execute a update in hive using Hive Driver
*
* @param sql SQL statement to execute
*/
public CommandProcessorResponse updateHiveSQLUsingHiveDriver(String sql) throws HoodieHiveSyncException {
List<CommandProcessorResponse> responses = updateHiveSQLs(Arrays.asList(sql));
return responses.get(responses.size() - 1);
}
private List<CommandProcessorResponse> updateHiveSQLs(List<String> sqls) throws HoodieHiveSyncException {
SessionState ss = null;
org.apache.hadoop.hive.ql.Driver hiveDriver = null;
List<CommandProcessorResponse> responses = new ArrayList<>();
try {
final long startTime = System.currentTimeMillis();
ss = SessionState.start(configuration);
hiveDriver = new org.apache.hadoop.hive.ql.Driver(configuration);
final long endTime = System.currentTimeMillis();
LOG.info(String.format("Time taken to start SessionState and create Driver: %s ms", (endTime - startTime)));
for (String sql : sqls) {
final long start = System.currentTimeMillis();
responses.add(hiveDriver.run(sql));
final long end = System.currentTimeMillis();
LOG.info(String.format("Time taken to execute [%s]: %s ms", sql, (end - start)));
}
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed in executing SQL", e);
} finally {
if (ss != null) {
try {
ss.close();
} catch (IOException ie) {
LOG.error("Error while closing SessionState", ie);
}
}
if (hiveDriver != null) {
try {
hiveDriver.close();
} catch (Exception e) {
LOG.error("Error while closing hiveDriver", e);
}
}
}
return responses;
}
private void createHiveConnection() { private void createHiveConnection() {
if (connection == null) { if (connection == null) {
@@ -505,6 +594,11 @@ public class HoodieHiveClient {
if (stmt != null) { if (stmt != null) {
stmt.close(); stmt.close();
} }
} catch (SQLException e) {
LOG.error("Could not close the statement opened ", e);
}
try {
if (resultSet != null) { if (resultSet != null) {
resultSet.close(); resultSet.close();
} }
@@ -544,6 +638,7 @@ public class HoodieHiveClient {
} }
if (client != null) { if (client != null) {
client.close(); client.close();
client = null;
} }
} catch (SQLException e) { } catch (SQLException e) {
LOG.error("Could not close connection ", e); LOG.error("Could not close connection ", e);
@@ -622,4 +717,4 @@ public class HoodieHiveClient {
return new PartitionEvent(PartitionEventType.UPDATE, storagePartition); return new PartitionEvent(PartitionEventType.UPDATE, storagePartition);
} }
} }
} }

View File

@@ -25,6 +25,8 @@ import static org.junit.Assert.assertTrue;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List; import java.util.List;
import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
@@ -39,10 +41,25 @@ import org.apache.parquet.schema.Types;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@SuppressWarnings("ConstantConditions") @SuppressWarnings("ConstantConditions")
@RunWith(Parameterized.class)
public class HiveSyncToolTest { public class HiveSyncToolTest {
// Test sync tool using both jdbc and metastore client
private boolean useJdbc;
public HiveSyncToolTest(Boolean useJdbc) {
this.useJdbc = useJdbc;
}
@Parameterized.Parameters(name = "UseJdbc")
public static Collection<Boolean[]> data() {
return Arrays.asList(new Boolean[][]{{false}, {true}});
}
@Before @Before
public void setUp() throws IOException, InterruptedException, URISyntaxException { public void setUp() throws IOException, InterruptedException, URISyntaxException {
TestUtil.setUp(); TestUtil.setUp();
@@ -146,6 +163,7 @@ public class HiveSyncToolTest {
@Test @Test
public void testBasicSync() throws Exception { public void testBasicSync() throws Exception {
TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
String commitTime = "100"; String commitTime = "100";
TestUtil.createCOWDataset(commitTime, 5); TestUtil.createCOWDataset(commitTime, 5);
HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig,
@@ -168,6 +186,7 @@ public class HiveSyncToolTest {
@Test @Test
public void testSyncIncremental() throws Exception { public void testSyncIncremental() throws Exception {
TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
String commitTime1 = "100"; String commitTime1 = "100";
TestUtil.createCOWDataset(commitTime1, 5); TestUtil.createCOWDataset(commitTime1, 5);
HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig,
@@ -211,6 +230,7 @@ public class HiveSyncToolTest {
@Test @Test
public void testSyncIncrementalWithSchemaEvolution() throws Exception { public void testSyncIncrementalWithSchemaEvolution() throws Exception {
TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
String commitTime1 = "100"; String commitTime1 = "100";
TestUtil.createCOWDataset(commitTime1, 5); TestUtil.createCOWDataset(commitTime1, 5);
HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig,
@@ -247,6 +267,7 @@ public class HiveSyncToolTest {
@Test @Test
public void testSyncMergeOnRead() throws Exception { public void testSyncMergeOnRead() throws Exception {
TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
String commitTime = "100"; String commitTime = "100";
String deltaCommitTime = "101"; String deltaCommitTime = "101";
TestUtil.createMORDataset(commitTime, deltaCommitTime, 5); TestUtil.createMORDataset(commitTime, deltaCommitTime, 5);
@@ -295,6 +316,7 @@ public class HiveSyncToolTest {
@Test @Test
public void testSyncMergeOnReadRT() public void testSyncMergeOnReadRT()
throws Exception { throws Exception {
TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
String commitTime = "100"; String commitTime = "100";
String deltaCommitTime = "101"; String deltaCommitTime = "101";
String roTablename = TestUtil.hiveSyncConfig.tableName; String roTablename = TestUtil.hiveSyncConfig.tableName;
@@ -350,6 +372,7 @@ public class HiveSyncToolTest {
@Test @Test
public void testMultiPartitionKeySync() public void testMultiPartitionKeySync()
throws Exception { throws Exception {
TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
String commitTime = "100"; String commitTime = "100";
TestUtil.createCOWDataset(commitTime, 5); TestUtil.createCOWDataset(commitTime, 5);