1
0

[HUDI-654] Rename hudi-hive to hudi-hive-sync

This commit is contained in:
lamber-ken
2020-03-04 22:45:44 +08:00
committed by leesf
parent 5f85c26704
commit ccbf543607
27 changed files with 13 additions and 13 deletions

View File

@@ -0,0 +1,46 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd">
<id>jar-with-dependencies</id>
<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<outputDirectory>/</outputDirectory>
<unpack>true</unpack>
<scope>runtime</scope>
<excludes>
<exclude>junit:junit</exclude>
<exclude>com.google.code.findbugs:*</exclude>
<exclude>org.apache.hbase:*</exclude>
</excludes>
</dependencySet>
<dependencySet>
<unpack>true</unpack>
<scope>provided</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hive;
import com.beust.jcommander.Parameter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/**
* Configs needed to sync data into Hive.
*/
public class HiveSyncConfig implements Serializable {
@Parameter(names = {"--database"}, description = "name of the target database in Hive", required = true)
public String databaseName;
@Parameter(names = {"--table"}, description = "name of the target table in Hive", required = true)
public String tableName;
@Parameter(names = {"--user"}, description = "Hive username", required = true)
public String hiveUser;
@Parameter(names = {"--pass"}, description = "Hive password", required = true)
public String hivePass;
@Parameter(names = {"--jdbc-url"}, description = "Hive jdbc connect url", required = true)
public String jdbcUrl;
@Parameter(names = {"--base-path"}, description = "Basepath of hoodie table to sync", required = true)
public String basePath;
@Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by")
public List<String> partitionFields = new ArrayList<>();
@Parameter(names = "--partition-value-extractor", description = "Class which implements PartitionValueExtractor "
+ "to extract the partition values from HDFS path")
public String partitionValueExtractorClass = SlashEncodedDayPartitionValueExtractor.class.getName();
@Parameter(names = {"--assume-date-partitioning"}, description = "Assume standard yyyy/mm/dd partitioning, this"
+ " exists to support backward compatibility. If you use hoodie 0.3.x, do not set this parameter")
public Boolean assumeDatePartitioning = false;
@Parameter(names = {"--use-pre-apache-input-format"},
description = "Use InputFormat under com.uber.hoodie package "
+ "instead of org.apache.hudi package. Use this when you are in the process of migrating from "
+ "com.uber.hoodie to org.apache.hudi. Stop using this after you migrated the table definition to "
+ "org.apache.hudi input format.")
public Boolean usePreApacheInputFormat = false;
@Parameter(names = {"--use-jdbc"}, description = "Hive jdbc connect url")
public Boolean useJdbc = true;
@Parameter(names = {"--skip-ro-suffix"}, description = "Skip the `_ro` suffix for Read optimized table, when registering")
public Boolean skipROSuffix = false;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
public static HiveSyncConfig copy(HiveSyncConfig cfg) {
HiveSyncConfig newConfig = new HiveSyncConfig();
newConfig.basePath = cfg.basePath;
newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
newConfig.databaseName = cfg.databaseName;
newConfig.hivePass = cfg.hivePass;
newConfig.hiveUser = cfg.hiveUser;
newConfig.partitionFields = cfg.partitionFields;
newConfig.partitionValueExtractorClass = cfg.partitionValueExtractorClass;
newConfig.jdbcUrl = cfg.jdbcUrl;
newConfig.tableName = cfg.tableName;
newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat;
return newConfig;
}
@Override
public String toString() {
return "HiveSyncConfig{databaseName='" + databaseName + '\'' + ", tableName='" + tableName + '\''
+ ", hiveUser='" + hiveUser + '\'' + ", hivePass='" + hivePass + '\'' + ", jdbcUrl='" + jdbcUrl + '\''
+ ", basePath='" + basePath + '\'' + ", partitionFields=" + partitionFields + ", partitionValueExtractorClass='"
+ partitionValueExtractorClass + '\'' + ", assumeDatePartitioning=" + assumeDatePartitioning
+ ", usePreApacheInputFormat=" + usePreApacheInputFormat + ", useJdbc=" + useJdbc + ", help=" + help + '}';
}
}

View File

@@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hive;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.InvalidTableException;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent;
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent.PartitionEventType;
import org.apache.hudi.hive.util.SchemaUtil;
import com.beust.jcommander.JCommander;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Tool to sync a hoodie HDFS table with a hive metastore table. Either use it as a api
* HiveSyncTool.syncHoodieTable(HiveSyncConfig) or as a command line java -cp hoodie-hive.jar HiveSyncTool [args]
* <p>
* This utility will get the schema from the latest commit and will sync hive table schema Also this will sync the
* partitions incrementally (all the partitions modified since the last commit)
*/
@SuppressWarnings("WeakerAccess")
public class HiveSyncTool {
private static final Logger LOG = LogManager.getLogger(HiveSyncTool.class);
public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
private final HiveSyncConfig cfg;
private final HoodieHiveClient hoodieHiveClient;
private final String snapshotTableName;
private final Option<String> roTableTableName;
public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
this.hoodieHiveClient = new HoodieHiveClient(cfg, configuration, fs);
this.cfg = cfg;
switch (hoodieHiveClient.getTableType()) {
case COPY_ON_WRITE:
this.snapshotTableName = cfg.tableName;
this.roTableTableName = Option.empty();
break;
case MERGE_ON_READ:
this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE;
this.roTableTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) :
Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
break;
default:
LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
throw new InvalidTableException(hoodieHiveClient.getBasePath());
}
}
public void syncHoodieTable() {
try {
switch (hoodieHiveClient.getTableType()) {
case COPY_ON_WRITE:
syncHoodieTable(snapshotTableName, false);
break;
case MERGE_ON_READ:
// sync a RO table for MOR
syncHoodieTable(roTableTableName.get(), false);
// sync a RT table for MOR
syncHoodieTable(snapshotTableName, true);
break;
default:
LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
throw new InvalidTableException(hoodieHiveClient.getBasePath());
}
} catch (RuntimeException re) {
LOG.error("Got runtime exception when hive syncing", re);
} finally {
hoodieHiveClient.close();
}
}
private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat) {
LOG.info("Trying to sync hoodie table " + tableName + " with base path " + hoodieHiveClient.getBasePath()
+ " of type " + hoodieHiveClient.getTableType());
// Check if the necessary table exists
boolean tableExists = hoodieHiveClient.doesTableExist(tableName);
// Get the parquet schema for this table looking at the latest commit
MessageType schema = hoodieHiveClient.getDataSchema();
// Sync schema if needed
syncSchema(tableName, tableExists, useRealtimeInputFormat, schema);
LOG.info("Schema sync complete. Syncing partitions for " + tableName);
// Get the last time we successfully synced partitions
Option<String> lastCommitTimeSynced = Option.empty();
if (tableExists) {
lastCommitTimeSynced = hoodieHiveClient.getLastCommitTimeSynced(tableName);
}
LOG.info("Last commit time synced was found to be " + lastCommitTimeSynced.orElse("null"));
List<String> writtenPartitionsSince = hoodieHiveClient.getPartitionsWrittenToSince(lastCommitTimeSynced);
LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size());
// Sync the partitions if needed
syncPartitions(tableName, writtenPartitionsSince);
hoodieHiveClient.updateLastCommitTimeSynced(tableName);
LOG.info("Sync complete for " + tableName);
}
/**
* Get the latest schema from the last commit and check if its in sync with the hive table schema. If not, evolves the
* table schema.
*
* @param tableExists - does table exist
* @param schema - extracted schema
*/
private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat, MessageType schema) {
// Check and sync schema
if (!tableExists) {
LOG.info("Hive table " + tableName + " is not found. Creating it");
if (!useRealTimeInputFormat) {
String inputFormatClassName = cfg.usePreApacheInputFormat ? com.uber.hoodie.hadoop.HoodieInputFormat.class.getName()
: HoodieParquetInputFormat.class.getName();
hoodieHiveClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(),
ParquetHiveSerDe.class.getName());
} else {
// Custom serde will not work with ALTER TABLE REPLACE COLUMNS
// https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive
// /ql/exec/DDLTask.java#L3488
String inputFormatClassName =
cfg.usePreApacheInputFormat ? com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat.class.getName()
: HoodieParquetRealtimeInputFormat.class.getName();
hoodieHiveClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(),
ParquetHiveSerDe.class.getName());
}
} else {
// Check if the table schema has evolved
Map<String, String> tableSchema = hoodieHiveClient.getTableSchema(tableName);
SchemaDifference schemaDiff = SchemaUtil.getSchemaDifference(schema, tableSchema, cfg.partitionFields);
if (!schemaDiff.isEmpty()) {
LOG.info("Schema difference found for " + tableName);
hoodieHiveClient.updateTableDefinition(tableName, schema);
} else {
LOG.info("No Schema difference for " + tableName);
}
}
}
/**
* Syncs the list of storage parititions passed in (checks if the partition is in hive, if not adds it or if the
* partition path does not match, it updates the partition path).
*/
private void syncPartitions(String tableName, List<String> writtenPartitionsSince) {
try {
List<Partition> hivePartitions = hoodieHiveClient.scanTablePartitions(tableName);
List<PartitionEvent> partitionEvents =
hoodieHiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
List<String> newPartitions = filterPartitions(partitionEvents, PartitionEventType.ADD);
LOG.info("New Partitions " + newPartitions);
hoodieHiveClient.addPartitionsToTable(tableName, newPartitions);
List<String> updatePartitions = filterPartitions(partitionEvents, PartitionEventType.UPDATE);
LOG.info("Changed Partitions " + updatePartitions);
hoodieHiveClient.updatePartitionsToTable(tableName, updatePartitions);
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to sync partitions for table " + tableName, e);
}
}
private List<String> filterPartitions(List<PartitionEvent> events, PartitionEventType eventType) {
return events.stream().filter(s -> s.eventType == eventType).map(s -> s.storagePartition)
.collect(Collectors.toList());
}
public static void main(String[] args) {
// parse the params
final HiveSyncConfig cfg = new HiveSyncConfig();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
HiveConf hiveConf = new HiveConf();
hiveConf.addResource(fs.getConf());
new HiveSyncTool(cfg, hiveConf, fs).syncHoodieTable();
}
}

View File

