1
0

[HUDI-838] Support schema from HoodieCommitMetadata for HiveSync (#1559)

Co-authored-by: Mehrotra <uditme@amazon.com>
This commit is contained in:
Udit Mehrotra
2020-05-07 16:33:09 -07:00
committed by GitHub
parent e783ab1749
commit d54b4b8a52
6 changed files with 199 additions and 57 deletions

View File

@@ -535,7 +535,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
try { try {
TableSchemaResolver schemaUtil = new TableSchemaResolver(getMetaClient()); TableSchemaResolver schemaUtil = new TableSchemaResolver(getMetaClient());
writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema()); writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableSchemaFromCommitMetadata()); tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields());
isValid = TableSchemaResolver.isSchemaCompatible(tableSchema, writerSchema); isValid = TableSchemaResolver.isSchemaCompatible(tableSchema, writerSchema);
} catch (Exception e) { } catch (Exception e) {
throw new HoodieException("Failed to read schema/check compatibility for base path " + metaClient.getBasePath(), e); throw new HoodieException("Failed to read schema/check compatibility for base path " + metaClient.getBasePath(), e);

View File

@@ -171,6 +171,16 @@ public class HoodieAvroUtils {
return mergedSchema; return mergedSchema;
} }
public static Schema removeMetadataFields(Schema schema) {
List<Schema.Field> filteredFields = schema.getFields()
.stream()
.filter(field -> !HoodieRecord.HOODIE_META_COLUMNS.contains(field.name()))
.collect(Collectors.toList());
Schema filteredSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false);
filteredSchema.setFields(filteredFields);
return filteredSchema;
}
public static String addMetadataColumnTypes(String hiveColumnTypes) { public static String addMetadataColumnTypes(String hiveColumnTypes) {
return "string,string,string,string,string," + hiveColumnTypes; return "string,string,string,string,string," + hiveColumnTypes;
} }

View File

