1
0

[HUDI-1794] Moved static COMMIT_FORMATTER to thread local variable as SimpleDateFormat is not thread safe. (#2819)

This commit is contained in:
Prashant Wason
2021-11-05 06:31:42 -07:00
committed by GitHub
parent 3af6568d31
commit b7ee341e14
19 changed files with 196 additions and 53 deletions

View File

@@ -35,14 +35,14 @@ import org.apache.log4j.Logger;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.text.ParseException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
/**
@@ -59,8 +59,6 @@ import java.util.function.Function;
*/
public class HoodieActiveTimeline extends HoodieDefaultTimeline {
public static final SimpleDateFormat COMMIT_FORMATTER = new SimpleDateFormat("yyyyMMddHHmmss");
public static final Set<String> VALID_EXTENSIONS_IN_ACTIVE_TIMELINE = new HashSet<>(Arrays.asList(
COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, REQUESTED_COMMIT_EXTENSION,
DELTA_COMMIT_EXTENSION, INFLIGHT_DELTA_COMMIT_EXTENSION, REQUESTED_DELTA_COMMIT_EXTENSION,
@@ -72,28 +70,44 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
REQUESTED_REPLACE_COMMIT_EXTENSION, INFLIGHT_REPLACE_COMMIT_EXTENSION, REPLACE_COMMIT_EXTENSION));
private static final Logger LOG = LogManager.getLogger(HoodieActiveTimeline.class);
protected HoodieTableMetaClient metaClient;
private static AtomicReference<String> lastInstantTime = new AtomicReference<>(String.valueOf(Integer.MIN_VALUE));
/**
* Returns next instant time in the {@link #COMMIT_FORMATTER} format.
* Ensures each instant time is atleast 1 second apart since we create instant times at second granularity
* Parse the timestamp of an Instant and return a {@code SimpleDateFormat}.
*/
public static String createNewInstantTime() {
return createNewInstantTime(0);
public static Date parseInstantTime(String timestamp) throws ParseException {
return HoodieInstantTimeGenerator.parseInstantTime(timestamp);
}
/**
* Returns next instant time that adds N milliseconds in the {@link #COMMIT_FORMATTER} format.
* Format the java.time.Instant to a String representing the timestamp of a Hoodie Instant.
*/
public static String formatInstantTime(Instant timestamp) {
return HoodieInstantTimeGenerator.formatInstantTime(timestamp);
}
/**
* Format the Date to a String representing the timestamp of a Hoodie Instant.
*/
public static String formatInstantTime(Date timestamp) {
return HoodieInstantTimeGenerator.formatInstantTime(timestamp);
}
/**
* Returns next instant time in the correct format.
* Ensures each instant time is atleast 1 second apart since we create instant times at second granularity
*/
public static String createNewInstantTime() {
return HoodieInstantTimeGenerator.createNewInstantTime(0);
}
/**
* Returns next instant time that adds N milliseconds to current time.
* Ensures each instant time is atleast 1 second apart since we create instant times at second granularity
*
* @param milliseconds Milliseconds to add to current time while generating the new instant time
*/
public static String createNewInstantTime(long milliseconds) {
return lastInstantTime.updateAndGet((oldVal) -> {
String newCommitTime;
do {
newCommitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date(System.currentTimeMillis() + milliseconds));
} while (HoodieTimeline.compareTimestamps(newCommitTime, LESSER_THAN_OR_EQUALS, oldVal));
return newCommitTime;
});
return HoodieInstantTimeGenerator.createNewInstantTime(milliseconds);
}
protected HoodieActiveTimeline(HoodieTableMetaClient metaClient, Set<String> includedExtensions) {
@@ -129,6 +143,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
*
* @deprecated
*/
@Deprecated
public HoodieActiveTimeline() {
}
@@ -137,6 +152,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
*
* @deprecated
*/
@Deprecated
private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
}

View File

@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.table.timeline;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.time.temporal.TemporalAccessor;
import java.util.Date;
import java.util.concurrent.atomic.AtomicReference;
/**
* Utility class to generate and parse timestamps used in Instants.
*/
public class HoodieInstantTimeGenerator {
// Format of the timestamp used for an Instant
private static final String INSTANT_TIMESTAMP_FORMAT = "yyyyMMddHHmmss";
// Formatter to generate Instant timestamps
private static DateTimeFormatter INSTANT_TIME_FORMATTER = DateTimeFormatter.ofPattern(INSTANT_TIMESTAMP_FORMAT);
// The last Instant timestamp generated
private static AtomicReference<String> lastInstantTime = new AtomicReference<>(String.valueOf(Integer.MIN_VALUE));
private static final String ALL_ZERO_TIMESTAMP = "00000000000000";
/**
* Returns next instant time that adds N milliseconds to the current time.
* Ensures each instant time is atleast 1 second apart since we create instant times at second granularity
*
* @param milliseconds Milliseconds to add to current time while generating the new instant time
*/
public static String createNewInstantTime(long milliseconds) {
return lastInstantTime.updateAndGet((oldVal) -> {
String newCommitTime;
do {
Date d = new Date(System.currentTimeMillis() + milliseconds);
newCommitTime = INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(d));
} while (HoodieTimeline.compareTimestamps(newCommitTime, HoodieActiveTimeline.LESSER_THAN_OR_EQUALS, oldVal));
return newCommitTime;
});
}
public static Date parseInstantTime(String timestamp) {
try {
LocalDateTime dt = LocalDateTime.parse(timestamp, INSTANT_TIME_FORMATTER);
return Date.from(dt.atZone(ZoneId.systemDefault()).toInstant());
} catch (DateTimeParseException e) {
// Special handling for all zero timestamp which is not parsable by DateTimeFormatter
if (timestamp.equals(ALL_ZERO_TIMESTAMP)) {
return new Date(0);
}
throw e;
}
}
public static String formatInstantTime(Instant timestamp) {
return INSTANT_TIME_FORMATTER.format(timestamp);
}
public static String formatInstantTime(Date timestamp) {
return INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(timestamp));
}
private static TemporalAccessor convertDateToTemporalAccessor(Date d) {
return d.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime();
}
}

