1
0

[HUDI-1003] Handle partitions correctly for syncing hudi non-parititioned table to hive (#1720)

This commit is contained in:
Yajun Luo
2020-06-15 19:02:03 +08:00
committed by GitHub
parent ede6c9bda4
commit 043eb564c2
2 changed files with 35 additions and 0 deletions

View File

@@ -38,6 +38,7 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -64,6 +65,11 @@ public class HiveSyncTool {
public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
this.hoodieHiveClient = new HoodieHiveClient(cfg, configuration, fs);
this.cfg = cfg;
// Set partitionFields to empty, when the NonPartitionedExtractor is used
if (NonPartitionedExtractor.class.getName().equals(cfg.partitionValueExtractorClass)) {
LOG.warn("Set partitionFields to empty, since the NonPartitionedExtractor is used");
cfg.partitionFields = new ArrayList<>();
}
switch (hoodieHiveClient.getTableType()) {
case COPY_ON_WRITE:
this.snapshotTableName = cfg.tableName;

View File

@@ -457,6 +457,35 @@ public class TestHiveSyncTool {
"The last commit that was sycned should be updated in the TBLPROPERTIES");
}
@ParameterizedTest
@MethodSource("useJdbc")
public void testNonPartitionedSync(boolean useJdbc) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
String instantTime = "100";
HiveTestUtil.createCOWTable(instantTime, 5, true);
HiveSyncConfig hiveSyncConfig = HiveSyncConfig.copy(HiveTestUtil.hiveSyncConfig);
// Set partition value extractor to NonPartitionedExtractor
hiveSyncConfig.partitionValueExtractorClass = NonPartitionedExtractor.class.getCanonicalName();
hiveSyncConfig.tableName = "non_partitioned";
hiveSyncConfig.partitionFields = Arrays.asList("year", "month", "day");
HiveTestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName),
"Table " + hiveSyncConfig.tableName + " should not exist initially");
// Lets do the sync
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool.syncHoodieTable();
assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName),
"Table " + hiveSyncConfig.tableName + " should exist after sync completes");
assertEquals(hiveClient.getTableSchema(hiveSyncConfig.tableName).size(),
hiveClient.getDataSchema().getColumns().size(),
"Hive Schema should match the table schemaignoring the partition fields");
assertEquals(0, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
"Table should not have partitions because of the NonPartitionedExtractor");
}
@ParameterizedTest
@MethodSource("useJdbc")
public void testReadSchemaForMOR(boolean useJdbc) throws Exception {