1
0

[HUDI-2987] Update all deprecated calls to new apis in HoodieRecordPayload (#4681)

This commit is contained in:
Sivabalan Narayanan
2022-02-10 19:19:33 -05:00
committed by GitHub
parent 2fe7a3a41f
commit ba4e732ba7
8 changed files with 68 additions and 21 deletions

View File

@@ -7,13 +7,14 @@
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing,
* distributed under the License is distributed on an "AS IS" BASIS, * software distributed under the License is distributed on an
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* See the License for the specific language governing permissions and * KIND, either express or implied. See the License for the
* limitations under the License. * specific language governing permissions and limitations
* under the License.
*/ */
package org.apache.hudi.common.table.log; package org.apache.hudi.common.table.log;
@@ -23,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReader;
@@ -53,10 +55,11 @@ public class HoodieFileSliceReader<T extends HoodieRecordPayload> implements Ite
return new HoodieFileSliceReader(scanner.iterator()); return new HoodieFileSliceReader(scanner.iterator());
} else { } else {
Iterable<HoodieRecord<? extends HoodieRecordPayload>> iterable = () -> scanner.iterator(); Iterable<HoodieRecord<? extends HoodieRecordPayload>> iterable = () -> scanner.iterator();
HoodiePayloadConfig payloadConfig = HoodiePayloadConfig.newBuilder().withPayloadOrderingField(preCombineField).build();
return new HoodieFileSliceReader(StreamSupport.stream(iterable.spliterator(), false) return new HoodieFileSliceReader(StreamSupport.stream(iterable.spliterator(), false)
.map(e -> { .map(e -> {
try { try {
GenericRecord record = (GenericRecord) e.getData().getInsertValue(schema).get(); GenericRecord record = (GenericRecord) e.getData().getInsertValue(schema, payloadConfig.getProps()).get();
return transform(record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt); return transform(record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt);
} catch (IOException io) { } catch (IOException io) {
throw new HoodieIOException("Error while creating reader for file slice with no base file.", io); throw new HoodieIOException("Error while creating reader for file slice with no base file.", io);

View File

@@ -31,6 +31,7 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.List; import java.util.List;
import java.util.Properties;
/** /**
* Provides support for seamlessly applying changes captured via Debezium for PostgresDB. * Provides support for seamlessly applying changes captured via Debezium for PostgresDB.
@@ -71,6 +72,19 @@ public class PostgresDebeziumAvroPayload extends AbstractDebeziumAvroPayload {
return insertSourceLSN < currentSourceLSN; return insertSourceLSN < currentSourceLSN;
} }
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException {
// Specific to Postgres: If the updated record has TOASTED columns,
// we will need to keep the previous value for those columns
// see https://debezium.io/documentation/reference/connectors/postgresql.html#postgresql-toasted-values
Option<IndexedRecord> insertOrDeleteRecord = super.combineAndGetUpdateValue(currentValue, schema, properties);
if (insertOrDeleteRecord.isPresent()) {
mergeToastedValuesIfPresent(insertOrDeleteRecord.get(), currentValue);
}
return insertOrDeleteRecord;
}
@Override @Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException { public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
// Specific to Postgres: If the updated record has TOASTED columns, // Specific to Postgres: If the updated record has TOASTED columns,

View File

@@ -52,6 +52,7 @@ import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@@ -67,7 +68,7 @@ import static org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_L
* <p> * <p>
* METADATA_TYPE_PARTITION_LIST (1): * METADATA_TYPE_PARTITION_LIST (1):
* -- List of all partitions. There is a single such record * -- List of all partitions. There is a single such record
* -- key = @{@link HoodieTableMetadata.RECORDKEY_PARTITION_LIST} * -- key = @{@link HoodieTableMetadata#RECORDKEY_PARTITION_LIST}
* <p> * <p>
* METADATA_TYPE_FILE_LIST (2): * METADATA_TYPE_FILE_LIST (2):
* -- List of all files in a partition. There is one such record for each partition * -- List of all files in a partition. There is one such record for each partition
@@ -289,14 +290,19 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
} }
@Override @Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord oldRecord, Schema schema) throws IOException { public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord oldRecord, Schema schema, Properties properties) throws IOException {
HoodieMetadataPayload anotherPayload = new HoodieMetadataPayload(Option.of((GenericRecord) oldRecord)); HoodieMetadataPayload anotherPayload = new HoodieMetadataPayload(Option.of((GenericRecord) oldRecord));
HoodieRecordPayload combinedPayload = preCombine(anotherPayload); HoodieRecordPayload combinedPayload = preCombine(anotherPayload);
return combinedPayload.getInsertValue(schema); return combinedPayload.getInsertValue(schema, properties);
} }
@Override @Override
public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException { public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord oldRecord, Schema schema) throws IOException {
return combineAndGetUpdateValue(oldRecord, schema, new Properties());
}
@Override
public Option<IndexedRecord> getInsertValue(Schema schema, Properties properties) throws IOException {
if (key == null) { if (key == null) {
return Option.empty(); return Option.empty();
} }
@@ -306,6 +312,11 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
return Option.of(record); return Option.of(record);
} }
@Override
public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
return getInsertValue(schema, new Properties());
}
/** /**
* Returns the list of filenames added as part of this record. * Returns the list of filenames added as part of this record.
*/ */

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.hadoop.realtime; package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodiePayloadProps;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.LogReaderUtils; import org.apache.hudi.common.table.log.LogReaderUtils;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
@@ -39,6 +40,7 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@@ -50,6 +52,7 @@ public abstract class AbstractRealtimeRecordReader {
protected final RealtimeSplit split; protected final RealtimeSplit split;
protected final JobConf jobConf; protected final JobConf jobConf;
protected final boolean usesCustomPayload; protected final boolean usesCustomPayload;
protected Properties payloadProps = new Properties();
// Schema handles // Schema handles
private Schema readerSchema; private Schema readerSchema;
private Schema writerSchema; private Schema writerSchema;
@@ -62,7 +65,11 @@ public abstract class AbstractRealtimeRecordReader {
LOG.info("columnIds ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); LOG.info("columnIds ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
LOG.info("partitioningColumns ==> " + job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "")); LOG.info("partitioningColumns ==> " + job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""));
try { try {
this.usesCustomPayload = usesCustomPayload(); HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jobConf).setBasePath(split.getBasePath()).build();
if (metaClient.getTableConfig().getPreCombineField() != null) {
this.payloadProps.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, metaClient.getTableConfig().getPreCombineField());
}
this.usesCustomPayload = usesCustomPayload(metaClient);
LOG.info("usesCustomPayload ==> " + this.usesCustomPayload); LOG.info("usesCustomPayload ==> " + this.usesCustomPayload);
init(); init();
} catch (IOException e) { } catch (IOException e) {
@@ -70,8 +77,7 @@ public abstract class AbstractRealtimeRecordReader {
} }
} }
private boolean usesCustomPayload() { private boolean usesCustomPayload(HoodieTableMetaClient metaClient) {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jobConf).setBasePath(split.getBasePath()).build();
return !(metaClient.getTableConfig().getPayloadClass().contains(HoodieAvroPayload.class.getName()) return !(metaClient.getTableConfig().getPayloadClass().contains(HoodieAvroPayload.class.getName())
|| metaClient.getTableConfig().getPayloadClass().contains("org.apache.hudi.OverwriteWithLatestAvroPayload")); || metaClient.getTableConfig().getPayloadClass().contains("org.apache.hudi.OverwriteWithLatestAvroPayload"));
} }

View File

@@ -97,9 +97,9 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
private Option<GenericRecord> buildGenericRecordwithCustomPayload(HoodieRecord record) throws IOException { private Option<GenericRecord> buildGenericRecordwithCustomPayload(HoodieRecord record) throws IOException {
if (usesCustomPayload) { if (usesCustomPayload) {
return ((HoodieAvroRecord) record).getData().getInsertValue(getWriterSchema()); return ((HoodieAvroRecord) record).getData().getInsertValue(getWriterSchema(), payloadProps);
} else { } else {
return ((HoodieAvroRecord) record).getData().getInsertValue(getReaderSchema()); return ((HoodieAvroRecord) record).getData().getInsertValue(getReaderSchema(), payloadProps);
} }
} }

View File

@@ -88,7 +88,7 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader
.withBufferSize(this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)) .withBufferSize(this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
.withLogRecordScannerCallback(record -> { .withLogRecordScannerCallback(record -> {
// convert Hoodie log record to Hadoop AvroWritable and buffer // convert Hoodie log record to Hadoop AvroWritable and buffer
GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema()).get(); GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema(), payloadProps).get();
ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema()); ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema());
this.executor.getQueue().insertRecord(aWritable); this.executor.getQueue().insertRecord(aWritable);
}) })

View File

@@ -153,7 +153,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
override def hasNext: Boolean = { override def hasNext: Boolean = {
if (logRecordsKeyIterator.hasNext) { if (logRecordsKeyIterator.hasNext) {
val curAvrokey = logRecordsKeyIterator.next() val curAvrokey = logRecordsKeyIterator.next()
val curAvroRecord = logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema) val curAvroRecord = logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps)
if (!curAvroRecord.isPresent) { if (!curAvroRecord.isPresent) {
// delete record found, skipping // delete record found, skipping
this.hasNext this.hasNext
@@ -210,7 +210,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
} else { } else {
if (logRecordsKeyIterator.hasNext) { if (logRecordsKeyIterator.hasNext) {
val curAvrokey = logRecordsKeyIterator.next() val curAvrokey = logRecordsKeyIterator.next()
val curAvroRecord = logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema) val curAvroRecord =logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps)
if (!curAvroRecord.isPresent) { if (!curAvroRecord.isPresent) {
// delete record found, skipping // delete record found, skipping
this.hasNext this.hasNext
@@ -298,8 +298,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
if (keyToSkip.contains(curKey)) { if (keyToSkip.contains(curKey)) {
this.hasNext this.hasNext
} else { } else {
val insertAvroRecord = val insertAvroRecord = logRecords.get(curKey).getData.getInsertValue(tableAvroSchema, payloadProps)
logRecords.get(curKey).getData.getInsertValue(tableAvroSchema)
if (!insertAvroRecord.isPresent) { if (!insertAvroRecord.isPresent) {
// stand alone delete record, skipping // stand alone delete record, skipping
this.hasNext this.hasNext

View File

@@ -26,6 +26,7 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord; import org.apache.avro.generic.IndexedRecord;
import java.io.IOException; import java.io.IOException;
import java.util.Properties;
/** /**
* Provides support for seamlessly applying changes captured via Amazon Database Migration Service onto S3. * Provides support for seamlessly applying changes captured via Amazon Database Migration Service onto S3.
@@ -68,12 +69,25 @@ public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload {
return delete ? Option.empty() : Option.of(insertValue); return delete ? Option.empty() : Option.of(insertValue);
} }
@Override
public Option<IndexedRecord> getInsertValue(Schema schema, Properties properties) throws IOException {
IndexedRecord insertValue = super.getInsertValue(schema, properties).get();
return handleDeleteOperation(insertValue);
}
@Override @Override
public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException { public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
IndexedRecord insertValue = super.getInsertValue(schema).get(); IndexedRecord insertValue = super.getInsertValue(schema).get();
return handleDeleteOperation(insertValue); return handleDeleteOperation(insertValue);
} }
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties)
throws IOException {
IndexedRecord insertValue = super.getInsertValue(schema, properties).get();
return handleDeleteOperation(insertValue);
}
@Override @Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema)
throws IOException { throws IOException {