View File

@@ -43,7 +43,7 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable {
* {@link org.apache.hudi.common.table.timeline.HoodieTimeline#INIT_INSTANT_TS}, such that the metadata table
* can be prepped even before bootstrap is done.
*/
String SOLO_COMMIT_TIMESTAMP = "0000000000000";
String SOLO_COMMIT_TIMESTAMP = "00000000000000";
// Key for the record which saves list of all partitions
String RECORDKEY_PARTITION_LIST = "__all_partitions__";
// The partition name used for non-partitioned tables

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.exception.HoodieException;
@@ -51,7 +52,6 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG;
import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.COMMIT_FORMATTER;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -79,14 +79,14 @@ public class TestFSUtils extends HoodieCommonTestHarness {
@Test
public void testMakeDataFileName() {
String instantTime = COMMIT_FORMATTER.format(new Date());
String instantTime = HoodieActiveTimeline.formatInstantTime(new Date());
String fileName = UUID.randomUUID().toString();
assertEquals(FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName), fileName + "_" + TEST_WRITE_TOKEN + "_" + instantTime + BASE_FILE_EXTENSION);
}
@Test
public void testMaskFileName() {
String instantTime = COMMIT_FORMATTER.format(new Date());
String instantTime = HoodieActiveTimeline.formatInstantTime(new Date());
int taskPartitionId = 2;
assertEquals(FSUtils.maskWithoutFileId(instantTime, taskPartitionId), "*_" + taskPartitionId + "_" + instantTime + BASE_FILE_EXTENSION);
}
@@ -154,7 +154,7 @@ public class TestFSUtils extends HoodieCommonTestHarness {
@Test
public void testGetCommitTime() {
String instantTime = COMMIT_FORMATTER.format(new Date());
String instantTime = HoodieActiveTimeline.formatInstantTime(new Date());
String fileName = UUID.randomUUID().toString();
String fullFileName = FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName);
assertEquals(instantTime, FSUtils.getCommitTime(fullFileName));
@@ -165,7 +165,7 @@ public class TestFSUtils extends HoodieCommonTestHarness {
@Test
public void testGetFileNameWithoutMeta() {
String instantTime = COMMIT_FORMATTER.format(new Date());
String instantTime = HoodieActiveTimeline.formatInstantTime(new Date());
String fileName = UUID.randomUUID().toString();
String fullFileName = FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName);
assertEquals(fileName, FSUtils.getFileId(fullFileName));

View File

@@ -19,14 +19,13 @@
package org.apache.hudi.common.model;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.Test;
import java.util.Date;
import java.util.UUID;
import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.COMMIT_FORMATTER;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -37,7 +36,7 @@ public class TestHoodieWriteStat {
@Test
public void testSetPaths() {
String instantTime = COMMIT_FORMATTER.format(new Date());
String instantTime = HoodieActiveTimeline.formatInstantTime(new Date());
String basePathString = "/data/tables/some-hoodie-table";
String partitionPathString = "2017/12/31";
String fileName = UUID.randomUUID().toString();

View File

@@ -25,7 +25,7 @@ import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.MockHoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -33,10 +33,15 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -428,6 +433,45 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, validReplaceInstants.get(0).getAction());
}
@Test
public void testCreateNewInstantTime() throws Exception {
String lastInstantTime = HoodieActiveTimeline.createNewInstantTime();
for (int i = 0; i < 3; ++i) {
String newInstantTime = HoodieActiveTimeline.createNewInstantTime();
assertTrue(HoodieTimeline.compareTimestamps(lastInstantTime, HoodieTimeline.LESSER_THAN, newInstantTime));
lastInstantTime = newInstantTime;
}
// All zero timestamp can be parsed
HoodieActiveTimeline.parseInstantTime("00000000000000");
// Multiple thread test
final int numChecks = 100000;
final int numThreads = 100;
final long milliSecondsInYear = 365 * 24 * 3600 * 1000;
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
List<Future> futures = new ArrayList<>(numThreads);
for (int idx = 0; idx < numThreads; ++idx) {
futures.add(executorService.submit(() -> {
Date date = new Date(System.currentTimeMillis() + (int)(Math.random() * numThreads) * milliSecondsInYear);
final String expectedFormat = HoodieActiveTimeline.formatInstantTime(date);
for (int tidx = 0; tidx < numChecks; ++tidx) {
final String curFormat = HoodieActiveTimeline.formatInstantTime(date);
if (!curFormat.equals(expectedFormat)) {
throw new HoodieException("Format error: expected=" + expectedFormat + ", curFormat=" + curFormat);
}
}
}));
}
executorService.shutdown();
assertTrue(executorService.awaitTermination(10, TimeUnit.SECONDS));
// required to catch exceptions
for (Future f : futures) {
f.get();
}
}
/**
* Returns an exhaustive list of all possible HoodieInstant.
* @return list of HoodieInstant

View File

@@ -44,6 +44,7 @@ import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -84,7 +85,6 @@ import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
import static org.apache.hudi.common.model.WriteOperationType.CLUSTER;
import static org.apache.hudi.common.model.WriteOperationType.COMPACT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.COMMIT_FORMATTER;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName;
@@ -147,7 +147,7 @@ public class HoodieTestTable {
}
public static String makeNewCommitTime(Instant dateTime) {
return COMMIT_FORMATTER.format(Date.from(dateTime));
return HoodieActiveTimeline.formatInstantTime(Date.from(dateTime));
}
public static List<String> makeIncrementalCommitTimes(int num) {