1
0

[HUDI-509] Renaming code in sync with cWiki restructuring (#1212)

- Storage Type replaced with Table Type (remaining instances)
 - View types replaced with query types;
 - ReadOptimized view referred as Snapshot Query
 - TableFileSystemView sub interfaces renamed to BaseFileOnly and Slice Views
 - HoodieDataFile renamed to HoodieBaseFile
 - Hive Sync tool will register RO tables for MOR with a `_ro` suffix
 - Datasource/Deltastreamer options renamed accordingly
 - Support fallback to old config values as well, so migration is painless
 - Config for controlling _ro suffix addition
 - Renaming DataFile to BaseFile across DTOs, HoodieFileSlice and AbstractTableFileSystemView
This commit is contained in:
vinoth chandar
2020-01-16 23:58:47 -08:00
committed by GitHub
parent 8a3a50309b
commit c2c0f6b13d
92 changed files with 907 additions and 822 deletions

View File

@@ -68,6 +68,9 @@ public class HiveSyncConfig implements Serializable {
@Parameter(names = {"--use-jdbc"}, description = "Hive jdbc connect url")
public Boolean useJdbc = true;
@Parameter(names = {"--skip-ro-suffix"}, description = "Skip the `_ro` suffix for Read optimized table, when registering")
public Boolean skipROSuffix = false;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;

View File

@@ -53,30 +53,44 @@ import java.util.stream.Collectors;
public class HiveSyncTool {
private static final Logger LOG = LogManager.getLogger(HiveSyncTool.class);
private final HoodieHiveClient hoodieHiveClient;
public static final String SUFFIX_REALTIME_TABLE = "_rt";
public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
private final HiveSyncConfig cfg;
private final HoodieHiveClient hoodieHiveClient;
private final String snapshotTableName;
private final Option<String> roTableTableName;
public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
this.hoodieHiveClient = new HoodieHiveClient(cfg, configuration, fs);
this.cfg = cfg;
switch (hoodieHiveClient.getTableType()) {
case COPY_ON_WRITE:
this.snapshotTableName = cfg.tableName;
this.roTableTableName = Option.empty();
break;
case MERGE_ON_READ:
this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE;
this.roTableTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) :
Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
break;
default:
LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
throw new InvalidTableException(hoodieHiveClient.getBasePath());
}
}
public void syncHoodieTable() throws ClassNotFoundException {
try {
switch (hoodieHiveClient.getTableType()) {
case COPY_ON_WRITE:
syncHoodieTable(false);
syncHoodieTable(snapshotTableName, false);
break;
case MERGE_ON_READ:
// sync a RO table for MOR
syncHoodieTable(false);
String originalTableName = cfg.tableName;
// TODO : Make realtime table registration optional using a config param
cfg.tableName = cfg.tableName + SUFFIX_REALTIME_TABLE;
syncHoodieTable(roTableTableName.get(), false);
// sync a RT table for MOR
syncHoodieTable(true);
cfg.tableName = originalTableName;
syncHoodieTable(snapshotTableName, true);
break;
default:
LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
@@ -89,31 +103,30 @@ public class HiveSyncTool {
}
}
private void syncHoodieTable(boolean isRealTime) throws ClassNotFoundException {
LOG.info("Trying to sync hoodie table " + cfg.tableName + " with base path " + hoodieHiveClient.getBasePath()
private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat) {
LOG.info("Trying to sync hoodie table " + tableName + " with base path " + hoodieHiveClient.getBasePath()
+ " of type " + hoodieHiveClient.getTableType());
// Check if the necessary table exists
boolean tableExists = hoodieHiveClient.doesTableExist();
boolean tableExists = hoodieHiveClient.doesTableExist(tableName);
// Get the parquet schema for this table looking at the latest commit
MessageType schema = hoodieHiveClient.getDataSchema();
// Sync schema if needed
syncSchema(tableExists, isRealTime, schema);
syncSchema(tableName, tableExists, useRealtimeInputFormat, schema);
LOG.info("Schema sync complete. Syncing partitions for " + cfg.tableName);
LOG.info("Schema sync complete. Syncing partitions for " + tableName);
// Get the last time we successfully synced partitions
Option<String> lastCommitTimeSynced = Option.empty();
if (tableExists) {
lastCommitTimeSynced = hoodieHiveClient.getLastCommitTimeSynced();
lastCommitTimeSynced = hoodieHiveClient.getLastCommitTimeSynced(tableName);
}
LOG.info("Last commit time synced was found to be " + lastCommitTimeSynced.orElse("null"));
List<String> writtenPartitionsSince = hoodieHiveClient.getPartitionsWrittenToSince(lastCommitTimeSynced);
LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size());
// Sync the partitions if needed
syncPartitions(writtenPartitionsSince);
syncPartitions(tableName, writtenPartitionsSince);
hoodieHiveClient.updateLastCommitTimeSynced();
LOG.info("Sync complete for " + cfg.tableName);
hoodieHiveClient.updateLastCommitTimeSynced(tableName);
LOG.info("Sync complete for " + tableName);
}
/**
@@ -123,17 +136,14 @@ public class HiveSyncTool {
* @param tableExists - does table exist
* @param schema - extracted schema
*/
private void syncSchema(boolean tableExists, boolean isRealTime, MessageType schema) throws ClassNotFoundException {
private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat, MessageType schema) {
// Check and sync schema
if (!tableExists) {
LOG.info("Table " + cfg.tableName + " is not found. Creating it");
if (!isRealTime) {
// TODO - RO Table for MOR only after major compaction (UnboundedCompaction is default
// for now)
String inputFormatClassName =
cfg.usePreApacheInputFormat ? com.uber.hoodie.hadoop.HoodieInputFormat.class.getName()
: HoodieParquetInputFormat.class.getName();
hoodieHiveClient.createTable(schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(),
LOG.info("Hive table " + tableName + " is not found. Creating it");
if (!useRealTimeInputFormat) {
String inputFormatClassName = cfg.usePreApacheInputFormat ? com.uber.hoodie.hadoop.HoodieInputFormat.class.getName()
: HoodieParquetInputFormat.class.getName();
hoodieHiveClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(),
ParquetHiveSerDe.class.getName());
} else {
// Custom serde will not work with ALTER TABLE REPLACE COLUMNS
@@ -142,18 +152,18 @@ public class HiveSyncTool {
String inputFormatClassName =
cfg.usePreApacheInputFormat ? com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat.class.getName()
: HoodieParquetRealtimeInputFormat.class.getName();
hoodieHiveClient.createTable(schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(),
hoodieHiveClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(),
ParquetHiveSerDe.class.getName());
}
} else {
// Check if the table schema has evolved
Map<String, String> tableSchema = hoodieHiveClient.getTableSchema();
Map<String, String> tableSchema = hoodieHiveClient.getTableSchema(tableName);
SchemaDifference schemaDiff = SchemaUtil.getSchemaDifference(schema, tableSchema, cfg.partitionFields);
if (!schemaDiff.isEmpty()) {
LOG.info("Schema difference found for " + cfg.tableName);
hoodieHiveClient.updateTableDefinition(schema);
LOG.info("Schema difference found for " + tableName);
hoodieHiveClient.updateTableDefinition(tableName, schema);
} else {
LOG.info("No Schema difference for " + cfg.tableName);
LOG.info("No Schema difference for " + tableName);
}
}
}
@@ -162,19 +172,19 @@ public class HiveSyncTool {
* Syncs the list of storage parititions passed in (checks if the partition is in hive, if not adds it or if the
* partition path does not match, it updates the partition path).
*/
private void syncPartitions(List<String> writtenPartitionsSince) {
private void syncPartitions(String tableName, List<String> writtenPartitionsSince) {
try {
List<Partition> hivePartitions = hoodieHiveClient.scanTablePartitions();
List<Partition> hivePartitions = hoodieHiveClient.scanTablePartitions(tableName);
List<PartitionEvent> partitionEvents =
hoodieHiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
List<String> newPartitions = filterPartitions(partitionEvents, PartitionEventType.ADD);
LOG.info("New Partitions " + newPartitions);
hoodieHiveClient.addPartitionsToTable(newPartitions);
hoodieHiveClient.addPartitionsToTable(tableName, newPartitions);
List<String> updatePartitions = filterPartitions(partitionEvents, PartitionEventType.UPDATE);
LOG.info("Changed Partitions " + updatePartitions);
hoodieHiveClient.updatePartitionsToTable(updatePartitions);
hoodieHiveClient.updatePartitionsToTable(tableName, updatePartitions);
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to sync partitions for table " + cfg.tableName, e);
throw new HoodieHiveSyncException("Failed to sync partitions for table " + tableName, e);
}
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.hive;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
@@ -135,36 +136,36 @@ public class HoodieHiveClient {
/**
* Add the (NEW) partitions to the table.
*/
void addPartitionsToTable(List<String> partitionsToAdd) {
void addPartitionsToTable(String tableName, List<String> partitionsToAdd) {
if (partitionsToAdd.isEmpty()) {
LOG.info("No partitions to add for " + syncConfig.tableName);
LOG.info("No partitions to add for " + tableName);
return;
}
LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + syncConfig.tableName);
String sql = constructAddPartitions(partitionsToAdd);
LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + tableName);
String sql = constructAddPartitions(tableName, partitionsToAdd);
updateHiveSQL(sql);
}
/**
* Partition path has changed - update the path for te following partitions.
*/
void updatePartitionsToTable(List<String> changedPartitions) {
void updatePartitionsToTable(String tableName, List<String> changedPartitions) {
if (changedPartitions.isEmpty()) {
LOG.info("No partitions to change for " + syncConfig.tableName);
LOG.info("No partitions to change for " + tableName);
return;
}
LOG.info("Changing partitions " + changedPartitions.size() + " on " + syncConfig.tableName);
List<String> sqls = constructChangePartitions(changedPartitions);
LOG.info("Changing partitions " + changedPartitions.size() + " on " + tableName);
List<String> sqls = constructChangePartitions(tableName, changedPartitions);
for (String sql : sqls) {
updateHiveSQL(sql);
}
}
private String constructAddPartitions(List<String> partitions) {
private String constructAddPartitions(String tableName, List<String> partitions) {
StringBuilder alterSQL = new StringBuilder("ALTER TABLE ");
alterSQL.append(HIVE_ESCAPE_CHARACTER).append(syncConfig.databaseName)
.append(HIVE_ESCAPE_CHARACTER).append(".").append(HIVE_ESCAPE_CHARACTER)
.append(syncConfig.tableName).append(HIVE_ESCAPE_CHARACTER).append(" ADD IF NOT EXISTS ");
.append(tableName).append(HIVE_ESCAPE_CHARACTER).append(" ADD IF NOT EXISTS ");
for (String partition : partitions) {
String partitionClause = getPartitionClause(partition);
String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
@@ -192,12 +193,12 @@ public class HoodieHiveClient {
return partBuilder.stream().collect(Collectors.joining(","));
}
private List<String> constructChangePartitions(List<String> partitions) {
private List<String> constructChangePartitions(String tableName, List<String> partitions) {
List<String> changePartitions = Lists.newArrayList();
// Hive 2.x doesn't like db.table name for operations, hence we need to change to using the database first
String useDatabase = "USE " + HIVE_ESCAPE_CHARACTER + syncConfig.databaseName + HIVE_ESCAPE_CHARACTER;
changePartitions.add(useDatabase);
String alterTable = "ALTER TABLE " + HIVE_ESCAPE_CHARACTER + syncConfig.tableName + HIVE_ESCAPE_CHARACTER;
String alterTable = "ALTER TABLE " + HIVE_ESCAPE_CHARACTER + tableName + HIVE_ESCAPE_CHARACTER;
for (String partition : partitions) {
String partitionClause = getPartitionClause(partition);
Path partitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition);
@@ -246,52 +247,52 @@ public class HoodieHiveClient {
/**
* Scan table partitions.
*/
public List<Partition> scanTablePartitions() throws TException {
return client.listPartitions(syncConfig.databaseName, syncConfig.tableName, (short) -1);
public List<Partition> scanTablePartitions(String tableName) throws TException {
return client.listPartitions(syncConfig.databaseName, tableName, (short) -1);
}
void updateTableDefinition(MessageType newSchema) {
void updateTableDefinition(String tableName, MessageType newSchema) {
try {
String newSchemaStr = SchemaUtil.generateSchemaString(newSchema, syncConfig.partitionFields);
// Cascade clause should not be present for non-partitioned tables
String cascadeClause = syncConfig.partitionFields.size() > 0 ? " cascade" : "";
StringBuilder sqlBuilder = new StringBuilder("ALTER TABLE ").append(HIVE_ESCAPE_CHARACTER)
.append(syncConfig.databaseName).append(HIVE_ESCAPE_CHARACTER).append(".")
.append(HIVE_ESCAPE_CHARACTER).append(syncConfig.tableName)
.append(HIVE_ESCAPE_CHARACTER).append(tableName)
.append(HIVE_ESCAPE_CHARACTER).append(" REPLACE COLUMNS(")
.append(newSchemaStr).append(" )").append(cascadeClause);
LOG.info("Updating table definition with " + sqlBuilder);
updateHiveSQL(sqlBuilder.toString());
} catch (IOException e) {
throw new HoodieHiveSyncException("Failed to update table for " + syncConfig.tableName, e);
throw new HoodieHiveSyncException("Failed to update table for " + tableName, e);
}
}
void createTable(MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass) {
void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass) {
try {
String createSQLQuery =
SchemaUtil.generateCreateDDL(storageSchema, syncConfig, inputFormatClass, outputFormatClass, serdeClass);
SchemaUtil.generateCreateDDL(tableName, storageSchema, syncConfig, inputFormatClass, outputFormatClass, serdeClass);
LOG.info("Creating table with " + createSQLQuery);
updateHiveSQL(createSQLQuery);
} catch (IOException e) {
throw new HoodieHiveSyncException("Failed to create table " + syncConfig.tableName, e);
throw new HoodieHiveSyncException("Failed to create table " + tableName, e);
}
}
/**
* Get the table schema.
*/
public Map<String, String> getTableSchema() {
public Map<String, String> getTableSchema(String tableName) {
if (syncConfig.useJdbc) {
if (!doesTableExist()) {
if (!doesTableExist(tableName)) {
throw new IllegalArgumentException(
"Failed to get schema for table " + syncConfig.tableName + " does not exist");
"Failed to get schema for table " + tableName + " does not exist");
}
Map<String, String> schema = Maps.newHashMap();
ResultSet result = null;
try {
DatabaseMetaData databaseMetaData = connection.getMetaData();
result = databaseMetaData.getColumns(null, syncConfig.databaseName, syncConfig.tableName, null);
result = databaseMetaData.getColumns(null, syncConfig.databaseName, tableName, null);
while (result.next()) {
String columnName = result.getString(4);
String columnType = result.getString(6);
@@ -304,26 +305,26 @@ public class HoodieHiveClient {
}
return schema;
} catch (SQLException e) {
throw new HoodieHiveSyncException("Failed to get table schema for " + syncConfig.tableName, e);
throw new HoodieHiveSyncException("Failed to get table schema for " + tableName, e);
} finally {
closeQuietly(result, null);
}
} else {
return getTableSchemaUsingMetastoreClient();
return getTableSchemaUsingMetastoreClient(tableName);
}
}
public Map<String, String> getTableSchemaUsingMetastoreClient() {
public Map<String, String> getTableSchemaUsingMetastoreClient(String tableName) {
try {
// HiveMetastoreClient returns partition keys separate from Columns, hence get both and merge to
// get the Schema of the table.
final long start = System.currentTimeMillis();
Table table = this.client.getTable(syncConfig.databaseName, syncConfig.tableName);
Table table = this.client.getTable(syncConfig.databaseName, tableName);
Map<String, String> partitionKeysMap =
table.getPartitionKeys().stream().collect(Collectors.toMap(f -> f.getName(), f -> f.getType().toUpperCase()));
table.getPartitionKeys().stream().collect(Collectors.toMap(FieldSchema::getName, f -> f.getType().toUpperCase()));
Map<String, String> columnsMap =
table.getSd().getCols().stream().collect(Collectors.toMap(f -> f.getName(), f -> f.getType().toUpperCase()));
table.getSd().getCols().stream().collect(Collectors.toMap(FieldSchema::getName, f -> f.getType().toUpperCase()));
Map<String, String> schema = new HashMap<>();
schema.putAll(columnsMap);
@@ -332,7 +333,7 @@ public class HoodieHiveClient {
LOG.info(String.format("Time taken to getTableSchema: %s ms", (end - start)));
return schema;
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to get table schema for : " + syncConfig.tableName, e);
throw new HoodieHiveSyncException("Failed to get table schema for : " + tableName, e);
}
}
@@ -357,7 +358,7 @@ public class HoodieHiveClient {
.orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for commit "
+ lastCommit + ", could not get schema for table " + metaClient.getBasePath() + ", Metadata :"
+ commitMetadata));
return readSchemaFromDataFile(new Path(filePath));
return readSchemaFromBaseFile(new Path(filePath));
case MERGE_ON_READ:
// If this is MOR, depending on whether the latest commit is a delta commit or
// compaction commit
@@ -387,18 +388,17 @@ public class HoodieHiveClient {
.map(f -> Pair.of(f, HoodieFileFormat.HOODIE_LOG)).orElseGet(() -> {
// No Log files in Delta-Commit. Check if there are any parquet files
return commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream()
.filter(s -> s.contains((metaClient.getTableConfig().getROFileFormat().getFileExtension())))
.findAny().map(f -> Pair.of(f, HoodieFileFormat.PARQUET)).orElseThrow(() -> {
return new IllegalArgumentException("Could not find any data file written for commit "
+ lastDeltaInstant + ", could not get schema for table " + metaClient.getBasePath()
+ ", CommitMetadata :" + commitMetadata);
});
.filter(s -> s.contains((metaClient.getTableConfig().getBaseFileFormat().getFileExtension())))
.findAny().map(f -> Pair.of(f, HoodieFileFormat.PARQUET)).orElseThrow(() ->
new IllegalArgumentException("Could not find any data file written for commit "
+ lastDeltaInstant + ", could not get schema for table " + metaClient.getBasePath()
+ ", CommitMetadata :" + commitMetadata));
});
switch (filePathWithFormat.getRight()) {
case HOODIE_LOG:
return readSchemaFromLogFile(lastCompactionCommit, new Path(filePathWithFormat.getLeft()));
case PARQUET:
return readSchemaFromDataFile(new Path(filePathWithFormat.getLeft()));
return readSchemaFromBaseFile(new Path(filePathWithFormat.getLeft()));
default:
throw new IllegalArgumentException("Unknown file format :" + filePathWithFormat.getRight()
+ " for file " + filePathWithFormat.getLeft());
@@ -411,7 +411,7 @@ public class HoodieHiveClient {
throw new InvalidTableException(syncConfig.basePath);
}
} catch (IOException e) {
throw new HoodieHiveSyncException("Failed to get table schema for " + syncConfig.tableName, e);
throw new HoodieHiveSyncException("Failed to read data schema", e);
}
}
@@ -429,7 +429,7 @@ public class HoodieHiveClient {
String filePath = compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny()
.orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for compaction "
+ lastCompactionCommit + ", could not get schema for table " + metaClient.getBasePath()));
return readSchemaFromDataFile(new Path(filePath));
return readSchemaFromBaseFile(new Path(filePath));
}
/**
@@ -450,7 +450,7 @@ public class HoodieHiveClient {
/**
* Read the parquet schema from a parquet File.
*/
private MessageType readSchemaFromDataFile(Path parquetFilePath) throws IOException {
private MessageType readSchemaFromBaseFile(Path parquetFilePath) throws IOException {
LOG.info("Reading schema from " + parquetFilePath);
if (!fs.exists(parquetFilePath)) {
throw new IllegalArgumentException(
@@ -464,11 +464,11 @@ public class HoodieHiveClient {
/**
* @return true if the configured table exists
*/
public boolean doesTableExist() {
public boolean doesTableExist(String tableName) {
try {
return client.tableExists(syncConfig.databaseName, syncConfig.tableName);
return client.tableExists(syncConfig.databaseName, tableName);
} catch (TException e) {
throw new HoodieHiveSyncException("Failed to check if table exists " + syncConfig.tableName, e);
throw new HoodieHiveSyncException("Failed to check if table exists " + tableName, e);
}
}
@@ -603,10 +603,10 @@ public class HoodieHiveClient {
return fs;
}
public Option<String> getLastCommitTimeSynced() {
public Option<String> getLastCommitTimeSynced(String tableName) {
// Get the last commit time from the TBLproperties
try {
Table database = client.getTable(syncConfig.databaseName, syncConfig.tableName);
Table database = client.getTable(syncConfig.databaseName, tableName);
return Option.ofNullable(database.getParameters().getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC, null));
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to get the last commit time synced from the database", e);
@@ -654,17 +654,16 @@ public class HoodieHiveClient {
return client.getAllTables(db);
}
void updateLastCommitTimeSynced() {
void updateLastCommitTimeSynced(String tableName) {
// Set the last commit time from the TBLproperties
String lastCommitSynced = activeTimeline.lastInstant().get().getTimestamp();
try {
Table table = client.getTable(syncConfig.databaseName, syncConfig.tableName);
Table table = client.getTable(syncConfig.databaseName, tableName);
table.putToParameters(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitSynced);
client.alter_table(syncConfig.databaseName, syncConfig.tableName, table);
client.alter_table(syncConfig.databaseName, tableName, table);
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to get update last commit time synced to " + lastCommitSynced, e);
}
}
/**

View File

@@ -391,7 +391,7 @@ public class SchemaUtil {
return columns.toString();
}
public static String generateCreateDDL(MessageType storageSchema, HiveSyncConfig config, String inputFormatClass,
public static String generateCreateDDL(String tableName, MessageType storageSchema, HiveSyncConfig config, String inputFormatClass,
String outputFormatClass, String serdeClass) throws IOException {
Map<String, String> hiveSchema = convertParquetSchemaToHiveSchema(storageSchema);
String columns = generateSchemaString(storageSchema, config.partitionFields);
@@ -406,7 +406,7 @@ public class SchemaUtil {
String partitionsStr = partitionFields.stream().collect(Collectors.joining(","));
StringBuilder sb = new StringBuilder("CREATE EXTERNAL TABLE IF NOT EXISTS ");
sb = sb.append(HIVE_ESCAPE_CHARACTER).append(config.databaseName).append(HIVE_ESCAPE_CHARACTER)
.append(".").append(HIVE_ESCAPE_CHARACTER).append(config.tableName).append(HIVE_ESCAPE_CHARACTER);
.append(".").append(HIVE_ESCAPE_CHARACTER).append(tableName).append(HIVE_ESCAPE_CHARACTER);
sb = sb.append("( ").append(columns).append(")");
if (!config.partitionFields.isEmpty()) {
sb = sb.append(" PARTITIONED BY (").append(partitionsStr).append(")");

View File

@@ -158,18 +158,19 @@ public class TestHiveSyncTool {
HoodieHiveClient hiveClient =
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
assertFalse("Table " + TestUtil.hiveSyncConfig.tableName + " should not exist initially",
hiveClient.doesTableExist());
hiveClient.doesTableExist(TestUtil.hiveSyncConfig.tableName));
// Lets do the sync
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
tool.syncHoodieTable();
assertTrue("Table " + TestUtil.hiveSyncConfig.tableName + " should exist after sync completes",
hiveClient.doesTableExist());
assertEquals("Hive Schema should match the table schema + partition field", hiveClient.getTableSchema().size(),
hiveClient.doesTableExist(TestUtil.hiveSyncConfig.tableName));
assertEquals("Hive Schema should match the table schema + partition field",
hiveClient.getTableSchema(TestUtil.hiveSyncConfig.tableName).size(),
hiveClient.getDataSchema().getColumns().size() + 1);
assertEquals("Table partitions should match the number of partitions we wrote", 5,
hiveClient.scanTablePartitions().size());
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", commitTime,
hiveClient.getLastCommitTimeSynced().get());
hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get());
}
@Test
@@ -183,9 +184,9 @@ public class TestHiveSyncTool {
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
tool.syncHoodieTable();
assertEquals("Table partitions should match the number of partitions we wrote", 5,
hiveClient.scanTablePartitions().size());
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", commitTime1,
hiveClient.getLastCommitTimeSynced().get());
hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get());
// Now lets create more parititions and these are the only ones which needs to be synced
DateTime dateTime = DateTime.now().plusDays(6);
@@ -196,7 +197,7 @@ public class TestHiveSyncTool {
hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(commitTime1));
assertEquals("We should have one partition written after 100 commit", 1, writtenPartitionsSince.size());
List<Partition> hivePartitions = hiveClient.scanTablePartitions();
List<Partition> hivePartitions = hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName);
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
assertEquals("There should be only one paritition event", 1, partitionEvents.size());
assertEquals("The one partition event must of type ADD", PartitionEventType.ADD,
@@ -205,9 +206,10 @@ public class TestHiveSyncTool {
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
tool.syncHoodieTable();
// Sync should add the one partition
assertEquals("The one partition we wrote should be added to hive", 6, hiveClient.scanTablePartitions().size());
assertEquals("The one partition we wrote should be added to hive", 6,
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
assertEquals("The last commit that was sycned should be 101", commitTime2,
hiveClient.getLastCommitTimeSynced().get());
hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get());
}
@Test
@@ -221,7 +223,7 @@ public class TestHiveSyncTool {
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
tool.syncHoodieTable();
int fields = hiveClient.getTableSchema().size();
int fields = hiveClient.getTableSchema(TestUtil.hiveSyncConfig.tableName).size();
// Now lets create more parititions and these are the only ones which needs to be synced
DateTime dateTime = DateTime.now().plusDays(6);
@@ -233,16 +235,17 @@ public class TestHiveSyncTool {
tool.syncHoodieTable();
assertEquals("Hive Schema has evolved and should not be 3 more field", fields + 3,
hiveClient.getTableSchema().size());
hiveClient.getTableSchema(TestUtil.hiveSyncConfig.tableName).size());
assertEquals("Hive Schema has evolved - Field favorite_number has evolved from int to long", "BIGINT",
hiveClient.getTableSchema().get("favorite_number"));
hiveClient.getTableSchema(TestUtil.hiveSyncConfig.tableName).get("favorite_number"));
assertTrue("Hive Schema has evolved - Field favorite_movie was added",
hiveClient.getTableSchema().containsKey("favorite_movie"));
hiveClient.getTableSchema(TestUtil.hiveSyncConfig.tableName).containsKey("favorite_movie"));
// Sync should add the one partition
assertEquals("The one partition we wrote should be added to hive", 6, hiveClient.scanTablePartitions().size());
assertEquals("The one partition we wrote should be added to hive", 6,
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
assertEquals("The last commit that was sycned should be 101", commitTime2,
hiveClient.getLastCommitTimeSynced().get());
hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get());
}
@Test
@@ -251,24 +254,24 @@ public class TestHiveSyncTool {
String commitTime = "100";
String deltaCommitTime = "101";
TestUtil.createMORTable(commitTime, deltaCommitTime, 5);
HoodieHiveClient hiveClient =
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
assertFalse("Table " + TestUtil.hiveSyncConfig.tableName + " should not exist initially",
hiveClient.doesTableExist());
String roTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
assertFalse("Table " + TestUtil.hiveSyncConfig.tableName + " should not exist initially", hiveClient.doesTableExist(roTableName));
// Lets do the sync
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
tool.syncHoodieTable();
assertTrue("Table " + TestUtil.hiveSyncConfig.tableName + " should exist after sync completes",
hiveClient.doesTableExist());
assertEquals("Hive Schema should match the table schema + partition field", hiveClient.getTableSchema().size(),
assertTrue("Table " + roTableName + " should exist after sync completes",
hiveClient.doesTableExist(roTableName));
assertEquals("Hive Schema should match the table schema + partition field", hiveClient.getTableSchema(roTableName).size(),
SchemaTestUtil.getSimpleSchema().getFields().size() + 1);
assertEquals("Table partitions should match the number of partitions we wrote", 5,
hiveClient.scanTablePartitions().size());
hiveClient.scanTablePartitions(roTableName).size());
assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", deltaCommitTime,
hiveClient.getLastCommitTimeSynced().get());
hiveClient.getLastCommitTimeSynced(roTableName).get());
// Now lets create more parititions and these are the only ones which needs to be synced
// Now lets create more partitions and these are the only ones which needs to be synced
DateTime dateTime = DateTime.now().plusDays(6);
String commitTime2 = "102";
String deltaCommitTime2 = "103";
@@ -281,11 +284,11 @@ public class TestHiveSyncTool {
hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
assertEquals("Hive Schema should match the evolved table schema + partition field",
hiveClient.getTableSchema().size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1);
hiveClient.getTableSchema(roTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1);
// Sync should add the one partition
assertEquals("The 2 partitions we wrote should be added to hive", 6, hiveClient.scanTablePartitions().size());
assertEquals("The 2 partitions we wrote should be added to hive", 6, hiveClient.scanTablePartitions(roTableName).size());
assertEquals("The last commit that was synced should be 103", deltaCommitTime2,
hiveClient.getLastCommitTimeSynced().get());
hiveClient.getLastCommitTimeSynced(roTableName).get());
}
@Test
@@ -293,28 +296,27 @@ public class TestHiveSyncTool {
TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
String commitTime = "100";
String deltaCommitTime = "101";
String roTablename = TestUtil.hiveSyncConfig.tableName;
TestUtil.hiveSyncConfig.tableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_REALTIME_TABLE;
String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
TestUtil.createMORTable(commitTime, deltaCommitTime, 5);
HoodieHiveClient hiveClientRT =
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
assertFalse("Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_REALTIME_TABLE
+ " should not exist initially", hiveClientRT.doesTableExist());
assertFalse("Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
+ " should not exist initially", hiveClientRT.doesTableExist(snapshotTableName));
// Lets do the sync
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
tool.syncHoodieTable();
assertTrue("Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_REALTIME_TABLE
+ " should exist after sync completes", hiveClientRT.doesTableExist());
assertTrue("Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
+ " should exist after sync completes", hiveClientRT.doesTableExist(snapshotTableName));
assertEquals("Hive Schema should match the table schema + partition field", hiveClientRT.getTableSchema().size(),
assertEquals("Hive Schema should match the table schema + partition field", hiveClientRT.getTableSchema(snapshotTableName).size(),
SchemaTestUtil.getSimpleSchema().getFields().size() + 1);
assertEquals("Table partitions should match the number of partitions we wrote", 5,
hiveClientRT.scanTablePartitions().size());
hiveClientRT.scanTablePartitions(snapshotTableName).size());
assertEquals("The last commit that was synced should be updated in the TBLPROPERTIES", deltaCommitTime,
hiveClientRT.getLastCommitTimeSynced().get());
hiveClientRT.getLastCommitTimeSynced(snapshotTableName).get());
// Now lets create more parititions and these are the only ones which needs to be synced
DateTime dateTime = DateTime.now().plusDays(6);
@@ -329,12 +331,11 @@ public class TestHiveSyncTool {
hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
assertEquals("Hive Schema should match the evolved table schema + partition field",
hiveClientRT.getTableSchema().size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1);
hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1);
// Sync should add the one partition
assertEquals("The 2 partitions we wrote should be added to hive", 6, hiveClientRT.scanTablePartitions().size());
assertEquals("The 2 partitions we wrote should be added to hive", 6, hiveClientRT.scanTablePartitions(snapshotTableName).size());
assertEquals("The last commit that was sycned should be 103", deltaCommitTime2,
hiveClientRT.getLastCommitTimeSynced().get());
TestUtil.hiveSyncConfig.tableName = roTablename;
hiveClientRT.getLastCommitTimeSynced(snapshotTableName).get());
}
@Test
@@ -350,16 +351,19 @@ public class TestHiveSyncTool {
TestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
assertFalse("Table " + hiveSyncConfig.tableName + " should not exist initially", hiveClient.doesTableExist());
assertFalse("Table " + hiveSyncConfig.tableName + " should not exist initially",
hiveClient.doesTableExist(hiveSyncConfig.tableName));
// Lets do the sync
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
tool.syncHoodieTable();
assertTrue("Table " + hiveSyncConfig.tableName + " should exist after sync completes", hiveClient.doesTableExist());
assertEquals("Hive Schema should match the table schema + partition fields", hiveClient.getTableSchema().size(),
assertTrue("Table " + hiveSyncConfig.tableName + " should exist after sync completes",
hiveClient.doesTableExist(hiveSyncConfig.tableName));
assertEquals("Hive Schema should match the table schema + partition fields",
hiveClient.getTableSchema(hiveSyncConfig.tableName).size(),
hiveClient.getDataSchema().getColumns().size() + 3);
assertEquals("Table partitions should match the number of partitions we wrote", 5,
hiveClient.scanTablePartitions().size());
hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size());
assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", commitTime,
hiveClient.getLastCommitTimeSynced().get());
hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get());
}
}

View File

@@ -26,7 +26,7 @@ import org.apache.hudi.common.minicluster.HdfsTestService;
import org.apache.hudi.common.minicluster.ZookeeperTestService;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieDataFile;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
@@ -179,7 +179,7 @@ public class TestUtil {
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, commitTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createdTablesSet
.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_REALTIME_TABLE);
.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE);
HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();
commitMetadata.getPartitionToWriteStats()
.forEach((key, value) -> value.forEach(l -> compactionMetadata.addWriteStat(key, l)));
@@ -202,9 +202,8 @@ public class TestUtil {
throws IOException, URISyntaxException, InterruptedException {
HoodieCommitMetadata commitMetadata =
createPartitions(numberOfPartitions, isParquetSchemaSimple, startFrom, commitTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createdTablesSet
.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_REALTIME_TABLE);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE);
HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();
commitMetadata.getPartitionToWriteStats()
.forEach((key, value) -> value.forEach(l -> compactionMetadata.addWriteStat(key, l)));
@@ -220,7 +219,7 @@ public class TestUtil {
String partitionPath = wEntry.getKey();
for (HoodieWriteStat wStat : wEntry.getValue()) {
Path path = new Path(wStat.getPath());
HoodieDataFile dataFile = new HoodieDataFile(fileSystem.getFileStatus(path));
HoodieBaseFile dataFile = new HoodieBaseFile(fileSystem.getFileStatus(path));
HoodieLogFile logFile = generateLogData(path, isLogSchemaSimple);
HoodieDeltaWriteStat writeStat = new HoodieDeltaWriteStat();
writeStat.setFileId(dataFile.getFileId());
@@ -291,7 +290,7 @@ public class TestUtil {
private static HoodieLogFile generateLogData(Path parquetFilePath, boolean isLogSchemaSimple)
throws IOException, InterruptedException, URISyntaxException {
Schema schema = (isLogSchemaSimple ? SchemaTestUtil.getSimpleSchema() : SchemaTestUtil.getEvolvedSchema());
HoodieDataFile dataFile = new HoodieDataFile(fileSystem.getFileStatus(parquetFilePath));
HoodieBaseFile dataFile = new HoodieBaseFile(fileSystem.getFileStatus(parquetFilePath));
// Write a log file for this parquet file
Writer logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(parquetFilePath.getParent())
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(dataFile.getFileId())