1
0

[HUDI-1939] remove joda time in hivesync module (#3430)

This commit is contained in:
Raymond Xu
2021-08-10 20:25:41 -07:00
committed by GitHub
parent 5448cdde7e
commit 8255a86cb4
6 changed files with 46 additions and 168 deletions

View File

@@ -64,11 +64,6 @@
<artifactId>parquet-avro</artifactId>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>

View File

@@ -18,10 +18,10 @@
package org.apache.hudi.hive;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.List;
@@ -37,12 +37,12 @@ public class SlashEncodedDayPartitionValueExtractor implements PartitionValueExt
private transient DateTimeFormatter dtfOut;
public SlashEncodedDayPartitionValueExtractor() {
this.dtfOut = DateTimeFormat.forPattern("yyyy-MM-dd");
this.dtfOut = DateTimeFormatter.ofPattern("yyyy-MM-dd");
}
private DateTimeFormatter getDtfOut() {
if (dtfOut == null) {
dtfOut = DateTimeFormat.forPattern("yyyy-MM-dd");
dtfOut = DateTimeFormatter.ofPattern("yyyy-MM-dd");
}
return dtfOut;
}
@@ -58,8 +58,8 @@ public class SlashEncodedDayPartitionValueExtractor implements PartitionValueExt
int year = Integer.parseInt(splits[0].contains("=") ? splits[0].split("=")[1] : splits[0]);
int mm = Integer.parseInt(splits[1].contains("=") ? splits[1].split("=")[1] : splits[1]);
int dd = Integer.parseInt(splits[2].contains("=") ? splits[2].split("=")[1] : splits[2]);
DateTime dateTime = new DateTime(year, mm, dd, 0, 0);
ZonedDateTime dateTime = ZonedDateTime.of(LocalDateTime.of(year, mm, dd, 0, 0), ZoneId.systemDefault());
return Collections.singletonList(getDtfOut().print(dateTime));
return Collections.singletonList(dateTime.format(getDtfOut()));
}
}

View File

@@ -18,10 +18,10 @@
package org.apache.hudi.hive;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.List;
@@ -37,12 +37,12 @@ public class SlashEncodedHourPartitionValueExtractor implements PartitionValueEx
private transient DateTimeFormatter dtfOut;
public SlashEncodedHourPartitionValueExtractor() {
this.dtfOut = DateTimeFormat.forPattern("yyyy-MM-dd-HH");
this.dtfOut = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH");
}
private DateTimeFormatter getDtfOut() {
if (dtfOut == null) {
dtfOut = DateTimeFormat.forPattern("yyyy-MM-dd-HH");
dtfOut = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH");
}
return dtfOut;
}
@@ -60,8 +60,8 @@ public class SlashEncodedHourPartitionValueExtractor implements PartitionValueEx
int dd = Integer.parseInt(splits[2].contains("=") ? splits[2].split("=")[1] : splits[2]);
int hh = Integer.parseInt(splits[3].contains("=") ? splits[3].split("=")[1] : splits[3]);
DateTime dateTime = new DateTime(year, mm, dd, hh, 0);
ZonedDateTime dateTime = ZonedDateTime.of(LocalDateTime.of(year, mm, dd, hh, 0), ZoneId.systemDefault());
return Collections.singletonList(getDtfOut().print(dateTime));
return Collections.singletonList(dateTime.format(getDtfOut()));
}
}

View File