@@ -25,6 +25,7 @@ import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaCompatibility; import org.apache.avro.SchemaCompatibility;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieLogFile;
@@ -36,6 +37,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.InvalidTableException; import org.apache.hudi.exception.InvalidTableException;
@@ -66,7 +68,7 @@ public class TableSchemaResolver {
* @return Parquet schema for this table * @return Parquet schema for this table
* @throws Exception * @throws Exception
*/ */
public MessageType getDataSchema() throws Exception { private MessageType getTableParquetSchemaFromDataFile() throws Exception {
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
try { try {
@@ -139,29 +141,66 @@ public class TableSchemaResolver {
} }
} }
private Schema getTableAvroSchemaFromDataFile() throws Exception {
return convertParquetSchemaToAvro(getTableParquetSchemaFromDataFile());
}
/** /**
* Gets the schema for a hoodie table in Avro format. * Gets full schema (user + metadata) for a hoodie table in Avro format.
* *
* @return Avro schema for this table * @return Avro schema for this table
* @throws Exception * @throws Exception
*/ */
public Schema getTableSchema() throws Exception { public Schema getTableAvroSchema() throws Exception {
return convertParquetSchemaToAvro(getDataSchema()); Option<Schema> schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(true);
return schemaFromCommitMetadata.isPresent() ? schemaFromCommitMetadata.get() : getTableAvroSchemaFromDataFile();
}
/**
* Gets full schema (user + metadata) for a hoodie table in Parquet format.
*
* @return Parquet schema for the table
* @throws Exception
*/
public MessageType getTableParquetSchema() throws Exception {
Option<Schema> schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(true);
return schemaFromCommitMetadata.isPresent() ? convertAvroSchemaToParquet(schemaFromCommitMetadata.get()) :
getTableParquetSchemaFromDataFile();
}
/**
* Gets users data schema for a hoodie table in Avro format.
*
* @return Avro user data schema
* @throws Exception
*/
public Schema getTableAvroSchemaWithoutMetadataFields() throws Exception {
Option<Schema> schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(false);
return schemaFromCommitMetadata.isPresent() ? schemaFromCommitMetadata.get() :
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
} }
/** /**
* Gets the schema for a hoodie table in Avro format from the HoodieCommitMetadata of the last commit. * Gets the schema for a hoodie table in Avro format from the HoodieCommitMetadata of the last commit.
* *
* @return Avro schema for this table * @return Avro schema for this table
* @throws Exception
*/ */
public Schema getTableSchemaFromCommitMetadata() throws Exception { private Option<Schema> getTableSchemaFromCommitMetadata(boolean includeMetadataFields) {
try { try {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
byte[] data = timeline.getInstantDetails(timeline.lastInstant().get()).get(); byte[] data = timeline.getInstantDetails(timeline.lastInstant().get()).get();
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class); HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
String existingSchemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY); String existingSchemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
return new Schema.Parser().parse(existingSchemaStr);
if (StringUtils.isNullOrEmpty(existingSchemaStr)) {
return Option.empty();
}
Schema schema = new Schema.Parser().parse(existingSchemaStr);
if (includeMetadataFields) {
schema = HoodieAvroUtils.addMetadataFields(schema);
}
return Option.of(schema);
} catch (Exception e) { } catch (Exception e) {
throw new HoodieException("Failed to read schema from commit metadata", e); throw new HoodieException("Failed to read schema from commit metadata", e);
} }
@@ -178,6 +217,17 @@ public class TableSchemaResolver {
return avroSchemaConverter.convert(parquetSchema); return avroSchemaConverter.convert(parquetSchema);
} }
/**
* Convert a avro scheme to the parquet format.
*
* @param schema The avro schema to convert
* @return The converted parquet schema
*/
public MessageType convertAvroSchemaToParquet(Schema schema) {
AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(metaClient.getHadoopConf());
return avroSchemaConverter.convert(schema);
}
/** /**
* HUDI specific validation of schema evolution. Ensures that a newer schema can be used for the dataset by * HUDI specific validation of schema evolution. Ensures that a newer schema can be used for the dataset by
* checking if the data written using the old schema can be read using the new schema. * checking if the data written using the old schema can be read using the new schema.

View File

@@ -328,14 +328,15 @@ public class HoodieHiveClient {
} }
/** /**
* Gets the schema for a hoodie table. Depending on the type of table, read from any file written in the latest * Gets the schema for a hoodie table. Depending on the type of table, try to read schema from commit metadata if
* commit. We will assume that the schema has not changed within a single atomic write. * present, else fallback to reading from any file written in the latest commit. We will assume that the schema has
* not changed within a single atomic write.
* *
* @return Parquet schema for this table * @return Parquet schema for this table
*/ */
public MessageType getDataSchema() { public MessageType getDataSchema() {
try { try {
return new TableSchemaResolver(metaClient).getDataSchema(); return new TableSchemaResolver(metaClient).getTableParquetSchema();
} catch (Exception e) { } catch (Exception e) {
throw new HoodieHiveSyncException("Failed to read data schema", e); throw new HoodieHiveSyncException("Failed to read data schema", e);
} }

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.hive; package org.apache.hudi.hive;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SchemaTestUtil; import org.apache.hudi.common.util.SchemaTestUtil;
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent; import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent;
@@ -52,6 +53,10 @@ public class TestHiveSyncTool {
return Stream.of(false, true); return Stream.of(false, true);
} }
private static Iterable<Object[]> useJdbcAndSchemaFromCommitMetadata() {
return Arrays.asList(new Object[][] { { true, true }, { true, false }, { false, true }, { false, false } });
}
@BeforeEach @BeforeEach
public void setUp() throws IOException, InterruptedException { public void setUp() throws IOException, InterruptedException {
TestUtil.setUp(); TestUtil.setUp();
@@ -146,11 +151,11 @@ public class TestHiveSyncTool {
} }
@ParameterizedTest @ParameterizedTest
@MethodSource("useJdbc") @MethodSource({"useJdbcAndSchemaFromCommitMetadata"})
public void testBasicSync(boolean useJdbc) throws Exception { public void testBasicSync(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception {
TestUtil.hiveSyncConfig.useJdbc = useJdbc; TestUtil.hiveSyncConfig.useJdbc = useJdbc;
String instantTime = "100"; String instantTime = "100";
TestUtil.createCOWTable(instantTime, 5); TestUtil.createCOWTable(instantTime, 5, useSchemaFromCommitMetadata);
HoodieHiveClient hiveClient = HoodieHiveClient hiveClient =
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
assertFalse(hiveClient.doesTableExist(TestUtil.hiveSyncConfig.tableName), assertFalse(hiveClient.doesTableExist(TestUtil.hiveSyncConfig.tableName),
@@ -214,7 +219,7 @@ public class TestHiveSyncTool {
public void testSyncIncremental(boolean useJdbc) throws Exception { public void testSyncIncremental(boolean useJdbc) throws Exception {
TestUtil.hiveSyncConfig.useJdbc = useJdbc; TestUtil.hiveSyncConfig.useJdbc = useJdbc;
String commitTime1 = "100"; String commitTime1 = "100";
TestUtil.createCOWTable(commitTime1, 5); TestUtil.createCOWTable(commitTime1, 5, true);
HoodieHiveClient hiveClient = HoodieHiveClient hiveClient =
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
// Lets do the sync // Lets do the sync
@@ -228,7 +233,7 @@ public class TestHiveSyncTool {
// Now lets create more parititions and these are the only ones which needs to be synced // Now lets create more parititions and these are the only ones which needs to be synced
DateTime dateTime = DateTime.now().plusDays(6); DateTime dateTime = DateTime.now().plusDays(6);
String commitTime2 = "101"; String commitTime2 = "101";
TestUtil.addCOWPartitions(1, true, dateTime, commitTime2); TestUtil.addCOWPartitions(1, true, true, dateTime, commitTime2);
// Lets do the sync // Lets do the sync
hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
@@ -253,7 +258,7 @@ public class TestHiveSyncTool {
public void testSyncIncrementalWithSchemaEvolution(boolean useJdbc) throws Exception { public void testSyncIncrementalWithSchemaEvolution(boolean useJdbc) throws Exception {
TestUtil.hiveSyncConfig.useJdbc = useJdbc; TestUtil.hiveSyncConfig.useJdbc = useJdbc;
String commitTime1 = "100"; String commitTime1 = "100";
TestUtil.createCOWTable(commitTime1, 5); TestUtil.createCOWTable(commitTime1, 5, true);
HoodieHiveClient hiveClient = HoodieHiveClient hiveClient =
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
// Lets do the sync // Lets do the sync
@@ -265,7 +270,7 @@ public class TestHiveSyncTool {
// Now lets create more parititions and these are the only ones which needs to be synced // Now lets create more parititions and these are the only ones which needs to be synced
DateTime dateTime = DateTime.now().plusDays(6); DateTime dateTime = DateTime.now().plusDays(6);
String commitTime2 = "101"; String commitTime2 = "101";
TestUtil.addCOWPartitions(1, false, dateTime, commitTime2); TestUtil.addCOWPartitions(1, false, true, dateTime, commitTime2);
// Lets do the sync // Lets do the sync
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
@@ -286,12 +291,13 @@ public class TestHiveSyncTool {
} }
@ParameterizedTest @ParameterizedTest
@MethodSource("useJdbc") @MethodSource("useJdbcAndSchemaFromCommitMetadata")
public void testSyncMergeOnRead(boolean useJdbc) throws Exception { public void testSyncMergeOnRead(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception {
TestUtil.hiveSyncConfig.useJdbc = useJdbc; TestUtil.hiveSyncConfig.useJdbc = useJdbc;
String instantTime = "100"; String instantTime = "100";
String deltaCommitTime = "101"; String deltaCommitTime = "101";
TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true); TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true,
useSchemaFromCommitMetadata);
String roTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE; String roTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
@@ -301,8 +307,19 @@ public class TestHiveSyncTool {
tool.syncHoodieTable(); tool.syncHoodieTable();
assertTrue(hiveClient.doesTableExist(roTableName), "Table " + roTableName + " should exist after sync completes"); assertTrue(hiveClient.doesTableExist(roTableName), "Table " + roTableName + " should exist after sync completes");
assertEquals(hiveClient.getTableSchema(roTableName).size(), SchemaTestUtil.getSimpleSchema().getFields().size() + 1,
if (useSchemaFromCommitMetadata) {
assertEquals(hiveClient.getTableSchema(roTableName).size(),
SchemaTestUtil.getSimpleSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size()
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
"Hive Schema should match the table schema + partition field"); "Hive Schema should match the table schema + partition field");
} else {
// The data generated and schema in the data file do not have metadata columns, so we need a separate check.
assertEquals(hiveClient.getTableSchema(roTableName).size(),
SchemaTestUtil.getSimpleSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size(),
"Hive Schema should match the table schema + partition field");
}
assertEquals(5, hiveClient.scanTablePartitions(roTableName).size(), assertEquals(5, hiveClient.scanTablePartitions(roTableName).size(),
"Table partitions should match the number of partitions we wrote"); "Table partitions should match the number of partitions we wrote");
assertEquals(deltaCommitTime, hiveClient.getLastCommitTimeSynced(roTableName).get(), assertEquals(deltaCommitTime, hiveClient.getLastCommitTimeSynced(roTableName).get(),
@@ -313,15 +330,25 @@ public class TestHiveSyncTool {
String commitTime2 = "102"; String commitTime2 = "102";
String deltaCommitTime2 = "103"; String deltaCommitTime2 = "103";
TestUtil.addCOWPartitions(1, true, dateTime, commitTime2); TestUtil.addCOWPartitions(1, true, useSchemaFromCommitMetadata, dateTime, commitTime2);
TestUtil.addMORPartitions(1, true, false, dateTime, commitTime2, deltaCommitTime2); TestUtil.addMORPartitions(1, true, false,
useSchemaFromCommitMetadata, dateTime, commitTime2, deltaCommitTime2);
// Lets do the sync // Lets do the sync
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
tool.syncHoodieTable(); tool.syncHoodieTable();
hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
assertEquals(hiveClient.getTableSchema(roTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1, if (useSchemaFromCommitMetadata) {
assertEquals(hiveClient.getTableSchema(roTableName).size(),
SchemaTestUtil.getEvolvedSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size()
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
"Hive Schema should match the evolved table schema + partition field"); "Hive Schema should match the evolved table schema + partition field");
} else {
// The data generated and schema in the data file do not have metadata columns, so we need a separate check.
assertEquals(hiveClient.getTableSchema(roTableName).size(),
SchemaTestUtil.getEvolvedSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size(),
"Hive Schema should match the evolved table schema + partition field");
}
// Sync should add the one partition // Sync should add the one partition
assertEquals(6, hiveClient.scanTablePartitions(roTableName).size(), assertEquals(6, hiveClient.scanTablePartitions(roTableName).size(),
"The 2 partitions we wrote should be added to hive"); "The 2 partitions we wrote should be added to hive");
@@ -330,13 +357,13 @@ public class TestHiveSyncTool {
} }
@ParameterizedTest @ParameterizedTest
@MethodSource("useJdbc") @MethodSource("useJdbcAndSchemaFromCommitMetadata")
public void testSyncMergeOnReadRT(boolean useJdbc) throws Exception { public void testSyncMergeOnReadRT(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception {
TestUtil.hiveSyncConfig.useJdbc = useJdbc; TestUtil.hiveSyncConfig.useJdbc = useJdbc;
String instantTime = "100"; String instantTime = "100";
String deltaCommitTime = "101"; String deltaCommitTime = "101";
String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE; String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true); TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true, useSchemaFromCommitMetadata);
HoodieHiveClient hiveClientRT = HoodieHiveClient hiveClientRT =
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
@@ -352,8 +379,18 @@ public class TestHiveSyncTool {
"Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE "Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
+ " should exist after sync completes"); + " should exist after sync completes");
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getSimpleSchema().getFields().size() + 1, if (useSchemaFromCommitMetadata) {
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
SchemaTestUtil.getSimpleSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size()
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
"Hive Schema should match the table schema + partition field"); "Hive Schema should match the table schema + partition field");
} else {
// The data generated and schema in the data file do not have metadata columns, so we need a separate check.
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
SchemaTestUtil.getSimpleSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size(),
"Hive Schema should match the table schema + partition field");
}
assertEquals(5, hiveClientRT.scanTablePartitions(snapshotTableName).size(), assertEquals(5, hiveClientRT.scanTablePartitions(snapshotTableName).size(),
"Table partitions should match the number of partitions we wrote"); "Table partitions should match the number of partitions we wrote");
assertEquals(deltaCommitTime, hiveClientRT.getLastCommitTimeSynced(snapshotTableName).get(), assertEquals(deltaCommitTime, hiveClientRT.getLastCommitTimeSynced(snapshotTableName).get(),
@@ -364,15 +401,24 @@ public class TestHiveSyncTool {
String commitTime2 = "102"; String commitTime2 = "102";
String deltaCommitTime2 = "103"; String deltaCommitTime2 = "103";
TestUtil.addCOWPartitions(1, true, dateTime, commitTime2); TestUtil.addCOWPartitions(1, true, useSchemaFromCommitMetadata, dateTime, commitTime2);
TestUtil.addMORPartitions(1, true, false, dateTime, commitTime2, deltaCommitTime2); TestUtil.addMORPartitions(1, true, false, useSchemaFromCommitMetadata, dateTime, commitTime2, deltaCommitTime2);
// Lets do the sync // Lets do the sync
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
tool.syncHoodieTable(); tool.syncHoodieTable();
hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1, if (useSchemaFromCommitMetadata) {
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
SchemaTestUtil.getEvolvedSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size()
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
"Hive Schema should match the evolved table schema + partition field"); "Hive Schema should match the evolved table schema + partition field");
} else {
// The data generated and schema in the data file do not have metadata columns, so we need a separate check.
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
SchemaTestUtil.getEvolvedSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size(),
"Hive Schema should match the evolved table schema + partition field");
}
// Sync should add the one partition // Sync should add the one partition
assertEquals(6, hiveClientRT.scanTablePartitions(snapshotTableName).size(), assertEquals(6, hiveClientRT.scanTablePartitions(snapshotTableName).size(),
"The 2 partitions we wrote should be added to hive"); "The 2 partitions we wrote should be added to hive");
@@ -385,7 +431,7 @@ public class TestHiveSyncTool {
public void testMultiPartitionKeySync(boolean useJdbc) throws Exception { public void testMultiPartitionKeySync(boolean useJdbc) throws Exception {
TestUtil.hiveSyncConfig.useJdbc = useJdbc; TestUtil.hiveSyncConfig.useJdbc = useJdbc;
String instantTime = "100"; String instantTime = "100";
TestUtil.createCOWTable(instantTime, 5); TestUtil.createCOWTable(instantTime, 5, true);
HiveSyncConfig hiveSyncConfig = HiveSyncConfig.copy(TestUtil.hiveSyncConfig); HiveSyncConfig hiveSyncConfig = HiveSyncConfig.copy(TestUtil.hiveSyncConfig);
hiveSyncConfig.partitionValueExtractorClass = MultiPartKeysValueExtractor.class.getCanonicalName(); hiveSyncConfig.partitionValueExtractorClass = MultiPartKeysValueExtractor.class.getCanonicalName();
@@ -416,7 +462,7 @@ public class TestHiveSyncTool {
TestUtil.hiveSyncConfig.useJdbc = useJdbc; TestUtil.hiveSyncConfig.useJdbc = useJdbc;
String commitTime = "100"; String commitTime = "100";
String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE; String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
TestUtil.createMORTable(commitTime, "", 5, false); TestUtil.createMORTable(commitTime, "", 5, false, true);
HoodieHiveClient hiveClientRT = HoodieHiveClient hiveClientRT =
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
@@ -431,7 +477,9 @@ public class TestHiveSyncTool {
+ " should exist after sync completes"); + " should exist after sync completes");
// Schema being read from compacted base files // Schema being read from compacted base files
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getSimpleSchema().getFields().size() + 1, assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
SchemaTestUtil.getSimpleSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size()
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
"Hive Schema should match the table schema + partition field"); "Hive Schema should match the table schema + partition field");
assertEquals(5, hiveClientRT.scanTablePartitions(snapshotTableName).size(), "Table partitions should match the number of partitions we wrote"); assertEquals(5, hiveClientRT.scanTablePartitions(snapshotTableName).size(), "Table partitions should match the number of partitions we wrote");
@@ -440,14 +488,16 @@ public class TestHiveSyncTool {
String commitTime2 = "102"; String commitTime2 = "102";
String deltaCommitTime2 = "103"; String deltaCommitTime2 = "103";
TestUtil.addMORPartitions(1, true, false, dateTime, commitTime2, deltaCommitTime2); TestUtil.addMORPartitions(1, true, false, true, dateTime, commitTime2, deltaCommitTime2);
// Lets do the sync // Lets do the sync
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
tool.syncHoodieTable(); tool.syncHoodieTable();
hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
// Schema being read from the log files // Schema being read from the log files
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1, assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
SchemaTestUtil.getEvolvedSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size()
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
"Hive Schema should match the evolved table schema + partition field"); "Hive Schema should match the evolved table schema + partition field");
// Sync should add the one partition // Sync should add the one partition
assertEquals(6, hiveClientRT.scanTablePartitions(snapshotTableName).size(), "The 1 partition we wrote should be added to hive"); assertEquals(6, hiveClientRT.scanTablePartitions(snapshotTableName).size(), "The 1 partition we wrote should be added to hive");

View File

@@ -155,7 +155,7 @@ public class TestUtil {
} }
} }
static void createCOWTable(String instantTime, int numberOfPartitions) static void createCOWTable(String instantTime, int numberOfPartitions, boolean useSchemaFromCommitMetadata)
throws IOException, URISyntaxException { throws IOException, URISyntaxException {
Path path = new Path(hiveSyncConfig.basePath); Path path = new Path(hiveSyncConfig.basePath);
FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath)); FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
@@ -164,13 +164,15 @@ public class TestUtil {
boolean result = fileSystem.mkdirs(path); boolean result = fileSystem.mkdirs(path);
checkResult(result); checkResult(result);
DateTime dateTime = DateTime.now(); DateTime dateTime = DateTime.now();
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, instantTime); HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true,
useSchemaFromCommitMetadata, dateTime, instantTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createCommitFile(commitMetadata, instantTime); createCommitFile(commitMetadata, instantTime);
} }
static void createMORTable(String commitTime, String deltaCommitTime, int numberOfPartitions, static void createMORTable(String commitTime, String deltaCommitTime, int numberOfPartitions,
boolean createDeltaCommit) throws IOException, URISyntaxException, InterruptedException { boolean createDeltaCommit, boolean useSchemaFromCommitMetadata)
throws IOException, URISyntaxException, InterruptedException {
Path path = new Path(hiveSyncConfig.basePath); Path path = new Path(hiveSyncConfig.basePath);
FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath)); FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
HoodieTableMetaClient.initTableType(configuration, hiveSyncConfig.basePath, HoodieTableType.MERGE_ON_READ, HoodieTableMetaClient.initTableType(configuration, hiveSyncConfig.basePath, HoodieTableType.MERGE_ON_READ,
@@ -179,46 +181,54 @@ public class TestUtil {
boolean result = fileSystem.mkdirs(path); boolean result = fileSystem.mkdirs(path);
checkResult(result); checkResult(result);
DateTime dateTime = DateTime.now(); DateTime dateTime = DateTime.now();
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, commitTime); HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true,
useSchemaFromCommitMetadata, dateTime, commitTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createdTablesSet createdTablesSet
.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE); .add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE);
HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata(); HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();
commitMetadata.getPartitionToWriteStats() commitMetadata.getPartitionToWriteStats()
.forEach((key, value) -> value.forEach(l -> compactionMetadata.addWriteStat(key, l))); .forEach((key, value) -> value.forEach(l -> compactionMetadata.addWriteStat(key, l)));
addSchemaToCommitMetadata(compactionMetadata, commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY),
useSchemaFromCommitMetadata);
createCompactionCommitFile(compactionMetadata, commitTime); createCompactionCommitFile(compactionMetadata, commitTime);
if (createDeltaCommit) { if (createDeltaCommit) {
// Write a delta commit // Write a delta commit
HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), true); HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), true,
useSchemaFromCommitMetadata);
createDeltaCommitFile(deltaMetadata, deltaCommitTime); createDeltaCommitFile(deltaMetadata, deltaCommitTime);
} }
} }
static void addCOWPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, DateTime startFrom, static void addCOWPartitions(int numberOfPartitions, boolean isParquetSchemaSimple,
String instantTime) throws IOException, URISyntaxException { boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime) throws IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata commitMetadata =
createPartitions(numberOfPartitions, isParquetSchemaSimple, startFrom, instantTime); createPartitions(numberOfPartitions, isParquetSchemaSimple, useSchemaFromCommitMetadata, startFrom, instantTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createCommitFile(commitMetadata, instantTime); createCommitFile(commitMetadata, instantTime);
} }
static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple, static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple,
DateTime startFrom, String instantTime, String deltaCommitTime) boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime, String deltaCommitTime)
throws IOException, URISyntaxException, InterruptedException { throws IOException, URISyntaxException, InterruptedException {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, isParquetSchemaSimple,
createPartitions(numberOfPartitions, isParquetSchemaSimple, startFrom, instantTime); useSchemaFromCommitMetadata, startFrom, instantTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE);
HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata(); HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();
commitMetadata.getPartitionToWriteStats() commitMetadata.getPartitionToWriteStats()
.forEach((key, value) -> value.forEach(l -> compactionMetadata.addWriteStat(key, l))); .forEach((key, value) -> value.forEach(l -> compactionMetadata.addWriteStat(key, l)));
addSchemaToCommitMetadata(compactionMetadata, commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY),
useSchemaFromCommitMetadata);
createCompactionCommitFile(compactionMetadata, instantTime); createCompactionCommitFile(compactionMetadata, instantTime);
HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), isLogSchemaSimple); HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), isLogSchemaSimple,
useSchemaFromCommitMetadata);
createDeltaCommitFile(deltaMetadata, deltaCommitTime); createDeltaCommitFile(deltaMetadata, deltaCommitTime);
} }
private static HoodieCommitMetadata createLogFiles(Map<String, List<HoodieWriteStat>> partitionWriteStats, private static HoodieCommitMetadata createLogFiles(Map<String, List<HoodieWriteStat>> partitionWriteStats,
boolean isLogSchemaSimple) throws InterruptedException, IOException, URISyntaxException { boolean isLogSchemaSimple, boolean useSchemaFromCommitMetadata)
throws InterruptedException, IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
for (Entry<String, List<HoodieWriteStat>> wEntry : partitionWriteStats.entrySet()) { for (Entry<String, List<HoodieWriteStat>> wEntry : partitionWriteStats.entrySet()) {
String partitionPath = wEntry.getKey(); String partitionPath = wEntry.getKey();
@@ -232,11 +242,12 @@ public class TestUtil {
commitMetadata.addWriteStat(partitionPath, writeStat); commitMetadata.addWriteStat(partitionPath, writeStat);
} }
} }
addSchemaToCommitMetadata(commitMetadata, isLogSchemaSimple, useSchemaFromCommitMetadata);
return commitMetadata; return commitMetadata;
} }
private static HoodieCommitMetadata createPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, private static HoodieCommitMetadata createPartitions(int numberOfPartitions, boolean isParquetSchemaSimple,
DateTime startFrom, String instantTime) throws IOException, URISyntaxException { boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime) throws IOException, URISyntaxException {
startFrom = startFrom.withTimeAtStartOfDay(); startFrom = startFrom.withTimeAtStartOfDay();
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
@@ -249,6 +260,7 @@ public class TestUtil {
startFrom = startFrom.minusDays(1); startFrom = startFrom.minusDays(1);
writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s)); writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s));
} }
addSchemaToCommitMetadata(commitMetadata, isParquetSchemaSimple, useSchemaFromCommitMetadata);
return commitMetadata; return commitMetadata;
} }
@@ -271,7 +283,7 @@ public class TestUtil {
@SuppressWarnings({"unchecked", "deprecation"}) @SuppressWarnings({"unchecked", "deprecation"})
private static void generateParquetData(Path filePath, boolean isParquetSchemaSimple) private static void generateParquetData(Path filePath, boolean isParquetSchemaSimple)
throws IOException, URISyntaxException { throws IOException, URISyntaxException {
Schema schema = (isParquetSchemaSimple ? SchemaTestUtil.getSimpleSchema() : SchemaTestUtil.getEvolvedSchema()); Schema schema = getTestDataSchema(isParquetSchemaSimple);
org.apache.parquet.schema.MessageType parquetSchema = new AvroSchemaConverter().convert(schema); org.apache.parquet.schema.MessageType parquetSchema = new AvroSchemaConverter().convert(schema);
BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, -1, BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, -1,
BloomFilterTypeCode.SIMPLE.name()); BloomFilterTypeCode.SIMPLE.name());
@@ -294,7 +306,7 @@ public class TestUtil {
private static HoodieLogFile generateLogData(Path parquetFilePath, boolean isLogSchemaSimple) private static HoodieLogFile generateLogData(Path parquetFilePath, boolean isLogSchemaSimple)
throws IOException, InterruptedException, URISyntaxException { throws IOException, InterruptedException, URISyntaxException {
Schema schema = (isLogSchemaSimple ? SchemaTestUtil.getSimpleSchema() : SchemaTestUtil.getEvolvedSchema()); Schema schema = getTestDataSchema(isLogSchemaSimple);
HoodieBaseFile dataFile = new HoodieBaseFile(fileSystem.getFileStatus(parquetFilePath)); HoodieBaseFile dataFile = new HoodieBaseFile(fileSystem.getFileStatus(parquetFilePath));
// Write a log file for this parquet file // Write a log file for this parquet file
Writer logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(parquetFilePath.getParent()) Writer logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(parquetFilePath.getParent())
@@ -311,6 +323,25 @@ public class TestUtil {
return logWriter.getLogFile(); return logWriter.getLogFile();
} }
private static Schema getTestDataSchema(boolean isSimpleSchema) throws IOException {
return isSimpleSchema ? SchemaTestUtil.getSimpleSchema() : SchemaTestUtil.getEvolvedSchema();
}
private static void addSchemaToCommitMetadata(HoodieCommitMetadata commitMetadata, boolean isSimpleSchema,
boolean useSchemaFromCommitMetadata) throws IOException {
if (useSchemaFromCommitMetadata) {
Schema dataSchema = getTestDataSchema(isSimpleSchema);
commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, dataSchema.toString());
}
}
private static void addSchemaToCommitMetadata(HoodieCommitMetadata commitMetadata, String schema,
boolean useSchemaFromCommitMetadata) {
if (useSchemaFromCommitMetadata) {
commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schema);
}
}
private static void checkResult(boolean result) { private static void checkResult(boolean result) {
if (!result) { if (!result) {
throw new JUnitException("Could not initialize"); throw new JUnitException("Could not initialize");