From b7ee341e14507f2562f8c3a3212daf8686631e05 Mon Sep 17 00:00:00 2001 From: Prashant Wason Date: Fri, 5 Nov 2021 06:31:42 -0700 Subject: [PATCH] [HUDI-1794] Moved static COMMIT_FORMATTER to thread local variable as SimpleDateFormat is not thread safe. (#2819) --- .../org/apache/hudi/cli/utils/CommitUtil.java | 6 +- .../client/AbstractHoodieWriteClient.java | 2 +- .../ScheduleCompactionActionExecutor.java | 2 +- .../hudi/client/HoodieFlinkWriteClient.java | 2 +- .../hudi/client/SparkRDDWriteClient.java | 4 +- .../index/hbase/SparkHoodieHBaseIndex.java | 2 +- .../table/timeline/HoodieActiveTimeline.java | 50 +++++++---- .../timeline/HoodieInstantTimeGenerator.java | 84 +++++++++++++++++++ .../hudi/metadata/HoodieTableMetadata.java | 2 +- .../apache/hudi/common/fs/TestFSUtils.java | 10 +-- .../common/model/TestHoodieWriteStat.java | 5 +- .../timeline/TestHoodieActiveTimeline.java | 46 +++++++++- .../common/testutils/HoodieTestTable.java | 4 +- .../org/apache/hudi/util/StreamerUtil.java | 10 +-- .../TestStreamWriteOperatorCoordinator.java | 2 +- .../spark/sql/hudi/HoodieSqlUtils.scala | 6 +- .../hudi/streaming/HoodieStreamSource.scala | 4 +- .../hudi/functional/TestTimeTravelQuery.scala | 4 +- .../functional/TestHDFSParquetImporter.java | 4 +- 19 files changed, 196 insertions(+), 53 deletions(-) create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java index 5a1c457b1..5f08f0097 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java @@ -51,7 +51,7 @@ public class CommitUtil { public static String getTimeDaysAgo(int numberOfDays) { Date date = Date.from(ZonedDateTime.now().minusDays(numberOfDays).toInstant()); - return HoodieActiveTimeline.COMMIT_FORMATTER.format(date); + return HoodieActiveTimeline.formatInstantTime(date); } /** @@ -61,8 +61,8 @@ public class CommitUtil { * b) hours: -1, returns 20200202010000 */ public static String addHours(String compactionCommitTime, int hours) throws ParseException { - Instant instant = HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).toInstant(); + Instant instant = HoodieActiveTimeline.parseInstantTime(compactionCommitTime).toInstant(); ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault()); - return HoodieActiveTimeline.COMMIT_FORMATTER.format(Date.from(commitDateTime.plusHours(hours).toInstant())); + return HoodieActiveTimeline.formatInstantTime(Date.from(commitDateTime.plusHours(hours).toInstant())); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 3e6b7ab49..699f739ba 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -232,7 +232,7 @@ public abstract class AbstractHoodieWriteClient extends if (compactionTimer != null) { long durationInMs = metrics.getDurationInMs(compactionTimer.stop()); try { - metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).getTime(), + metrics.updateCommitMetrics(HoodieActiveTimeline.parseInstantTime(compactionCommitTime).getTime(), durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION); } catch (ParseException e) { throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 173276d98..b8437d39f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -315,7 +315,7 @@ public class SparkRDDWriteClient extends if (compactionTimer != null) { long durationInMs = metrics.getDurationInMs(compactionTimer.stop()); try { - metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).getTime(), + metrics.updateCommitMetrics(HoodieActiveTimeline.parseInstantTime(compactionCommitTime).getTime(), durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION); } catch (ParseException e) { throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " @@ -396,7 +396,7 @@ public class SparkRDDWriteClient extends if (clusteringTimer != null) { long durationInMs = metrics.getDurationInMs(clusteringTimer.stop()); try { - metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(clusteringCommitTime).getTime(), + metrics.updateCommitMetrics(HoodieActiveTimeline.parseInstantTime(clusteringCommitTime).getTime(), durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION); } catch (ParseException e) { throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java index 56dd49515..0317b961f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java @@ -567,7 +567,7 @@ public class SparkHoodieHBaseIndex> BufferedMutator mutator = hbaseConnection.getBufferedMutator(TableName.valueOf(tableName))) { final RateLimiter limiter = RateLimiter.create(multiPutBatchSize, TimeUnit.SECONDS); - Long rollbackTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse(instantTime).getTime(); + Long rollbackTime = HoodieActiveTimeline.parseInstantTime(instantTime).getTime(); Long currentTime = new Date().getTime(); Scan scan = new Scan(); scan.addFamily(SYSTEM_COLUMN_FAMILY); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index e586815d3..37631b025 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -35,14 +35,14 @@ import org.apache.log4j.Logger; import java.io.FileNotFoundException; import java.io.IOException; import java.io.Serializable; -import java.text.SimpleDateFormat; +import java.text.ParseException; +import java.time.Instant; import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashSet; import java.util.Objects; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; /** @@ -59,8 +59,6 @@ import java.util.function.Function; */ public class HoodieActiveTimeline extends HoodieDefaultTimeline { - public static final SimpleDateFormat COMMIT_FORMATTER = new SimpleDateFormat("yyyyMMddHHmmss"); - public static final Set VALID_EXTENSIONS_IN_ACTIVE_TIMELINE = new HashSet<>(Arrays.asList( COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, REQUESTED_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION, INFLIGHT_DELTA_COMMIT_EXTENSION, REQUESTED_DELTA_COMMIT_EXTENSION, @@ -72,28 +70,44 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { REQUESTED_REPLACE_COMMIT_EXTENSION, INFLIGHT_REPLACE_COMMIT_EXTENSION, REPLACE_COMMIT_EXTENSION)); private static final Logger LOG = LogManager.getLogger(HoodieActiveTimeline.class); protected HoodieTableMetaClient metaClient; - private static AtomicReference lastInstantTime = new AtomicReference<>(String.valueOf(Integer.MIN_VALUE)); /** - * Returns next instant time in the {@link #COMMIT_FORMATTER} format. - * Ensures each instant time is atleast 1 second apart since we create instant times at second granularity + * Parse the timestamp of an Instant and return a {@code SimpleDateFormat}. */ - public static String createNewInstantTime() { - return createNewInstantTime(0); + public static Date parseInstantTime(String timestamp) throws ParseException { + return HoodieInstantTimeGenerator.parseInstantTime(timestamp); } /** - * Returns next instant time that adds N milliseconds in the {@link #COMMIT_FORMATTER} format. + * Format the java.time.Instant to a String representing the timestamp of a Hoodie Instant. + */ + public static String formatInstantTime(Instant timestamp) { + return HoodieInstantTimeGenerator.formatInstantTime(timestamp); + } + + /** + * Format the Date to a String representing the timestamp of a Hoodie Instant. + */ + public static String formatInstantTime(Date timestamp) { + return HoodieInstantTimeGenerator.formatInstantTime(timestamp); + } + + /** + * Returns next instant time in the correct format. * Ensures each instant time is atleast 1 second apart since we create instant times at second granularity */ + public static String createNewInstantTime() { + return HoodieInstantTimeGenerator.createNewInstantTime(0); + } + + /** + * Returns next instant time that adds N milliseconds to current time. + * Ensures each instant time is atleast 1 second apart since we create instant times at second granularity + * + * @param milliseconds Milliseconds to add to current time while generating the new instant time + */ public static String createNewInstantTime(long milliseconds) { - return lastInstantTime.updateAndGet((oldVal) -> { - String newCommitTime; - do { - newCommitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date(System.currentTimeMillis() + milliseconds)); - } while (HoodieTimeline.compareTimestamps(newCommitTime, LESSER_THAN_OR_EQUALS, oldVal)); - return newCommitTime; - }); + return HoodieInstantTimeGenerator.createNewInstantTime(milliseconds); } protected HoodieActiveTimeline(HoodieTableMetaClient metaClient, Set includedExtensions) { @@ -129,6 +143,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { * * @deprecated */ + @Deprecated public HoodieActiveTimeline() { } @@ -137,6 +152,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { * * @deprecated */ + @Deprecated private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java new file mode 100644 index 000000000..817b39254 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.timeline; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; +import java.time.temporal.TemporalAccessor; +import java.util.Date; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Utility class to generate and parse timestamps used in Instants. + */ +public class HoodieInstantTimeGenerator { + // Format of the timestamp used for an Instant + private static final String INSTANT_TIMESTAMP_FORMAT = "yyyyMMddHHmmss"; + // Formatter to generate Instant timestamps + private static DateTimeFormatter INSTANT_TIME_FORMATTER = DateTimeFormatter.ofPattern(INSTANT_TIMESTAMP_FORMAT); + // The last Instant timestamp generated + private static AtomicReference lastInstantTime = new AtomicReference<>(String.valueOf(Integer.MIN_VALUE)); + private static final String ALL_ZERO_TIMESTAMP = "00000000000000"; + + /** + * Returns next instant time that adds N milliseconds to the current time. + * Ensures each instant time is atleast 1 second apart since we create instant times at second granularity + * + * @param milliseconds Milliseconds to add to current time while generating the new instant time + */ + public static String createNewInstantTime(long milliseconds) { + return lastInstantTime.updateAndGet((oldVal) -> { + String newCommitTime; + do { + Date d = new Date(System.currentTimeMillis() + milliseconds); + newCommitTime = INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(d)); + } while (HoodieTimeline.compareTimestamps(newCommitTime, HoodieActiveTimeline.LESSER_THAN_OR_EQUALS, oldVal)); + return newCommitTime; + }); + } + + public static Date parseInstantTime(String timestamp) { + try { + LocalDateTime dt = LocalDateTime.parse(timestamp, INSTANT_TIME_FORMATTER); + return Date.from(dt.atZone(ZoneId.systemDefault()).toInstant()); + } catch (DateTimeParseException e) { + // Special handling for all zero timestamp which is not parsable by DateTimeFormatter + if (timestamp.equals(ALL_ZERO_TIMESTAMP)) { + return new Date(0); + } + + throw e; + } + } + + public static String formatInstantTime(Instant timestamp) { + return INSTANT_TIME_FORMATTER.format(timestamp); + } + + public static String formatInstantTime(Date timestamp) { + return INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(timestamp)); + } + + private static TemporalAccessor convertDateToTemporalAccessor(Date d) { + return d.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime(); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java index 68273b009..f5c176261 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java @@ -43,7 +43,7 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable { * {@link org.apache.hudi.common.table.timeline.HoodieTimeline#INIT_INSTANT_TS}, such that the metadata table * can be prepped even before bootstrap is done. */ - String SOLO_COMMIT_TIMESTAMP = "0000000000000"; + String SOLO_COMMIT_TIMESTAMP = "00000000000000"; // Key for the record which saves list of all partitions String RECORDKEY_PARTITION_LIST = "__all_partitions__"; // The partition name used for non-partitioned tables diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java index ef8b09b51..65c729e7a 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.exception.HoodieException; @@ -51,7 +52,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG; -import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.COMMIT_FORMATTER; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; @@ -79,14 +79,14 @@ public class TestFSUtils extends HoodieCommonTestHarness { @Test public void testMakeDataFileName() { - String instantTime = COMMIT_FORMATTER.format(new Date()); + String instantTime = HoodieActiveTimeline.formatInstantTime(new Date()); String fileName = UUID.randomUUID().toString(); assertEquals(FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName), fileName + "_" + TEST_WRITE_TOKEN + "_" + instantTime + BASE_FILE_EXTENSION); } @Test public void testMaskFileName() { - String instantTime = COMMIT_FORMATTER.format(new Date()); + String instantTime = HoodieActiveTimeline.formatInstantTime(new Date()); int taskPartitionId = 2; assertEquals(FSUtils.maskWithoutFileId(instantTime, taskPartitionId), "*_" + taskPartitionId + "_" + instantTime + BASE_FILE_EXTENSION); } @@ -154,7 +154,7 @@ public class TestFSUtils extends HoodieCommonTestHarness { @Test public void testGetCommitTime() { - String instantTime = COMMIT_FORMATTER.format(new Date()); + String instantTime = HoodieActiveTimeline.formatInstantTime(new Date()); String fileName = UUID.randomUUID().toString(); String fullFileName = FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName); assertEquals(instantTime, FSUtils.getCommitTime(fullFileName)); @@ -165,7 +165,7 @@ public class TestFSUtils extends HoodieCommonTestHarness { @Test public void testGetFileNameWithoutMeta() { - String instantTime = COMMIT_FORMATTER.format(new Date()); + String instantTime = HoodieActiveTimeline.formatInstantTime(new Date()); String fileName = UUID.randomUUID().toString(); String fullFileName = FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName); assertEquals(fileName, FSUtils.getFileId(fullFileName)); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java index 7136ce7d3..8fb9dddaa 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java @@ -19,14 +19,13 @@ package org.apache.hudi.common.model; import org.apache.hudi.common.fs.FSUtils; - +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.Test; import java.util.Date; import java.util.UUID; -import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.COMMIT_FORMATTER; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -37,7 +36,7 @@ public class TestHoodieWriteStat { @Test public void testSetPaths() { - String instantTime = COMMIT_FORMATTER.format(new Date()); + String instantTime = HoodieActiveTimeline.formatInstantTime(new Date()); String basePathString = "/data/tables/some-hoodie-table"; String partitionPathString = "2017/12/31"; String fileName = UUID.randomUUID().toString(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index 5c4c911e1..5f2d6928c 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -25,7 +25,7 @@ import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.MockHoodieTimeline; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; - +import org.apache.hudi.exception.HoodieException; import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -33,10 +33,15 @@ import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Random; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -428,6 +433,45 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness { assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, validReplaceInstants.get(0).getAction()); } + @Test + public void testCreateNewInstantTime() throws Exception { + String lastInstantTime = HoodieActiveTimeline.createNewInstantTime(); + for (int i = 0; i < 3; ++i) { + String newInstantTime = HoodieActiveTimeline.createNewInstantTime(); + assertTrue(HoodieTimeline.compareTimestamps(lastInstantTime, HoodieTimeline.LESSER_THAN, newInstantTime)); + lastInstantTime = newInstantTime; + } + + // All zero timestamp can be parsed + HoodieActiveTimeline.parseInstantTime("00000000000000"); + + // Multiple thread test + final int numChecks = 100000; + final int numThreads = 100; + final long milliSecondsInYear = 365 * 24 * 3600 * 1000; + ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + List futures = new ArrayList<>(numThreads); + for (int idx = 0; idx < numThreads; ++idx) { + futures.add(executorService.submit(() -> { + Date date = new Date(System.currentTimeMillis() + (int)(Math.random() * numThreads) * milliSecondsInYear); + final String expectedFormat = HoodieActiveTimeline.formatInstantTime(date); + for (int tidx = 0; tidx < numChecks; ++tidx) { + final String curFormat = HoodieActiveTimeline.formatInstantTime(date); + if (!curFormat.equals(expectedFormat)) { + throw new HoodieException("Format error: expected=" + expectedFormat + ", curFormat=" + curFormat); + } + } + })); + } + + executorService.shutdown(); + assertTrue(executorService.awaitTermination(10, TimeUnit.SECONDS)); + // required to catch exceptions + for (Future f : futures) { + f.get(); + } + } + /** * Returns an exhaustive list of all possible HoodieInstant. * @return list of HoodieInstant diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index 2018ae28c..95d0657cb 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -44,6 +44,7 @@ import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; @@ -84,7 +85,6 @@ import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ; import static org.apache.hudi.common.model.WriteOperationType.CLUSTER; import static org.apache.hudi.common.model.WriteOperationType.COMPACT; import static org.apache.hudi.common.model.WriteOperationType.UPSERT; -import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.COMMIT_FORMATTER; import static org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION; import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName; @@ -147,7 +147,7 @@ public class HoodieTestTable { } public static String makeNewCommitTime(Instant dateTime) { - return COMMIT_FORMATTER.format(Date.from(dateTime)); + return HoodieActiveTimeline.formatInstantTime(Date.from(dateTime)); } public static List makeIncrementalCommitTimes(int num) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 5aab5cb05..8bbd4aad8 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -403,12 +403,12 @@ public class StreamerUtil { */ public static String medianInstantTime(String highVal, String lowVal) { try { - long high = HoodieActiveTimeline.COMMIT_FORMATTER.parse(highVal).getTime(); - long low = HoodieActiveTimeline.COMMIT_FORMATTER.parse(lowVal).getTime(); + long high = HoodieActiveTimeline.parseInstantTime(highVal).getTime(); + long low = HoodieActiveTimeline.parseInstantTime(lowVal).getTime(); ValidationUtils.checkArgument(high > low, "Instant [" + highVal + "] should have newer timestamp than instant [" + lowVal + "]"); long median = low + (high - low) / 2; - return HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date(median)); + return HoodieActiveTimeline.formatInstantTime(new Date(median)); } catch (ParseException e) { throw new HoodieException("Get median instant time with interval [" + lowVal + ", " + highVal + "] error", e); } @@ -419,8 +419,8 @@ public class StreamerUtil { */ public static long instantTimeDiffSeconds(String newInstantTime, String oldInstantTime) { try { - long newTimestamp = HoodieActiveTimeline.COMMIT_FORMATTER.parse(newInstantTime).getTime(); - long oldTimestamp = HoodieActiveTimeline.COMMIT_FORMATTER.parse(oldInstantTime).getTime(); + long newTimestamp = HoodieActiveTimeline.parseInstantTime(newInstantTime).getTime(); + long oldTimestamp = HoodieActiveTimeline.parseInstantTime(oldInstantTime).getTime(); return (newTimestamp - oldTimestamp) / 1000; } catch (ParseException e) { throw new HoodieException("Get instant time diff with interval [" + oldInstantTime + ", " + newInstantTime + "] error", e); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index 3683f4888..be2e334a4 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -204,7 +204,7 @@ public class TestStreamWriteOperatorCoordinator { HoodieTableMetaClient metadataTableMetaClient = StreamerUtil.createMetaClient(metadataTableBasePath); HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants(); assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(1L)); - assertThat(completedTimeline.lastInstant().get().getTimestamp(), is("0000000000000")); + assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP)); // test metadata table compaction // write another 3 commits diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala index 182d891dd..cf9c49ef0 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala @@ -293,12 +293,12 @@ object HoodieSqlUtils extends SparkAdapterSupport { */ def formatQueryInstant(queryInstant: String): String = { if (queryInstant.length == 19) { // for yyyy-MM-dd HH:mm:ss - HoodieActiveTimeline.COMMIT_FORMATTER.format(defaultDateTimeFormat.parse(queryInstant)) + HoodieActiveTimeline.formatInstantTime(defaultDateTimeFormat.parse(queryInstant)) } else if (queryInstant.length == 14) { // for yyyyMMddHHmmss - HoodieActiveTimeline.COMMIT_FORMATTER.parse(queryInstant) // validate the format + HoodieActiveTimeline.parseInstantTime(queryInstant) // validate the format queryInstant } else if (queryInstant.length == 10) { // for yyyy-MM-dd - HoodieActiveTimeline.COMMIT_FORMATTER.format(defaultDateFormat.parse(queryInstant)) + HoodieActiveTimeline.formatInstantTime(defaultDateFormat.parse(queryInstant)) } else { throw new IllegalArgumentException(s"Unsupported query instant time format: $queryInstant," + s"Supported time format are: 'yyyy-MM-dd: HH:mm:ss' or 'yyyy-MM-dd' or 'yyyyMMddHHmmss'") diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala index 0482e7488..a60a63b7a 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala @@ -179,10 +179,10 @@ class HoodieStreamSource( startOffset match { case INIT_OFFSET => startOffset.commitTime case HoodieSourceOffset(commitTime) => - val time = HoodieActiveTimeline.COMMIT_FORMATTER.parse(commitTime).getTime + val time = HoodieActiveTimeline.parseInstantTime(commitTime).getTime // As we consume the data between (start, end], start is not included, // so we +1s to the start commit time here. - HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date(time + 1000)) + HoodieActiveTimeline.formatInstantTime(new Date(time + 1000)) case _=> throw new IllegalStateException("UnKnow offset type.") } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala index 9482ae32f..c4af71768 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala @@ -218,13 +218,13 @@ class TestTimeTravelQuery extends HoodieClientTestBase { } private def defaultDateTimeFormat(queryInstant: String): String = { - val date = HoodieActiveTimeline.COMMIT_FORMATTER.parse(queryInstant) + val date = HoodieActiveTimeline.parseInstantTime(queryInstant) val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") format.format(date) } private def defaultDateFormat(queryInstant: String): String = { - val date = HoodieActiveTimeline.COMMIT_FORMATTER.parse(queryInstant) + val date = HoodieActiveTimeline.parseInstantTime(queryInstant) val format = new SimpleDateFormat("yyyy-MM-dd") format.format(date) } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java index 6d0141e40..28ba17efa 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java @@ -230,7 +230,7 @@ public class TestHDFSParquetImporter extends FunctionalTestHarness implements Se public List createInsertRecords(Path srcFolder) throws ParseException, IOException { Path srcFile = new Path(srcFolder.toString(), "file1.parquet"); - long startTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000; + long startTime = HoodieActiveTimeline.parseInstantTime("20170203000000").getTime() / 1000; List records = new ArrayList(); for (long recordNum = 0; recordNum < 96; recordNum++) { records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "0", "rider-" + recordNum, @@ -247,7 +247,7 @@ public class TestHDFSParquetImporter extends FunctionalTestHarness implements Se public List createUpsertRecords(Path srcFolder) throws ParseException, IOException { Path srcFile = new Path(srcFolder.toString(), "file1.parquet"); - long startTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000; + long startTime = HoodieActiveTimeline.parseInstantTime("20170203000000").getTime() / 1000; List records = new ArrayList(); // 10 for update for (long recordNum = 0; recordNum < 11; recordNum++) {