[HUDI-2081] Move schema util tests out from TestHiveSyncTool (#3166)
This commit is contained in:
@@ -26,7 +26,6 @@ import org.apache.hudi.common.testutils.SchemaTestUtil;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.hive.testutils.HiveTestUtil;
|
||||
import org.apache.hudi.hive.util.ConfigUtils;
|
||||
import org.apache.hudi.hive.util.HiveSchemaUtil;
|
||||
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
|
||||
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
|
||||
|
||||
@@ -36,10 +35,6 @@ import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hive.metastore.api.Partition;
|
||||
import org.apache.hadoop.hive.ql.Driver;
|
||||
import org.apache.hadoop.hive.ql.session.SessionState;
|
||||
import org.apache.parquet.schema.MessageType;
|
||||
import org.apache.parquet.schema.OriginalType;
|
||||
import org.apache.parquet.schema.PrimitiveType;
|
||||
import org.apache.parquet.schema.Types;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
@@ -52,7 +47,6 @@ import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -87,117 +81,6 @@ public class TestHiveSyncTool {
|
||||
HiveTestUtil.shutdown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Testing converting array types to Hive field declaration strings.
|
||||
* <p>
|
||||
* Refer to the Parquet-113 spec: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
|
||||
*/
|
||||
@Test
|
||||
public void testSchemaConvertArray() throws IOException {
|
||||
// Testing the 3-level annotation structure
|
||||
MessageType schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
|
||||
.optional(PrimitiveType.PrimitiveTypeName.INT32).named("element").named("list").named("int_list")
|
||||
.named("ArrayOfInts");
|
||||
|
||||
String schemaString = HiveSchemaUtil.generateSchemaString(schema);
|
||||
assertEquals("`int_list` ARRAY< int>", schemaString);
|
||||
|
||||
// A array of arrays
|
||||
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup().requiredGroup()
|
||||
.as(OriginalType.LIST).repeatedGroup().required(PrimitiveType.PrimitiveTypeName.INT32).named("element")
|
||||
.named("list").named("element").named("list").named("int_list_list").named("ArrayOfArrayOfInts");
|
||||
|
||||
schemaString = HiveSchemaUtil.generateSchemaString(schema);
|
||||
assertEquals("`int_list_list` ARRAY< ARRAY< int>>", schemaString);
|
||||
|
||||
// A list of integers
|
||||
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeated(PrimitiveType.PrimitiveTypeName.INT32)
|
||||
.named("element").named("int_list").named("ArrayOfInts");
|
||||
|
||||
schemaString = HiveSchemaUtil.generateSchemaString(schema);
|
||||
assertEquals("`int_list` ARRAY< int>", schemaString);
|
||||
|
||||
// A list of structs with two fields
|
||||
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
|
||||
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").required(PrimitiveType.PrimitiveTypeName.INT32)
|
||||
.named("num").named("element").named("tuple_list").named("ArrayOfTuples");
|
||||
|
||||
schemaString = HiveSchemaUtil.generateSchemaString(schema);
|
||||
assertEquals("`tuple_list` ARRAY< STRUCT< `str` : binary, `num` : int>>", schemaString);
|
||||
|
||||
// A list of structs with a single field
|
||||
// For this case, since the inner group name is "array", we treat the
|
||||
// element type as a one-element struct.
|
||||
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
|
||||
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("array").named("one_tuple_list")
|
||||
.named("ArrayOfOneTuples");
|
||||
|
||||
schemaString = HiveSchemaUtil.generateSchemaString(schema);
|
||||
assertEquals("`one_tuple_list` ARRAY< STRUCT< `str` : binary>>", schemaString);
|
||||
|
||||
// A list of structs with a single field
|
||||
// For this case, since the inner group name ends with "_tuple", we also treat the
|
||||
// element type as a one-element struct.
|
||||
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
|
||||
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("one_tuple_list_tuple")
|
||||
.named("one_tuple_list").named("ArrayOfOneTuples2");
|
||||
|
||||
schemaString = HiveSchemaUtil.generateSchemaString(schema);
|
||||
assertEquals("`one_tuple_list` ARRAY< STRUCT< `str` : binary>>", schemaString);
|
||||
|
||||
// A list of structs with a single field
|
||||
// Unlike the above two cases, for this the element type is the type of the
|
||||
// only field in the struct.
|
||||
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
|
||||
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("one_tuple_list").named("one_tuple_list")
|
||||
.named("ArrayOfOneTuples3");
|
||||
|
||||
schemaString = HiveSchemaUtil.generateSchemaString(schema);
|
||||
assertEquals("`one_tuple_list` ARRAY< binary>", schemaString);
|
||||
|
||||
// A list of maps
|
||||
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup().as(OriginalType.MAP)
|
||||
.repeatedGroup().as(OriginalType.MAP_KEY_VALUE).required(PrimitiveType.PrimitiveTypeName.BINARY)
|
||||
.as(OriginalType.UTF8).named("string_key").required(PrimitiveType.PrimitiveTypeName.INT32).named("int_value")
|
||||
.named("key_value").named("array").named("map_list").named("ArrayOfMaps");
|
||||
|
||||
schemaString = HiveSchemaUtil.generateSchemaString(schema);
|
||||
assertEquals("`map_list` ARRAY< MAP< string, int>>", schemaString);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSchemaConvertTimestampMicros() throws IOException {
|
||||
MessageType schema = Types.buildMessage().optional(PrimitiveType.PrimitiveTypeName.INT64)
|
||||
.as(OriginalType.TIMESTAMP_MICROS).named("my_element").named("my_timestamp");
|
||||
String schemaString = HiveSchemaUtil.generateSchemaString(schema);
|
||||
// verify backward compatibility - int64 converted to bigint type
|
||||
assertEquals("`my_element` bigint", schemaString);
|
||||
// verify new functionality - int64 converted to timestamp type when 'supportTimestamp' is enabled
|
||||
schemaString = HiveSchemaUtil.generateSchemaString(schema, Collections.emptyList(), true);
|
||||
assertEquals("`my_element` TIMESTAMP", schemaString);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSchemaDiffForTimestampMicros() {
|
||||
MessageType schema = Types.buildMessage().optional(PrimitiveType.PrimitiveTypeName.INT64)
|
||||
.as(OriginalType.TIMESTAMP_MICROS).named("my_element").named("my_timestamp");
|
||||
// verify backward compatibility - int64 converted to bigint type
|
||||
SchemaDifference schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
|
||||
Collections.emptyMap(), Collections.emptyList(), false);
|
||||
assertEquals("bigint", schemaDifference.getAddColumnTypes().get("`my_element`"));
|
||||
schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
|
||||
schemaDifference.getAddColumnTypes(), Collections.emptyList(), false);
|
||||
assertTrue(schemaDifference.isEmpty());
|
||||
|
||||
// verify schema difference is calculated correctly when supportTimestamp is enabled
|
||||
schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
|
||||
Collections.emptyMap(), Collections.emptyList(), true);
|
||||
assertEquals("TIMESTAMP", schemaDifference.getAddColumnTypes().get("`my_element`"));
|
||||
schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
|
||||
schemaDifference.getAddColumnTypes(), Collections.emptyList(), true);
|
||||
assertTrue(schemaDifference.isEmpty());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource({"useJdbcAndSchemaFromCommitMetadata"})
|
||||
public void testBasicSync(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception {
|
||||
|
||||
@@ -0,0 +1,148 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.hive.util;
|
||||
|
||||
import org.apache.hudi.hive.SchemaDifference;
|
||||
|
||||
import org.apache.parquet.schema.MessageType;
|
||||
import org.apache.parquet.schema.OriginalType;
|
||||
import org.apache.parquet.schema.PrimitiveType;
|
||||
import org.apache.parquet.schema.Types;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class TestHiveSchemaUtil {
|
||||
|
||||
/**
|
||||
* Testing converting array types to Hive field declaration strings.
|
||||
* <p>
|
||||
* Refer to the Parquet-113 spec: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
|
||||
*/
|
||||
@Test
|
||||
public void testSchemaConvertArray() throws IOException {
|
||||
// Testing the 3-level annotation structure
|
||||
MessageType schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
|
||||
.optional(PrimitiveType.PrimitiveTypeName.INT32).named("element").named("list").named("int_list")
|
||||
.named("ArrayOfInts");
|
||||
|
||||
String schemaString = HiveSchemaUtil.generateSchemaString(schema);
|
||||
assertEquals("`int_list` ARRAY< int>", schemaString);
|
||||
|
||||
// A array of arrays
|
||||
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup().requiredGroup()
|
||||
.as(OriginalType.LIST).repeatedGroup().required(PrimitiveType.PrimitiveTypeName.INT32).named("element")
|
||||
.named("list").named("element").named("list").named("int_list_list").named("ArrayOfArrayOfInts");
|
||||
|
||||
schemaString = HiveSchemaUtil.generateSchemaString(schema);
|
||||
assertEquals("`int_list_list` ARRAY< ARRAY< int>>", schemaString);
|
||||
|
||||
// A list of integers
|
||||
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeated(PrimitiveType.PrimitiveTypeName.INT32)
|
||||
.named("element").named("int_list").named("ArrayOfInts");
|
||||
|
||||
schemaString = HiveSchemaUtil.generateSchemaString(schema);
|
||||
assertEquals("`int_list` ARRAY< int>", schemaString);
|
||||
|
||||
// A list of structs with two fields
|
||||
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
|
||||
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").required(PrimitiveType.PrimitiveTypeName.INT32)
|
||||
.named("num").named("element").named("tuple_list").named("ArrayOfTuples");
|
||||
|
||||
schemaString = HiveSchemaUtil.generateSchemaString(schema);
|
||||
assertEquals("`tuple_list` ARRAY< STRUCT< `str` : binary, `num` : int>>", schemaString);
|
||||
|
||||
// A list of structs with a single field
|
||||
// For this case, since the inner group name is "array", we treat the
|
||||
// element type as a one-element struct.
|
||||
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
|
||||
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("array").named("one_tuple_list")
|
||||
.named("ArrayOfOneTuples");
|
||||
|
||||
schemaString = HiveSchemaUtil.generateSchemaString(schema);
|
||||
assertEquals("`one_tuple_list` ARRAY< STRUCT< `str` : binary>>", schemaString);
|
||||
|
||||
// A list of structs with a single field
|
||||
// For this case, since the inner group name ends with "_tuple", we also treat the
|
||||
// element type as a one-element struct.
|
||||
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
|
||||
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("one_tuple_list_tuple")
|
||||
.named("one_tuple_list").named("ArrayOfOneTuples2");
|
||||
|
||||
schemaString = HiveSchemaUtil.generateSchemaString(schema);
|
||||
assertEquals("`one_tuple_list` ARRAY< STRUCT< `str` : binary>>", schemaString);
|
||||
|
||||
// A list of structs with a single field
|
||||
// Unlike the above two cases, for this the element type is the type of the
|
||||
// only field in the struct.
|
||||
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
|
||||
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("one_tuple_list").named("one_tuple_list")
|
||||
.named("ArrayOfOneTuples3");
|
||||
|
||||
schemaString = HiveSchemaUtil.generateSchemaString(schema);
|
||||
assertEquals("`one_tuple_list` ARRAY< binary>", schemaString);
|
||||
|
||||
// A list of maps
|
||||
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup().as(OriginalType.MAP)
|
||||
.repeatedGroup().as(OriginalType.MAP_KEY_VALUE).required(PrimitiveType.PrimitiveTypeName.BINARY)
|
||||
.as(OriginalType.UTF8).named("string_key").required(PrimitiveType.PrimitiveTypeName.INT32).named("int_value")
|
||||
.named("key_value").named("array").named("map_list").named("ArrayOfMaps");
|
||||
|
||||
schemaString = HiveSchemaUtil.generateSchemaString(schema);
|
||||
assertEquals("`map_list` ARRAY< MAP< string, int>>", schemaString);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSchemaConvertTimestampMicros() throws IOException {
|
||||
MessageType schema = Types.buildMessage().optional(PrimitiveType.PrimitiveTypeName.INT64)
|
||||
.as(OriginalType.TIMESTAMP_MICROS).named("my_element").named("my_timestamp");
|
||||
String schemaString = HiveSchemaUtil.generateSchemaString(schema);
|
||||
// verify backward compatibility - int64 converted to bigint type
|
||||
assertEquals("`my_element` bigint", schemaString);
|
||||
// verify new functionality - int64 converted to timestamp type when 'supportTimestamp' is enabled
|
||||
schemaString = HiveSchemaUtil.generateSchemaString(schema, Collections.emptyList(), true);
|
||||
assertEquals("`my_element` TIMESTAMP", schemaString);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSchemaDiffForTimestampMicros() {
|
||||
MessageType schema = Types.buildMessage().optional(PrimitiveType.PrimitiveTypeName.INT64)
|
||||
.as(OriginalType.TIMESTAMP_MICROS).named("my_element").named("my_timestamp");
|
||||
// verify backward compatibility - int64 converted to bigint type
|
||||
SchemaDifference schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
|
||||
Collections.emptyMap(), Collections.emptyList(), false);
|
||||
assertEquals("bigint", schemaDifference.getAddColumnTypes().get("`my_element`"));
|
||||
schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
|
||||
schemaDifference.getAddColumnTypes(), Collections.emptyList(), false);
|
||||
assertTrue(schemaDifference.isEmpty());
|
||||
|
||||
// verify schema difference is calculated correctly when supportTimestamp is enabled
|
||||
schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
|
||||
Collections.emptyMap(), Collections.emptyList(), true);
|
||||
assertEquals("TIMESTAMP", schemaDifference.getAddColumnTypes().get("`my_element`"));
|
||||
schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
|
||||
schemaDifference.getAddColumnTypes(), Collections.emptyList(), true);
|
||||
assertTrue(schemaDifference.isEmpty());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user