- * Refer to the Parquet-113 spec: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
- */
- @Test
- public void testSchemaConvertArray() throws IOException {
- // Testing the 3-level annotation structure
- MessageType schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
- .optional(PrimitiveType.PrimitiveTypeName.INT32).named("element").named("list").named("int_list")
- .named("ArrayOfInts");
-
- String schemaString = HiveSchemaUtil.generateSchemaString(schema);
- assertEquals("`int_list` ARRAY< int>", schemaString);
-
- // A array of arrays
- schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup().requiredGroup()
- .as(OriginalType.LIST).repeatedGroup().required(PrimitiveType.PrimitiveTypeName.INT32).named("element")
- .named("list").named("element").named("list").named("int_list_list").named("ArrayOfArrayOfInts");
-
- schemaString = HiveSchemaUtil.generateSchemaString(schema);
- assertEquals("`int_list_list` ARRAY< ARRAY< int>>", schemaString);
-
- // A list of integers
- schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeated(PrimitiveType.PrimitiveTypeName.INT32)
- .named("element").named("int_list").named("ArrayOfInts");
-
- schemaString = HiveSchemaUtil.generateSchemaString(schema);
- assertEquals("`int_list` ARRAY< int>", schemaString);
-
- // A list of structs with two fields
- schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
- .required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("num").named("element").named("tuple_list").named("ArrayOfTuples");
-
- schemaString = HiveSchemaUtil.generateSchemaString(schema);
- assertEquals("`tuple_list` ARRAY< STRUCT< `str` : binary, `num` : int>>", schemaString);
-
- // A list of structs with a single field
- // For this case, since the inner group name is "array", we treat the
- // element type as a one-element struct.
- schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
- .required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("array").named("one_tuple_list")
- .named("ArrayOfOneTuples");
-
- schemaString = HiveSchemaUtil.generateSchemaString(schema);
- assertEquals("`one_tuple_list` ARRAY< STRUCT< `str` : binary>>", schemaString);
-
- // A list of structs with a single field
- // For this case, since the inner group name ends with "_tuple", we also treat the
- // element type as a one-element struct.
- schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
- .required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("one_tuple_list_tuple")
- .named("one_tuple_list").named("ArrayOfOneTuples2");
-
- schemaString = HiveSchemaUtil.generateSchemaString(schema);
- assertEquals("`one_tuple_list` ARRAY< STRUCT< `str` : binary>>", schemaString);
-
- // A list of structs with a single field
- // Unlike the above two cases, for this the element type is the type of the
- // only field in the struct.
- schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
- .required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("one_tuple_list").named("one_tuple_list")
- .named("ArrayOfOneTuples3");
-
- schemaString = HiveSchemaUtil.generateSchemaString(schema);
- assertEquals("`one_tuple_list` ARRAY< binary>", schemaString);
-
- // A list of maps
- schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup().as(OriginalType.MAP)
- .repeatedGroup().as(OriginalType.MAP_KEY_VALUE).required(PrimitiveType.PrimitiveTypeName.BINARY)
- .as(OriginalType.UTF8).named("string_key").required(PrimitiveType.PrimitiveTypeName.INT32).named("int_value")
- .named("key_value").named("array").named("map_list").named("ArrayOfMaps");
-
- schemaString = HiveSchemaUtil.generateSchemaString(schema);
- assertEquals("`map_list` ARRAY< MAP< string, int>>", schemaString);
- }
-
- @Test
- public void testSchemaConvertTimestampMicros() throws IOException {
- MessageType schema = Types.buildMessage().optional(PrimitiveType.PrimitiveTypeName.INT64)
- .as(OriginalType.TIMESTAMP_MICROS).named("my_element").named("my_timestamp");
- String schemaString = HiveSchemaUtil.generateSchemaString(schema);
- // verify backward compatibility - int64 converted to bigint type
- assertEquals("`my_element` bigint", schemaString);
- // verify new functionality - int64 converted to timestamp type when 'supportTimestamp' is enabled
- schemaString = HiveSchemaUtil.generateSchemaString(schema, Collections.emptyList(), true);
- assertEquals("`my_element` TIMESTAMP", schemaString);
- }
-
- @Test
- public void testSchemaDiffForTimestampMicros() {
- MessageType schema = Types.buildMessage().optional(PrimitiveType.PrimitiveTypeName.INT64)
- .as(OriginalType.TIMESTAMP_MICROS).named("my_element").named("my_timestamp");
- // verify backward compatibility - int64 converted to bigint type
- SchemaDifference schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
- Collections.emptyMap(), Collections.emptyList(), false);
- assertEquals("bigint", schemaDifference.getAddColumnTypes().get("`my_element`"));
- schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
- schemaDifference.getAddColumnTypes(), Collections.emptyList(), false);
- assertTrue(schemaDifference.isEmpty());
-
- // verify schema difference is calculated correctly when supportTimestamp is enabled
- schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
- Collections.emptyMap(), Collections.emptyList(), true);
- assertEquals("TIMESTAMP", schemaDifference.getAddColumnTypes().get("`my_element`"));
- schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
- schemaDifference.getAddColumnTypes(), Collections.emptyList(), true);
- assertTrue(schemaDifference.isEmpty());
- }
-
@ParameterizedTest
@MethodSource({"syncModeAndSchemaFromCommitMetadata"})
public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode) throws Exception {
@@ -580,7 +463,7 @@ public class TestHiveSyncTool {
"The last commit that was synced should be updated in the TBLPROPERTIES");
// Now lets create more partitions and these are the only ones which needs to be synced
- DateTime dateTime = DateTime.now().plusDays(6);
+ ZonedDateTime dateTime = ZonedDateTime.now().plusDays(6);
String commitTime2 = "101";
HiveTestUtil.addCOWPartitions(1, true, true, dateTime, commitTime2);
@@ -618,7 +501,7 @@ public class TestHiveSyncTool {
int fields = hiveClient.getTableSchema(hiveSyncConfig.tableName).size();
// Now lets create more partitions and these are the only ones which needs to be synced
- DateTime dateTime = DateTime.now().plusDays(6);
+ ZonedDateTime dateTime = ZonedDateTime.now().plusDays(6);
String commitTime2 = "101";
HiveTestUtil.addCOWPartitions(1, false, true, dateTime, commitTime2);
@@ -677,7 +560,7 @@ public class TestHiveSyncTool {
"The last commit that was synced should be updated in the TBLPROPERTIES");
// Now lets create more partitions and these are the only ones which needs to be synced
- DateTime dateTime = DateTime.now().plusDays(6);
+ ZonedDateTime dateTime = ZonedDateTime.now().plusDays(6);
String commitTime2 = "102";
String deltaCommitTime2 = "103";
@@ -749,7 +632,7 @@ public class TestHiveSyncTool {
"The last commit that was synced should be updated in the TBLPROPERTIES");
// Now lets create more partitions and these are the only ones which needs to be synced
- DateTime dateTime = DateTime.now().plusDays(6);
+ ZonedDateTime dateTime = ZonedDateTime.now().plusDays(6);
String commitTime2 = "102";
String deltaCommitTime2 = "103";
@@ -915,7 +798,7 @@ public class TestHiveSyncTool {
assertEquals(5, hiveClientRT.scanTablePartitions(snapshotTableName).size(), "Table partitions should match the number of partitions we wrote");
// Now lets create more partitions and these are the only ones which needs to be synced
- DateTime dateTime = DateTime.now().plusDays(6);
+ ZonedDateTime dateTime = ZonedDateTime.now().plusDays(6);
String commitTime2 = "102";
String deltaCommitTime2 = "103";
@@ -1013,7 +896,7 @@ public class TestHiveSyncTool {
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
// evolve the schema
- DateTime dateTime = DateTime.now().plusDays(6);
+ ZonedDateTime dateTime = ZonedDateTime.now().plusDays(6);
String commitTime2 = "101";
HiveTestUtil.addCOWPartitions(1, false, true, dateTime, commitTime2);
@@ -1069,7 +952,7 @@ public class TestHiveSyncTool {
verifyOldParquetFileTest(hiveClient, emptyCommitTime);
// evolve the schema
- DateTime dateTime = DateTime.now().plusDays(6);
+ ZonedDateTime dateTime = ZonedDateTime.now().plusDays(6);
String commitTime2 = "301";
HiveTestUtil.addCOWPartitions(1, false, true, dateTime, commitTime2);
//HiveTestUtil.createCommitFileWithSchema(commitMetadata, "400", false); // create another empty commit
diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
index 0686ac0aa..a3bc2268d 100644
--- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
+++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
@@ -60,9 +60,6 @@ import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.zookeeper.server.ZooKeeperServer;
-import org.joda.time.DateTime;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
import org.junit.platform.commons.JUnitException;
import java.io.File;
@@ -71,6 +68,9 @@ import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.time.Instant;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -120,7 +120,7 @@ public class HiveTestUtil {
hiveSyncConfig.usePreApacheInputFormat = false;
hiveSyncConfig.partitionFields = Collections.singletonList("datestr");
- dtfOut = DateTimeFormat.forPattern("yyyy/MM/dd");
+ dtfOut = DateTimeFormatter.ofPattern("yyyy/MM/dd");
ddlExecutor = new HiveQueryDDLExecutor(hiveSyncConfig, fileSystem, getHiveConf());
clear();
@@ -169,7 +169,7 @@ public class HiveTestUtil {
boolean result = fileSystem.mkdirs(path);
checkResult(result);
- DateTime dateTime = DateTime.now();
+ ZonedDateTime dateTime = ZonedDateTime.now();
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true,
useSchemaFromCommitMetadata, dateTime, instantTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
@@ -188,10 +188,10 @@ public class HiveTestUtil {
boolean result = fileSystem.mkdirs(path);
checkResult(result);
- DateTime dateTime = DateTime.now().withTimeAtStartOfDay();
+ ZonedDateTime dateTime = ZonedDateTime.now().truncatedTo(ChronoUnit.DAYS);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
- String partitionPath = dtfOut.print(dateTime);
+ String partitionPath = dateTime.format(dtfOut);
Path partPath = new Path(hiveSyncConfig.basePath + "/" + partitionPath);
fileSystem.makeQualified(partPath);
fileSystem.mkdirs(partPath);
@@ -223,7 +223,7 @@ public class HiveTestUtil {
boolean result = fileSystem.mkdirs(path);
checkResult(result);
- DateTime dateTime = DateTime.now();
+ ZonedDateTime dateTime = ZonedDateTime.now();
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true,
useSchemaFromCommitMetadata, dateTime, commitTime);
createdTablesSet
@@ -245,7 +245,7 @@ public class HiveTestUtil {
}
public static void addCOWPartitions(int numberOfPartitions, boolean isParquetSchemaSimple,
- boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime) throws IOException, URISyntaxException {
+ boolean useSchemaFromCommitMetadata, ZonedDateTime startFrom, String instantTime) throws IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata =
createPartitions(numberOfPartitions, isParquetSchemaSimple, useSchemaFromCommitMetadata, startFrom, instantTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
@@ -261,7 +261,7 @@ public class HiveTestUtil {
}
public static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple,
- boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime, String deltaCommitTime)
+ boolean useSchemaFromCommitMetadata, ZonedDateTime startFrom, String instantTime, String deltaCommitTime)
throws IOException, URISyntaxException, InterruptedException {
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, isParquetSchemaSimple,
useSchemaFromCommitMetadata, startFrom, instantTime);
@@ -299,12 +299,12 @@ public class HiveTestUtil {
}
private static HoodieCommitMetadata createPartitions(int numberOfPartitions, boolean isParquetSchemaSimple,
- boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime) throws IOException, URISyntaxException {
- startFrom = startFrom.withTimeAtStartOfDay();
+ boolean useSchemaFromCommitMetadata, ZonedDateTime startFrom, String instantTime) throws IOException, URISyntaxException {
+ startFrom = startFrom.truncatedTo(ChronoUnit.DAYS);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
for (int i = 0; i < numberOfPartitions; i++) {
- String partitionPath = dtfOut.print(startFrom);
+ String partitionPath = startFrom.format(dtfOut);
Path partPath = new Path(hiveSyncConfig.basePath + "/" + partitionPath);
fileSystem.makeQualified(partPath);
fileSystem.mkdirs(partPath);
@@ -470,4 +470,4 @@ public class HiveTestUtil {
public static Set