From ba4e732ba75e12a02d07c09f2662678bdcafbdc2 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Thu, 10 Feb 2022 19:19:33 -0500 Subject: [PATCH] [HUDI-2987] Update all deprecated calls to new apis in HoodieRecordPayload (#4681) --- .../table/log/HoodieFileSliceReader.java | 17 ++++++++++------- .../debezium/PostgresDebeziumAvroPayload.java | 14 ++++++++++++++ .../hudi/metadata/HoodieMetadataPayload.java | 19 +++++++++++++++---- .../AbstractRealtimeRecordReader.java | 12 +++++++++--- .../RealtimeCompactedRecordReader.java | 4 ++-- .../RealtimeUnmergedRecordReader.java | 2 +- .../apache/hudi/HoodieMergeOnReadRDD.scala | 7 +++---- .../hudi/payload/AWSDmsAvroPayload.java | 14 ++++++++++++++ 8 files changed, 68 insertions(+), 21 deletions(-) rename {hudi-common => hudi-client/hudi-client-common}/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java (85%) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java similarity index 85% rename from hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java index a786e8305..a042255cd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java @@ -7,13 +7,14 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.hudi.common.table.log; @@ -23,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.storage.HoodieFileReader; @@ -53,10 +55,11 @@ public class HoodieFileSliceReader implements Ite return new HoodieFileSliceReader(scanner.iterator()); } else { Iterable> iterable = () -> scanner.iterator(); + HoodiePayloadConfig payloadConfig = HoodiePayloadConfig.newBuilder().withPayloadOrderingField(preCombineField).build(); return new HoodieFileSliceReader(StreamSupport.stream(iterable.spliterator(), false) .map(e -> { try { - GenericRecord record = (GenericRecord) e.getData().getInsertValue(schema).get(); + GenericRecord record = (GenericRecord) e.getData().getInsertValue(schema, payloadConfig.getProps()).get(); return transform(record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt); } catch (IOException io) { throw new HoodieIOException("Error while creating reader for file slice with no base file.", io); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/PostgresDebeziumAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/PostgresDebeziumAvroPayload.java index 448627d97..d4be1899a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/PostgresDebeziumAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/PostgresDebeziumAvroPayload.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Properties; /** * Provides support for seamlessly applying changes captured via Debezium for PostgresDB. @@ -71,6 +72,19 @@ public class PostgresDebeziumAvroPayload extends AbstractDebeziumAvroPayload { return insertSourceLSN < currentSourceLSN; } + @Override + public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException { + // Specific to Postgres: If the updated record has TOASTED columns, + // we will need to keep the previous value for those columns + // see https://debezium.io/documentation/reference/connectors/postgresql.html#postgresql-toasted-values + Option insertOrDeleteRecord = super.combineAndGetUpdateValue(currentValue, schema, properties); + + if (insertOrDeleteRecord.isPresent()) { + mergeToastedValuesIfPresent(insertOrDeleteRecord.get(), currentValue); + } + return insertOrDeleteRecord; + } + @Override public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException { // Specific to Postgres: If the updated record has TOASTED columns, diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 650e86146..6000e04f3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -52,6 +52,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -67,7 +68,7 @@ import static org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_L *

* METADATA_TYPE_PARTITION_LIST (1): * -- List of all partitions. There is a single such record - * -- key = @{@link HoodieTableMetadata.RECORDKEY_PARTITION_LIST} + * -- key = @{@link HoodieTableMetadata#RECORDKEY_PARTITION_LIST} *

* METADATA_TYPE_FILE_LIST (2): * -- List of all files in a partition. There is one such record for each partition @@ -289,14 +290,19 @@ public class HoodieMetadataPayload implements HoodieRecordPayload combineAndGetUpdateValue(IndexedRecord oldRecord, Schema schema) throws IOException { + public Option combineAndGetUpdateValue(IndexedRecord oldRecord, Schema schema, Properties properties) throws IOException { HoodieMetadataPayload anotherPayload = new HoodieMetadataPayload(Option.of((GenericRecord) oldRecord)); HoodieRecordPayload combinedPayload = preCombine(anotherPayload); - return combinedPayload.getInsertValue(schema); + return combinedPayload.getInsertValue(schema, properties); } @Override - public Option getInsertValue(Schema schema) throws IOException { + public Option combineAndGetUpdateValue(IndexedRecord oldRecord, Schema schema) throws IOException { + return combineAndGetUpdateValue(oldRecord, schema, new Properties()); + } + + @Override + public Option getInsertValue(Schema schema, Properties properties) throws IOException { if (key == null) { return Option.empty(); } @@ -306,6 +312,11 @@ public class HoodieMetadataPayload implements HoodieRecordPayload getInsertValue(Schema schema) throws IOException { + return getInsertValue(schema, new Properties()); + } + /** * Returns the list of filenames added as part of this record. */ diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java index 78ac8805d..030e20f22 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -19,6 +19,7 @@ package org.apache.hudi.hadoop.realtime; import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodiePayloadProps; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.LogReaderUtils; import org.apache.hudi.exception.HoodieIOException; @@ -39,6 +40,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.stream.Collectors; /** @@ -50,6 +52,7 @@ public abstract class AbstractRealtimeRecordReader { protected final RealtimeSplit split; protected final JobConf jobConf; protected final boolean usesCustomPayload; + protected Properties payloadProps = new Properties(); // Schema handles private Schema readerSchema; private Schema writerSchema; @@ -62,7 +65,11 @@ public abstract class AbstractRealtimeRecordReader { LOG.info("columnIds ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); LOG.info("partitioningColumns ==> " + job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "")); try { - this.usesCustomPayload = usesCustomPayload(); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jobConf).setBasePath(split.getBasePath()).build(); + if (metaClient.getTableConfig().getPreCombineField() != null) { + this.payloadProps.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, metaClient.getTableConfig().getPreCombineField()); + } + this.usesCustomPayload = usesCustomPayload(metaClient); LOG.info("usesCustomPayload ==> " + this.usesCustomPayload); init(); } catch (IOException e) { @@ -70,8 +77,7 @@ public abstract class AbstractRealtimeRecordReader { } } - private boolean usesCustomPayload() { - HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jobConf).setBasePath(split.getBasePath()).build(); + private boolean usesCustomPayload(HoodieTableMetaClient metaClient) { return !(metaClient.getTableConfig().getPayloadClass().contains(HoodieAvroPayload.class.getName()) || metaClient.getTableConfig().getPayloadClass().contains("org.apache.hudi.OverwriteWithLatestAvroPayload")); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index c8f074652..d35df9f33 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -97,9 +97,9 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader private Option buildGenericRecordwithCustomPayload(HoodieRecord record) throws IOException { if (usesCustomPayload) { - return ((HoodieAvroRecord) record).getData().getInsertValue(getWriterSchema()); + return ((HoodieAvroRecord) record).getData().getInsertValue(getWriterSchema(), payloadProps); } else { - return ((HoodieAvroRecord) record).getData().getInsertValue(getReaderSchema()); + return ((HoodieAvroRecord) record).getData().getInsertValue(getReaderSchema(), payloadProps); } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java index 9f51e7f16..313b96488 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java @@ -88,7 +88,7 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader .withBufferSize(this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)) .withLogRecordScannerCallback(record -> { // convert Hoodie log record to Hadoop AvroWritable and buffer - GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema()).get(); + GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema(), payloadProps).get(); ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema()); this.executor.getQueue().insertRecord(aWritable); }) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index ea9d5fc3f..3257ecf1f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -153,7 +153,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, override def hasNext: Boolean = { if (logRecordsKeyIterator.hasNext) { val curAvrokey = logRecordsKeyIterator.next() - val curAvroRecord = logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema) + val curAvroRecord = logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps) if (!curAvroRecord.isPresent) { // delete record found, skipping this.hasNext @@ -210,7 +210,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, } else { if (logRecordsKeyIterator.hasNext) { val curAvrokey = logRecordsKeyIterator.next() - val curAvroRecord = logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema) + val curAvroRecord =logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps) if (!curAvroRecord.isPresent) { // delete record found, skipping this.hasNext @@ -298,8 +298,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, if (keyToSkip.contains(curKey)) { this.hasNext } else { - val insertAvroRecord = - logRecords.get(curKey).getData.getInsertValue(tableAvroSchema) + val insertAvroRecord = logRecords.get(curKey).getData.getInsertValue(tableAvroSchema, payloadProps) if (!insertAvroRecord.isPresent) { // stand alone delete record, skipping this.hasNext diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java index d0e132676..0eba1d9a6 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java @@ -26,6 +26,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import java.io.IOException; +import java.util.Properties; /** * Provides support for seamlessly applying changes captured via Amazon Database Migration Service onto S3. @@ -68,12 +69,25 @@ public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload { return delete ? Option.empty() : Option.of(insertValue); } + @Override + public Option getInsertValue(Schema schema, Properties properties) throws IOException { + IndexedRecord insertValue = super.getInsertValue(schema, properties).get(); + return handleDeleteOperation(insertValue); + } + @Override public Option getInsertValue(Schema schema) throws IOException { IndexedRecord insertValue = super.getInsertValue(schema).get(); return handleDeleteOperation(insertValue); } + @Override + public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) + throws IOException { + IndexedRecord insertValue = super.getInsertValue(schema, properties).get(); + return handleDeleteOperation(insertValue); + } + @Override public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {