From e5d9b818bce3fcd9714a5973c68745cce0845fcc Mon Sep 17 00:00:00 2001 From: Nishith Agarwal Date: Thu, 22 Jun 2017 20:33:33 -0700 Subject: [PATCH] Sync Tool registers 2 tables, RO and RT Tables --- .../com/uber/hoodie/hive/HiveSyncTool.java | 64 +++++++++++-------- .../uber/hoodie/hive/HiveSyncToolTest.java | 55 ++++++++++++++++ .../java/com/uber/hoodie/hive/TestUtil.java | 2 + 3 files changed, 96 insertions(+), 25 deletions(-) diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncTool.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncTool.java index 7e2abee33..1268e69e8 100644 --- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncTool.java +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncTool.java @@ -26,10 +26,6 @@ import com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat; import com.uber.hoodie.hive.HoodieHiveClient.PartitionEvent; import com.uber.hoodie.hive.HoodieHiveClient.PartitionEvent.PartitionEventType; import com.uber.hoodie.hive.util.SchemaUtil; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Partition; @@ -39,6 +35,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import parquet.schema.MessageType; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + + /** * Tool to sync a hoodie HDFS dataset with a hive metastore table. * Either use it as a api HiveSyncTool.syncHoodieTable(HiveSyncConfig) @@ -53,6 +55,7 @@ public class HiveSyncTool { private static Logger LOG = LoggerFactory.getLogger(HiveSyncTool.class); private final HoodieHiveClient hoodieHiveClient; + public final static String SUFFIX_REALTIME_TABLE = "_rt"; private final HiveSyncConfig cfg; public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) { @@ -61,15 +64,38 @@ public class HiveSyncTool { } public void syncHoodieTable() { - LOG.info("Trying to sync hoodie table" + cfg.tableName + " with base path " + hoodieHiveClient + switch(hoodieHiveClient.getTableType()) { + case COPY_ON_WRITE: + syncHoodieTable(false); + break; + case MERGE_ON_READ: + //sync a RO table for MOR + syncHoodieTable(false); + String originalTableName = cfg.tableName; + //TODO : Make realtime table registration optional using a config param + cfg.tableName = cfg.tableName + SUFFIX_REALTIME_TABLE; + //sync a RT table for MOR + syncHoodieTable(true); + cfg.tableName = originalTableName; + break; + default: + LOG.error("Unknown table type " + hoodieHiveClient.getTableType()); + throw new InvalidDatasetException(hoodieHiveClient.getBasePath()); + } + hoodieHiveClient.close(); + } + + private void syncHoodieTable(boolean isRealTime) { + LOG.info("Trying to sync hoodie table " + cfg.tableName + " with base path " + hoodieHiveClient .getBasePath() + " of type " + hoodieHiveClient .getTableType()); + // Check if the necessary table exists boolean tableExists = hoodieHiveClient.doesTableExist(); // Get the parquet schema for this dataset looking at the latest commit MessageType schema = hoodieHiveClient.getDataSchema(); // Sync schema if needed - syncSchema(tableExists, schema); + syncSchema(tableExists, isRealTime, schema); LOG.info("Schema sync complete. Syncing partitions for " + cfg.tableName); // Get the last time we successfully synced partitions @@ -86,8 +112,6 @@ public class HiveSyncTool { hoodieHiveClient.updateLastCommitTimeSynced(); LOG.info("Sync complete for " + cfg.tableName); - - hoodieHiveClient.close(); } /** @@ -97,29 +121,19 @@ public class HiveSyncTool { * @param tableExists - does table exist * @param schema - extracted schema */ - private void syncSchema(boolean tableExists, MessageType schema) { + private void syncSchema(boolean tableExists, boolean isRealTime, MessageType schema) { // Check and sync schema if (!tableExists) { LOG.info("Table " + cfg.tableName + " is not found. Creating it"); - switch (hoodieHiveClient.getTableType()) { - case COPY_ON_WRITE: - hoodieHiveClient.createTable(schema, HoodieInputFormat.class.getName(), - MapredParquetOutputFormat.class.getName(), ParquetHiveSerDe.class.getName()); - break; - case MERGE_ON_READ: - // create RT Table + if(!isRealTime) { + // TODO - RO Table for MOR only after major compaction (UnboundedCompaction is default for now) + hoodieHiveClient.createTable(schema, HoodieInputFormat.class.getName(), + MapredParquetOutputFormat.class.getName(), ParquetHiveSerDe.class.getName()); + } else { // Custom serde will not work with ALTER TABLE REPLACE COLUMNS // https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java#L3488 - // Need a fix to check instance of - // hoodieHiveClient.createTable(schema, HoodieRealtimeInputFormat.class.getName(), - // MapredParquetOutputFormat.class.getName(), HoodieParquetSerde.class.getName()); hoodieHiveClient.createTable(schema, HoodieRealtimeInputFormat.class.getName(), - MapredParquetOutputFormat.class.getName(), ParquetHiveSerDe.class.getName()); - // TODO - create RO Table - break; - default: - LOG.error("Unknown table type " + hoodieHiveClient.getTableType()); - throw new InvalidDatasetException(hoodieHiveClient.getBasePath()); + MapredParquetOutputFormat.class.getName(), ParquetHiveSerDe.class.getName()); } } else { // Check if the dataset schema has evolved diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/HiveSyncToolTest.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/HiveSyncToolTest.java index fd59d20dd..398d6e0a8 100644 --- a/hoodie-hive/src/test/java/com/uber/hoodie/hive/HiveSyncToolTest.java +++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/HiveSyncToolTest.java @@ -305,4 +305,59 @@ public class HiveSyncToolTest { hiveClient.getLastCommitTimeSynced().get()); } + @Test + public void testSyncMergeOnReadRT() + throws IOException, InitializationError, URISyntaxException, TException, InterruptedException { + String commitTime = "100"; + String deltaCommitTime = "101"; + String roTablename = TestUtil.hiveSyncConfig.tableName; + TestUtil.hiveSyncConfig.tableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_REALTIME_TABLE; + TestUtil.createMORDataset(commitTime, deltaCommitTime, 5); + HoodieHiveClient hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, + TestUtil.getHiveConf(), TestUtil.fileSystem); + + assertFalse("Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_REALTIME_TABLE + " should not exist initially", + hiveClientRT.doesTableExist()); + + // Lets do the sync + HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), + TestUtil.fileSystem); + tool.syncHoodieTable(); + + assertTrue("Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_REALTIME_TABLE + " should exist after sync completes", + hiveClientRT.doesTableExist()); + + assertEquals("Hive Schema should match the dataset schema + partition field", + hiveClientRT.getTableSchema().size(), SchemaTestUtil.getSimpleSchema().getFields().size() + 1); + assertEquals("Table partitions should match the number of partitions we wrote", 5, + hiveClientRT.scanTablePartitions().size()); + assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", + deltaCommitTime, + hiveClientRT.getLastCommitTimeSynced().get()); + + // Now lets create more parititions and these are the only ones which needs to be synced + DateTime dateTime = DateTime.now().plusDays(6); + String commitTime2 = "102"; + String deltaCommitTime2 = "103"; + + TestUtil.addCOWPartitions(1, true, dateTime, commitTime2); + TestUtil.addMORPartitions(1, true, false, dateTime, commitTime2, deltaCommitTime2); + // Lets do the sync + tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), + TestUtil.fileSystem); + tool.syncHoodieTable(); + hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, + TestUtil.getHiveConf(), TestUtil.fileSystem); + + assertEquals("Hive Schema should match the evolved dataset schema + partition field", + hiveClientRT.getTableSchema().size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1); + // Sync should add the one partition + assertEquals("The 2 partitions we wrote should be added to hive", 6, + hiveClientRT.scanTablePartitions().size()); + assertEquals("The last commit that was sycned should be 103", + deltaCommitTime2, + hiveClientRT.getLastCommitTimeSynced().get()); + TestUtil.hiveSyncConfig.tableName = roTablename; + } + } \ No newline at end of file diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java index 38b9efcb7..8c2036270 100644 --- a/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java +++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java @@ -175,6 +175,7 @@ public class TestUtil { DateTime dateTime = DateTime.now(); HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, commitTime); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); + createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_REALTIME_TABLE); HoodieCompactionMetadata compactionMetadata = new HoodieCompactionMetadata(); commitMetadata.getPartitionToWriteStats() .forEach((key, value) -> value.stream().map(k -> new CompactionWriteStat(k, key, 0, 0, 0)) @@ -201,6 +202,7 @@ public class TestUtil { HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, isParquetSchemaSimple, startFrom, commitTime); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); + createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_REALTIME_TABLE); HoodieCompactionMetadata compactionMetadata = new HoodieCompactionMetadata(); commitMetadata.getPartitionToWriteStats() .forEach((key, value) -> value.stream().map(k -> new CompactionWriteStat(k, key, 0, 0, 0))