@@ -0,0 +1,687 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hive;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.storage.StorageSchemes;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.InvalidTableException;
import org.apache.hudi.hive.util.SchemaUtil;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.jdbc.HiveDriver;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import org.apache.thrift.TException;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class HoodieHiveClient {
private static final String HOODIE_LAST_COMMIT_TIME_SYNC = "last_commit_time_sync";
// Make sure we have the hive JDBC driver in classpath
private static String driverName = HiveDriver.class.getName();
private static final String HIVE_ESCAPE_CHARACTER = SchemaUtil.HIVE_ESCAPE_CHARACTER;
static {
try {
Class.forName(driverName);
} catch (ClassNotFoundException e) {
throw new IllegalStateException("Could not find " + driverName + " in classpath. ", e);
}
}
private static final Logger LOG = LogManager.getLogger(HoodieHiveClient.class);
private final HoodieTableMetaClient metaClient;
private final HoodieTableType tableType;
private final PartitionValueExtractor partitionValueExtractor;
private IMetaStoreClient client;
private HiveSyncConfig syncConfig;
private FileSystem fs;
private Connection connection;
private HoodieTimeline activeTimeline;
private HiveConf configuration;
public HoodieHiveClient(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
this.syncConfig = cfg;
this.fs = fs;
this.metaClient = new HoodieTableMetaClient(fs.getConf(), cfg.basePath, true);
this.tableType = metaClient.getTableType();
this.configuration = configuration;
// Support both JDBC and metastore based implementations for backwards compatiblity. Future users should
// disable jdbc and depend on metastore client for all hive registrations
if (cfg.useJdbc) {
LOG.info("Creating hive connection " + cfg.jdbcUrl);
createHiveConnection();
}
try {
this.client = Hive.get(configuration).getMSC();
} catch (MetaException | HiveException e) {
throw new HoodieHiveSyncException("Failed to create HiveMetaStoreClient", e);
}
try {
this.partitionValueExtractor =
(PartitionValueExtractor) Class.forName(cfg.partitionValueExtractorClass).newInstance();
} catch (Exception e) {
throw new HoodieHiveSyncException(
"Failed to initialize PartitionValueExtractor class " + cfg.partitionValueExtractorClass, e);
}
activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
}
public HoodieTimeline getActiveTimeline() {
return activeTimeline;
}
/**
* Add the (NEW) partitions to the table.
*/
void addPartitionsToTable(String tableName, List<String> partitionsToAdd) {
if (partitionsToAdd.isEmpty()) {
LOG.info("No partitions to add for " + tableName);
return;
}
LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + tableName);
String sql = constructAddPartitions(tableName, partitionsToAdd);
updateHiveSQL(sql);
}
/**
* Partition path has changed - update the path for te following partitions.
*/
void updatePartitionsToTable(String tableName, List<String> changedPartitions) {
if (changedPartitions.isEmpty()) {
LOG.info("No partitions to change for " + tableName);
return;
}
LOG.info("Changing partitions " + changedPartitions.size() + " on " + tableName);
List<String> sqls = constructChangePartitions(tableName, changedPartitions);
for (String sql : sqls) {
updateHiveSQL(sql);
}
}
private String constructAddPartitions(String tableName, List<String> partitions) {
StringBuilder alterSQL = new StringBuilder("ALTER TABLE ");
alterSQL.append(HIVE_ESCAPE_CHARACTER).append(syncConfig.databaseName)
.append(HIVE_ESCAPE_CHARACTER).append(".").append(HIVE_ESCAPE_CHARACTER)
.append(tableName).append(HIVE_ESCAPE_CHARACTER).append(" ADD IF NOT EXISTS ");
for (String partition : partitions) {
String partitionClause = getPartitionClause(partition);
String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
alterSQL.append(" PARTITION (").append(partitionClause).append(") LOCATION '").append(fullPartitionPath)
.append("' ");
}
return alterSQL.toString();
}
/**
* Generate Hive Partition from partition values.
*
* @param partition Partition path
* @return
*/
private String getPartitionClause(String partition) {
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
Preconditions.checkArgument(syncConfig.partitionFields.size() == partitionValues.size(),
"Partition key parts " + syncConfig.partitionFields + " does not match with partition values " + partitionValues
+ ". Check partition strategy. ");
List<String> partBuilder = new ArrayList<>();
for (int i = 0; i < syncConfig.partitionFields.size(); i++) {
partBuilder.add("`" + syncConfig.partitionFields.get(i) + "`='" + partitionValues.get(i) + "'");
}
return String.join(",", partBuilder);
}
private List<String> constructChangePartitions(String tableName, List<String> partitions) {
List<String> changePartitions = new ArrayList<>();
// Hive 2.x doesn't like db.table name for operations, hence we need to change to using the database first
String useDatabase = "USE " + HIVE_ESCAPE_CHARACTER + syncConfig.databaseName + HIVE_ESCAPE_CHARACTER;
changePartitions.add(useDatabase);
String alterTable = "ALTER TABLE " + HIVE_ESCAPE_CHARACTER + tableName + HIVE_ESCAPE_CHARACTER;
for (String partition : partitions) {
String partitionClause = getPartitionClause(partition);
Path partitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition);
String fullPartitionPath = partitionPath.toUri().getScheme().equals(StorageSchemes.HDFS.getScheme())
? FSUtils.getDFSFullPartitionPath(fs, partitionPath) : partitionPath.toString();
String changePartition =
alterTable + " PARTITION (" + partitionClause + ") SET LOCATION '" + fullPartitionPath + "'";
changePartitions.add(changePartition);
}
return changePartitions;
}
/**
* Iterate over the storage partitions and find if there are any new partitions that need to be added or updated.
* Generate a list of PartitionEvent based on the changes required.
*/
List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<String> partitionStoragePartitions) {
Map<String, String> paths = new HashMap<>();
for (Partition tablePartition : tablePartitions) {
List<String> hivePartitionValues = tablePartition.getValues();
Collections.sort(hivePartitionValues);
String fullTablePartitionPath =
Path.getPathWithoutSchemeAndAuthority(new Path(tablePartition.getSd().getLocation())).toUri().getPath();
paths.put(String.join(", ", hivePartitionValues), fullTablePartitionPath);
}
List<PartitionEvent> events = new ArrayList<>();
for (String storagePartition : partitionStoragePartitions) {
Path storagePartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, storagePartition);
String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
// Check if the partition values or if hdfs path is the same
List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
Collections.sort(storagePartitionValues);
if (!storagePartitionValues.isEmpty()) {
String storageValue = String.join(", ", storagePartitionValues);
if (!paths.containsKey(storageValue)) {
events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
} else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) {
events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition));
}
}
}
return events;
}
/**
* Scan table partitions.
*/
public List<Partition> scanTablePartitions(String tableName) throws TException {
return client.listPartitions(syncConfig.databaseName, tableName, (short) -1);
}
void updateTableDefinition(String tableName, MessageType newSchema) {
try {
String newSchemaStr = SchemaUtil.generateSchemaString(newSchema, syncConfig.partitionFields);
// Cascade clause should not be present for non-partitioned tables
String cascadeClause = syncConfig.partitionFields.size() > 0 ? " cascade" : "";
StringBuilder sqlBuilder = new StringBuilder("ALTER TABLE ").append(HIVE_ESCAPE_CHARACTER)
.append(syncConfig.databaseName).append(HIVE_ESCAPE_CHARACTER).append(".")
.append(HIVE_ESCAPE_CHARACTER).append(tableName)
.append(HIVE_ESCAPE_CHARACTER).append(" REPLACE COLUMNS(")
.append(newSchemaStr).append(" )").append(cascadeClause);
LOG.info("Updating table definition with " + sqlBuilder);
updateHiveSQL(sqlBuilder.toString());
} catch (IOException e) {
throw new HoodieHiveSyncException("Failed to update table for " + tableName, e);
}
}
void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass) {
try {
String createSQLQuery =
SchemaUtil.generateCreateDDL(tableName, storageSchema, syncConfig, inputFormatClass, outputFormatClass, serdeClass);
LOG.info("Creating table with " + createSQLQuery);
updateHiveSQL(createSQLQuery);
} catch (IOException e) {
throw new HoodieHiveSyncException("Failed to create table " + tableName, e);
}
}
/**
* Get the table schema.
*/
public Map<String, String> getTableSchema(String tableName) {
if (syncConfig.useJdbc) {
if (!doesTableExist(tableName)) {
throw new IllegalArgumentException(
"Failed to get schema for table " + tableName + " does not exist");
}
Map<String, String> schema = new HashMap<>();
ResultSet result = null;
try {
DatabaseMetaData databaseMetaData = connection.getMetaData();
result = databaseMetaData.getColumns(null, syncConfig.databaseName, tableName, null);
while (result.next()) {
String columnName = result.getString(4);
String columnType = result.getString(6);
if ("DECIMAL".equals(columnType)) {
int columnSize = result.getInt("COLUMN_SIZE");
int decimalDigits = result.getInt("DECIMAL_DIGITS");
columnType += String.format("(%s,%s)", columnSize, decimalDigits);
}
schema.put(columnName, columnType);
}
return schema;
} catch (SQLException e) {
throw new HoodieHiveSyncException("Failed to get table schema for " + tableName, e);
} finally {
closeQuietly(result, null);
}
} else {
return getTableSchemaUsingMetastoreClient(tableName);
}
}
public Map<String, String> getTableSchemaUsingMetastoreClient(String tableName) {
try {
// HiveMetastoreClient returns partition keys separate from Columns, hence get both and merge to
// get the Schema of the table.
final long start = System.currentTimeMillis();
Table table = this.client.getTable(syncConfig.databaseName, tableName);
Map<String, String> partitionKeysMap =
table.getPartitionKeys().stream().collect(Collectors.toMap(FieldSchema::getName, f -> f.getType().toUpperCase()));
Map<String, String> columnsMap =
table.getSd().getCols().stream().collect(Collectors.toMap(FieldSchema::getName, f -> f.getType().toUpperCase()));
Map<String, String> schema = new HashMap<>();
schema.putAll(columnsMap);
schema.putAll(partitionKeysMap);
final long end = System.currentTimeMillis();
LOG.info(String.format("Time taken to getTableSchema: %s ms", (end - start)));
return schema;
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to get table schema for : " + tableName, e);
}
}
/**
* Gets the schema for a hoodie table. Depending on the type of table, read from any file written in the latest
* commit. We will assume that the schema has not changed within a single atomic write.
*
* @return Parquet schema for this table
*/
@SuppressWarnings("WeakerAccess")
public MessageType getDataSchema() {
try {
switch (tableType) {
case COPY_ON_WRITE:
// If this is COW, get the last commit and read the schema from a file written in the
// last commit
HoodieInstant lastCommit =
activeTimeline.lastInstant().orElseThrow(() -> new InvalidTableException(syncConfig.basePath));
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(activeTimeline.getInstantDetails(lastCommit).get(), HoodieCommitMetadata.class);
String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny()
.orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for commit "
+ lastCommit + ", could not get schema for table " + metaClient.getBasePath() + ", Metadata :"
+ commitMetadata));
return readSchemaFromBaseFile(new Path(filePath));
case MERGE_ON_READ:
// If this is MOR, depending on whether the latest commit is a delta commit or
// compaction commit
// Get a datafile written and get the schema from that file
Option<HoodieInstant> lastCompactionCommit =
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
LOG.info("Found the last compaction commit as " + lastCompactionCommit);
Option<HoodieInstant> lastDeltaCommit;
if (lastCompactionCommit.isPresent()) {
lastDeltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()
.findInstantsAfter(lastCompactionCommit.get().getTimestamp(), Integer.MAX_VALUE).lastInstant();
} else {
lastDeltaCommit =
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant();
}
LOG.info("Found the last delta commit " + lastDeltaCommit);
if (lastDeltaCommit.isPresent()) {
HoodieInstant lastDeltaInstant = lastDeltaCommit.get();
// read from the log file wrote
commitMetadata = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(lastDeltaInstant).get(),
HoodieCommitMetadata.class);
Pair<String, HoodieFileFormat> filePathWithFormat =
commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream()
.filter(s -> s.contains(HoodieLogFile.DELTA_EXTENSION)).findAny()
.map(f -> Pair.of(f, HoodieFileFormat.HOODIE_LOG)).orElseGet(() -> {
// No Log files in Delta-Commit. Check if there are any parquet files
return commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream()
.filter(s -> s.contains((metaClient.getTableConfig().getBaseFileFormat().getFileExtension())))
.findAny().map(f -> Pair.of(f, HoodieFileFormat.PARQUET)).orElseThrow(() ->
new IllegalArgumentException("Could not find any data file written for commit "
+ lastDeltaInstant + ", could not get schema for table " + metaClient.getBasePath()
+ ", CommitMetadata :" + commitMetadata));
});
switch (filePathWithFormat.getRight()) {
case HOODIE_LOG:
return readSchemaFromLogFile(lastCompactionCommit, new Path(filePathWithFormat.getLeft()));
case PARQUET:
return readSchemaFromBaseFile(new Path(filePathWithFormat.getLeft()));
default:
throw new IllegalArgumentException("Unknown file format :" + filePathWithFormat.getRight()
+ " for file " + filePathWithFormat.getLeft());
}
} else {
return readSchemaFromLastCompaction(lastCompactionCommit);
}
default:
LOG.error("Unknown table type " + tableType);
throw new InvalidTableException(syncConfig.basePath);
}
} catch (IOException e) {
throw new HoodieHiveSyncException("Failed to read data schema", e);
}
}
/**
* Read schema from a data file from the last compaction commit done.
*/
private MessageType readSchemaFromLastCompaction(Option<HoodieInstant> lastCompactionCommitOpt) throws IOException {
HoodieInstant lastCompactionCommit = lastCompactionCommitOpt.orElseThrow(() -> new HoodieHiveSyncException(
"Could not read schema from last compaction, no compaction commits found on path " + syncConfig.basePath));
// Read from the compacted file wrote
HoodieCommitMetadata compactionMetadata = HoodieCommitMetadata
.fromBytes(activeTimeline.getInstantDetails(lastCompactionCommit).get(), HoodieCommitMetadata.class);
String filePath = compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny()
.orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for compaction "
+ lastCompactionCommit + ", could not get schema for table " + metaClient.getBasePath()));
return readSchemaFromBaseFile(new Path(filePath));
}
/**
* Read the schema from the log file on path.
*/
private MessageType readSchemaFromLogFile(Option<HoodieInstant> lastCompactionCommitOpt, Path path)
throws IOException {
MessageType messageType = SchemaUtil.readSchemaFromLogFile(fs, path);
// Fall back to read the schema from last compaction
if (messageType == null) {
LOG.info("Falling back to read the schema from last compaction " + lastCompactionCommitOpt);
return readSchemaFromLastCompaction(lastCompactionCommitOpt);
}
return messageType;
}
/**
* Read the parquet schema from a parquet File.
*/
private MessageType readSchemaFromBaseFile(Path parquetFilePath) throws IOException {
LOG.info("Reading schema from " + parquetFilePath);
if (!fs.exists(parquetFilePath)) {
throw new IllegalArgumentException(
"Failed to read schema from data file " + parquetFilePath + ". File does not exist.");
}
ParquetMetadata fileFooter =
ParquetFileReader.readFooter(fs.getConf(), parquetFilePath, ParquetMetadataConverter.NO_FILTER);
return fileFooter.getFileMetaData().getSchema();
}
/**
* @return true if the configured table exists
*/
public boolean doesTableExist(String tableName) {
try {
return client.tableExists(syncConfig.databaseName, tableName);
} catch (TException e) {
throw new HoodieHiveSyncException("Failed to check if table exists " + tableName, e);
}
}
/**
* Execute a update in hive metastore with this SQL.
*
* @param s SQL to execute
*/
public void updateHiveSQL(String s) {
if (syncConfig.useJdbc) {
Statement stmt = null;
try {
stmt = connection.createStatement();
LOG.info("Executing SQL " + s);
stmt.execute(s);
} catch (SQLException e) {
throw new HoodieHiveSyncException("Failed in executing SQL " + s, e);
} finally {
closeQuietly(null, stmt);
}
} else {
updateHiveSQLUsingHiveDriver(s);
}
}
/**
* Execute a update in hive using Hive Driver.
*
* @param sql SQL statement to execute
*/
public CommandProcessorResponse updateHiveSQLUsingHiveDriver(String sql) {
List<CommandProcessorResponse> responses = updateHiveSQLs(Collections.singletonList(sql));
return responses.get(responses.size() - 1);
}
private List<CommandProcessorResponse> updateHiveSQLs(List<String> sqls) {
SessionState ss = null;
org.apache.hadoop.hive.ql.Driver hiveDriver = null;
List<CommandProcessorResponse> responses = new ArrayList<>();
try {
final long startTime = System.currentTimeMillis();
ss = SessionState.start(configuration);
hiveDriver = new org.apache.hadoop.hive.ql.Driver(configuration);
final long endTime = System.currentTimeMillis();
LOG.info(String.format("Time taken to start SessionState and create Driver: %s ms", (endTime - startTime)));
for (String sql : sqls) {
final long start = System.currentTimeMillis();
responses.add(hiveDriver.run(sql));
final long end = System.currentTimeMillis();
LOG.info(String.format("Time taken to execute [%s]: %s ms", sql, (end - start)));
}
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed in executing SQL", e);
} finally {
if (ss != null) {
try {
ss.close();
} catch (IOException ie) {
LOG.error("Error while closing SessionState", ie);
}
}
if (hiveDriver != null) {
try {
hiveDriver.close();
} catch (Exception e) {
LOG.error("Error while closing hiveDriver", e);
}
}
}
return responses;
}
private void createHiveConnection() {
if (connection == null) {
try {
Class.forName(HiveDriver.class.getCanonicalName());
} catch (ClassNotFoundException e) {
LOG.error("Unable to load Hive driver class", e);
return;
}
try {
this.connection = DriverManager.getConnection(syncConfig.jdbcUrl, syncConfig.hiveUser, syncConfig.hivePass);
LOG.info("Successfully established Hive connection to " + syncConfig.jdbcUrl);
} catch (SQLException e) {
throw new HoodieHiveSyncException("Cannot create hive connection " + getHiveJdbcUrlWithDefaultDBName(), e);
}
}
}
private String getHiveJdbcUrlWithDefaultDBName() {
String hiveJdbcUrl = syncConfig.jdbcUrl;
String urlAppend = null;
// If the hive url contains addition properties like ;transportMode=http;httpPath=hs2
if (hiveJdbcUrl.contains(";")) {
urlAppend = hiveJdbcUrl.substring(hiveJdbcUrl.indexOf(";"));
hiveJdbcUrl = hiveJdbcUrl.substring(0, hiveJdbcUrl.indexOf(";"));
}
if (!hiveJdbcUrl.endsWith("/")) {
hiveJdbcUrl = hiveJdbcUrl + "/";
}
return hiveJdbcUrl + (urlAppend == null ? "" : urlAppend);
}
private static void closeQuietly(ResultSet resultSet, Statement stmt) {
try {
if (stmt != null) {
stmt.close();
}
} catch (SQLException e) {
LOG.error("Could not close the statement opened ", e);
}
try {
if (resultSet != null) {
resultSet.close();
}
} catch (SQLException e) {
LOG.error("Could not close the resultset opened ", e);
}
}
public String getBasePath() {
return metaClient.getBasePath();
}
HoodieTableType getTableType() {
return tableType;
}
public FileSystem getFs() {
return fs;
}
public Option<String> getLastCommitTimeSynced(String tableName) {
// Get the last commit time from the TBLproperties
try {
Table database = client.getTable(syncConfig.databaseName, tableName);
return Option.ofNullable(database.getParameters().getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC, null));
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to get the last commit time synced from the database", e);
}
}
public void close() {
try {
if (connection != null) {
connection.close();
}
if (client != null) {
Hive.closeCurrent();
client = null;
}
} catch (SQLException e) {
LOG.error("Could not close connection ", e);
}
}
List<String> getPartitionsWrittenToSince(Option<String> lastCommitTimeSynced) {
if (!lastCommitTimeSynced.isPresent()) {
LOG.info("Last commit time synced is not known, listing all partitions in " + syncConfig.basePath + ",FS :" + fs);
try {
return FSUtils.getAllPartitionPaths(fs, syncConfig.basePath, syncConfig.assumeDatePartitioning);
} catch (IOException e) {
throw new HoodieIOException("Failed to list all partitions in " + syncConfig.basePath, e);
}
} else {
LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then");
HoodieTimeline timelineToSync = activeTimeline.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE);
return timelineToSync.getInstants().map(s -> {
try {
return HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(s).get(), HoodieCommitMetadata.class);
} catch (IOException e) {
throw new HoodieIOException("Failed to get partitions written since " + lastCommitTimeSynced, e);
}
}).flatMap(s -> s.getPartitionToWriteStats().keySet().stream()).distinct().collect(Collectors.toList());
}
}
List<String> getAllTables(String db) throws Exception {
return client.getAllTables(db);
}
void updateLastCommitTimeSynced(String tableName) {
// Set the last commit time from the TBLproperties
String lastCommitSynced = activeTimeline.lastInstant().get().getTimestamp();
try {
Table table = client.getTable(syncConfig.databaseName, tableName);
table.putToParameters(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitSynced);
client.alter_table(syncConfig.databaseName, tableName, table);
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to get update last commit time synced to " + lastCommitSynced, e);
}
}
/**
* Partition Event captures any partition that needs to be added or updated.
*/
static class PartitionEvent {
public enum PartitionEventType {
ADD, UPDATE
}
PartitionEventType eventType;
String storagePartition;
PartitionEvent(PartitionEventType eventType, String storagePartition) {
this.eventType = eventType;
this.storagePartition = storagePartition;
}
static PartitionEvent newPartitionAddEvent(String storagePartition) {
return new PartitionEvent(PartitionEventType.ADD, storagePartition);
}
static PartitionEvent newPartitionUpdateEvent(String storagePartition) {
return new PartitionEvent(PartitionEventType.UPDATE, storagePartition);
}
}
}

