diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java index a93f2682b..6d465d489 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java @@ -21,20 +21,28 @@ package org.apache.hudi.client; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.util.DateTimeUtils; import org.apache.hudi.common.util.Option; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + import java.io.Serializable; +import java.time.DateTimeException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; +import static org.apache.hudi.common.model.DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY; + /** * Status of a write operation. */ public class WriteStatus implements Serializable { + private static final Logger LOG = LogManager.getLogger(WriteStatus.class); private static final long serialVersionUID = 1L; private static final long RANDOM_SEED = 9038412832L; @@ -77,6 +85,18 @@ public class WriteStatus implements Serializable { writtenRecords.add(record); } totalRecords++; + + // get the min and max event time for calculating latency and freshness + if (optionalRecordMetadata.isPresent()) { + String eventTimeVal = optionalRecordMetadata.get().getOrDefault(METADATA_EVENT_TIME_KEY, null); + try { + long eventTime = DateTimeUtils.parseDateTime(eventTimeVal).toEpochMilli(); + stat.setMinEventTime(eventTime); + stat.setMaxEventTime(eventTime); + } catch (DateTimeException | IllegalArgumentException e) { + LOG.debug(String.format("Fail to parse event time value: %s", eventTimeVal), e); + } + } } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java index 8ef5575d3..489d23c42 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java @@ -19,14 +19,15 @@ package org.apache.hudi.config; import org.apache.hudi.common.config.DefaultHoodieConfig; -import org.apache.hudi.config.HoodieMemoryConfig.Builder; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.Properties; +import static org.apache.hudi.common.model.HoodiePayloadProps.DEFAULT_PAYLOAD_EVENT_TIME_FIELD_VAL; import static org.apache.hudi.common.model.HoodiePayloadProps.DEFAULT_PAYLOAD_ORDERING_FIELD_VAL; +import static org.apache.hudi.common.model.HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP; import static org.apache.hudi.common.model.HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP; /** @@ -63,10 +64,17 @@ public class HoodiePayloadConfig extends DefaultHoodieConfig { return this; } + public Builder withPayloadEventTimeField(String payloadEventTimeField) { + props.setProperty(PAYLOAD_EVENT_TIME_FIELD_PROP, String.valueOf(payloadEventTimeField)); + return this; + } + public HoodiePayloadConfig build() { HoodiePayloadConfig config = new HoodiePayloadConfig(props); setDefaultOnCondition(props, !props.containsKey(PAYLOAD_ORDERING_FIELD_PROP), PAYLOAD_ORDERING_FIELD_PROP, String.valueOf(DEFAULT_PAYLOAD_ORDERING_FIELD_VAL)); + setDefaultOnCondition(props, !props.containsKey(PAYLOAD_EVENT_TIME_FIELD_PROP), PAYLOAD_EVENT_TIME_FIELD_PROP, + String.valueOf(DEFAULT_PAYLOAD_EVENT_TIME_FIELD_VAL)); return config; } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java index c8c112fc2..5db28e241 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java @@ -20,6 +20,8 @@ package org.apache.hudi.metrics; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import com.codahale.metrics.Timer; @@ -130,6 +132,7 @@ public class HoodieMetrics { public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs, HoodieCommitMetadata metadata, String actionType) { + updateCommitTimingMetrics(commitEpochTimeInMs, durationInMs, metadata, actionType); if (config.isMetricsOn()) { long totalPartitionsWritten = metadata.fetchTotalPartitionsWritten(); long totalFilesInsert = metadata.fetchTotalFilesInsert(); @@ -144,7 +147,6 @@ public class HoodieMetrics { long totalCompactedRecordsUpdated = metadata.getTotalCompactedRecordsUpdated(); long totalLogFilesCompacted = metadata.getTotalLogFilesCompacted(); long totalLogFilesSize = metadata.getTotalLogFilesSize(); - Metrics.registerGauge(getMetricsName(actionType, "duration"), durationInMs); Metrics.registerGauge(getMetricsName(actionType, "totalPartitionsWritten"), totalPartitionsWritten); Metrics.registerGauge(getMetricsName(actionType, "totalFilesInsert"), totalFilesInsert); Metrics.registerGauge(getMetricsName(actionType, "totalFilesUpdate"), totalFilesUpdate); @@ -152,7 +154,6 @@ public class HoodieMetrics { Metrics.registerGauge(getMetricsName(actionType, "totalUpdateRecordsWritten"), totalUpdateRecordsWritten); Metrics.registerGauge(getMetricsName(actionType, "totalInsertRecordsWritten"), totalInsertRecordsWritten); Metrics.registerGauge(getMetricsName(actionType, "totalBytesWritten"), totalBytesWritten); - Metrics.registerGauge(getMetricsName(actionType, "commitTime"), commitEpochTimeInMs); Metrics.registerGauge(getMetricsName(actionType, "totalScanTime"), totalTimeTakenByScanner); Metrics.registerGauge(getMetricsName(actionType, "totalCreateTime"), totalTimeTakenForInsert); Metrics.registerGauge(getMetricsName(actionType, "totalUpsertTime"), totalTimeTakenForUpsert); @@ -162,6 +163,23 @@ public class HoodieMetrics { } } + private void updateCommitTimingMetrics(long commitEpochTimeInMs, long durationInMs, HoodieCommitMetadata metadata, + String actionType) { + if (config.isMetricsOn()) { + Pair, Option> eventTimePairMinMax = metadata.getMinAndMaxEventTime(); + if (eventTimePairMinMax.getLeft().isPresent()) { + long commitLatencyInMs = commitEpochTimeInMs + durationInMs - eventTimePairMinMax.getLeft().get(); + Metrics.registerGauge(getMetricsName(actionType, "commitLatencyInMs"), commitLatencyInMs); + } + if (eventTimePairMinMax.getRight().isPresent()) { + long commitFreshnessInMs = commitEpochTimeInMs + durationInMs - eventTimePairMinMax.getRight().get(); + Metrics.registerGauge(getMetricsName(actionType, "commitFreshnessInMs"), commitFreshnessInMs); + } + Metrics.registerGauge(getMetricsName(actionType, "commitTime"), commitEpochTimeInMs); + Metrics.registerGauge(getMetricsName(actionType, "duration"), durationInMs); + } + } + public void updateRollbackMetrics(long durationInMs, long numFilesDeleted) { if (config.isMetricsOn()) { LOG.info( diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java index 41842b1be..e669a677e 100755 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java @@ -19,6 +19,8 @@ package org.apache.hudi.metrics; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import com.codahale.metrics.Timer; @@ -123,6 +125,7 @@ public class TestHoodieMetrics { when(metadata.getTotalCompactedRecordsUpdated()).thenReturn(randomValue + 11); when(metadata.getTotalLogFilesCompacted()).thenReturn(randomValue + 12); when(metadata.getTotalLogFilesSize()).thenReturn(randomValue + 13); + when(metadata.getMinAndMaxEventTime()).thenReturn(Pair.of(Option.empty(), Option.empty())); metrics.updateCommitMetrics(randomValue + 14, commitTimer.stop(), metadata, action); String metricname = metrics.getMetricsName(action, "duration"); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java index 91878e199..78e711ed7 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java @@ -19,6 +19,7 @@ package org.apache.hudi.client; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; import org.junit.jupiter.api.Test; @@ -44,8 +45,8 @@ public class TestWriteStatus { WriteStatus status = new WriteStatus(false, 1.0); Throwable t = new Exception("some error in writing"); for (int i = 0; i < 1000; i++) { - status.markSuccess(mock(HoodieRecord.class), null); - status.markFailure(mock(HoodieRecord.class), t, null); + status.markSuccess(mock(HoodieRecord.class), Option.empty()); + status.markFailure(mock(HoodieRecord.class), t, Option.empty()); } assertEquals(1000, status.getFailedRecords().size()); assertTrue(status.hasErrors()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java index 77f65cbe0..db41a3b1d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java @@ -25,6 +25,8 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import static org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro; @@ -37,6 +39,9 @@ import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal; */ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload { + public static final String METADATA_EVENT_TIME_KEY = "metadata.event_time.key"; + private Option eventTime = Option.empty(); + public DefaultHoodieRecordPayload(GenericRecord record, Comparable orderingVal) { super(record, orderingVal); } @@ -71,6 +76,10 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload { /* * We reached a point where the value is disk is older than the incoming record. + */ + eventTime = Option.ofNullable(getNestedFieldVal(incomingRecord, properties.getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP), true)); + + /* * Now check if the incoming record is a delete record. */ if (isDeleteRecord(incomingRecord)) { @@ -79,4 +88,13 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload { return Option.of(incomingRecord); } } + + @Override + public Option> getMetadata() { + Map metadata = new HashMap<>(); + if (eventTime.isPresent()) { + metadata.put(METADATA_EVENT_TIME_KEY, String.valueOf(eventTime.get())); + } + return metadata.isEmpty() ? Option.empty() : Option.of(metadata); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java index 3e760f6bd..bc9a4ba7e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java @@ -18,14 +18,16 @@ package org.apache.hudi.common.model; -import org.apache.hadoop.fs.Path; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -323,6 +325,20 @@ public class HoodieCommitMetadata implements Serializable { return totalUpsertTime; } + public Pair, Option> getMinAndMaxEventTime() { + long minEventTime = Long.MAX_VALUE; + long maxEventTime = Long.MIN_VALUE; + for (Map.Entry> entry : partitionToWriteStats.entrySet()) { + for (HoodieWriteStat writeStat : entry.getValue()) { + minEventTime = writeStat.getMinEventTime() != null ? Math.min(writeStat.getMinEventTime(), minEventTime) : minEventTime; + maxEventTime = writeStat.getMaxEventTime() != null ? Math.max(writeStat.getMaxEventTime(), maxEventTime) : maxEventTime; + } + } + return Pair.of( + minEventTime == Long.MAX_VALUE ? Option.empty() : Option.of(minEventTime), + maxEventTime == Long.MIN_VALUE ? Option.empty() : Option.of(maxEventTime)); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.java index 5d71ec3cb..48dde2a81 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.java @@ -24,9 +24,20 @@ package org.apache.hudi.common.model; */ public class HoodiePayloadProps { - // payload ordering field. This could be used to merge incoming record with that in storage. Implementations of - // {@link HoodieRecordPayload} can leverage if required. + /** + * Property for payload ordering field; to be used to merge incoming record with that in storage. + * Implementations of {@link HoodieRecordPayload} can leverage if required. + * + * @see DefaultHoodieRecordPayload + */ public static final String PAYLOAD_ORDERING_FIELD_PROP = "hoodie.payload.ordering.field"; public static String DEFAULT_PAYLOAD_ORDERING_FIELD_VAL = "ts"; + /** + * Property for payload event time field; to be used to extract source event time info. + * + * @see DefaultHoodieRecordPayload + */ + public static final String PAYLOAD_EVENT_TIME_FIELD_PROP = "hoodie.payload.event.time.field"; + public static String DEFAULT_PAYLOAD_EVENT_TIME_FIELD_VAL = "ts"; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java index 9a640bede..ba43526f9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java @@ -143,6 +143,18 @@ public class HoodieWriteStat implements Serializable { */ private long fileSizeInBytes; + /** + * The earliest of incoming records' event times (Epoch ms) for calculating latency. + */ + @Nullable + private Long minEventTime; + + /** + * The latest of incoming records' event times (Epoch ms) for calculating freshness. + */ + @Nullable + private Long maxEventTime; + @Nullable @JsonIgnore private RuntimeStats runtimeStats; @@ -303,6 +315,30 @@ public class HoodieWriteStat implements Serializable { this.fileSizeInBytes = fileSizeInBytes; } + public Long getMinEventTime() { + return minEventTime; + } + + public void setMinEventTime(Long minEventTime) { + if (this.minEventTime == null) { + this.minEventTime = minEventTime; + } else { + this.minEventTime = Math.min(minEventTime, this.minEventTime); + } + } + + public Long getMaxEventTime() { + return maxEventTime; + } + + public void setMaxEventTime(Long maxEventTime) { + if (this.maxEventTime == null) { + this.maxEventTime = maxEventTime; + } else { + this.maxEventTime = Math.max(maxEventTime, this.maxEventTime); + } + } + @Nullable public RuntimeStats getRuntimeStats() { return runtimeStats; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java new file mode 100644 index 000000000..1c18a77d6 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.util; + +import java.time.Instant; +import java.time.format.DateTimeParseException; +import java.util.Objects; + +public class DateTimeUtils { + + /** + * Parse input String to a {@link java.time.Instant}. + * @param s Input String should be Epoch time in millisecond or ISO-8601 format. + */ + public static Instant parseDateTime(String s) throws DateTimeParseException { + ValidationUtils.checkArgument(Objects.nonNull(s), "Input String cannot be null."); + try { + return Instant.ofEpochMilli(Long.parseLong(s)); + } catch (NumberFormatException e) { + return Instant.parse(s); + } + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java index 791415428..4c43903e0 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java @@ -18,12 +18,16 @@ package org.apache.hudi.common.model; +import org.apache.hudi.common.util.Option; + import org.apache.avro.Schema; import org.apache.avro.Schema.Type; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.util.Arrays; @@ -31,6 +35,7 @@ import java.util.Properties; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Unit tests {@link DefaultHoodieRecordPayload}. @@ -50,6 +55,7 @@ public class TestDefaultHoodieRecordPayload { )); props = new Properties(); props.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP, "ts"); + props.setProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP, "ts"); } @Test @@ -104,4 +110,36 @@ public class TestDefaultHoodieRecordPayload { assertFalse(payload2.combineAndGetUpdateValue(record1, schema, props).isPresent()); } + @Test + public void testGetEmptyMetadata() { + GenericRecord record = new GenericData.Record(schema); + record.put("id", "1"); + record.put("partition", "partition0"); + record.put("ts", 0L); + record.put("_hoodie_is_deleted", false); + DefaultHoodieRecordPayload payload = new DefaultHoodieRecordPayload(Option.of(record)); + assertFalse(payload.getMetadata().isPresent()); + } + + @ParameterizedTest + @ValueSource(longs = {1L, 1612542030000L}) + public void testGetEventTimeInMetadata(long eventTime) throws IOException { + GenericRecord record1 = new GenericData.Record(schema); + record1.put("id", "1"); + record1.put("partition", "partition0"); + record1.put("ts", 0L); + record1.put("_hoodie_is_deleted", false); + + GenericRecord record2 = new GenericData.Record(schema); + record2.put("id", "1"); + record2.put("partition", "partition0"); + record2.put("ts", eventTime); + record2.put("_hoodie_is_deleted", false); + + DefaultHoodieRecordPayload payload2 = new DefaultHoodieRecordPayload(record2, eventTime); + payload2.combineAndGetUpdateValue(record1, schema, props); + assertTrue(payload2.getMetadata().isPresent()); + assertEquals(eventTime, + Long.parseLong(payload2.getMetadata().get().get(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY))); + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestDateTimeUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestDateTimeUtils.java new file mode 100644 index 000000000..996c8ba6c --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestDateTimeUtils.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.util; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.time.format.DateTimeParseException; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class TestDateTimeUtils { + + @ParameterizedTest + @ValueSource(strings = {"0", "1612542030000", "2020-01-01T01:01:00Z", "1970-01-01T00:00:00.123456Z"}) + public void testParseStringIntoInstant(String s) { + assertDoesNotThrow(() -> { + DateTimeUtils.parseDateTime(s); + }); + } + + @ParameterizedTest + @ValueSource(strings = {"#", "0L", ""}) + public void testParseDateTimeThrowsException(String s) { + assertThrows(DateTimeParseException.class, () -> { + DateTimeUtils.parseDateTime(s); + }); + } + + @Test + public void testParseDateTimeWithNull() { + assertThrows(IllegalArgumentException.class, () -> { + DateTimeUtils.parseDateTime(null); + }); + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index d204d2d19..8c8655b98 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -18,24 +18,22 @@ package org.apache.hudi -import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner -import org.apache.hudi.exception.HoodieException -import org.apache.hudi.hadoop.config.HoodieRealtimeConfig -import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS import org.apache.avro.Schema import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} import org.apache.hadoop.conf.Configuration -import org.apache.hudi.common.model.HoodiePayloadProps -import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext} +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner +import org.apache.hudi.config.HoodiePayloadConfig +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.hadoop.config.HoodieRealtimeConfig +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS import org.apache.spark.rdd.RDD import org.apache.spark.sql.avro.{AvroDeserializer, AvroSerializer} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection} import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.vectorized.ColumnarBatch - -import java.util.Properties +import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext} import scala.collection.JavaConverters._ import scala.collection.mutable @@ -53,9 +51,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, private val confBroadcast = sc.broadcast(new SerializableWritable(config)) private val preCombineField = tableState.preCombineField private val payloadProps = if (preCombineField.isDefined) { - val properties = new Properties() - properties.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP, preCombineField.get) - Some(properties) + Some(HoodiePayloadConfig.newBuilder.withPayloadOrderingField(preCombineField.get).build.getProps) } else { None } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala index 4c69950c0..8c3f18c19 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala @@ -21,7 +21,7 @@ import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.common.config.TypedProperties -import org.apache.hudi.common.model.{BaseAvroPayload, DefaultHoodieRecordPayload, EmptyHoodieRecordPayload, HoodieKey, HoodiePayloadProps, OverwriteWithLatestAvroPayload} +import org.apache.hudi.common.model._ import org.apache.hudi.common.testutils.SchemaTestUtil import org.apache.hudi.common.util.Option import org.apache.hudi.config.HoodiePayloadConfig @@ -33,8 +33,6 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{BeforeEach, Test} import org.scalatest.Assertions.fail -import scala.collection.JavaConverters.mapAsJavaMapConverter - /** * Tests on the default key generator, payload classes. */ @@ -591,10 +589,9 @@ class TestDataSourceDefaults { } @Test def testDefaultHoodieRecordPayloadCombineAndGetUpdateValue() = { - val baseOrderingVal: Object = baseRecord.get("favoriteIntNumber") val fieldSchema: Schema = baseRecord.getSchema().getField("favoriteIntNumber").schema() - val props = new TypedProperties() - props.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP, "favoriteIntNumber"); + val props = HoodiePayloadConfig.newBuilder() + .withPayloadOrderingField("favoriteIntNumber").build().getProps; val laterRecord = SchemaTestUtil .generateAvroRecordFromJson(schema, 2, "001", "f1")