[HUDI-3014] Add table option to set utc timezone (#4306)
This commit is contained in:
@@ -0,0 +1,37 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.common.model;
|
||||
|
||||
/**
|
||||
* Hoodie TimelineZone.
|
||||
*/
|
||||
public enum HoodieTimelineTimeZone {
|
||||
LOCAL("local"),
|
||||
UTC("utc");
|
||||
|
||||
private final String timeZone;
|
||||
|
||||
HoodieTimelineTimeZone(String timeZone) {
|
||||
this.timeZone = timeZone;
|
||||
}
|
||||
|
||||
public String getTimeZone() {
|
||||
return timeZone;
|
||||
}
|
||||
}
|
||||
@@ -27,7 +27,9 @@ import org.apache.hudi.common.config.HoodieConfig;
|
||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.model.HoodieTimelineTimeZone;
|
||||
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
|
||||
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
|
||||
import org.apache.hudi.common.util.FileIOUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
@@ -167,6 +169,11 @@ public class HoodieTableConfig extends HoodieConfig {
|
||||
.noDefaultValue()
|
||||
.withDocumentation("Key Generator class property for the hoodie table");
|
||||
|
||||
public static final ConfigProperty<HoodieTimelineTimeZone> TIMELINE_TIMEZONE = ConfigProperty
|
||||
.key("hoodie.table.timeline.timezone")
|
||||
.defaultValue(HoodieTimelineTimeZone.LOCAL)
|
||||
.withDocumentation("User can set hoodie commit timeline timezone, such as utc, local and so on. local is default");
|
||||
|
||||
public static final ConfigProperty<String> URL_ENCODE_PARTITIONING = KeyGeneratorOptions.URL_ENCODE_PARTITIONING;
|
||||
public static final ConfigProperty<String> HIVE_STYLE_PARTITIONING_ENABLE = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE;
|
||||
|
||||
@@ -315,6 +322,9 @@ public class HoodieTableConfig extends HoodieConfig {
|
||||
// Use the default bootstrap index class.
|
||||
hoodieConfig.setDefaultValue(BOOTSTRAP_INDEX_CLASS_NAME, getDefaultBootstrapIndexClass(properties));
|
||||
}
|
||||
if (hoodieConfig.contains(TIMELINE_TIMEZONE)) {
|
||||
HoodieInstantTimeGenerator.setCommitTimeZone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getString(TIMELINE_TIMEZONE)));
|
||||
}
|
||||
hoodieConfig.getProps().store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
|
||||
import org.apache.hudi.common.fs.NoOpConsistencyGuard;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.model.HoodieTimelineTimeZone;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
|
||||
@@ -639,6 +640,7 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
private String keyGeneratorClassProp;
|
||||
private Boolean hiveStylePartitioningEnable;
|
||||
private Boolean urlEncodePartitioning;
|
||||
private HoodieTimelineTimeZone commitTimeZone;
|
||||
|
||||
private PropertyBuilder() {
|
||||
|
||||
@@ -737,6 +739,11 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
return this;
|
||||
}
|
||||
|
||||
public PropertyBuilder setCommitTimezone(HoodieTimelineTimeZone timelineTimeZone) {
|
||||
this.commitTimeZone = timelineTimeZone;
|
||||
return this;
|
||||
}
|
||||
|
||||
public PropertyBuilder fromMetaClient(HoodieTableMetaClient metaClient) {
|
||||
return setTableType(metaClient.getTableType())
|
||||
.setTableName(metaClient.getTableConfig().getTableName())
|
||||
@@ -873,6 +880,9 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
if (null != urlEncodePartitioning) {
|
||||
tableConfig.setValue(HoodieTableConfig.URL_ENCODE_PARTITIONING, Boolean.toString(urlEncodePartitioning));
|
||||
}
|
||||
if (null != commitTimeZone) {
|
||||
tableConfig.setValue(HoodieTableConfig.TIMELINE_TIMEZONE, commitTimeZone.toString());
|
||||
}
|
||||
return tableConfig.getProps();
|
||||
}
|
||||
|
||||
@@ -886,5 +896,6 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
throws IOException {
|
||||
return HoodieTableMetaClient.initTableAndGetMetaClient(configuration, basePath, build());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,10 +18,12 @@
|
||||
|
||||
package org.apache.hudi.common.table.timeline;
|
||||
|
||||
import org.apache.hudi.common.model.HoodieTimelineTimeZone;
|
||||
import java.text.ParseException;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZoneOffset;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.time.format.DateTimeFormatterBuilder;
|
||||
import java.time.format.DateTimeParseException;
|
||||
@@ -56,6 +58,8 @@ public class HoodieInstantTimeGenerator {
|
||||
// when performing comparisons such as LESS_THAN_OR_EQUAL_TO
|
||||
private static final String DEFAULT_MILLIS_EXT = "999";
|
||||
|
||||
private static HoodieTimelineTimeZone commitTimeZone = HoodieTimelineTimeZone.LOCAL;
|
||||
|
||||
/**
|
||||
* Returns next instant time that adds N milliseconds to the current time.
|
||||
* Ensures each instant time is atleast 1 second apart since we create instant times at second granularity
|
||||
@@ -66,8 +70,13 @@ public class HoodieInstantTimeGenerator {
|
||||
return lastInstantTime.updateAndGet((oldVal) -> {
|
||||
String newCommitTime;
|
||||
do {
|
||||
Date d = new Date(System.currentTimeMillis() + milliseconds);
|
||||
newCommitTime = MILLIS_INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(d));
|
||||
if (commitTimeZone.equals(HoodieTimelineTimeZone.UTC.toString())) {
|
||||
LocalDateTime now = LocalDateTime.now(ZoneOffset.UTC);
|
||||
newCommitTime = now.format(MILLIS_INSTANT_TIME_FORMATTER);
|
||||
} else {
|
||||
Date d = new Date(System.currentTimeMillis() + milliseconds);
|
||||
newCommitTime = MILLIS_INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(d));
|
||||
}
|
||||
} while (HoodieTimeline.compareTimestamps(newCommitTime, HoodieActiveTimeline.LESSER_THAN_OR_EQUALS, oldVal));
|
||||
return newCommitTime;
|
||||
});
|
||||
@@ -131,4 +140,8 @@ public class HoodieInstantTimeGenerator {
|
||||
private static TemporalAccessor convertDateToTemporalAccessor(Date d) {
|
||||
return d.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime();
|
||||
}
|
||||
|
||||
public static void setCommitTimeZone(HoodieTimelineTimeZone commitTimeZone) {
|
||||
HoodieInstantTimeGenerator.commitTimeZone = commitTimeZone;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,18 +19,16 @@ package org.apache.hudi
|
||||
|
||||
import org.apache.avro.Schema
|
||||
import org.apache.avro.generic.GenericRecord
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||
import org.apache.hadoop.hive.conf.HiveConf
|
||||
|
||||
import org.apache.hudi.DataSourceWriteOptions._
|
||||
import org.apache.hudi.HoodieWriterUtils._
|
||||
import org.apache.hudi.avro.HoodieAvroUtils
|
||||
import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
|
||||
import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties}
|
||||
import org.apache.hudi.common.fs.FSUtils
|
||||
import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType}
|
||||
import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, HoodieTimelineTimeZone, WriteOperationType}
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
|
||||
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
|
||||
import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils, StringUtils}
|
||||
@@ -44,16 +42,13 @@ import org.apache.hudi.internal.DataSourceInternalWriterHelper
|
||||
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
|
||||
import org.apache.hudi.sync.common.AbstractSyncTool
|
||||
import org.apache.hudi.table.BulkInsertPartitioner
|
||||
|
||||
import org.apache.log4j.LogManager
|
||||
|
||||
import org.apache.spark.api.java.JavaSparkContext
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
|
||||
import org.apache.spark.sql.types.StructType
|
||||
import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession}
|
||||
import org.apache.spark.{SPARK_VERSION, SparkContext}
|
||||
|
||||
import java.util.Properties
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
@@ -147,6 +142,7 @@ object HoodieSparkSqlWriter {
|
||||
.setKeyGeneratorClassProp(HoodieWriterUtils.getOriginKeyGenerator(parameters))
|
||||
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
|
||||
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
|
||||
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
|
||||
.initTable(sparkContext.hadoopConfiguration, path)
|
||||
tableConfig = tableMetaClient.getTableConfig
|
||||
}
|
||||
@@ -397,6 +393,7 @@ object HoodieSparkSqlWriter {
|
||||
.setKeyGeneratorClassProp(keyGenProp)
|
||||
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
|
||||
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
|
||||
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
|
||||
.initTable(sparkContext.hadoopConfiguration, path)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user