1
0

[HUDI-1383] Fixing sorting of partition vals for hive sync computation (#2402)

This commit is contained in:
Sivabalan Narayanan
2021-01-06 07:49:44 -05:00
committed by GitHub
parent 47c5e518a7
commit da2919a75f
3 changed files with 75 additions and 13 deletions

View File

@@ -207,7 +207,6 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
Map<String, String> paths = new HashMap<>(); Map<String, String> paths = new HashMap<>();
for (Partition tablePartition : tablePartitions) { for (Partition tablePartition : tablePartitions) {
List<String> hivePartitionValues = tablePartition.getValues(); List<String> hivePartitionValues = tablePartition.getValues();
Collections.sort(hivePartitionValues);
String fullTablePartitionPath = String fullTablePartitionPath =
Path.getPathWithoutSchemeAndAuthority(new Path(tablePartition.getSd().getLocation())).toUri().getPath(); Path.getPathWithoutSchemeAndAuthority(new Path(tablePartition.getSd().getLocation())).toUri().getPath();
paths.put(String.join(", ", hivePartitionValues), fullTablePartitionPath); paths.put(String.join(", ", hivePartitionValues), fullTablePartitionPath);
@@ -219,7 +218,6 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath(); String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
// Check if the partition values or if hdfs path is the same // Check if the partition values or if hdfs path is the same
List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition); List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
Collections.sort(storagePartitionValues);
if (!storagePartitionValues.isEmpty()) { if (!storagePartitionValues.isEmpty()) {
String storageValue = String.join(", ", storagePartitionValues); String storageValue = String.join(", ", storagePartitionValues);
if (!paths.containsKey(storageValue)) { if (!paths.containsKey(storageValue)) {

View File

@@ -21,10 +21,10 @@ package org.apache.hudi.hive;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
import org.apache.hudi.hive.testutils.HiveTestUtil; import org.apache.hudi.hive.testutils.HiveTestUtil;
import org.apache.hudi.hive.util.HiveSchemaUtil; import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageType;
@@ -56,7 +56,7 @@ public class TestHiveSyncTool {
} }
private static Iterable<Object[]> useJdbcAndSchemaFromCommitMetadata() { private static Iterable<Object[]> useJdbcAndSchemaFromCommitMetadata() {
return Arrays.asList(new Object[][] { { true, true }, { true, false }, { false, true }, { false, false } }); return Arrays.asList(new Object[][] {{true, true}, {true, false}, {false, true}, {false, false}});
} }
@BeforeEach @BeforeEach
@@ -347,7 +347,7 @@ public class TestHiveSyncTool {
assertEquals(hiveClient.getTableSchema(roTableName).size(), assertEquals(hiveClient.getTableSchema(roTableName).size(),
SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size() SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size()
+ HoodieRecord.HOODIE_META_COLUMNS.size(), + HoodieRecord.HOODIE_META_COLUMNS.size(),
"Hive Schema should match the table schema + partition field"); "Hive Schema should match the table schema + partition field");
} else { } else {
// The data generated and schema in the data file do not have metadata columns, so we need a separate check. // The data generated and schema in the data file do not have metadata columns, so we need a separate check.
assertEquals(hiveClient.getTableSchema(roTableName).size(), assertEquals(hiveClient.getTableSchema(roTableName).size(),
@@ -377,7 +377,7 @@ public class TestHiveSyncTool {
assertEquals(hiveClient.getTableSchema(roTableName).size(), assertEquals(hiveClient.getTableSchema(roTableName).size(),
SchemaTestUtil.getEvolvedSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size() SchemaTestUtil.getEvolvedSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size()
+ HoodieRecord.HOODIE_META_COLUMNS.size(), + HoodieRecord.HOODIE_META_COLUMNS.size(),
"Hive Schema should match the evolved table schema + partition field"); "Hive Schema should match the evolved table schema + partition field");
} else { } else {
// The data generated and schema in the data file do not have metadata columns, so we need a separate check. // The data generated and schema in the data file do not have metadata columns, so we need a separate check.
assertEquals(hiveClient.getTableSchema(roTableName).size(), assertEquals(hiveClient.getTableSchema(roTableName).size(),
@@ -418,7 +418,7 @@ public class TestHiveSyncTool {
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size() SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size()
+ HoodieRecord.HOODIE_META_COLUMNS.size(), + HoodieRecord.HOODIE_META_COLUMNS.size(),
"Hive Schema should match the table schema + partition field"); "Hive Schema should match the table schema + partition field");
} else { } else {
// The data generated and schema in the data file do not have metadata columns, so we need a separate check. // The data generated and schema in the data file do not have metadata columns, so we need a separate check.
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
@@ -489,6 +489,50 @@ public class TestHiveSyncTool {
"Table partitions should match the number of partitions we wrote"); "Table partitions should match the number of partitions we wrote");
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(), assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
"The last commit that was sycned should be updated in the TBLPROPERTIES"); "The last commit that was sycned should be updated in the TBLPROPERTIES");
// HoodieHiveClient had a bug where partition vals were sorted
// and stored as keys in a map. The following tests this particular case.
// Now lets create partition "2010/01/02" and followed by "2010/02/01".
String commitTime2 = "101";
HiveTestUtil.addCOWPartition("2010/01/02", true, true, commitTime2);
hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(instantTime));
assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit");
List<Partition> hivePartitions = hiveClient.scanTablePartitions(hiveSyncConfig.tableName);
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
assertEquals(1, partitionEvents.size(), "There should be only one paritition event");
assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD");
tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool.syncHoodieTable();
// Sync should add the one partition
assertEquals(6, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
"Table partitions should match the number of partitions we wrote");
assertEquals(commitTime2, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
"The last commit that was sycned should be 101");
// create partition "2010/02/01" and ensure sync works
String commitTime3 = "102";
HiveTestUtil.addCOWPartition("2010/02/01", true, true, commitTime3);
HiveTestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool.syncHoodieTable();
assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName),
"Table " + hiveSyncConfig.tableName + " should exist after sync completes");
assertEquals(hiveClient.getTableSchema(hiveSyncConfig.tableName).size(),
hiveClient.getDataSchema().getColumns().size() + 3,
"Hive Schema should match the table schema + partition fields");
assertEquals(7, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
"Table partitions should match the number of partitions we wrote");
assertEquals(commitTime3, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
"The last commit that was sycned should be updated in the TBLPROPERTIES");
assertEquals(1, hiveClient.getPartitionsWrittenToSince(Option.of(commitTime2)).size());
} }
@ParameterizedTest @ParameterizedTest
@@ -507,17 +551,17 @@ public class TestHiveSyncTool {
HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName), assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName),
"Table " + hiveSyncConfig.tableName + " should not exist initially"); "Table " + hiveSyncConfig.tableName + " should not exist initially");
// Lets do the sync // Lets do the sync
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool.syncHoodieTable(); tool.syncHoodieTable();
assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName), assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName),
"Table " + hiveSyncConfig.tableName + " should exist after sync completes"); "Table " + hiveSyncConfig.tableName + " should exist after sync completes");
assertEquals(hiveClient.getTableSchema(hiveSyncConfig.tableName).size(), assertEquals(hiveClient.getTableSchema(hiveSyncConfig.tableName).size(),
hiveClient.getDataSchema().getColumns().size(), hiveClient.getDataSchema().getColumns().size(),
"Hive Schema should match the table schemaignoring the partition fields"); "Hive Schema should match the table schemaignoring the partition fields");
assertEquals(0, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), assertEquals(0, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
"Table should not have partitions because of the NonPartitionedExtractor"); "Table should not have partitions because of the NonPartitionedExtractor");
} }
@ParameterizedTest @ParameterizedTest