View File

@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hive;
public class HoodieHiveSyncException extends RuntimeException {
public HoodieHiveSyncException() {
super();
}
public HoodieHiveSyncException(String message) {
super(message);
}
public HoodieHiveSyncException(String message, Throwable t) {
super(message, t);
}
public HoodieHiveSyncException(Throwable t) {
super(t);
}
protected static String format(String message, Object... args) {
return String.format(String.valueOf(message), (Object[]) args);
}
}

View File

@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hive;
import com.google.common.base.Preconditions;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
/**
* Partition Key extractor treating each value delimited by slash as separate key.
*/
public class MultiPartKeysValueExtractor implements PartitionValueExtractor {
@Override
public List<String> extractPartitionValuesInPath(String partitionPath) {
String[] splits = partitionPath.split("/");
return Arrays.stream(splits).map(s -> {
if (s.contains("=")) {
String[] moreSplit = s.split("=");
Preconditions.checkArgument(moreSplit.length == 2, "Partition Field (" + s + ") not in expected format");
return moreSplit[1];
}
return s;
}).collect(Collectors.toList());
}
}

View File

@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hive;
import java.util.ArrayList;
import java.util.List;
/**
* Extractor for Non-partitioned hive tables.
*/
public class NonPartitionedExtractor implements PartitionValueExtractor {
@Override
public List<String> extractPartitionValuesInPath(String partitionPath) {
return new ArrayList<>();
}
}

