[HUDI-2831] Securing usages of SimpleDateFormat to be thread-safe (#4073)
This commit is contained in:
@@ -39,14 +39,17 @@ import java.util.stream.Collectors;
|
||||
*/
|
||||
public class BoundedPartitionAwareCompactionStrategy extends DayBasedCompactionStrategy {
|
||||
|
||||
SimpleDateFormat dateFormat = new SimpleDateFormat(DayBasedCompactionStrategy.DATE_PARTITION_FORMAT);
|
||||
// NOTE: {@code SimpleDataFormat} is NOT thread-safe
|
||||
// TODO replace w/ DateTimeFormatter
|
||||
private final ThreadLocal<SimpleDateFormat> dateFormat =
|
||||
ThreadLocal.withInitial(() -> new SimpleDateFormat(DayBasedCompactionStrategy.DATE_PARTITION_FORMAT));
|
||||
|
||||
@Override
|
||||
public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig,
|
||||
List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) {
|
||||
// The earliest partition to compact - current day minus the target partitions limit
|
||||
String earliestPartitionPathToCompact =
|
||||
dateFormat.format(getDateAtOffsetFromToday(-1 * writeConfig.getTargetPartitionsPerDayBasedCompaction()));
|
||||
dateFormat.get().format(getDateAtOffsetFromToday(-1 * writeConfig.getTargetPartitionsPerDayBasedCompaction()));
|
||||
// Filter out all partitions greater than earliestPartitionPathToCompact
|
||||
|
||||
return operations.stream().collect(Collectors.groupingBy(HoodieCompactionOperation::getPartitionPath)).entrySet()
|
||||
@@ -59,7 +62,7 @@ public class BoundedPartitionAwareCompactionStrategy extends DayBasedCompactionS
|
||||
public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> partitionPaths) {
|
||||
// The earliest partition to compact - current day minus the target partitions limit
|
||||
String earliestPartitionPathToCompact =
|
||||
dateFormat.format(getDateAtOffsetFromToday(-1 * writeConfig.getTargetPartitionsPerDayBasedCompaction()));
|
||||
dateFormat.get().format(getDateAtOffsetFromToday(-1 * writeConfig.getTargetPartitionsPerDayBasedCompaction()));
|
||||
// Get all partitions and sort them
|
||||
return partitionPaths.stream().map(partition -> partition.replace("/", "-"))
|
||||
.sorted(Comparator.reverseOrder()).map(partitionPath -> partitionPath.replace("-", "/"))
|
||||
|
||||
@@ -41,12 +41,18 @@ public class DayBasedCompactionStrategy extends CompactionStrategy {
|
||||
// For now, use SimpleDateFormat as default partition format
|
||||
protected static final String DATE_PARTITION_FORMAT = "yyyy/MM/dd";
|
||||
// Sorts compaction in LastInFirstCompacted order
|
||||
|
||||
// NOTE: {@code SimpleDataFormat} is NOT thread-safe
|
||||
// TODO replace w/ DateTimeFormatter
|
||||
private static final ThreadLocal<SimpleDateFormat> DATE_FORMAT =
|
||||
ThreadLocal.withInitial(() -> new SimpleDateFormat(DATE_PARTITION_FORMAT, Locale.ENGLISH));
|
||||
|
||||
protected static Comparator<String> comparator = (String leftPartition, String rightPartition) -> {
|
||||
try {
|
||||
leftPartition = getPartitionPathWithoutPartitionKeys(leftPartition);
|
||||
rightPartition = getPartitionPathWithoutPartitionKeys(rightPartition);
|
||||
Date left = new SimpleDateFormat(DATE_PARTITION_FORMAT, Locale.ENGLISH).parse(leftPartition);
|
||||
Date right = new SimpleDateFormat(DATE_PARTITION_FORMAT, Locale.ENGLISH).parse(rightPartition);
|
||||
Date left = DATE_FORMAT.get().parse(leftPartition);
|
||||
Date right = DATE_FORMAT.get().parse(rightPartition);
|
||||
return left.after(right) ? -1 : right.after(left) ? 1 : 0;
|
||||
} catch (ParseException e) {
|
||||
throw new HoodieException("Invalid Partition Date Format", e);
|
||||
|
||||
@@ -48,7 +48,12 @@ import java.text.SimpleDateFormat
|
||||
import scala.collection.immutable.Map
|
||||
|
||||
object HoodieSqlUtils extends SparkAdapterSupport {
|
||||
private val defaultDateFormat = new SimpleDateFormat("yyyy-MM-dd")
|
||||
// NOTE: {@code SimpleDataFormat} is NOT thread-safe
|
||||
// TODO replace w/ DateTimeFormatter
|
||||
private val defaultDateFormat =
|
||||
ThreadLocal.withInitial(new java.util.function.Supplier[SimpleDateFormat] {
|
||||
override def get() = new SimpleDateFormat("yyyy-MM-dd")
|
||||
})
|
||||
|
||||
def isHoodieTable(table: CatalogTable): Boolean = {
|
||||
table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi"
|
||||
@@ -298,7 +303,7 @@ object HoodieSqlUtils extends SparkAdapterSupport {
|
||||
HoodieActiveTimeline.parseDateFromInstantTime(queryInstant) // validate the format
|
||||
queryInstant
|
||||
} else if (instantLength == 10) { // for yyyy-MM-dd
|
||||
HoodieActiveTimeline.formatDate(defaultDateFormat.parse(queryInstant))
|
||||
HoodieActiveTimeline.formatDate(defaultDateFormat.get().parse(queryInstant))
|
||||
} else {
|
||||
throw new IllegalArgumentException(s"Unsupported query instant time format: $queryInstant,"
|
||||
+ s"Supported time format are: 'yyyy-MM-dd: HH:mm:ss' or 'yyyy-MM-dd' or 'yyyyMMddHHmmss'")
|
||||
|
||||
Reference in New Issue
Block a user