1
0

[HUDI-1932] Update Hive sync timestamp when change detected (#3053)

* Update Hive sync timestamp when change detected

Only update the last commit timestamp on the Hive table when the table schema
has changed or a partition is created/updated.

When using AWS Glue Data Catalog as the metastore for Hive this will ensure
that table versions are substantive (including schema and/or partition
changes). Prior to this change when a Hive sync is performed without schema
or partition changes the table in the Glue Data Catalog would have a new
version published with the only change being the timestamp property.

https://issues.apache.org/jira/browse/HUDI-1932

* add conditional sync flag

* fix testSyncWithoutDiffs

* fix HiveSyncConfig

Co-authored-by: Raymond Xu <2701446+xushiyan@users.noreply.github.com>
This commit is contained in:
Nate Radtke
2021-11-21 00:41:05 -06:00
committed by GitHub
parent 520538b15d
commit 887787e8b9
3 changed files with 51 additions and 5 deletions

View File

@@ -120,6 +120,9 @@ public class HiveSyncConfig implements Serializable {
@Parameter(names = {"--with-operation-field"}, description = "Whether to include the '_hoodie_operation' field in the metadata fields")
public Boolean withOperationField = false;
@Parameter(names = {"--conditional-sync"}, description = "If true, only sync on conditions like schema change or partition change.")
public Boolean isConditionalSync = false;
// enhance the similar function in child class
public static HiveSyncConfig copy(HiveSyncConfig cfg) {
HiveSyncConfig newConfig = new HiveSyncConfig();
@@ -143,6 +146,7 @@ public class HiveSyncConfig implements Serializable {
newConfig.syncAsSparkDataSourceTable = cfg.syncAsSparkDataSourceTable;
newConfig.sparkSchemaLengthThreshold = cfg.sparkSchemaLengthThreshold;
newConfig.withOperationField = cfg.withOperationField;
newConfig.isConditionalSync = cfg.isConditionalSync;
return newConfig;
}
@@ -174,6 +178,7 @@ public class HiveSyncConfig implements Serializable {
+ ", syncAsSparkDataSourceTable=" + syncAsSparkDataSourceTable
+ ", sparkSchemaLengthThreshold=" + sparkSchemaLengthThreshold
+ ", withOperationField=" + withOperationField
+ ", isConditionalSync=" + isConditionalSync
+ '}';
}
}

View File

@@ -179,7 +179,7 @@ public class HiveSyncTool extends AbstractSyncTool {
cfg.syncAsSparkDataSourceTable = false;
}
// Sync schema if needed
syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema);
boolean schemaChanged = syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema);
LOG.info("Schema sync complete. Syncing partitions for " + tableName);
// Get the last time we successfully synced partitions
@@ -192,8 +192,11 @@ public class HiveSyncTool extends AbstractSyncTool {
LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size());
// Sync the partitions if needed
syncPartitions(tableName, writtenPartitionsSince);
hoodieHiveClient.updateLastCommitTimeSynced(tableName);
boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince);
boolean meetSyncConditions = schemaChanged || partitionsChanged;
if (!cfg.isConditionalSync || meetSyncConditions) {
hoodieHiveClient.updateLastCommitTimeSynced(tableName);
}
LOG.info("Sync complete for " + tableName);
}
@@ -204,7 +207,7 @@ public class HiveSyncTool extends AbstractSyncTool {
* @param tableExists - does table exist
* @param schema - extracted schema
*/
private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat,
private boolean syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat,
boolean readAsOptimized, MessageType schema) {
// Append spark table properties & serde properties
Map<String, String> tableProperties = ConfigUtils.toMap(cfg.tableProperties);
@@ -215,6 +218,7 @@ public class HiveSyncTool extends AbstractSyncTool {
tableProperties.putAll(sparkTableProperties);
serdeProperties.putAll(sparkSerdeProperties);
}
boolean schemaChanged = false;
// Check and sync schema
if (!tableExists) {
LOG.info("Hive table " + tableName + " is not found. Creating it");
@@ -236,6 +240,7 @@ public class HiveSyncTool extends AbstractSyncTool {
// /ql/exec/DDLTask.java#L3488
hoodieHiveClient.createTable(tableName, schema, inputFormatClassName,
outputFormatClassName, serDeFormatClassName, serdeProperties, tableProperties);
schemaChanged = true;
} else {
// Check if the table schema has evolved
Map<String, String> tableSchema = hoodieHiveClient.getTableSchema(tableName);
@@ -248,10 +253,12 @@ public class HiveSyncTool extends AbstractSyncTool {
hoodieHiveClient.updateTableProperties(tableName, tableProperties);
LOG.info("Sync table properties for " + tableName + ", table properties is: " + cfg.tableProperties);
}
schemaChanged = true;
} else {
LOG.info("No Schema difference for " + tableName);
}
}
return schemaChanged;
}
/**
@@ -324,7 +331,8 @@ public class HiveSyncTool extends AbstractSyncTool {
* Syncs the list of storage partitions passed in (checks if the partition is in hive, if not adds it or if the
* partition path does not match, it updates the partition path).
*/
private void syncPartitions(String tableName, List<String> writtenPartitionsSince) {
private boolean syncPartitions(String tableName, List<String> writtenPartitionsSince) {
boolean partitionsChanged;
try {
List<Partition> hivePartitions = hoodieHiveClient.scanTablePartitions(tableName);
List<PartitionEvent> partitionEvents =
@@ -335,9 +343,11 @@ public class HiveSyncTool extends AbstractSyncTool {
List<String> updatePartitions = filterPartitions(partitionEvents, PartitionEventType.UPDATE);
LOG.info("Changed Partitions " + updatePartitions);
hoodieHiveClient.updatePartitionsToTable(tableName, updatePartitions);
partitionsChanged = !updatePartitions.isEmpty() || !newPartitions.isEmpty();
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to sync partitions for table " + tableName, e);
}
return partitionsChanged;
}
private List<String> filterPartitions(List<PartitionEvent> events, PartitionEventType eventType) {

View File

@@ -1017,4 +1017,35 @@ public class TestHiveSyncTool {
.containsValue("BIGINT"), errorMsg);
ddlExecutor.runSQL(dropTableSql);
}
@ParameterizedTest
@MethodSource("syncMode")
public void testSyncWithoutDiffs(String syncMode) throws Exception {
hiveSyncConfig.syncMode = syncMode;
hiveSyncConfig.isConditionalSync = true;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
String tableName = HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
String commitTime0 = "100";
String commitTime1 = "101";
String commitTime2 = "102";
HiveTestUtil.createMORTable(commitTime0, commitTime1, 2, true, true);
HoodieHiveClient hiveClient =
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool.syncHoodieTable();
assertTrue(hiveClient.doesTableExist(tableName));
assertEquals(commitTime1, hiveClient.getLastCommitTimeSynced(tableName).get());
HiveTestUtil.addMORPartitions(0, true, true, true, ZonedDateTime.now().plusDays(2), commitTime1, commitTime2);
tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool.syncHoodieTable();
hiveClient = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
assertEquals(commitTime1, hiveClient.getLastCommitTimeSynced(tableName).get());
}
}