View File

@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hive;
import java.io.Serializable;
import java.util.List;
/**
* HDFS Path contain hive partition values for the keys it is partitioned on. This mapping is not straight forward and
* requires a pluggable implementation to extract the partition value from HDFS path.
* <p>
* e.g. Hive table partitioned by datestr=yyyy-mm-dd and hdfs path /app/hoodie/dataset1/YYYY=[yyyy]/MM=[mm]/DD=[dd]
*/
public interface PartitionValueExtractor extends Serializable {
List<String> extractPartitionValuesInPath(String partitionPath);
}

View File

@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hive;
import org.apache.parquet.schema.MessageType;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
/**
* Represents the schema difference between the storage schema and hive table schema.
*/
public class SchemaDifference {
private final MessageType storageSchema;
private final Map<String, String> tableSchema;
private final List<String> deleteColumns;
private final Map<String, String> updateColumnTypes;
private final Map<String, String> addColumnTypes;
private SchemaDifference(MessageType storageSchema, Map<String, String> tableSchema, List<String> deleteColumns,
Map<String, String> updateColumnTypes, Map<String, String> addColumnTypes) {
this.storageSchema = storageSchema;
this.tableSchema = tableSchema;
this.deleteColumns = Collections.unmodifiableList(deleteColumns);
this.updateColumnTypes = Collections.unmodifiableMap(updateColumnTypes);
this.addColumnTypes = Collections.unmodifiableMap(addColumnTypes);
}
public List<String> getDeleteColumns() {
return deleteColumns;
}
public Map<String, String> getUpdateColumnTypes() {
return updateColumnTypes;
}
public Map<String, String> getAddColumnTypes() {
return addColumnTypes;
}
public static Builder newBuilder(MessageType storageSchema, Map<String, String> tableSchema) {
return new Builder(storageSchema, tableSchema);
}
public boolean isEmpty() {
return deleteColumns.isEmpty() && updateColumnTypes.isEmpty() && addColumnTypes.isEmpty();
}
@Override
public String toString() {
return new StringJoiner(", ", SchemaDifference.class.getSimpleName() + "[", "]")
.add("storageSchema=" + storageSchema)
.add("tableSchema=" + tableSchema)
.add("deleteColumns=" + deleteColumns)
.add("updateColumnTypes=" + updateColumnTypes)
.add("addColumnTypes=" + addColumnTypes)
.toString();
}
public static class Builder {
private final MessageType storageSchema;
private final Map<String, String> tableSchema;
private List<String> deleteColumns;
private Map<String, String> updateColumnTypes;
private Map<String, String> addColumnTypes;
public Builder(MessageType storageSchema, Map<String, String> tableSchema) {
this.storageSchema = storageSchema;
this.tableSchema = tableSchema;
deleteColumns = new ArrayList<>();
updateColumnTypes = new HashMap<>();
addColumnTypes = new HashMap<>();
}
public Builder deleteTableColumn(String column) {
deleteColumns.add(column);
return this;
}
public Builder updateTableColumn(String column, String storageColumnType) {
updateColumnTypes.put(column, storageColumnType);
return this;
}
public Builder addTableColumn(String name, String type) {
addColumnTypes.put(name, type);
return this;
}
public SchemaDifference build() {
return new SchemaDifference(storageSchema, tableSchema, deleteColumns, updateColumnTypes, addColumnTypes);
}
}
}

View File

@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hive;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.List;
/**
* HDFS Path contain hive partition values for the keys it is partitioned on. This mapping is not straight forward and
* requires a pluggable implementation to extract the partition value from HDFS path.
* <p>
* This implementation extracts datestr=yyyy-mm-dd from path of type /yyyy/mm/dd
*/
public class SlashEncodedDayPartitionValueExtractor implements PartitionValueExtractor {
private transient DateTimeFormatter dtfOut;
public SlashEncodedDayPartitionValueExtractor() {
this.dtfOut = DateTimeFormat.forPattern("yyyy-MM-dd");
}
private DateTimeFormatter getDtfOut() {
if (dtfOut == null) {
dtfOut = DateTimeFormat.forPattern("yyyy-MM-dd");
}
return dtfOut;
}
@Override
public List<String> extractPartitionValuesInPath(String partitionPath) {
// partition path is expected to be in this format yyyy/mm/dd
String[] splits = partitionPath.split("/");
if (splits.length != 3) {
throw new IllegalArgumentException("Partition path " + partitionPath + " is not in the form yyyy/mm/dd ");
}
// Get the partition part and remove the / as well at the end
int year = Integer.parseInt(splits[0].contains("=") ? splits[0].split("=")[1] : splits[0]);
int mm = Integer.parseInt(splits[1].contains("=") ? splits[1].split("=")[1] : splits[1]);
int dd = Integer.parseInt(splits[2].contains("=") ? splits[2].split("=")[1] : splits[2]);
DateTime dateTime = new DateTime(year, mm, dd, 0, 0);
return Collections.singletonList(getDtfOut().print(dateTime));
}
}

View File

@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hive.util;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
public class ColumnNameXLator {
private static Map<String, String> xformMap = new HashMap<>();
public static String translateNestedColumn(String colName) {
Map.Entry<String,String> entry;
for (Iterator<Map.Entry<String, String>> ic = xformMap.entrySet().iterator(); ic.hasNext(); colName =
colName.replaceAll(entry.getKey(), entry.getValue())) {
entry = ic.next();
}
return colName;
}
public static String translateColumn(String colName) {
return colName;
}
public static String translate(String colName, boolean nestedColumn) {
return !nestedColumn ? translateColumn(colName) : translateNestedColumn(colName);
}
static {
xformMap.put("\\$", "_dollar_");
}
}

View File

