[HUDI-1759] Save one connection retry to hive metastore when hiveSyncTool run with useJdbc=false (#2759)
* [HUDI-1759] Save one connection retry to hive metastore when hiveSyncTool run with useJdbc=false * Fix review comment
This commit is contained in:
@@ -26,9 +26,12 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
|
|||||||
import org.apache.hadoop.hive.metastore.api.Partition;
|
import org.apache.hadoop.hive.metastore.api.Partition;
|
||||||
import org.apache.hadoop.hive.metastore.api.Table;
|
import org.apache.hadoop.hive.metastore.api.Table;
|
||||||
import org.apache.hadoop.hive.metastore.api.Database;
|
import org.apache.hadoop.hive.metastore.api.Database;
|
||||||
|
import org.apache.hadoop.hive.ql.Driver;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
import org.apache.hudi.common.fs.StorageSchemes;
|
import org.apache.hudi.common.fs.StorageSchemes;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
|
import org.apache.hudi.common.util.HoodieTimer;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.common.util.ValidationUtils;
|
import org.apache.hudi.common.util.ValidationUtils;
|
||||||
import org.apache.hudi.hive.util.HiveSchemaUtil;
|
import org.apache.hudi.hive.util.HiveSchemaUtil;
|
||||||
@@ -69,6 +72,8 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
|
|||||||
private static final Logger LOG = LogManager.getLogger(HoodieHiveClient.class);
|
private static final Logger LOG = LogManager.getLogger(HoodieHiveClient.class);
|
||||||
private final PartitionValueExtractor partitionValueExtractor;
|
private final PartitionValueExtractor partitionValueExtractor;
|
||||||
private IMetaStoreClient client;
|
private IMetaStoreClient client;
|
||||||
|
private SessionState sessionState;
|
||||||
|
private Driver hiveDriver;
|
||||||
private HiveSyncConfig syncConfig;
|
private HiveSyncConfig syncConfig;
|
||||||
private FileSystem fs;
|
private FileSystem fs;
|
||||||
private Connection connection;
|
private Connection connection;
|
||||||
@@ -88,8 +93,26 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
|
|||||||
createHiveConnection();
|
createHiveConnection();
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
HoodieTimer timer = new HoodieTimer().startTimer();
|
||||||
|
this.sessionState = new SessionState(configuration,
|
||||||
|
UserGroupInformation.getCurrentUser().getShortUserName());
|
||||||
|
SessionState.start(this.sessionState);
|
||||||
|
this.sessionState.setCurrentDatabase(syncConfig.databaseName);
|
||||||
|
hiveDriver = new org.apache.hadoop.hive.ql.Driver(configuration);
|
||||||
this.client = Hive.get(configuration).getMSC();
|
this.client = Hive.get(configuration).getMSC();
|
||||||
} catch (MetaException | HiveException e) {
|
LOG.info(String.format("Time taken to start SessionState and create Driver and client: "
|
||||||
|
+ "%s ms", (timer.endTimer())));
|
||||||
|
} catch (MetaException | HiveException | IOException e) {
|
||||||
|
if (this.sessionState != null) {
|
||||||
|
try {
|
||||||
|
this.sessionState.close();
|
||||||
|
} catch (IOException ioException) {
|
||||||
|
LOG.error("Error while closing SessionState", ioException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (this.hiveDriver != null) {
|
||||||
|
this.hiveDriver.close();
|
||||||
|
}
|
||||||
throw new HoodieHiveSyncException("Failed to create HiveMetaStoreClient", e);
|
throw new HoodieHiveSyncException("Failed to create HiveMetaStoreClient", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -402,39 +425,18 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private List<CommandProcessorResponse> updateHiveSQLs(List<String> sqls) {
|
private List<CommandProcessorResponse> updateHiveSQLs(List<String> sqls) {
|
||||||
SessionState ss = null;
|
|
||||||
org.apache.hadoop.hive.ql.Driver hiveDriver = null;
|
|
||||||
List<CommandProcessorResponse> responses = new ArrayList<>();
|
List<CommandProcessorResponse> responses = new ArrayList<>();
|
||||||
try {
|
try {
|
||||||
final long startTime = System.currentTimeMillis();
|
|
||||||
ss = SessionState.start(configuration);
|
|
||||||
ss.setCurrentDatabase(syncConfig.databaseName);
|
|
||||||
hiveDriver = new org.apache.hadoop.hive.ql.Driver(configuration);
|
|
||||||
final long endTime = System.currentTimeMillis();
|
|
||||||
LOG.info(String.format("Time taken to start SessionState and create Driver: %s ms", (endTime - startTime)));
|
|
||||||
for (String sql : sqls) {
|
for (String sql : sqls) {
|
||||||
final long start = System.currentTimeMillis();
|
if (hiveDriver != null) {
|
||||||
responses.add(hiveDriver.run(sql));
|
final long start = System.currentTimeMillis();
|
||||||
final long end = System.currentTimeMillis();
|
responses.add(hiveDriver.run(sql));
|
||||||
LOG.info(String.format("Time taken to execute [%s]: %s ms", sql, (end - start)));
|
final long end = System.currentTimeMillis();
|
||||||
|
LOG.info(String.format("Time taken to execute [%s]: %s ms", sql, (end - start)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new HoodieHiveSyncException("Failed in executing SQL", e);
|
throw new HoodieHiveSyncException("Failed in executing SQL", e);
|
||||||
} finally {
|
|
||||||
if (ss != null) {
|
|
||||||
try {
|
|
||||||
ss.close();
|
|
||||||
} catch (IOException ie) {
|
|
||||||
LOG.error("Error while closing SessionState", ie);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (hiveDriver != null) {
|
|
||||||
try {
|
|
||||||
hiveDriver.close();
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("Error while closing hiveDriver", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return responses;
|
return responses;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -205,6 +205,10 @@ public class TestHiveSyncTool {
|
|||||||
// Lets do the sync
|
// Lets do the sync
|
||||||
HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||||
tool.syncHoodieTable();
|
tool.syncHoodieTable();
|
||||||
|
// we need renew the hiveclient after tool.syncHoodieTable(), because it will close hive
|
||||||
|
// session, then lead to connection retry, we can see there is a exception at log.
|
||||||
|
hiveClient =
|
||||||
|
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||||
assertTrue(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
|
assertTrue(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
|
||||||
"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should exist after sync completes");
|
"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should exist after sync completes");
|
||||||
assertEquals(hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(),
|
assertEquals(hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(),
|
||||||
@@ -249,6 +253,8 @@ public class TestHiveSyncTool {
|
|||||||
|
|
||||||
tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||||
tool.syncHoodieTable();
|
tool.syncHoodieTable();
|
||||||
|
hiveClient =
|
||||||
|
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||||
// Sync should update the changed partition to correct path
|
// Sync should update the changed partition to correct path
|
||||||
List<Partition> tablePartitions = hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName);
|
List<Partition> tablePartitions = hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName);
|
||||||
assertEquals(6, tablePartitions.size(), "The one partition we wrote should be added to hive");
|
assertEquals(6, tablePartitions.size(), "The one partition we wrote should be added to hive");
|
||||||
|
|||||||
Reference in New Issue
Block a user