@@ -26,7 +26,6 @@ import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hive.testutils.HiveTestUtil;
import org.apache.hudi.hive.util.ConfigUtils;
import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
@@ -38,11 +37,6 @@ import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
import org.joda.time.DateTime;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -52,9 +46,9 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -122,117 +116,6 @@ public class TestHiveSyncTool {
HiveTestUtil.clear();
}
/**
* Testing converting array types to Hive field declaration strings.
* <p>
* Refer to the Parquet-113 spec: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
*/
@Test
public void testSchemaConvertArray() throws IOException {
// Testing the 3-level annotation structure
MessageType schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
.optional(PrimitiveType.PrimitiveTypeName.INT32).named("element").named("list").named("int_list")
.named("ArrayOfInts");
String schemaString = HiveSchemaUtil.generateSchemaString(schema);
assertEquals("`int_list` ARRAY< int>", schemaString);
// A array of arrays
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup().requiredGroup()
.as(OriginalType.LIST).repeatedGroup().required(PrimitiveType.PrimitiveTypeName.INT32).named("element")
.named("list").named("element").named("list").named("int_list_list").named("ArrayOfArrayOfInts");
schemaString = HiveSchemaUtil.generateSchemaString(schema);
assertEquals("`int_list_list` ARRAY< ARRAY< int>>", schemaString);
// A list of integers
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeated(PrimitiveType.PrimitiveTypeName.INT32)
.named("element").named("int_list").named("ArrayOfInts");
schemaString = HiveSchemaUtil.generateSchemaString(schema);
assertEquals("`int_list` ARRAY< int>", schemaString);
// A list of structs with two fields
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").required(PrimitiveType.PrimitiveTypeName.INT32)
.named("num").named("element").named("tuple_list").named("ArrayOfTuples");
schemaString = HiveSchemaUtil.generateSchemaString(schema);
assertEquals("`tuple_list` ARRAY< STRUCT< `str` : binary, `num` : int>>", schemaString);
// A list of structs with a single field
// For this case, since the inner group name is "array", we treat the
// element type as a one-element struct.
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("array").named("one_tuple_list")
.named("ArrayOfOneTuples");
schemaString = HiveSchemaUtil.generateSchemaString(schema);
assertEquals("`one_tuple_list` ARRAY< STRUCT< `str` : binary>>", schemaString);
// A list of structs with a single field
// For this case, since the inner group name ends with "_tuple", we also treat the
// element type as a one-element struct.
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("one_tuple_list_tuple")
.named("one_tuple_list").named("ArrayOfOneTuples2");
schemaString = HiveSchemaUtil.generateSchemaString(schema);
assertEquals("`one_tuple_list` ARRAY< STRUCT< `str` : binary>>", schemaString);
// A list of structs with a single field
// Unlike the above two cases, for this the element type is the type of the
// only field in the struct.
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("one_tuple_list").named("one_tuple_list")
.named("ArrayOfOneTuples3");
schemaString = HiveSchemaUtil.generateSchemaString(schema);
assertEquals("`one_tuple_list` ARRAY< binary>", schemaString);
// A list of maps
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup().as(OriginalType.MAP)
.repeatedGroup().as(OriginalType.MAP_KEY_VALUE).required(PrimitiveType.PrimitiveTypeName.BINARY)
.as(OriginalType.UTF8).named("string_key").required(PrimitiveType.PrimitiveTypeName.INT32).named("int_value")
.named("key_value").named("array").named("map_list").named("ArrayOfMaps");
schemaString = HiveSchemaUtil.generateSchemaString(schema);
assertEquals("`map_list` ARRAY< MAP< string, int>>", schemaString);
}
@Test
public void testSchemaConvertTimestampMicros() throws IOException {
MessageType schema = Types.buildMessage().optional(PrimitiveType.PrimitiveTypeName.INT64)
.as(OriginalType.TIMESTAMP_MICROS).named("my_element").named("my_timestamp");
String schemaString = HiveSchemaUtil.generateSchemaString(schema);
// verify backward compatibility - int64 converted to bigint type
assertEquals("`my_element` bigint", schemaString);
// verify new functionality - int64 converted to timestamp type when 'supportTimestamp' is enabled
schemaString = HiveSchemaUtil.generateSchemaString(schema, Collections.emptyList(), true);
assertEquals("`my_element` TIMESTAMP", schemaString);
}
@Test
public void testSchemaDiffForTimestampMicros() {
MessageType schema = Types.buildMessage().optional(PrimitiveType.PrimitiveTypeName.INT64)
.as(OriginalType.TIMESTAMP_MICROS).named("my_element").named("my_timestamp");
// verify backward compatibility - int64 converted to bigint type
SchemaDifference schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
Collections.emptyMap(), Collections.emptyList(), false);
assertEquals("bigint", schemaDifference.getAddColumnTypes().get("`my_element`"));
schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
schemaDifference.getAddColumnTypes(), Collections.emptyList(), false);
assertTrue(schemaDifference.isEmpty());
// verify schema difference is calculated correctly when supportTimestamp is enabled
schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
Collections.emptyMap(), Collections.emptyList(), true);
assertEquals("TIMESTAMP", schemaDifference.getAddColumnTypes().get("`my_element`"));
schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
schemaDifference.getAddColumnTypes(), Collections.emptyList(), true);
assertTrue(schemaDifference.isEmpty());
}
@ParameterizedTest
@MethodSource({"syncModeAndSchemaFromCommitMetadata"})
public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode) throws Exception {
@@ -580,7 +463,7 @@ public class TestHiveSyncTool {
"The last commit that was synced should be updated in the TBLPROPERTIES");
// Now lets create more partitions and these are the only ones which needs to be synced
DateTime dateTime = DateTime.now().plusDays(6);
ZonedDateTime dateTime = ZonedDateTime.now().plusDays(6);
String commitTime2 = "101";
HiveTestUtil.addCOWPartitions(1, true, true, dateTime, commitTime2);
@@ -618,7 +501,7 @@ public class TestHiveSyncTool {
int fields = hiveClient.getTableSchema(hiveSyncConfig.tableName).size();
// Now lets create more partitions and these are the only ones which needs to be synced
DateTime dateTime = DateTime.now().plusDays(6);
ZonedDateTime dateTime = ZonedDateTime.now().plusDays(6);
String commitTime2 = "101";
HiveTestUtil.addCOWPartitions(1, false, true, dateTime, commitTime2);
@@ -677,7 +560,7 @@ public class TestHiveSyncTool {
"The last commit that was synced should be updated in the TBLPROPERTIES");
// Now lets create more partitions and these are the only ones which needs to be synced
DateTime dateTime = DateTime.now().plusDays(6);
ZonedDateTime dateTime = ZonedDateTime.now().plusDays(6);
String commitTime2 = "102";
String deltaCommitTime2 = "103";
@@ -749,7 +632,7 @@ public class TestHiveSyncTool {
"The last commit that was synced should be updated in the TBLPROPERTIES");
// Now lets create more partitions and these are the only ones which needs to be synced
DateTime dateTime = DateTime.now().plusDays(6);
ZonedDateTime dateTime = ZonedDateTime.now().plusDays(6);
String commitTime2 = "102";
String deltaCommitTime2 = "103";
@@ -915,7 +798,7 @@ public class TestHiveSyncTool {
assertEquals(5, hiveClientRT.scanTablePartitions(snapshotTableName).size(), "Table partitions should match the number of partitions we wrote");
// Now lets create more partitions and these are the only ones which needs to be synced
DateTime dateTime = DateTime.now().plusDays(6);
ZonedDateTime dateTime = ZonedDateTime.now().plusDays(6);
String commitTime2 = "102";
String deltaCommitTime2 = "103";
@@ -1013,7 +896,7 @@ public class TestHiveSyncTool {
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
// evolve the schema
DateTime dateTime = DateTime.now().plusDays(6);
ZonedDateTime dateTime = ZonedDateTime.now().plusDays(6);
String commitTime2 = "101";
HiveTestUtil.addCOWPartitions(1, false, true, dateTime, commitTime2);
@@ -1069,7 +952,7 @@ public class TestHiveSyncTool {
verifyOldParquetFileTest(hiveClient, emptyCommitTime);
// evolve the schema
DateTime dateTime = DateTime.now().plusDays(6);
ZonedDateTime dateTime = ZonedDateTime.now().plusDays(6);
String commitTime2 = "301";
HiveTestUtil.addCOWPartitions(1, false, true, dateTime, commitTime2);
//HiveTestUtil.createCommitFileWithSchema(commitMetadata, "400", false); // create another empty commit

View File

@@ -60,9 +60,6 @@ import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.junit.platform.commons.JUnitException;
import java.io.File;
@@ -71,6 +68,9 @@ import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -120,7 +120,7 @@ public class HiveTestUtil {
hiveSyncConfig.usePreApacheInputFormat = false;
hiveSyncConfig.partitionFields = Collections.singletonList("datestr");
dtfOut = DateTimeFormat.forPattern("yyyy/MM/dd");
dtfOut = DateTimeFormatter.ofPattern("yyyy/MM/dd");
ddlExecutor = new HiveQueryDDLExecutor(hiveSyncConfig, fileSystem, getHiveConf());
clear();
@@ -169,7 +169,7 @@ public class HiveTestUtil {
boolean result = fileSystem.mkdirs(path);
checkResult(result);
DateTime dateTime = DateTime.now();
ZonedDateTime dateTime = ZonedDateTime.now();
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true,
useSchemaFromCommitMetadata, dateTime, instantTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
@@ -188,10 +188,10 @@ public class HiveTestUtil {
boolean result = fileSystem.mkdirs(path);
checkResult(result);
DateTime dateTime = DateTime.now().withTimeAtStartOfDay();
ZonedDateTime dateTime = ZonedDateTime.now().truncatedTo(ChronoUnit.DAYS);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
String partitionPath = dtfOut.print(dateTime);
String partitionPath = dateTime.format(dtfOut);
Path partPath = new Path(hiveSyncConfig.basePath + "/" + partitionPath);
fileSystem.makeQualified(partPath);
fileSystem.mkdirs(partPath);
@@ -223,7 +223,7 @@ public class HiveTestUtil {
boolean result = fileSystem.mkdirs(path);
checkResult(result);
DateTime dateTime = DateTime.now();
ZonedDateTime dateTime = ZonedDateTime.now();
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true,
useSchemaFromCommitMetadata, dateTime, commitTime);
createdTablesSet
@@ -245,7 +245,7 @@ public class HiveTestUtil {
}
public static void addCOWPartitions(int numberOfPartitions, boolean isParquetSchemaSimple,
boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime) throws IOException, URISyntaxException {
boolean useSchemaFromCommitMetadata, ZonedDateTime startFrom, String instantTime) throws IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata =
createPartitions(numberOfPartitions, isParquetSchemaSimple, useSchemaFromCommitMetadata, startFrom, instantTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
@@ -261,7 +261,7 @@ public class HiveTestUtil {
}
public static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple,
boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime, String deltaCommitTime)
boolean useSchemaFromCommitMetadata, ZonedDateTime startFrom, String instantTime, String deltaCommitTime)
throws IOException, URISyntaxException, InterruptedException {
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, isParquetSchemaSimple,
useSchemaFromCommitMetadata, startFrom, instantTime);
@@ -299,12 +299,12 @@ public class HiveTestUtil {
}
private static HoodieCommitMetadata createPartitions(int numberOfPartitions, boolean isParquetSchemaSimple,
boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime) throws IOException, URISyntaxException {
startFrom = startFrom.withTimeAtStartOfDay();
boolean useSchemaFromCommitMetadata, ZonedDateTime startFrom, String instantTime) throws IOException, URISyntaxException {
startFrom = startFrom.truncatedTo(ChronoUnit.DAYS);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
for (int i = 0; i < numberOfPartitions; i++) {
String partitionPath = dtfOut.print(startFrom);
String partitionPath = startFrom.format(dtfOut);
Path partPath = new Path(hiveSyncConfig.basePath + "/" + partitionPath);
fileSystem.makeQualified(partPath);
fileSystem.mkdirs(partPath);
@@ -470,4 +470,4 @@ public class HiveTestUtil {
public static Set<String> getCreatedTablesSet() {
return createdTablesSet;
}
}
}

View File

@@ -51,9 +51,6 @@ import org.apache.hive.service.server.HiveServer2;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
@@ -67,6 +64,9 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -114,7 +114,7 @@ public class TestCluster implements BeforeAllCallback, AfterAllCallback,
conf.setInt(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname, port++);
hiveTestService = new HiveTestService(conf);
server2 = hiveTestService.start();
dtfOut = DateTimeFormat.forPattern("yyyy/MM/dd");
dtfOut = DateTimeFormatter.ofPattern("yyyy/MM/dd");
hiveSiteXml = File.createTempFile("hive-site", ".xml");
hiveSiteXml.deleteOnExit();
try (OutputStream os = new FileOutputStream(hiveSiteXml)) {
@@ -172,7 +172,7 @@ public class TestCluster implements BeforeAllCallback, AfterAllCallback,
if (!result) {
throw new InitializationError("cannot initialize table");
}
DateTime dateTime = DateTime.now();
ZonedDateTime dateTime = ZonedDateTime.now();
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, commitTime, path.toString());
createCommitFile(commitMetadata, commitTime, path.toString());
}
@@ -187,12 +187,12 @@ public class TestCluster implements BeforeAllCallback, AfterAllCallback,
}
private HoodieCommitMetadata createPartitions(int numberOfPartitions, boolean isParquetSchemaSimple,
DateTime startFrom, String commitTime, String basePath) throws IOException, URISyntaxException {
startFrom = startFrom.withTimeAtStartOfDay();
ZonedDateTime startFrom, String commitTime, String basePath) throws IOException, URISyntaxException {
startFrom = startFrom.truncatedTo(ChronoUnit.DAYS);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
for (int i = 0; i < numberOfPartitions; i++) {
String partitionPath = dtfOut.print(startFrom);
String partitionPath = startFrom.format(dtfOut);
Path partPath = new Path(basePath + "/" + partitionPath);
dfsCluster.getFileSystem().makeQualified(partPath);
dfsCluster.getFileSystem().mkdirs(partPath);