diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index d5d668b1c..07b6d0420 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -26,9 +26,12 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.StorageSchemes; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.hive.util.HiveSchemaUtil; @@ -69,6 +72,8 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient { private static final Logger LOG = LogManager.getLogger(HoodieHiveClient.class); private final PartitionValueExtractor partitionValueExtractor; private IMetaStoreClient client; + private SessionState sessionState; + private Driver hiveDriver; private HiveSyncConfig syncConfig; private FileSystem fs; private Connection connection; @@ -88,8 +93,26 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient { createHiveConnection(); } try { + HoodieTimer timer = new HoodieTimer().startTimer(); + this.sessionState = new SessionState(configuration, + UserGroupInformation.getCurrentUser().getShortUserName()); + SessionState.start(this.sessionState); + this.sessionState.setCurrentDatabase(syncConfig.databaseName); + hiveDriver = new org.apache.hadoop.hive.ql.Driver(configuration); this.client = Hive.get(configuration).getMSC(); - } catch (MetaException | HiveException e) { + LOG.info(String.format("Time taken to start SessionState and create Driver and client: " + + "%s ms", (timer.endTimer()))); + } catch (MetaException | HiveException | IOException e) { + if (this.sessionState != null) { + try { + this.sessionState.close(); + } catch (IOException ioException) { + LOG.error("Error while closing SessionState", ioException); + } + } + if (this.hiveDriver != null) { + this.hiveDriver.close(); + } throw new HoodieHiveSyncException("Failed to create HiveMetaStoreClient", e); } @@ -402,39 +425,18 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient { } private List updateHiveSQLs(List sqls) { - SessionState ss = null; - org.apache.hadoop.hive.ql.Driver hiveDriver = null; List responses = new ArrayList<>(); try { - final long startTime = System.currentTimeMillis(); - ss = SessionState.start(configuration); - ss.setCurrentDatabase(syncConfig.databaseName); - hiveDriver = new org.apache.hadoop.hive.ql.Driver(configuration); - final long endTime = System.currentTimeMillis(); - LOG.info(String.format("Time taken to start SessionState and create Driver: %s ms", (endTime - startTime))); for (String sql : sqls) { - final long start = System.currentTimeMillis(); - responses.add(hiveDriver.run(sql)); - final long end = System.currentTimeMillis(); - LOG.info(String.format("Time taken to execute [%s]: %s ms", sql, (end - start))); + if (hiveDriver != null) { + final long start = System.currentTimeMillis(); + responses.add(hiveDriver.run(sql)); + final long end = System.currentTimeMillis(); + LOG.info(String.format("Time taken to execute [%s]: %s ms", sql, (end - start))); + } } } catch (Exception e) { throw new HoodieHiveSyncException("Failed in executing SQL", e); - } finally { - if (ss != null) { - try { - ss.close(); - } catch (IOException ie) { - LOG.error("Error while closing SessionState", ie); - } - } - if (hiveDriver != null) { - try { - hiveDriver.close(); - } catch (Exception e) { - LOG.error("Error while closing hiveDriver", e); - } - } } return responses; } diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 4ec4f0d3d..99c599084 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -205,6 +205,10 @@ public class TestHiveSyncTool { // Lets do the sync HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); tool.syncHoodieTable(); + // we need renew the hiveclient after tool.syncHoodieTable(), because it will close hive + // session, then lead to connection retry, we can see there is a exception at log. + hiveClient = + new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); assertTrue(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName), "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should exist after sync completes"); assertEquals(hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(), @@ -249,6 +253,8 @@ public class TestHiveSyncTool { tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); tool.syncHoodieTable(); + hiveClient = + new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); // Sync should update the changed partition to correct path List tablePartitions = hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName); assertEquals(6, tablePartitions.size(), "The one partition we wrote should be added to hive");