View File

@@ -210,6 +210,14 @@ public class HiveTestUtil {
createCommitFile(commitMetadata, instantTime); createCommitFile(commitMetadata, instantTime);
} }
public static void addCOWPartition(String partitionPath, boolean isParquetSchemaSimple,
boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata =
createPartition(partitionPath, isParquetSchemaSimple, useSchemaFromCommitMetadata, instantTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createCommitFile(commitMetadata, instantTime);
}
public static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple, public static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple,
boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime, String deltaCommitTime) boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime, String deltaCommitTime)
throws IOException, URISyntaxException, InterruptedException { throws IOException, URISyntaxException, InterruptedException {
@@ -266,6 +274,18 @@ public class HiveTestUtil {
return commitMetadata; return commitMetadata;
} }
private static HoodieCommitMetadata createPartition(String partitionPath, boolean isParquetSchemaSimple,
boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
Path partPath = new Path(hiveSyncConfig.basePath + "/" + partitionPath);
fileSystem.makeQualified(partPath);
fileSystem.mkdirs(partPath);
List<HoodieWriteStat> writeStats = createTestData(partPath, isParquetSchemaSimple, instantTime);
writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s));
addSchemaToCommitMetadata(commitMetadata, isParquetSchemaSimple, useSchemaFromCommitMetadata);
return commitMetadata;
}
private static List<HoodieWriteStat> createTestData(Path partPath, boolean isParquetSchemaSimple, String instantTime) private static List<HoodieWriteStat> createTestData(Path partPath, boolean isParquetSchemaSimple, String instantTime)
throws IOException, URISyntaxException { throws IOException, URISyntaxException {
List<HoodieWriteStat> writeStats = new ArrayList<>(); List<HoodieWriteStat> writeStats = new ArrayList<>();