@@ -0,0 +1,447 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hive.util;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncException;
import org.apache.hudi.hive.SchemaDifference;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.schema.DecimalMetadata;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Schema Utilities.
*/
public class SchemaUtil {
private static final Logger LOG = LogManager.getLogger(SchemaUtil.class);
public static final String HIVE_ESCAPE_CHARACTER = "`";
/**
* Get the schema difference between the storage schema and hive table schema.
*/
public static SchemaDifference getSchemaDifference(MessageType storageSchema, Map<String, String> tableSchema,
List<String> partitionKeys) {
Map<String, String> newTableSchema;
try {
newTableSchema = convertParquetSchemaToHiveSchema(storageSchema);
} catch (IOException e) {
throw new HoodieHiveSyncException("Failed to convert parquet schema to hive schema", e);
}
LOG.info("Getting schema difference for " + tableSchema + "\r\n\r\n" + newTableSchema);
SchemaDifference.Builder schemaDiffBuilder = SchemaDifference.newBuilder(storageSchema, tableSchema);
Set<String> tableColumns = new HashSet<>();
for (Map.Entry<String, String> field : tableSchema.entrySet()) {
String fieldName = field.getKey().toLowerCase();
String tickSurroundedFieldName = tickSurround(fieldName);
if (!isFieldExistsInSchema(newTableSchema, tickSurroundedFieldName) && !partitionKeys.contains(fieldName)) {
schemaDiffBuilder.deleteTableColumn(fieldName);
} else {
// check type
String tableColumnType = field.getValue();
if (!isFieldExistsInSchema(newTableSchema, tickSurroundedFieldName)) {
if (partitionKeys.contains(fieldName)) {
// Partition key does not have to be part of the storage schema
continue;
}
// We will log this and continue. Hive schema is a superset of all parquet schemas
LOG.warn("Ignoring table column " + fieldName + " as its not present in the parquet schema");
continue;
}
tableColumnType = tableColumnType.replaceAll("\\s+", "");
String expectedType = getExpectedType(newTableSchema, tickSurroundedFieldName);
expectedType = expectedType.replaceAll("\\s+", "");
expectedType = expectedType.replaceAll("`", "");
if (!tableColumnType.equalsIgnoreCase(expectedType)) {
// check for incremental queries, the schema type change is allowed as per evolution
// rules
if (!isSchemaTypeUpdateAllowed(tableColumnType, expectedType)) {
throw new HoodieHiveSyncException("Could not convert field Type from " + tableColumnType + " to "
+ expectedType + " for field " + fieldName);
}
schemaDiffBuilder.updateTableColumn(fieldName, getExpectedType(newTableSchema, tickSurroundedFieldName));
}
}
tableColumns.add(tickSurroundedFieldName);
}
for (Map.Entry<String, String> entry : newTableSchema.entrySet()) {
if (!tableColumns.contains(entry.getKey().toLowerCase())) {
schemaDiffBuilder.addTableColumn(entry.getKey(), entry.getValue());
}
}
LOG.info("Difference between schemas: " + schemaDiffBuilder.build().toString());
return schemaDiffBuilder.build();
}
private static String getExpectedType(Map<String, String> newTableSchema, String fieldName) {
for (Map.Entry<String, String> entry : newTableSchema.entrySet()) {
if (entry.getKey().toLowerCase().equals(fieldName)) {
return entry.getValue();
}
}
return null;
}
private static boolean isFieldExistsInSchema(Map<String, String> newTableSchema, String fieldName) {
for (String entry : newTableSchema.keySet()) {
if (entry.toLowerCase().equals(fieldName)) {
return true;
}
}
return false;
}
/**
* Returns equivalent Hive table schema read from a parquet file.
*
* @param messageType : Parquet Schema
* @return : Hive Table schema read from parquet file MAP[String,String]
*/
public static Map<String, String> convertParquetSchemaToHiveSchema(MessageType messageType) throws IOException {
Map<String, String> schema = new LinkedHashMap<>();
List<Type> parquetFields = messageType.getFields();
for (Type parquetType : parquetFields) {
StringBuilder result = new StringBuilder();
String key = parquetType.getName();
if (parquetType.isRepetition(Type.Repetition.REPEATED)) {
result.append(createHiveArray(parquetType, ""));
} else {
result.append(convertField(parquetType));
}
schema.put(hiveCompatibleFieldName(key, false), result.toString());
}
return schema;
}
/**
* Convert one field data type of parquet schema into an equivalent Hive schema.
*
* @param parquetType : Single paruet field
* @return : Equivalent sHive schema
*/
private static String convertField(final Type parquetType) {
StringBuilder field = new StringBuilder();
if (parquetType.isPrimitive()) {
final PrimitiveType.PrimitiveTypeName parquetPrimitiveTypeName =
parquetType.asPrimitiveType().getPrimitiveTypeName();
final OriginalType originalType = parquetType.getOriginalType();
if (originalType == OriginalType.DECIMAL) {
final DecimalMetadata decimalMetadata = parquetType.asPrimitiveType().getDecimalMetadata();
return field.append("DECIMAL(").append(decimalMetadata.getPrecision()).append(" , ")
.append(decimalMetadata.getScale()).append(")").toString();
} else if (originalType == OriginalType.DATE) {
return field.append("DATE").toString();
}
// TODO - fix the method naming here
return parquetPrimitiveTypeName.convert(new PrimitiveType.PrimitiveTypeNameConverter<String, RuntimeException>() {
@Override
public String convertBOOLEAN(PrimitiveType.PrimitiveTypeName primitiveTypeName) {
return "boolean";
}
@Override
public String convertINT32(PrimitiveType.PrimitiveTypeName primitiveTypeName) {
return "int";
}
@Override
public String convertINT64(PrimitiveType.PrimitiveTypeName primitiveTypeName) {
return "bigint";
}
@Override
public String convertINT96(PrimitiveType.PrimitiveTypeName primitiveTypeName) {
return "timestamp-millis";
}
@Override
public String convertFLOAT(PrimitiveType.PrimitiveTypeName primitiveTypeName) {
return "float";
}
@Override
public String convertDOUBLE(PrimitiveType.PrimitiveTypeName primitiveTypeName) {
return "double";
}
@Override
public String convertFIXED_LEN_BYTE_ARRAY(PrimitiveType.PrimitiveTypeName primitiveTypeName) {
return "binary";
}
@Override
public String convertBINARY(PrimitiveType.PrimitiveTypeName primitiveTypeName) {
if (originalType == OriginalType.UTF8 || originalType == OriginalType.ENUM) {
return "string";
} else {
return "binary";
}
}
});
} else {
GroupType parquetGroupType = parquetType.asGroupType();
OriginalType originalType = parquetGroupType.getOriginalType();
if (originalType != null) {
switch (originalType) {
case LIST:
if (parquetGroupType.getFieldCount() != 1) {
throw new UnsupportedOperationException("Invalid list type " + parquetGroupType);
}
Type elementType = parquetGroupType.getType(0);
if (!elementType.isRepetition(Type.Repetition.REPEATED)) {
throw new UnsupportedOperationException("Invalid list type " + parquetGroupType);
}
return createHiveArray(elementType, parquetGroupType.getName());
case MAP:
if (parquetGroupType.getFieldCount() != 1 || parquetGroupType.getType(0).isPrimitive()) {
throw new UnsupportedOperationException("Invalid map type " + parquetGroupType);
}
GroupType mapKeyValType = parquetGroupType.getType(0).asGroupType();
if (!mapKeyValType.isRepetition(Type.Repetition.REPEATED)
|| !mapKeyValType.getOriginalType().equals(OriginalType.MAP_KEY_VALUE)
|| mapKeyValType.getFieldCount() != 2) {
throw new UnsupportedOperationException("Invalid map type " + parquetGroupType);
}
Type keyType = mapKeyValType.getType(0);
if (!keyType.isPrimitive()
|| !keyType.asPrimitiveType().getPrimitiveTypeName().equals(PrimitiveType.PrimitiveTypeName.BINARY)
|| !keyType.getOriginalType().equals(OriginalType.UTF8)) {
throw new UnsupportedOperationException("Map key type must be binary (UTF8): " + keyType);
}
Type valueType = mapKeyValType.getType(1);
return createHiveMap(convertField(keyType), convertField(valueType));
case ENUM:
case UTF8:
return "string";
case MAP_KEY_VALUE:
// MAP_KEY_VALUE was supposed to be used to annotate key and
// value group levels in a
// MAP. However, that is always implied by the structure of
// MAP. Hence, PARQUET-113
// dropped the requirement for having MAP_KEY_VALUE.
default:
throw new UnsupportedOperationException("Cannot convert Parquet type " + parquetType);
}
} else {
// if no original type then it's a record
return createHiveStruct(parquetGroupType.getFields());
}
}
}
/**
* Return a 'struct' Hive schema from a list of Parquet fields.
*
* @param parquetFields : list of parquet fields
* @return : Equivalent 'struct' Hive schema
*/
private static String createHiveStruct(List<Type> parquetFields) {
StringBuilder struct = new StringBuilder();
struct.append("STRUCT< ");
for (Type field : parquetFields) {
// TODO: struct field name is only translated to support special char($)
// We will need to extend it to other collection type
struct.append(hiveCompatibleFieldName(field.getName(), true)).append(" : ");
struct.append(convertField(field)).append(", ");
}
struct.delete(struct.length() - 2, struct.length()); // Remove the last
// ", "
struct.append(">");
String finalStr = struct.toString();
// Struct cannot have - in them. userstore_udr_entities has uuid in struct. This breaks the
// schema.
// HDrone sync should not fail because of this.
finalStr = finalStr.replaceAll("-", "_");
return finalStr;
}
private static String hiveCompatibleFieldName(String fieldName, boolean isNested) {
String result = fieldName;
if (isNested) {
result = ColumnNameXLator.translateNestedColumn(fieldName);
}
return tickSurround(result);
}
private static String tickSurround(String result) {
if (!result.startsWith("`")) {
result = "`" + result;
}
if (!result.endsWith("`")) {
result = result + "`";
}
return result;
}
private static String removeSurroundingTick(String result) {
if (result.startsWith("`") && result.endsWith("`")) {
result = result.substring(1, result.length() - 1);
}
return result;
}
/**
* Create a 'Map' schema from Parquet map field.
*/
private static String createHiveMap(String keyType, String valueType) {
return "MAP< " + keyType + ", " + valueType + ">";
}
/**
* Create an Array Hive schema from equivalent parquet list type.
*/
private static String createHiveArray(Type elementType, String elementName) {
StringBuilder array = new StringBuilder();
array.append("ARRAY< ");
if (elementType.isPrimitive()) {
array.append(convertField(elementType));
} else {
final GroupType groupType = elementType.asGroupType();
final List<Type> groupFields = groupType.getFields();
if (groupFields.size() > 1 || (groupFields.size() == 1
&& (elementType.getName().equals("array") || elementType.getName().equals(elementName + "_tuple")))) {
array.append(convertField(elementType));
} else {
array.append(convertField(groupType.getFields().get(0)));
}
}
array.append(">");
return array.toString();
}
public static boolean isSchemaTypeUpdateAllowed(String prevType, String newType) {
if (prevType == null || prevType.trim().isEmpty() || newType == null || newType.trim().isEmpty()) {
return false;
}
prevType = prevType.toLowerCase();
newType = newType.toLowerCase();
if (prevType.equals(newType)) {
return true;
} else if (prevType.equalsIgnoreCase("int") && newType.equalsIgnoreCase("bigint")) {
return true;
} else if (prevType.equalsIgnoreCase("float") && newType.equalsIgnoreCase("double")) {
return true;
} else {
return prevType.contains("struct") && newType.toLowerCase().contains("struct");
}
}
public static String generateSchemaString(MessageType storageSchema) throws IOException {
return generateSchemaString(storageSchema, new ArrayList<>());
}
public static String generateSchemaString(MessageType storageSchema, List<String> colsToSkip) throws IOException {
Map<String, String> hiveSchema = convertParquetSchemaToHiveSchema(storageSchema);
StringBuilder columns = new StringBuilder();
for (Map.Entry<String, String> hiveSchemaEntry : hiveSchema.entrySet()) {
if (!colsToSkip.contains(removeSurroundingTick(hiveSchemaEntry.getKey()))) {
columns.append(hiveSchemaEntry.getKey()).append(" ");
columns.append(hiveSchemaEntry.getValue()).append(", ");
}
}
// Remove the last ", "
columns.delete(columns.length() - 2, columns.length());
return columns.toString();
}
public static String generateCreateDDL(String tableName, MessageType storageSchema, HiveSyncConfig config, String inputFormatClass,
String outputFormatClass, String serdeClass) throws IOException {
Map<String, String> hiveSchema = convertParquetSchemaToHiveSchema(storageSchema);
String columns = generateSchemaString(storageSchema, config.partitionFields);
List<String> partitionFields = new ArrayList<>();
for (String partitionKey : config.partitionFields) {
String partitionKeyWithTicks = tickSurround(partitionKey);
partitionFields.add(new StringBuilder().append(partitionKeyWithTicks).append(" ")
.append(getPartitionKeyType(hiveSchema, partitionKeyWithTicks)).toString());
}
String partitionsStr = String.join(",", partitionFields);
StringBuilder sb = new StringBuilder("CREATE EXTERNAL TABLE IF NOT EXISTS ");
sb.append(HIVE_ESCAPE_CHARACTER).append(config.databaseName).append(HIVE_ESCAPE_CHARACTER)
.append(".").append(HIVE_ESCAPE_CHARACTER).append(tableName).append(HIVE_ESCAPE_CHARACTER);
sb.append("( ").append(columns).append(")");
if (!config.partitionFields.isEmpty()) {
sb.append(" PARTITIONED BY (").append(partitionsStr).append(")");
}
sb.append(" ROW FORMAT SERDE '").append(serdeClass).append("'");
sb.append(" STORED AS INPUTFORMAT '").append(inputFormatClass).append("'");
sb.append(" OUTPUTFORMAT '").append(outputFormatClass).append("' LOCATION '").append(config.basePath).append("'");
return sb.toString();
}
private static String getPartitionKeyType(Map<String, String> hiveSchema, String partitionKey) {
if (hiveSchema.containsKey(partitionKey)) {
return hiveSchema.get(partitionKey);
}
// Default the unknown partition fields to be String
// TODO - all partition fields should be part of the schema. datestr is treated as special.
// Dont do that
return "String";
}
/**
* Read the schema from the log file on path.
*
* @return
*/
public static MessageType readSchemaFromLogFile(FileSystem fs, Path path) throws IOException {
Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null);
HoodieAvroDataBlock lastBlock = null;
while (reader.hasNext()) {
HoodieLogBlock block = reader.next();
if (block instanceof HoodieAvroDataBlock) {
lastBlock = (HoodieAvroDataBlock) block;
}
}
reader.close();
if (lastBlock != null) {
return new AvroSchemaConverter().convert(lastBlock.getSchema());
}
return null;
}
}

