Sync Tool registers 2 tables, RO and RT Tables
This commit is contained in:
committed by
prazanna
parent
5cc071f74e
commit
e5d9b818bc
@@ -26,10 +26,6 @@ import com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat;
|
||||
import com.uber.hoodie.hive.HoodieHiveClient.PartitionEvent;
|
||||
import com.uber.hoodie.hive.HoodieHiveClient.PartitionEvent.PartitionEventType;
|
||||
import com.uber.hoodie.hive.util.SchemaUtil;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hive.conf.HiveConf;
|
||||
import org.apache.hadoop.hive.metastore.api.Partition;
|
||||
@@ -39,6 +35,12 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import parquet.schema.MessageType;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
/**
|
||||
* Tool to sync a hoodie HDFS dataset with a hive metastore table.
|
||||
* Either use it as a api HiveSyncTool.syncHoodieTable(HiveSyncConfig)
|
||||
@@ -53,6 +55,7 @@ public class HiveSyncTool {
|
||||
|
||||
private static Logger LOG = LoggerFactory.getLogger(HiveSyncTool.class);
|
||||
private final HoodieHiveClient hoodieHiveClient;
|
||||
public final static String SUFFIX_REALTIME_TABLE = "_rt";
|
||||
private final HiveSyncConfig cfg;
|
||||
|
||||
public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
|
||||
@@ -61,15 +64,38 @@ public class HiveSyncTool {
|
||||
}
|
||||
|
||||
public void syncHoodieTable() {
|
||||
LOG.info("Trying to sync hoodie table" + cfg.tableName + " with base path " + hoodieHiveClient
|
||||
switch(hoodieHiveClient.getTableType()) {
|
||||
case COPY_ON_WRITE:
|
||||
syncHoodieTable(false);
|
||||
break;
|
||||
case MERGE_ON_READ:
|
||||
//sync a RO table for MOR
|
||||
syncHoodieTable(false);
|
||||
String originalTableName = cfg.tableName;
|
||||
//TODO : Make realtime table registration optional using a config param
|
||||
cfg.tableName = cfg.tableName + SUFFIX_REALTIME_TABLE;
|
||||
//sync a RT table for MOR
|
||||
syncHoodieTable(true);
|
||||
cfg.tableName = originalTableName;
|
||||
break;
|
||||
default:
|
||||
LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
|
||||
throw new InvalidDatasetException(hoodieHiveClient.getBasePath());
|
||||
}
|
||||
hoodieHiveClient.close();
|
||||
}
|
||||
|
||||
private void syncHoodieTable(boolean isRealTime) {
|
||||
LOG.info("Trying to sync hoodie table " + cfg.tableName + " with base path " + hoodieHiveClient
|
||||
.getBasePath() + " of type " + hoodieHiveClient
|
||||
.getTableType());
|
||||
|
||||
// Check if the necessary table exists
|
||||
boolean tableExists = hoodieHiveClient.doesTableExist();
|
||||
// Get the parquet schema for this dataset looking at the latest commit
|
||||
MessageType schema = hoodieHiveClient.getDataSchema();
|
||||
// Sync schema if needed
|
||||
syncSchema(tableExists, schema);
|
||||
syncSchema(tableExists, isRealTime, schema);
|
||||
|
||||
LOG.info("Schema sync complete. Syncing partitions for " + cfg.tableName);
|
||||
// Get the last time we successfully synced partitions
|
||||
@@ -86,8 +112,6 @@ public class HiveSyncTool {
|
||||
|
||||
hoodieHiveClient.updateLastCommitTimeSynced();
|
||||
LOG.info("Sync complete for " + cfg.tableName);
|
||||
|
||||
hoodieHiveClient.close();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -97,29 +121,19 @@ public class HiveSyncTool {
|
||||
* @param tableExists - does table exist
|
||||
* @param schema - extracted schema
|
||||
*/
|
||||
private void syncSchema(boolean tableExists, MessageType schema) {
|
||||
private void syncSchema(boolean tableExists, boolean isRealTime, MessageType schema) {
|
||||
// Check and sync schema
|
||||
if (!tableExists) {
|
||||
LOG.info("Table " + cfg.tableName + " is not found. Creating it");
|
||||
switch (hoodieHiveClient.getTableType()) {
|
||||
case COPY_ON_WRITE:
|
||||
hoodieHiveClient.createTable(schema, HoodieInputFormat.class.getName(),
|
||||
MapredParquetOutputFormat.class.getName(), ParquetHiveSerDe.class.getName());
|
||||
break;
|
||||
case MERGE_ON_READ:
|
||||
// create RT Table
|
||||
if(!isRealTime) {
|
||||
// TODO - RO Table for MOR only after major compaction (UnboundedCompaction is default for now)
|
||||
hoodieHiveClient.createTable(schema, HoodieInputFormat.class.getName(),
|
||||
MapredParquetOutputFormat.class.getName(), ParquetHiveSerDe.class.getName());
|
||||
} else {
|
||||
// Custom serde will not work with ALTER TABLE REPLACE COLUMNS
|
||||
// https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java#L3488
|
||||
// Need a fix to check instance of
|
||||
// hoodieHiveClient.createTable(schema, HoodieRealtimeInputFormat.class.getName(),
|
||||
// MapredParquetOutputFormat.class.getName(), HoodieParquetSerde.class.getName());
|
||||
hoodieHiveClient.createTable(schema, HoodieRealtimeInputFormat.class.getName(),
|
||||
MapredParquetOutputFormat.class.getName(), ParquetHiveSerDe.class.getName());
|
||||
// TODO - create RO Table
|
||||
break;
|
||||
default:
|
||||
LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
|
||||
throw new InvalidDatasetException(hoodieHiveClient.getBasePath());
|
||||
MapredParquetOutputFormat.class.getName(), ParquetHiveSerDe.class.getName());
|
||||
}
|
||||
} else {
|
||||
// Check if the dataset schema has evolved
|
||||
|
||||
@@ -305,4 +305,59 @@ public class HiveSyncToolTest {
|
||||
hiveClient.getLastCommitTimeSynced().get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSyncMergeOnReadRT()
|
||||
throws IOException, InitializationError, URISyntaxException, TException, InterruptedException {
|
||||
String commitTime = "100";
|
||||
String deltaCommitTime = "101";
|
||||
String roTablename = TestUtil.hiveSyncConfig.tableName;
|
||||
TestUtil.hiveSyncConfig.tableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_REALTIME_TABLE;
|
||||
TestUtil.createMORDataset(commitTime, deltaCommitTime, 5);
|
||||
HoodieHiveClient hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig,
|
||||
TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||
|
||||
assertFalse("Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_REALTIME_TABLE + " should not exist initially",
|
||||
hiveClientRT.doesTableExist());
|
||||
|
||||
// Lets do the sync
|
||||
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(),
|
||||
TestUtil.fileSystem);
|
||||
tool.syncHoodieTable();
|
||||
|
||||
assertTrue("Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_REALTIME_TABLE + " should exist after sync completes",
|
||||
hiveClientRT.doesTableExist());
|
||||
|
||||
assertEquals("Hive Schema should match the dataset schema + partition field",
|
||||
hiveClientRT.getTableSchema().size(), SchemaTestUtil.getSimpleSchema().getFields().size() + 1);
|
||||
assertEquals("Table partitions should match the number of partitions we wrote", 5,
|
||||
hiveClientRT.scanTablePartitions().size());
|
||||
assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES",
|
||||
deltaCommitTime,
|
||||
hiveClientRT.getLastCommitTimeSynced().get());
|
||||
|
||||
// Now lets create more parititions and these are the only ones which needs to be synced
|
||||
DateTime dateTime = DateTime.now().plusDays(6);
|
||||
String commitTime2 = "102";
|
||||
String deltaCommitTime2 = "103";
|
||||
|
||||
TestUtil.addCOWPartitions(1, true, dateTime, commitTime2);
|
||||
TestUtil.addMORPartitions(1, true, false, dateTime, commitTime2, deltaCommitTime2);
|
||||
// Lets do the sync
|
||||
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(),
|
||||
TestUtil.fileSystem);
|
||||
tool.syncHoodieTable();
|
||||
hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig,
|
||||
TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||
|
||||
assertEquals("Hive Schema should match the evolved dataset schema + partition field",
|
||||
hiveClientRT.getTableSchema().size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1);
|
||||
// Sync should add the one partition
|
||||
assertEquals("The 2 partitions we wrote should be added to hive", 6,
|
||||
hiveClientRT.scanTablePartitions().size());
|
||||
assertEquals("The last commit that was sycned should be 103",
|
||||
deltaCommitTime2,
|
||||
hiveClientRT.getLastCommitTimeSynced().get());
|
||||
TestUtil.hiveSyncConfig.tableName = roTablename;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -175,6 +175,7 @@ public class TestUtil {
|
||||
DateTime dateTime = DateTime.now();
|
||||
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, commitTime);
|
||||
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
|
||||
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_REALTIME_TABLE);
|
||||
HoodieCompactionMetadata compactionMetadata = new HoodieCompactionMetadata();
|
||||
commitMetadata.getPartitionToWriteStats()
|
||||
.forEach((key, value) -> value.stream().map(k -> new CompactionWriteStat(k, key, 0, 0, 0))
|
||||
@@ -201,6 +202,7 @@ public class TestUtil {
|
||||
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions,
|
||||
isParquetSchemaSimple, startFrom, commitTime);
|
||||
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
|
||||
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_REALTIME_TABLE);
|
||||
HoodieCompactionMetadata compactionMetadata = new HoodieCompactionMetadata();
|
||||
commitMetadata.getPartitionToWriteStats()
|
||||
.forEach((key, value) -> value.stream().map(k -> new CompactionWriteStat(k, key, 0, 0, 0))
|
||||
|
||||
Reference in New Issue
Block a user