1
0

[HUDI-717] Fixed usage of HiveDriver for DDL statements. (#1416)

When using HiveDriver mode in HudiHiveClient, Hive 2.x DDL operations like ALTER PARTITION may fail. This is because Hive 2.x doesn't like `db`.`table_name` for operations. In this fix, we set the name of the database in the SessionState create for the Driver.
This commit is contained in:
Prashant Wason
2020-04-03 16:23:05 -07:00
committed by GitHub
parent 639ec20412
commit 6808559b01
3 changed files with 101 additions and 10 deletions

View File

@@ -198,7 +198,8 @@ public class HoodieHiveClient {
for (String partition : partitions) {
String partitionClause = getPartitionClause(partition);
Path partitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition);
String fullPartitionPath = partitionPath.toUri().getScheme().equals(StorageSchemes.HDFS.getScheme())
String partitionScheme = partitionPath.toUri().getScheme();
String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme)
? FSUtils.getDFSFullPartitionPath(fs, partitionPath) : partitionPath.toString();
String changePartition =
alterTable + " PARTITION (" + partitionClause + ") SET LOCATION '" + fullPartitionPath + "'";
@@ -505,6 +506,7 @@ public class HoodieHiveClient {
try {
final long startTime = System.currentTimeMillis();
ss = SessionState.start(configuration);
ss.setCurrentDatabase(syncConfig.databaseName);
hiveDriver = new org.apache.hadoop.hive.ql.Driver(configuration);
final long endTime = System.currentTimeMillis();
LOG.info(String.format("Time taken to start SessionState and create Driver: %s ms", (endTime - startTime)));

View File

@@ -168,6 +168,47 @@ public class TestHiveSyncTool {
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", instantTime,
hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get());
// Adding of new partitions
List<String> newPartition = Arrays.asList("2050/01/01");
hiveClient.addPartitionsToTable(TestUtil.hiveSyncConfig.tableName, Arrays.asList());
assertEquals("No new partition should be added", 5,
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
hiveClient.addPartitionsToTable(TestUtil.hiveSyncConfig.tableName, newPartition);
assertEquals("New partition should be added", 6,
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
// Update partitions
hiveClient.updatePartitionsToTable(TestUtil.hiveSyncConfig.tableName, Arrays.asList());
assertEquals("Partition count should remain the same", 6,
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
hiveClient.updatePartitionsToTable(TestUtil.hiveSyncConfig.tableName, newPartition);
assertEquals("Partition count should remain the same", 6,
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
// Alter partitions
// Manually change a hive partition location to check if the sync will detect
// it and generage a partition update event for it.
hiveClient.updateHiveSQL("ALTER TABLE `" + TestUtil.hiveSyncConfig.tableName
+ "` PARTITION (`datestr`='2050-01-01') SET LOCATION '/some/new/location'");
hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
List<Partition> hivePartitions = hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName);
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.empty());
writtenPartitionsSince.add(newPartition.get(0));
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
assertEquals("There should be only one paritition event", 1, partitionEvents.size());
assertEquals("The one partition event must of type UPDATE", PartitionEventType.UPDATE,
partitionEvents.iterator().next().eventType);
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
tool.syncHoodieTable();
// Sync should update the changed partition to correct path
List<Partition> tablePartitions = hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName);
assertEquals("The one partition we wrote should be added to hive", 6, tablePartitions.size());
assertEquals("The last commit that was sycned should be 100", instantTime,
hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get());
}
@Test
@@ -250,7 +291,7 @@ public class TestHiveSyncTool {
TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
String instantTime = "100";
String deltaCommitTime = "101";
TestUtil.createMORTable(instantTime, deltaCommitTime, 5);
TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true);
String roTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
@@ -294,7 +335,7 @@ public class TestHiveSyncTool {
String instantTime = "100";
String deltaCommitTime = "101";
String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
TestUtil.createMORTable(instantTime, deltaCommitTime, 5);
TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true);
HoodieHiveClient hiveClientRT =
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
@@ -363,4 +404,50 @@ public class TestHiveSyncTool {
assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", instantTime,
hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get());
}
@Test
public void testReadSchemaForMOR() throws Exception {
TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
String commitTime = "100";
String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
TestUtil.createMORTable(commitTime, "", 5, false);
HoodieHiveClient hiveClientRT =
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
assertFalse("Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
+ " should not exist initially", hiveClientRT.doesTableExist(snapshotTableName));
// Lets do the sync
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
tool.syncHoodieTable();
assertTrue("Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
+ " should exist after sync completes", hiveClientRT.doesTableExist(snapshotTableName));
// Schema being read from compacted base files
assertEquals("Hive Schema should match the table schema + partition field", hiveClientRT.getTableSchema(snapshotTableName).size(),
SchemaTestUtil.getSimpleSchema().getFields().size() + 1);
assertEquals("Table partitions should match the number of partitions we wrote", 5,
hiveClientRT.scanTablePartitions(snapshotTableName).size());
// Now lets create more partitions and these are the only ones which needs to be synced
DateTime dateTime = DateTime.now().plusDays(6);
String commitTime2 = "102";
String deltaCommitTime2 = "103";
TestUtil.addMORPartitions(1, true, false, dateTime, commitTime2, deltaCommitTime2);
// Lets do the sync
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
tool.syncHoodieTable();
hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
// Schema being read from the log files
assertEquals("Hive Schema should match the evolved table schema + partition field",
hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1);
// Sync should add the one partition
assertEquals("The 1 partition we wrote should be added to hive", 6, hiveClientRT.scanTablePartitions(snapshotTableName).size());
assertEquals("The last commit that was sycned should be 103", deltaCommitTime2,
hiveClientRT.getLastCommitTimeSynced(snapshotTableName).get());
}
}

View File

@@ -107,7 +107,6 @@ public class TestUtil {
hiveSyncConfig = new HiveSyncConfig();
hiveSyncConfig.jdbcUrl = "jdbc:hive2://127.0.0.1:9999/";
hiveSyncConfig.databaseName = "hdrone_test";
hiveSyncConfig.hiveUser = "";
hiveSyncConfig.hivePass = "";
hiveSyncConfig.databaseName = "testdb";
@@ -167,7 +166,8 @@ public class TestUtil {
createCommitFile(commitMetadata, instantTime);
}
static void createMORTable(String instantTime, String deltaCommitTime, int numberOfPartitions)
static void createMORTable(String commitTime, String deltaCommitTime, int numberOfPartitions,
boolean createDeltaCommit)
throws IOException, InitializationError, URISyntaxException, InterruptedException {
Path path = new Path(hiveSyncConfig.basePath);
FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
@@ -177,17 +177,19 @@ public class TestUtil {
boolean result = fileSystem.mkdirs(path);
checkResult(result);
DateTime dateTime = DateTime.now();
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, instantTime);
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, commitTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createdTablesSet
.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE);
HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();
commitMetadata.getPartitionToWriteStats()
.forEach((key, value) -> value.forEach(l -> compactionMetadata.addWriteStat(key, l)));
createCompactionCommitFile(compactionMetadata, instantTime);
// Write a delta commit
HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), true);
createDeltaCommitFile(deltaMetadata, deltaCommitTime);
createCompactionCommitFile(compactionMetadata, commitTime);
if (createDeltaCommit) {
// Write a delta commit
HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), true);
createDeltaCommitFile(deltaMetadata, deltaCommitTime);
}
}
static void addCOWPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, DateTime startFrom,