View File

@@ -0,0 +1,366 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hive;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SchemaTestUtil;
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent;
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent.PartitionEventType;
import org.apache.hudi.hive.util.SchemaUtil;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
public class TestHiveSyncTool {
// Test sync tool using both jdbc and metastore client
private boolean useJdbc;
public TestHiveSyncTool(Boolean useJdbc) {
this.useJdbc = useJdbc;
}
@Parameterized.Parameters(name = "UseJdbc")
public static Collection<Boolean[]> data() {
return Arrays.asList(new Boolean[][] {{false}, {true}});
}
@Before
public void setUp() throws IOException, InterruptedException {
TestUtil.setUp();
}
@After
public void teardown() throws IOException {
TestUtil.clear();
}
/**
* Testing converting array types to Hive field declaration strings. According to the Parquet-113 spec:
* https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
*/
@Test
public void testSchemaConvertArray() throws IOException {
// Testing the 3-level annotation structure
MessageType schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
.optional(PrimitiveType.PrimitiveTypeName.INT32).named("element").named("list").named("int_list")
.named("ArrayOfInts");
String schemaString = SchemaUtil.generateSchemaString(schema);
assertEquals("`int_list` ARRAY< int>", schemaString);
// A array of arrays
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup().requiredGroup()
.as(OriginalType.LIST).repeatedGroup().required(PrimitiveType.PrimitiveTypeName.INT32).named("element")
.named("list").named("element").named("list").named("int_list_list").named("ArrayOfArrayOfInts");
schemaString = SchemaUtil.generateSchemaString(schema);
assertEquals("`int_list_list` ARRAY< ARRAY< int>>", schemaString);
// A list of integers
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeated(PrimitiveType.PrimitiveTypeName.INT32)
.named("element").named("int_list").named("ArrayOfInts");
schemaString = SchemaUtil.generateSchemaString(schema);
assertEquals("`int_list` ARRAY< int>", schemaString);
// A list of structs with two fields
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").required(PrimitiveType.PrimitiveTypeName.INT32)
.named("num").named("element").named("tuple_list").named("ArrayOfTuples");
schemaString = SchemaUtil.generateSchemaString(schema);
assertEquals("`tuple_list` ARRAY< STRUCT< `str` : binary, `num` : int>>", schemaString);
// A list of structs with a single field
// For this case, since the inner group name is "array", we treat the
// element type as a one-element struct.
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("array").named("one_tuple_list")
.named("ArrayOfOneTuples");
schemaString = SchemaUtil.generateSchemaString(schema);
assertEquals("`one_tuple_list` ARRAY< STRUCT< `str` : binary>>", schemaString);
// A list of structs with a single field
// For this case, since the inner group name ends with "_tuple", we also treat the
// element type as a one-element struct.
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("one_tuple_list_tuple")
.named("one_tuple_list").named("ArrayOfOneTuples2");
schemaString = SchemaUtil.generateSchemaString(schema);
assertEquals("`one_tuple_list` ARRAY< STRUCT< `str` : binary>>", schemaString);
// A list of structs with a single field
// Unlike the above two cases, for this the element type is the type of the
// only field in the struct.
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("one_tuple_list").named("one_tuple_list")
.named("ArrayOfOneTuples3");
schemaString = SchemaUtil.generateSchemaString(schema);
assertEquals("`one_tuple_list` ARRAY< binary>", schemaString);
// A list of maps
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup().as(OriginalType.MAP)
.repeatedGroup().as(OriginalType.MAP_KEY_VALUE).required(PrimitiveType.PrimitiveTypeName.BINARY)
.as(OriginalType.UTF8).named("string_key").required(PrimitiveType.PrimitiveTypeName.INT32).named("int_value")
.named("key_value").named("array").named("map_list").named("ArrayOfMaps");
schemaString = SchemaUtil.generateSchemaString(schema);
assertEquals("`map_list` ARRAY< MAP< string, int>>", schemaString);
}
@Test
public void testBasicSync() throws Exception {
TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
String commitTime = "100";
TestUtil.createCOWTable(commitTime, 5);
HoodieHiveClient hiveClient =
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
assertFalse("Table " + TestUtil.hiveSyncConfig.tableName + " should not exist initially",
hiveClient.doesTableExist(TestUtil.hiveSyncConfig.tableName));
// Lets do the sync
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
tool.syncHoodieTable();
assertTrue("Table " + TestUtil.hiveSyncConfig.tableName + " should exist after sync completes",
hiveClient.doesTableExist(TestUtil.hiveSyncConfig.tableName));
assertEquals("Hive Schema should match the table schema + partition field",
hiveClient.getTableSchema(TestUtil.hiveSyncConfig.tableName).size(),
hiveClient.getDataSchema().getColumns().size() + 1);
assertEquals("Table partitions should match the number of partitions we wrote", 5,
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", commitTime,
hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get());
}
@Test
public void testSyncIncremental() throws Exception {
TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
String commitTime1 = "100";
TestUtil.createCOWTable(commitTime1, 5);
HoodieHiveClient hiveClient =
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
// Lets do the sync
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
tool.syncHoodieTable();
assertEquals("Table partitions should match the number of partitions we wrote", 5,
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", commitTime1,
hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get());
// Now lets create more parititions and these are the only ones which needs to be synced
DateTime dateTime = DateTime.now().plusDays(6);
String commitTime2 = "101";
TestUtil.addCOWPartitions(1, true, dateTime, commitTime2);
// Lets do the sync
hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(commitTime1));
assertEquals("We should have one partition written after 100 commit", 1, writtenPartitionsSince.size());
List<Partition> hivePartitions = hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName);
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
assertEquals("There should be only one paritition event", 1, partitionEvents.size());
assertEquals("The one partition event must of type ADD", PartitionEventType.ADD,
partitionEvents.iterator().next().eventType);
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
tool.syncHoodieTable();
// Sync should add the one partition
assertEquals("The one partition we wrote should be added to hive", 6,
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
assertEquals("The last commit that was sycned should be 101", commitTime2,
hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get());
}
@Test
public void testSyncIncrementalWithSchemaEvolution() throws Exception {
TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
String commitTime1 = "100";
TestUtil.createCOWTable(commitTime1, 5);
HoodieHiveClient hiveClient =
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
// Lets do the sync
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
tool.syncHoodieTable();
int fields = hiveClient.getTableSchema(TestUtil.hiveSyncConfig.tableName).size();
// Now lets create more parititions and these are the only ones which needs to be synced
DateTime dateTime = DateTime.now().plusDays(6);
String commitTime2 = "101";
TestUtil.addCOWPartitions(1, false, dateTime, commitTime2);
// Lets do the sync
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
tool.syncHoodieTable();
assertEquals("Hive Schema has evolved and should not be 3 more field", fields + 3,
hiveClient.getTableSchema(TestUtil.hiveSyncConfig.tableName).size());
assertEquals("Hive Schema has evolved - Field favorite_number has evolved from int to long", "BIGINT",
hiveClient.getTableSchema(TestUtil.hiveSyncConfig.tableName).get("favorite_number"));
assertTrue("Hive Schema has evolved - Field favorite_movie was added",
hiveClient.getTableSchema(TestUtil.hiveSyncConfig.tableName).containsKey("favorite_movie"));
// Sync should add the one partition
assertEquals("The one partition we wrote should be added to hive", 6,
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
assertEquals("The last commit that was sycned should be 101", commitTime2,
hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get());
}
@Test
public void testSyncMergeOnRead() throws Exception {
TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
String commitTime = "100";
String deltaCommitTime = "101";
TestUtil.createMORTable(commitTime, deltaCommitTime, 5);
String roTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
assertFalse("Table " + TestUtil.hiveSyncConfig.tableName + " should not exist initially", hiveClient.doesTableExist(roTableName));
// Lets do the sync
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
tool.syncHoodieTable();
assertTrue("Table " + roTableName + " should exist after sync completes",
hiveClient.doesTableExist(roTableName));
assertEquals("Hive Schema should match the table schema + partition field", hiveClient.getTableSchema(roTableName).size(),
SchemaTestUtil.getSimpleSchema().getFields().size() + 1);
assertEquals("Table partitions should match the number of partitions we wrote", 5,
hiveClient.scanTablePartitions(roTableName).size());
assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", deltaCommitTime,
hiveClient.getLastCommitTimeSynced(roTableName).get());
// Now lets create more partitions and these are the only ones which needs to be synced
DateTime dateTime = DateTime.now().plusDays(6);
String commitTime2 = "102";
String deltaCommitTime2 = "103";
TestUtil.addCOWPartitions(1, true, dateTime, commitTime2);
TestUtil.addMORPartitions(1, true, false, dateTime, commitTime2, deltaCommitTime2);
// Lets do the sync
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
tool.syncHoodieTable();
hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
assertEquals("Hive Schema should match the evolved table schema + partition field",
hiveClient.getTableSchema(roTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1);
// Sync should add the one partition
assertEquals("The 2 partitions we wrote should be added to hive", 6, hiveClient.scanTablePartitions(roTableName).size());
assertEquals("The last commit that was synced should be 103", deltaCommitTime2,
hiveClient.getLastCommitTimeSynced(roTableName).get());
}
@Test
public void testSyncMergeOnReadRT() throws Exception {
TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
String commitTime = "100";
String deltaCommitTime = "101";
String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
TestUtil.createMORTable(commitTime, deltaCommitTime, 5);
HoodieHiveClient hiveClientRT =
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
assertFalse("Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
+ " should not exist initially", hiveClientRT.doesTableExist(snapshotTableName));
// Lets do the sync
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
tool.syncHoodieTable();
assertTrue("Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
+ " should exist after sync completes", hiveClientRT.doesTableExist(snapshotTableName));
assertEquals("Hive Schema should match the table schema + partition field", hiveClientRT.getTableSchema(snapshotTableName).size(),
SchemaTestUtil.getSimpleSchema().getFields().size() + 1);
assertEquals("Table partitions should match the number of partitions we wrote", 5,
hiveClientRT.scanTablePartitions(snapshotTableName).size());
assertEquals("The last commit that was synced should be updated in the TBLPROPERTIES", deltaCommitTime,
hiveClientRT.getLastCommitTimeSynced(snapshotTableName).get());
// Now lets create more parititions and these are the only ones which needs to be synced
DateTime dateTime = DateTime.now().plusDays(6);
String commitTime2 = "102";
String deltaCommitTime2 = "103";
TestUtil.addCOWPartitions(1, true, dateTime, commitTime2);
TestUtil.addMORPartitions(1, true, false, dateTime, commitTime2, deltaCommitTime2);
// Lets do the sync
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
tool.syncHoodieTable();
hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
assertEquals("Hive Schema should match the evolved table schema + partition field",
hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1);
// Sync should add the one partition
assertEquals("The 2 partitions we wrote should be added to hive", 6, hiveClientRT.scanTablePartitions(snapshotTableName).size());
assertEquals("The last commit that was sycned should be 103", deltaCommitTime2,
hiveClientRT.getLastCommitTimeSynced(snapshotTableName).get());
}
@Test
public void testMultiPartitionKeySync() throws Exception {
TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
String commitTime = "100";
TestUtil.createCOWTable(commitTime, 5);
HiveSyncConfig hiveSyncConfig = HiveSyncConfig.copy(TestUtil.hiveSyncConfig);
hiveSyncConfig.partitionValueExtractorClass = MultiPartKeysValueExtractor.class.getCanonicalName();
hiveSyncConfig.tableName = "multi_part_key";
hiveSyncConfig.partitionFields = Arrays.asList("year", "month", "day");
TestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
assertFalse("Table " + hiveSyncConfig.tableName + " should not exist initially",
hiveClient.doesTableExist(hiveSyncConfig.tableName));
// Lets do the sync
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
tool.syncHoodieTable();
assertTrue("Table " + hiveSyncConfig.tableName + " should exist after sync completes",
hiveClient.doesTableExist(hiveSyncConfig.tableName));
assertEquals("Hive Schema should match the table schema + partition fields",
hiveClient.getTableSchema(hiveSyncConfig.tableName).size(),
hiveClient.getDataSchema().getColumns().size() + 3);
assertEquals("Table partitions should match the number of partitions we wrote", 5,
hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size());
assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", commitTime,
hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get());
}
}

