Refactor hoodie-hive
This commit is contained in:
committed by
prazanna
parent
c192dd60b4
commit
db6150c5ef
@@ -19,64 +19,161 @@
|
||||
package com.uber.hoodie.hive;
|
||||
|
||||
import com.beust.jcommander.JCommander;
|
||||
import com.uber.hoodie.hive.impl.DayBasedPartitionStrategy;
|
||||
import com.uber.hoodie.hive.impl.ParseSchemaFromDataStrategy;
|
||||
import com.uber.hoodie.hive.model.HoodieDatasetReference;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import com.uber.hoodie.exception.InvalidDatasetException;
|
||||
import com.uber.hoodie.hadoop.HoodieInputFormat;
|
||||
import com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat;
|
||||
import com.uber.hoodie.hive.HoodieHiveClient.PartitionEvent;
|
||||
import com.uber.hoodie.hive.HoodieHiveClient.PartitionEvent.PartitionEventType;
|
||||
import com.uber.hoodie.hive.util.SchemaUtil;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hive.conf.HiveConf;
|
||||
import org.apache.hadoop.hive.metastore.api.Partition;
|
||||
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
|
||||
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import parquet.schema.MessageType;
|
||||
|
||||
/**
|
||||
* Tool to sync new data from commits, into Hive in terms of
|
||||
* Tool to sync a hoodie HDFS dataset with a hive metastore table.
|
||||
* Either use it as a api HiveSyncTool.syncHoodieTable(HiveSyncConfig)
|
||||
* or as a command line java -cp hoodie-hive.jar HiveSyncTool [args]
|
||||
*
|
||||
* - New table/partitions
|
||||
* - Updated schema for table/partitions
|
||||
* This utility will get the schema from the latest commit and will sync hive table schema
|
||||
* Also this will sync the partitions incrementally
|
||||
* (all the partitions modified since the last commit)
|
||||
*/
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public class HiveSyncTool {
|
||||
|
||||
private static Logger LOG = LoggerFactory.getLogger(HiveSyncTool.class);
|
||||
private final HoodieHiveClient hoodieHiveClient;
|
||||
private final HiveSyncConfig cfg;
|
||||
|
||||
/**
|
||||
* Sync to Hive, based on day based partitioning
|
||||
*
|
||||
* @param cfg
|
||||
*/
|
||||
public static void sync(HiveSyncConfig cfg) {
|
||||
// Configure to point to which metastore and database to connect to
|
||||
HoodieHiveConfiguration apiConfig =
|
||||
HoodieHiveConfiguration.newBuilder().hadoopConfiguration(new Configuration())
|
||||
.hivedb(cfg.databaseName)
|
||||
.hiveJdbcUrl(cfg.jdbcUrl)
|
||||
.jdbcUsername(cfg.hiveUser)
|
||||
.jdbcPassword(cfg.hivePass)
|
||||
.build();
|
||||
public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
|
||||
this.hoodieHiveClient = new HoodieHiveClient(cfg, configuration, fs);
|
||||
this.cfg = cfg;
|
||||
}
|
||||
|
||||
HoodieDatasetReference datasetReference =
|
||||
new HoodieDatasetReference(cfg.tableName, cfg.basePath, cfg.databaseName);
|
||||
public void syncHoodieTable() {
|
||||
LOG.info("Trying to sync hoodie table" + cfg.tableName + " with base path " + hoodieHiveClient
|
||||
.getBasePath() + " of type " + hoodieHiveClient
|
||||
.getTableType());
|
||||
// Check if the necessary table exists
|
||||
boolean tableExists = hoodieHiveClient.doesTableExist();
|
||||
// Get the parquet schema for this dataset looking at the latest commit
|
||||
MessageType schema = hoodieHiveClient.getDataSchema();
|
||||
// Sync schema if needed
|
||||
syncSchema(tableExists, schema);
|
||||
|
||||
// initialize the strategies
|
||||
PartitionStrategy partitionStrategy = new DayBasedPartitionStrategy();
|
||||
SchemaStrategy schemaStrategy = new ParseSchemaFromDataStrategy();
|
||||
|
||||
// Creates a new dataset which reflects the state at the time of creation
|
||||
HoodieHiveDatasetSyncTask datasetSyncTask =
|
||||
HoodieHiveDatasetSyncTask.newBuilder().withReference(datasetReference)
|
||||
.withConfiguration(apiConfig).partitionStrategy(partitionStrategy)
|
||||
.schemaStrategy(schemaStrategy).build();
|
||||
|
||||
// Sync dataset
|
||||
datasetSyncTask.sync();
|
||||
LOG.info("Schema sync complete. Syncing partitions for " + cfg.tableName);
|
||||
// Get the last time we successfully synced partitions
|
||||
Optional<String> lastCommitTimeSynced = Optional.empty();
|
||||
if (tableExists) {
|
||||
lastCommitTimeSynced = hoodieHiveClient.getLastCommitTimeSynced();
|
||||
}
|
||||
LOG.info("Last commit time synced was found to be " + lastCommitTimeSynced.orElse("null"));
|
||||
List<String> writtenPartitionsSince = hoodieHiveClient
|
||||
.getPartitionsWrittenToSince(lastCommitTimeSynced);
|
||||
LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size());
|
||||
// Sync the partitions if needed
|
||||
syncPartitions(writtenPartitionsSince);
|
||||
|
||||
hoodieHiveClient.updateLastCommitTimeSynced();
|
||||
LOG.info("Sync complete for " + cfg.tableName);
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
hoodieHiveClient.close();
|
||||
}
|
||||
|
||||
// parse the params
|
||||
final HiveSyncConfig cfg = new HiveSyncConfig();
|
||||
JCommander cmd = new JCommander(cfg, args);
|
||||
if (cfg.help || args.length == 0) {
|
||||
cmd.usage();
|
||||
System.exit(1);
|
||||
}
|
||||
|
||||
sync(cfg);
|
||||
/**
|
||||
* Get the latest schema from the last commit and check if its in sync with the hive table schema.
|
||||
* If not, evolves the table schema.
|
||||
*
|
||||
* @param tableExists - does table exist
|
||||
* @param schema - extracted schema
|
||||
*/
|
||||
private void syncSchema(boolean tableExists, MessageType schema) {
|
||||
// Check and sync schema
|
||||
if (!tableExists) {
|
||||
LOG.info("Table " + cfg.tableName + " is not found. Creating it");
|
||||
switch (hoodieHiveClient.getTableType()) {
|
||||
case COPY_ON_WRITE:
|
||||
hoodieHiveClient.createTable(schema, HoodieInputFormat.class.getName(),
|
||||
MapredParquetOutputFormat.class.getName(), ParquetHiveSerDe.class.getName());
|
||||
break;
|
||||
case MERGE_ON_READ:
|
||||
// create RT Table
|
||||
// Custom serde will not work with ALTER TABLE REPLACE COLUMNS
|
||||
// https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java#L3488
|
||||
// Need a fix to check instance of
|
||||
// hoodieHiveClient.createTable(schema, HoodieRealtimeInputFormat.class.getName(),
|
||||
// MapredParquetOutputFormat.class.getName(), HoodieParquetSerde.class.getName());
|
||||
hoodieHiveClient.createTable(schema, HoodieRealtimeInputFormat.class.getName(),
|
||||
MapredParquetOutputFormat.class.getName(), ParquetHiveSerDe.class.getName());
|
||||
// TODO - create RO Table
|
||||
break;
|
||||
default:
|
||||
LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
|
||||
throw new InvalidDatasetException(hoodieHiveClient.getBasePath());
|
||||
}
|
||||
} else {
|
||||
// Check if the dataset schema has evolved
|
||||
Map<String, String> tableSchema = hoodieHiveClient.getTableSchema();
|
||||
SchemaDifference schemaDiff = SchemaUtil
|
||||
.getSchemaDifference(schema, tableSchema, cfg.partitionFields);
|
||||
if (!schemaDiff.isEmpty()) {
|
||||
LOG.info("Schema difference found for " + cfg.tableName);
|
||||
hoodieHiveClient.updateTableDefinition(schema);
|
||||
} else {
|
||||
LOG.info("No Schema difference for " + cfg.tableName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Syncs the list of storage parititions passed in (checks if the partition is in hive, if not
|
||||
* adds it or if the partition path does not match, it updates the partition path)
|
||||
*/
|
||||
private void syncPartitions(List<String> writtenPartitionsSince) {
|
||||
try {
|
||||
List<Partition> hivePartitions = hoodieHiveClient.scanTablePartitions();
|
||||
List<PartitionEvent> partitionEvents = hoodieHiveClient
|
||||
.getPartitionEvents(hivePartitions, writtenPartitionsSince);
|
||||
List<String> newPartitions = filterPartitions(partitionEvents, PartitionEventType.ADD);
|
||||
LOG.info("New Partitions " + newPartitions);
|
||||
hoodieHiveClient.addPartitionsToTable(newPartitions);
|
||||
List<String> updatePartitions = filterPartitions(partitionEvents, PartitionEventType.UPDATE);
|
||||
LOG.info("Changed Partitions " + updatePartitions);
|
||||
hoodieHiveClient.updatePartitionsToTable(updatePartitions);
|
||||
} catch (Exception e) {
|
||||
throw new HoodieHiveSyncException("Failed to sync partitions for table " + cfg.tableName,
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
private List<String> filterPartitions(List<PartitionEvent> events, PartitionEventType eventType) {
|
||||
return events.stream()
|
||||
.filter(s -> s.eventType == eventType).map(s -> s.storagePartition).collect(
|
||||
Collectors.toList());
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
// parse the params
|
||||
final HiveSyncConfig cfg = new HiveSyncConfig();
|
||||
JCommander cmd = new JCommander(cfg, args);
|
||||
if (cfg.help || args.length == 0) {
|
||||
cmd.usage();
|
||||
System.exit(1);
|
||||
}
|
||||
FileSystem fs = FSUtils.getFs();
|
||||
HiveConf hiveConf = new HiveConf();
|
||||
hiveConf.addResource(fs.getConf());
|
||||
new HiveSyncTool(cfg, hiveConf, fs).syncHoodieTable();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user