[HUDI-1879] Fix RO Tables Returning Snapshot Result (#2925)
This commit is contained in:
@@ -23,7 +23,6 @@ import org.apache.hudi.common.model.WriteOperationType
|
|||||||
import org.apache.hudi.config.HoodieWriteConfig
|
import org.apache.hudi.config.HoodieWriteConfig
|
||||||
import org.apache.hudi.hive.HiveSyncTool
|
import org.apache.hudi.hive.HiveSyncTool
|
||||||
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
|
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
|
||||||
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config
|
|
||||||
import org.apache.hudi.keygen.{CustomKeyGenerator, SimpleKeyGenerator}
|
import org.apache.hudi.keygen.{CustomKeyGenerator, SimpleKeyGenerator}
|
||||||
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
|
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
|
||||||
import org.apache.log4j.LogManager
|
import org.apache.log4j.LogManager
|
||||||
|
|||||||
@@ -437,7 +437,14 @@ object HoodieSparkSqlWriter {
|
|||||||
DataSourceWriteOptions.DEFAULT_HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean
|
DataSourceWriteOptions.DEFAULT_HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean
|
||||||
if (syncAsDtaSourceTable) {
|
if (syncAsDtaSourceTable) {
|
||||||
hiveSyncConfig.tableProperties = parameters.getOrElse(HIVE_TABLE_PROPERTIES, null)
|
hiveSyncConfig.tableProperties = parameters.getOrElse(HIVE_TABLE_PROPERTIES, null)
|
||||||
hiveSyncConfig.serdeProperties = createSqlTableSerdeProperties(parameters, basePath.toString)
|
val serdePropText = createSqlTableSerdeProperties(parameters, basePath.toString)
|
||||||
|
val serdeProp = ConfigUtils.toMap(serdePropText)
|
||||||
|
serdeProp.put(ConfigUtils.SPARK_QUERY_TYPE_KEY, DataSourceReadOptions.QUERY_TYPE_OPT_KEY)
|
||||||
|
serdeProp.put(ConfigUtils.SPARK_QUERY_AS_RO_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
|
||||||
|
serdeProp.put(ConfigUtils.SPARK_QUERY_AS_RT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
|
||||||
|
|
||||||
|
hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProp)
|
||||||
|
|
||||||
}
|
}
|
||||||
hiveSyncConfig
|
hiveSyncConfig
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -570,8 +570,10 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
|
|||||||
"{\"name\":\"_row_key\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}," +
|
"{\"name\":\"_row_key\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}," +
|
||||||
"{\"name\":\"ts\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}," +
|
"{\"name\":\"ts\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}," +
|
||||||
"{\"name\":\"partition\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}")(hiveSyncConfig.tableProperties)
|
"{\"name\":\"partition\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}")(hiveSyncConfig.tableProperties)
|
||||||
|
assertResult("path=/tmp/hoodie_test\n" +
|
||||||
assertResult("path=/tmp/hoodie_test")(hiveSyncConfig.serdeProperties)
|
"spark.query.type.key=hoodie.datasource.query.type\n" +
|
||||||
|
"spark.query.as.rt.key=snapshot\n" +
|
||||||
|
"spark.query.as.ro.key=read_optimized")(hiveSyncConfig.serdeProperties)
|
||||||
}
|
}
|
||||||
|
|
||||||
test("Test build sync config for skip Ro Suffix vals") {
|
test("Test build sync config for skip Ro Suffix vals") {
|
||||||
|
|||||||
@@ -106,13 +106,13 @@ public class HiveSyncTool extends AbstractSyncTool {
|
|||||||
if (hoodieHiveClient != null) {
|
if (hoodieHiveClient != null) {
|
||||||
switch (hoodieHiveClient.getTableType()) {
|
switch (hoodieHiveClient.getTableType()) {
|
||||||
case COPY_ON_WRITE:
|
case COPY_ON_WRITE:
|
||||||
syncHoodieTable(snapshotTableName, false);
|
syncHoodieTable(snapshotTableName, false, false);
|
||||||
break;
|
break;
|
||||||
case MERGE_ON_READ:
|
case MERGE_ON_READ:
|
||||||
// sync a RO table for MOR
|
// sync a RO table for MOR
|
||||||
syncHoodieTable(roTableName.get(), false);
|
syncHoodieTable(roTableName.get(), false, true);
|
||||||
// sync a RT table for MOR
|
// sync a RT table for MOR
|
||||||
syncHoodieTable(snapshotTableName, true);
|
syncHoodieTable(snapshotTableName, true, false);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
|
LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
|
||||||
@@ -128,7 +128,8 @@ public class HiveSyncTool extends AbstractSyncTool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat) {
|
private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
|
||||||
|
boolean readAsOptimized) {
|
||||||
LOG.info("Trying to sync hoodie table " + tableName + " with base path " + hoodieHiveClient.getBasePath()
|
LOG.info("Trying to sync hoodie table " + tableName + " with base path " + hoodieHiveClient.getBasePath()
|
||||||
+ " of type " + hoodieHiveClient.getTableType());
|
+ " of type " + hoodieHiveClient.getTableType());
|
||||||
|
|
||||||
@@ -152,7 +153,7 @@ public class HiveSyncTool extends AbstractSyncTool {
|
|||||||
// Get the parquet schema for this table looking at the latest commit
|
// Get the parquet schema for this table looking at the latest commit
|
||||||
MessageType schema = hoodieHiveClient.getDataSchema();
|
MessageType schema = hoodieHiveClient.getDataSchema();
|
||||||
// Sync schema if needed
|
// Sync schema if needed
|
||||||
syncSchema(tableName, tableExists, useRealtimeInputFormat, schema);
|
syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema);
|
||||||
|
|
||||||
LOG.info("Schema sync complete. Syncing partitions for " + tableName);
|
LOG.info("Schema sync complete. Syncing partitions for " + tableName);
|
||||||
// Get the last time we successfully synced partitions
|
// Get the last time we successfully synced partitions
|
||||||
@@ -177,7 +178,8 @@ public class HiveSyncTool extends AbstractSyncTool {
|
|||||||
* @param tableExists - does table exist
|
* @param tableExists - does table exist
|
||||||
* @param schema - extracted schema
|
* @param schema - extracted schema
|
||||||
*/
|
*/
|
||||||
private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat, MessageType schema) {
|
private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat,
|
||||||
|
boolean readAsOptimized, MessageType schema) {
|
||||||
// Check and sync schema
|
// Check and sync schema
|
||||||
if (!tableExists) {
|
if (!tableExists) {
|
||||||
LOG.info("Hive table " + tableName + " is not found. Creating it");
|
LOG.info("Hive table " + tableName + " is not found. Creating it");
|
||||||
@@ -194,11 +196,27 @@ public class HiveSyncTool extends AbstractSyncTool {
|
|||||||
String outputFormatClassName = HoodieInputFormatUtils.getOutputFormatClassName(baseFileFormat);
|
String outputFormatClassName = HoodieInputFormatUtils.getOutputFormatClassName(baseFileFormat);
|
||||||
String serDeFormatClassName = HoodieInputFormatUtils.getSerDeClassName(baseFileFormat);
|
String serDeFormatClassName = HoodieInputFormatUtils.getSerDeClassName(baseFileFormat);
|
||||||
|
|
||||||
|
Map<String, String> serdeProperties = ConfigUtils.toMap(cfg.serdeProperties);
|
||||||
|
|
||||||
|
// The serdeProperties is non-empty only for spark sync meta data currently.
|
||||||
|
if (!serdeProperties.isEmpty()) {
|
||||||
|
String queryTypeKey = serdeProperties.remove(ConfigUtils.SPARK_QUERY_TYPE_KEY);
|
||||||
|
String queryAsROKey = serdeProperties.remove(ConfigUtils.SPARK_QUERY_AS_RO_KEY);
|
||||||
|
String queryAsRTKey = serdeProperties.remove(ConfigUtils.SPARK_QUERY_AS_RT_KEY);
|
||||||
|
|
||||||
|
if (queryTypeKey != null && queryAsROKey != null && queryAsRTKey != null) {
|
||||||
|
if (readAsOptimized) { // read optimized
|
||||||
|
serdeProperties.put(queryTypeKey, queryAsROKey);
|
||||||
|
} else { // read snapshot
|
||||||
|
serdeProperties.put(queryTypeKey, queryAsRTKey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
// Custom serde will not work with ALTER TABLE REPLACE COLUMNS
|
// Custom serde will not work with ALTER TABLE REPLACE COLUMNS
|
||||||
// https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive
|
// https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive
|
||||||
// /ql/exec/DDLTask.java#L3488
|
// /ql/exec/DDLTask.java#L3488
|
||||||
hoodieHiveClient.createTable(tableName, schema, inputFormatClassName,
|
hoodieHiveClient.createTable(tableName, schema, inputFormatClassName,
|
||||||
outputFormatClassName, serDeFormatClassName, ConfigUtils.toMap(cfg.serdeProperties), ConfigUtils.toMap(cfg.tableProperties));
|
outputFormatClassName, serDeFormatClassName, serdeProperties, ConfigUtils.toMap(cfg.tableProperties));
|
||||||
} else {
|
} else {
|
||||||
// Check if the table schema has evolved
|
// Check if the table schema has evolved
|
||||||
Map<String, String> tableSchema = hoodieHiveClient.getTableSchema(tableName);
|
Map<String, String> tableSchema = hoodieHiveClient.getTableSchema(tableName);
|
||||||
|
|||||||
@@ -24,6 +24,12 @@ import org.apache.hudi.common.util.StringUtils;
|
|||||||
|
|
||||||
public class ConfigUtils {
|
public class ConfigUtils {
|
||||||
|
|
||||||
|
public static final String SPARK_QUERY_TYPE_KEY = "spark.query.type.key";
|
||||||
|
|
||||||
|
public static final String SPARK_QUERY_AS_RO_KEY = "spark.query.as.ro.key";
|
||||||
|
|
||||||
|
public static final String SPARK_QUERY_AS_RT_KEY = "spark.query.as.rt.key";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convert the key-value config to a map.The format of the config
|
* Convert the key-value config to a map.The format of the config
|
||||||
* is a key-value pair just like "k1=v1\nk2=v2\nk3=v3".
|
* is a key-value pair just like "k1=v1\nk2=v2\nk3=v3".
|
||||||
|
|||||||
@@ -264,11 +264,15 @@ public class TestHiveSyncTool {
|
|||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@MethodSource({"useJdbcAndSchemaFromCommitMetadata"})
|
@MethodSource({"useJdbcAndSchemaFromCommitMetadata"})
|
||||||
public void testSyncWithProperties(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception {
|
public void testSyncCOWTableWithProperties(boolean useJdbc,
|
||||||
|
boolean useSchemaFromCommitMetadata) throws Exception {
|
||||||
HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig;
|
HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig;
|
||||||
Map<String, String> serdeProperties = new HashMap<String, String>() {
|
Map<String, String> serdeProperties = new HashMap<String, String>() {
|
||||||
{
|
{
|
||||||
put("path", hiveSyncConfig.basePath);
|
put("path", hiveSyncConfig.basePath);
|
||||||
|
put(ConfigUtils.SPARK_QUERY_TYPE_KEY, "hoodie.datasource.query.type");
|
||||||
|
put(ConfigUtils.SPARK_QUERY_AS_RO_KEY, "read_optimized");
|
||||||
|
put(ConfigUtils.SPARK_QUERY_AS_RT_KEY, "snapshot");
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -304,10 +308,79 @@ public class TestHiveSyncTool {
|
|||||||
assertTrue(results.get(results.size() - 1).startsWith("transient_lastDdlTime"));
|
assertTrue(results.get(results.size() - 1).startsWith("transient_lastDdlTime"));
|
||||||
|
|
||||||
results.clear();
|
results.clear();
|
||||||
|
// validate serde properties
|
||||||
hiveDriver.run("SHOW CREATE TABLE " + dbTableName);
|
hiveDriver.run("SHOW CREATE TABLE " + dbTableName);
|
||||||
hiveDriver.getResults(results);
|
hiveDriver.getResults(results);
|
||||||
String ddl = String.join("\n", results);
|
String ddl = String.join("\n", results);
|
||||||
assertTrue(ddl.contains("'path'='" + hiveSyncConfig.basePath + "'"));
|
assertTrue(ddl.contains("'path'='" + hiveSyncConfig.basePath + "'"));
|
||||||
|
assertTrue(ddl.contains("'hoodie.datasource.query.type'='snapshot'"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@MethodSource({"useJdbcAndSchemaFromCommitMetadata"})
|
||||||
|
public void testSyncMORTableWithProperties(boolean useJdbc,
|
||||||
|
boolean useSchemaFromCommitMetadata) throws Exception {
|
||||||
|
HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig;
|
||||||
|
Map<String, String> serdeProperties = new HashMap<String, String>() {
|
||||||
|
{
|
||||||
|
put("path", hiveSyncConfig.basePath);
|
||||||
|
put(ConfigUtils.SPARK_QUERY_TYPE_KEY, "hoodie.datasource.query.type");
|
||||||
|
put(ConfigUtils.SPARK_QUERY_AS_RO_KEY, "read_optimized");
|
||||||
|
put(ConfigUtils.SPARK_QUERY_AS_RT_KEY, "snapshot");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Map<String, String> tableProperties = new HashMap<String, String>() {
|
||||||
|
{
|
||||||
|
put("tp_0", "p0");
|
||||||
|
put("tp_1", "p1");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
hiveSyncConfig.useJdbc = useJdbc;
|
||||||
|
hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProperties);
|
||||||
|
hiveSyncConfig.tableProperties = ConfigUtils.configToString(tableProperties);
|
||||||
|
String instantTime = "100";
|
||||||
|
String deltaCommitTime = "101";
|
||||||
|
HiveTestUtil.createMORTable(instantTime, deltaCommitTime, 5, true,
|
||||||
|
useSchemaFromCommitMetadata);
|
||||||
|
|
||||||
|
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||||
|
tool.syncHoodieTable();
|
||||||
|
|
||||||
|
String roTableName = hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
|
||||||
|
String rtTableName = hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
|
||||||
|
|
||||||
|
String[] tableNames = new String[] {roTableName, rtTableName};
|
||||||
|
String[] expectQueryTypes = new String[] {"read_optimized", "snapshot"};
|
||||||
|
|
||||||
|
SessionState.start(HiveTestUtil.getHiveConf());
|
||||||
|
Driver hiveDriver = new org.apache.hadoop.hive.ql.Driver(HiveTestUtil.getHiveConf());
|
||||||
|
|
||||||
|
for (int i = 0;i < 2; i++) {
|
||||||
|
String dbTableName = hiveSyncConfig.databaseName + "." + tableNames[i];
|
||||||
|
String expectQueryType = expectQueryTypes[i];
|
||||||
|
|
||||||
|
hiveDriver.run("SHOW TBLPROPERTIES " + dbTableName);
|
||||||
|
List<String> results = new ArrayList<>();
|
||||||
|
hiveDriver.getResults(results);
|
||||||
|
|
||||||
|
String tblPropertiesWithoutDdlTime = String.join("\n",
|
||||||
|
results.subList(0, results.size() - 1));
|
||||||
|
assertEquals(
|
||||||
|
"EXTERNAL\tTRUE\n"
|
||||||
|
+ "last_commit_time_sync\t101\n"
|
||||||
|
+ "tp_0\tp0\n"
|
||||||
|
+ "tp_1\tp1", tblPropertiesWithoutDdlTime);
|
||||||
|
assertTrue(results.get(results.size() - 1).startsWith("transient_lastDdlTime"));
|
||||||
|
|
||||||
|
results.clear();
|
||||||
|
// validate serde properties
|
||||||
|
hiveDriver.run("SHOW CREATE TABLE " + dbTableName);
|
||||||
|
hiveDriver.getResults(results);
|
||||||
|
String ddl = String.join("\n", results);
|
||||||
|
assertTrue(ddl.contains("'path'='" + hiveSyncConfig.basePath + "'"));
|
||||||
|
assertTrue(ddl.contains("'hoodie.datasource.query.type'='" + expectQueryType + "'"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
|
|||||||
@@ -188,7 +188,8 @@ public class HiveTestUtil {
|
|||||||
DateTime dateTime = DateTime.now();
|
DateTime dateTime = DateTime.now();
|
||||||
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true,
|
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true,
|
||||||
useSchemaFromCommitMetadata, dateTime, commitTime);
|
useSchemaFromCommitMetadata, dateTime, commitTime);
|
||||||
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
|
createdTablesSet
|
||||||
|
.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE);
|
||||||
createdTablesSet
|
createdTablesSet
|
||||||
.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE);
|
.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE);
|
||||||
HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();
|
HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();
|
||||||
|
|||||||
Reference in New Issue
Block a user