View File

@@ -0,0 +1,348 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hive;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.bloom.filter.BloomFilter;
import org.apache.hudi.common.bloom.filter.BloomFilterFactory;
import org.apache.hudi.common.bloom.filter.BloomFilterTypeCode;
import org.apache.hudi.common.minicluster.HdfsTestService;
import org.apache.hudi.common.minicluster.ZookeeperTestService;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.SchemaTestUtil;
import org.apache.hudi.hive.util.HiveTestService;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.service.server.HiveServer2;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.junit.runners.model.InitializationError;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import static org.junit.Assert.fail;
@SuppressWarnings("SameParameterValue")
public class TestUtil {
private static MiniDFSCluster dfsCluster;
private static ZooKeeperServer zkServer;
private static HiveServer2 hiveServer;
private static Configuration configuration;
static HiveSyncConfig hiveSyncConfig;
private static DateTimeFormatter dtfOut;
static FileSystem fileSystem;
private static Set<String> createdTablesSet = new HashSet<>();
public static void setUp() throws IOException, InterruptedException {
if (dfsCluster == null) {
HdfsTestService service = new HdfsTestService();
dfsCluster = service.start(true);
configuration = service.getHadoopConf();
}
if (zkServer == null) {
ZookeeperTestService zkService = new ZookeeperTestService(configuration);
zkServer = zkService.start();
}
if (hiveServer == null) {
HiveTestService hiveService = new HiveTestService(configuration);
hiveServer = hiveService.start();
}
fileSystem = FileSystem.get(configuration);
hiveSyncConfig = new HiveSyncConfig();
hiveSyncConfig.jdbcUrl = "jdbc:hive2://127.0.0.1:9999/";
hiveSyncConfig.databaseName = "hdrone_test";
hiveSyncConfig.hiveUser = "";
hiveSyncConfig.hivePass = "";
hiveSyncConfig.databaseName = "testdb";
hiveSyncConfig.tableName = "test1";
hiveSyncConfig.basePath = "/tmp/hdfs/TestHiveSyncTool/";
hiveSyncConfig.assumeDatePartitioning = true;
hiveSyncConfig.usePreApacheInputFormat = false;
hiveSyncConfig.partitionFields = Collections.singletonList("datestr");
dtfOut = DateTimeFormat.forPattern("yyyy/MM/dd");
clear();
}
static void clear() throws IOException {
fileSystem.delete(new Path(hiveSyncConfig.basePath), true);
HoodieTableMetaClient.initTableType(configuration, hiveSyncConfig.basePath, HoodieTableType.COPY_ON_WRITE,
hiveSyncConfig.tableName, HoodieAvroPayload.class.getName());
HoodieHiveClient client = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(), fileSystem);
for (String tableName : createdTablesSet) {
client.updateHiveSQL("drop table if exists " + tableName);
}
createdTablesSet.clear();
client.updateHiveSQL("drop database if exists " + hiveSyncConfig.databaseName);
client.updateHiveSQL("create database " + hiveSyncConfig.databaseName);
}
static HiveConf getHiveConf() {
return hiveServer.getHiveConf();
}
@SuppressWarnings("unused")
public static void shutdown() {
if (hiveServer != null) {
hiveServer.stop();
}
if (dfsCluster != null) {
dfsCluster.shutdown();
}
if (zkServer != null) {
zkServer.shutdown();
}
}
static void createCOWTable(String commitTime, int numberOfPartitions)
throws IOException, InitializationError, URISyntaxException {
Path path = new Path(hiveSyncConfig.basePath);
FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
HoodieTableMetaClient.initTableType(configuration, hiveSyncConfig.basePath, HoodieTableType.COPY_ON_WRITE,
hiveSyncConfig.tableName, HoodieAvroPayload.class.getName());
boolean result = fileSystem.mkdirs(path);
checkResult(result);
DateTime dateTime = DateTime.now();
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, commitTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createCommitFile(commitMetadata, commitTime);
}
static void createMORTable(String commitTime, String deltaCommitTime, int numberOfPartitions)
throws IOException, InitializationError, URISyntaxException, InterruptedException {
Path path = new Path(hiveSyncConfig.basePath);
FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
HoodieTableMetaClient.initTableType(configuration, hiveSyncConfig.basePath, HoodieTableType.MERGE_ON_READ,
hiveSyncConfig.tableName, HoodieAvroPayload.class.getName());
boolean result = fileSystem.mkdirs(path);
checkResult(result);
DateTime dateTime = DateTime.now();
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, commitTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createdTablesSet
.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE);
HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();
commitMetadata.getPartitionToWriteStats()
.forEach((key, value) -> value.forEach(l -> compactionMetadata.addWriteStat(key, l)));
createCompactionCommitFile(compactionMetadata, commitTime);
// Write a delta commit
HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), true);
createDeltaCommitFile(deltaMetadata, deltaCommitTime);
}
static void addCOWPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, DateTime startFrom,
String commitTime) throws IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata =
createPartitions(numberOfPartitions, isParquetSchemaSimple, startFrom, commitTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createCommitFile(commitMetadata, commitTime);
}
static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple,
DateTime startFrom, String commitTime, String deltaCommitTime)
throws IOException, URISyntaxException, InterruptedException {
HoodieCommitMetadata commitMetadata =
createPartitions(numberOfPartitions, isParquetSchemaSimple, startFrom, commitTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE);
HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();
commitMetadata.getPartitionToWriteStats()
.forEach((key, value) -> value.forEach(l -> compactionMetadata.addWriteStat(key, l)));
createCompactionCommitFile(compactionMetadata, commitTime);
HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), isLogSchemaSimple);
createDeltaCommitFile(deltaMetadata, deltaCommitTime);
}
private static HoodieCommitMetadata createLogFiles(Map<String, List<HoodieWriteStat>> partitionWriteStats,
boolean isLogSchemaSimple) throws InterruptedException, IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
for (Entry<String, List<HoodieWriteStat>> wEntry : partitionWriteStats.entrySet()) {
String partitionPath = wEntry.getKey();
for (HoodieWriteStat wStat : wEntry.getValue()) {
Path path = new Path(wStat.getPath());
HoodieBaseFile dataFile = new HoodieBaseFile(fileSystem.getFileStatus(path));
HoodieLogFile logFile = generateLogData(path, isLogSchemaSimple);
HoodieDeltaWriteStat writeStat = new HoodieDeltaWriteStat();
writeStat.setFileId(dataFile.getFileId());
writeStat.setPath(logFile.getPath().toString());
commitMetadata.addWriteStat(partitionPath, writeStat);
}
}
return commitMetadata;
}
private static HoodieCommitMetadata createPartitions(int numberOfPartitions, boolean isParquetSchemaSimple,
DateTime startFrom, String commitTime) throws IOException, URISyntaxException {
startFrom = startFrom.withTimeAtStartOfDay();
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
for (int i = 0; i < numberOfPartitions; i++) {
String partitionPath = dtfOut.print(startFrom);
Path partPath = new Path(hiveSyncConfig.basePath + "/" + partitionPath);
fileSystem.makeQualified(partPath);
fileSystem.mkdirs(partPath);
List<HoodieWriteStat> writeStats = createTestData(partPath, isParquetSchemaSimple, commitTime);
startFrom = startFrom.minusDays(1);
writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s));
}
return commitMetadata;
}
private static List<HoodieWriteStat> createTestData(Path partPath, boolean isParquetSchemaSimple, String commitTime)
throws IOException, URISyntaxException {
List<HoodieWriteStat> writeStats = new ArrayList<>();
for (int i = 0; i < 5; i++) {
// Create 5 files
String fileId = UUID.randomUUID().toString();
Path filePath = new Path(partPath.toString() + "/" + FSUtils.makeDataFileName(commitTime, "1-0-1", fileId));
generateParquetData(filePath, isParquetSchemaSimple);
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setFileId(fileId);
writeStat.setPath(filePath.toString());
writeStats.add(writeStat);
}
return writeStats;
}
@SuppressWarnings({"unchecked", "deprecation"})
private static void generateParquetData(Path filePath, boolean isParquetSchemaSimple)
throws IOException, URISyntaxException {
Schema schema = (isParquetSchemaSimple ? SchemaTestUtil.getSimpleSchema() : SchemaTestUtil.getEvolvedSchema());
org.apache.parquet.schema.MessageType parquetSchema = new AvroSchemaConverter().convert(schema);
BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, -1,
BloomFilterTypeCode.SIMPLE.name());
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(parquetSchema, schema, filter);
ParquetWriter writer = new ParquetWriter(filePath, writeSupport, CompressionCodecName.GZIP, 120 * 1024 * 1024,
ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED,
ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED, ParquetWriter.DEFAULT_WRITER_VERSION, fileSystem.getConf());
List<IndexedRecord> testRecords = (isParquetSchemaSimple ? SchemaTestUtil.generateTestRecords(0, 100)
: SchemaTestUtil.generateEvolvedTestRecords(100, 100));
testRecords.forEach(s -> {
try {
writer.write(s);
} catch (IOException e) {
fail("IOException while writing test records as parquet" + e.toString());
}
});
writer.close();
}
private static HoodieLogFile generateLogData(Path parquetFilePath, boolean isLogSchemaSimple)
throws IOException, InterruptedException, URISyntaxException {
Schema schema = (isLogSchemaSimple ? SchemaTestUtil.getSimpleSchema() : SchemaTestUtil.getEvolvedSchema());
HoodieBaseFile dataFile = new HoodieBaseFile(fileSystem.getFileStatus(parquetFilePath));
// Write a log file for this parquet file
Writer logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(parquetFilePath.getParent())
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(dataFile.getFileId())
.overBaseCommit(dataFile.getCommitTime()).withFs(fileSystem).build();
List<IndexedRecord> records = (isLogSchemaSimple ? SchemaTestUtil.generateTestRecords(0, 100)
: SchemaTestUtil.generateEvolvedTestRecords(100, 100));
Map<HeaderMetadataType, String> header = new HashMap<>(2);
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, dataFile.getCommitTime());
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
logWriter.appendBlock(dataBlock);
logWriter.close();
return logWriter.getLogFile();
}
private static void checkResult(boolean result) throws InitializationError {
if (!result) {
throw new InitializationError("Could not initialize");
}
}
private static void createCommitFile(HoodieCommitMetadata commitMetadata, String commitTime) throws IOException {
byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
Path fullPath = new Path(hiveSyncConfig.basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeCommitFileName(commitTime));
FSDataOutputStream fsout = fileSystem.create(fullPath, true);
fsout.write(bytes);
fsout.close();
}
private static void createCompactionCommitFile(HoodieCommitMetadata commitMetadata, String commitTime)
throws IOException {
byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
Path fullPath = new Path(hiveSyncConfig.basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeCommitFileName(commitTime));
FSDataOutputStream fsout = fileSystem.create(fullPath, true);
fsout.write(bytes);
fsout.close();
}
private static void createDeltaCommitFile(HoodieCommitMetadata deltaCommitMetadata, String deltaCommitTime)
throws IOException {
byte[] bytes = deltaCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
Path fullPath = new Path(hiveSyncConfig.basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeDeltaFileName(deltaCommitTime));
FSDataOutputStream fsout = fileSystem.create(fullPath, true);
fsout.write(bytes);
fsout.close();
}
public static Set<String> getCreatedTablesSet() {
return createdTablesSet;
}
}

View File

@@ -0,0 +1,287 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hive.util;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.util.FileIOUtils;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStore;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IHMSHandler;
import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor;
import org.apache.hadoop.hive.metastore.TUGIBasedProcessor;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.thrift.TUGIContainingTransport;
import org.apache.hive.service.server.HiveServer2;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class HiveTestService {
private static final Logger LOG = LogManager.getLogger(HiveTestService.class);
private static final int CONNECTION_TIMEOUT = 30000;
/**
* Configuration settings.
*/
private Configuration hadoopConf;
private String workDir;
private String bindIP = "127.0.0.1";
private int metastorePort = 9083;
private int serverPort = 9999;
private boolean clean = true;
private Map<String, String> sysProps = Maps.newHashMap();
private ExecutorService executorService;
private TServer tServer;
private HiveServer2 hiveServer;
public HiveTestService(Configuration configuration) {
this.workDir = Files.createTempDir().getAbsolutePath();
}
public Configuration getHadoopConf() {
return hadoopConf;
}
public HiveServer2 start() throws IOException {
Preconditions.checkState(workDir != null, "The work dir must be set before starting cluster.");
if (hadoopConf == null) {
hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
}
String localHiveLocation = getHiveLocation(workDir);
if (clean) {
LOG.info("Cleaning Hive cluster data at: " + localHiveLocation + " and starting fresh.");
File file = new File(localHiveLocation);
FileIOUtils.deleteDirectory(file);
}
HiveConf serverConf = configureHive(hadoopConf, localHiveLocation);
executorService = Executors.newSingleThreadExecutor();
tServer = startMetaStore(bindIP, metastorePort, serverConf);
hiveServer = startHiveServer(serverConf);
String serverHostname;
if (bindIP.equals("0.0.0.0")) {
serverHostname = "localhost";
} else {
serverHostname = bindIP;
}
if (!waitForServerUp(serverConf, serverHostname, metastorePort, CONNECTION_TIMEOUT)) {
throw new IOException("Waiting for startup of standalone server");
}
LOG.info("Hive Minicluster service started.");
return hiveServer;
}
private HiveConf configureHive(Configuration conf, String localHiveLocation) throws IOException {
conf.set("hive.metastore.local", "false");
conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://" + bindIP + ":" + metastorePort);
conf.set(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, bindIP);
conf.setInt(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, serverPort);
// The following line to turn of SASL has no effect since HiveAuthFactory calls
// 'new HiveConf()'. This is fixed by https://issues.apache.org/jira/browse/HIVE-6657,
// in Hive 0.14.
// As a workaround, the property is set in hive-site.xml in this module.
// conf.set(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname, "NOSASL");
File localHiveDir = new File(localHiveLocation);
localHiveDir.mkdirs();
File metastoreDbDir = new File(localHiveDir, "metastore_db");
conf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname,
"jdbc:derby:" + metastoreDbDir.getPath() + ";create=true");
File derbyLogFile = new File(localHiveDir, "derby.log");
derbyLogFile.createNewFile();
setSystemProperty("derby.stream.error.file", derbyLogFile.getPath());
conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, Files.createTempDir().getAbsolutePath());
conf.set("datanucleus.schema.autoCreateTables", "true");
conf.set("hive.metastore.schema.verification", "false");
setSystemProperty("derby.stream.error.file", derbyLogFile.getPath());
return new HiveConf(conf, this.getClass());
}
private boolean waitForServerUp(HiveConf serverConf, String hostname, int port, int timeout) {
long start = System.currentTimeMillis();
while (true) {
try {
new HiveMetaStoreClient(serverConf);
return true;
} catch (MetaException e) {
// ignore as this is expected
LOG.info("server " + hostname + ":" + port + " not up " + e);
}
if (System.currentTimeMillis() > start + timeout) {
break;
}
try {
Thread.sleep(250);
} catch (InterruptedException e) {
// ignore
}
}
return false;
}
private void setSystemProperty(String name, String value) {
if (!sysProps.containsKey(name)) {
String currentValue = System.getProperty(name);
sysProps.put(name, currentValue);
}
if (value != null) {
System.setProperty(name, value);
} else {
System.getProperties().remove(name);
}
}
private static String getHiveLocation(String baseLocation) {
return baseLocation + Path.SEPARATOR + "hive";
}
private HiveServer2 startHiveServer(HiveConf serverConf) {
HiveServer2 hiveServer = new HiveServer2();
hiveServer.init(serverConf);
hiveServer.start();
return hiveServer;
}
// XXX: From org.apache.hadoop.hive.metastore.HiveMetaStore,
// with changes to support binding to a specified IP address (not only 0.0.0.0)
private static final class ChainedTTransportFactory extends TTransportFactory {
private final TTransportFactory parentTransFactory;
private final TTransportFactory childTransFactory;
private ChainedTTransportFactory(TTransportFactory parentTransFactory, TTransportFactory childTransFactory) {
this.parentTransFactory = parentTransFactory;
this.childTransFactory = childTransFactory;
}
@Override
public TTransport getTransport(TTransport trans) {
return childTransFactory.getTransport(parentTransFactory.getTransport(trans));
}
}
private static final class TServerSocketKeepAlive extends TServerSocket {
public TServerSocketKeepAlive(int port) throws TTransportException {
super(port, 0);
}
public TServerSocketKeepAlive(InetSocketAddress address) throws TTransportException {
super(address, 0);
}
@Override
protected TSocket acceptImpl() throws TTransportException {
TSocket ts = super.acceptImpl();
try {
ts.getSocket().setKeepAlive(true);
} catch (SocketException e) {
throw new TTransportException(e);
}
return ts;
}
}
public TServer startMetaStore(String forceBindIP, int port, HiveConf conf) throws IOException {
try {
// Server will create new threads up to max as necessary. After an idle
// period, it will destory threads to keep the number of threads in the
// pool to min.
int minWorkerThreads = conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMINTHREADS);
int maxWorkerThreads = conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMAXTHREADS);
boolean tcpKeepAlive = conf.getBoolVar(HiveConf.ConfVars.METASTORE_TCP_KEEP_ALIVE);
boolean useFramedTransport = conf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT);
// don't support SASL yet
// boolean useSasl = conf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL);
TServerTransport serverTransport;
if (forceBindIP != null) {
InetSocketAddress address = new InetSocketAddress(forceBindIP, port);
serverTransport = tcpKeepAlive ? new TServerSocketKeepAlive(address) : new TServerSocket(address);
} else {
serverTransport = tcpKeepAlive ? new TServerSocketKeepAlive(port) : new TServerSocket(port);
}
TProcessor processor;
TTransportFactory transFactory;
IHMSHandler handler = (IHMSHandler) HiveMetaStore.newRetryingHMSHandler("new db based metaserver", conf, true);
if (conf.getBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI)) {
transFactory = useFramedTransport
? new ChainedTTransportFactory(new TFramedTransport.Factory(), new TUGIContainingTransport.Factory())
: new TUGIContainingTransport.Factory();
processor = new TUGIBasedProcessor<>(handler);
LOG.info("Starting DB backed MetaStore Server with SetUGI enabled");
} else {
transFactory = useFramedTransport ? new TFramedTransport.Factory() : new TTransportFactory();
processor = new TSetIpAddressProcessor<>(handler);
LOG.info("Starting DB backed MetaStore Server");
}
TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverTransport).processor(processor)
.transportFactory(transFactory).protocolFactory(new TBinaryProtocol.Factory())
.minWorkerThreads(minWorkerThreads).maxWorkerThreads(maxWorkerThreads);
final TServer tServer = new TThreadPoolServer(args);
executorService.submit(tServer::serve);
return tServer;
} catch (Throwable x) {
throw new IOException(x);
}
}
}

View File

@@ -0,0 +1,23 @@
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
log4j.rootLogger=WARN, A1
# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=[%-5p] %d %c %x - %m%n

View File

@@ -0,0 +1,26 @@
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
log4j.rootLogger=WARN, A1
log4j.category.org.apache=INFO
log4j.category.org.apache.parquet.hadoop=WARN
log4j.category.parquet.hadoop=WARN
# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n