1
0

[HUDI-2439] Replace RDD with HoodieData in HoodieSparkTable and commit executors (#4856)

- Adopt HoodieData in Spark action commit executors
- Make Spark independent DeleteHelper, WriteHelper, MergeHelper in hudi-client-common
- Make HoodieTable in WriteClient APIs have raw type to decouple with Client's generic types
This commit is contained in:
Raymond Xu
2022-03-17 19:17:56 +08:00
committed by GitHub
parent bf191f8d46
commit 7446ff95a7
69 changed files with 723 additions and 769 deletions

View File

@@ -18,8 +18,6 @@
package org.apache.hudi.client;
import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.async.AsyncArchiveService;
import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
@@ -78,6 +76,9 @@ import org.apache.hudi.table.action.savepoint.SavepointHelpers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -242,11 +243,11 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
}
protected HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf) {
protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) {
return createTable(config, hadoopConf, false);
}
protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf, boolean refreshTimeline);
protected abstract HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, boolean refreshTimeline);
void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) {
try {
@@ -397,7 +398,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* @return Collection of WriteStatus to inspect errors and counts
*/
public abstract O bulkInsert(I records, final String instantTime,
Option<BulkInsertPartitioner<I>> userDefinedBulkInsertPartitioner);
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner);
/**
@@ -417,7 +418,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* @return Collection of WriteStatus to inspect errors and counts
*/
public abstract O bulkInsertPreppedRecords(I preppedRecords, final String instantTime,
Option<BulkInsertPartitioner<I>> bulkInsertPartitioner);
Option<BulkInsertPartitioner> bulkInsertPartitioner);
/**
* Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied instantTime {@link HoodieKey}s will be
@@ -458,7 +459,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* @param hoodieTable Hoodie Table
* @return Write Status
*/
protected abstract O postWrite(HoodieWriteMetadata<O> result, String instantTime, HoodieTable<T, I, K, O> hoodieTable);
protected abstract O postWrite(HoodieWriteMetadata<O> result, String instantTime, HoodieTable hoodieTable);
/**
* Post Commit Hook. Derived classes use this method to perform post-commit processing
@@ -468,7 +469,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* @param instantTime Instant Time
* @param extraMetadata Additional Metadata passed by user
*/
protected void postCommit(HoodieTable<T, I, K, O> table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
try {
// Delete the marker directory for the instant.
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
@@ -480,7 +481,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
}
}
protected void runTableServicesInline(HoodieTable<T, I, K, O> table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
if (!tableServicesEnabled(config)) {
return;
}
@@ -524,7 +525,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
}
}
protected void runAnyPendingCompactions(HoodieTable<T, I, K, O> table) {
protected void runAnyPendingCompactions(HoodieTable table) {
table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().getInstants()
.forEach(instant -> {
LOG.info("Running previously failed inflight compaction at instant " + instant);
@@ -532,7 +533,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
});
}
protected void runAnyPendingClustering(HoodieTable<T, I, K, O> table) {
protected void runAnyPendingClustering(HoodieTable table) {
table.getActiveTimeline().filterPendingReplaceTimeline().getInstants().forEach(instant -> {
Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant);
if (instantPlan.isPresent()) {
@@ -558,7 +559,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
}
}
protected void autoArchiveOnCommit(HoodieTable<T, I, K, O> table) {
protected void autoArchiveOnCommit(HoodieTable table) {
if (!config.isAutoArchive()) {
return;
}
@@ -808,7 +809,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* and keep increasing unbounded over time.
* @param table table to commit on.
*/
protected void archive(HoodieTable<T, I, K, O> table) {
protected void archive(HoodieTable table) {
if (!tableServicesEnabled(config)) {
return;
}
@@ -937,7 +938,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
/**
* Commit Compaction and track metrics.
*/
protected abstract void completeCompaction(HoodieCommitMetadata metadata, HoodieTable<T, I, K, O> table, String compactionCommitTime);
protected abstract void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime);
/**
* Get inflight time line exclude compaction and clustering.
@@ -1223,7 +1224,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
return scheduleClustering(extraMetadata);
}
protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable<T, I, K, O> table) {
protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable table) {
Option<HoodiePendingRollbackInfo> pendingRollbackInstantInfo = getPendingRollbackInfo(table.getMetaClient(), inflightInstant.getTimestamp(), false);
String commitTime = pendingRollbackInstantInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
table.scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
@@ -1238,7 +1239,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* @param instantTime Instant Time
* @param stats Hoodie Write Stat
*/
protected void finalizeWrite(HoodieTable<T, I, K, O> table, String instantTime, List<HoodieWriteStat> stats) {
protected void finalizeWrite(HoodieTable table, String instantTime, List<HoodieWriteStat> stats) {
try {
final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
table.finalizeWrite(context, instantTime, stats);
@@ -1273,7 +1274,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* @param instantTime current inflight instant time
* @return instantiated {@link HoodieTable}
*/
protected abstract HoodieTable<T, I, K, O> doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime);
protected abstract HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime);
/**
* Instantiates and initializes instance of {@link HoodieTable}, performing crucial bootstrapping
@@ -1288,14 +1289,14 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* <li>Initializing metrics contexts</li>
* </ul>
*/
protected final HoodieTable<T, I, K, O> initTable(WriteOperationType operationType, Option<String> instantTime) {
protected final HoodieTable initTable(WriteOperationType operationType, Option<String> instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
// Setup write schemas for deletes
if (operationType == WriteOperationType.DELETE) {
setWriteSchemaForDeletes(metaClient);
}
HoodieTable<T, I, K, O> table;
HoodieTable table;
this.txnManager.beginTransaction();
try {
@@ -1381,7 +1382,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
this.txnManager.close();
}
private void setWriteTimer(HoodieTable<T, I, K, O> table) {
private void setWriteTimer(HoodieTable table) {
String commitType = table.getMetaClient().getCommitActionType();
if (commitType.equals(HoodieTimeline.COMMIT_ACTION)) {
writeTimer = metrics.getCommitCtx();

View File

@@ -180,7 +180,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, String instantTime,
I records, Option<BulkInsertPartitioner<I>> bulkInsertPartitioner);
I records, Option<BulkInsertPartitioner> bulkInsertPartitioner);
/**
* Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied instantTime {@link HoodieKey}s will be
@@ -237,7 +237,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata<O> bulkInsertPrepped(HoodieEngineContext context, String instantTime,
I preppedRecords, Option<BulkInsertPartitioner<I>> bulkInsertPartitioner);
I preppedRecords, Option<BulkInsertPartitioner> bulkInsertPartitioner);
/**
* Replaces all the existing records and inserts the specified new records into Hoodie table at the supplied instantTime,

View File

@@ -28,7 +28,7 @@ import java.util.Set;
/**
* When file groups in clustering, write records to these file group need to check.
*/
public abstract class UpdateStrategy<T extends HoodieRecordPayload<T>, I> {
public abstract class UpdateStrategy<T extends HoodieRecordPayload, I> {
protected final HoodieEngineContext engineContext;
protected Set<HoodieFileGroupId> fileGroupsInPendingClustering;

View File

@@ -34,7 +34,7 @@ public abstract class BaseBulkInsertHelper<T extends HoodieRecordPayload, I, K,
public abstract HoodieWriteMetadata<O> bulkInsert(I inputRecords, String instantTime,
HoodieTable<T, I, K, O> table, HoodieWriteConfig config,
BaseCommitActionExecutor<T, I, K, O, R> executor, boolean performDedupe,
Option<BulkInsertPartitioner<I>> userDefinedBulkInsertPartitioner);
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner);
/**
* Only write input records. Does not change timeline/index. Return information about new files created.
@@ -42,7 +42,7 @@ public abstract class BaseBulkInsertHelper<T extends HoodieRecordPayload, I, K,
public abstract O bulkInsert(I inputRecords, String instantTime,
HoodieTable<T, I, K, O> table, HoodieWriteConfig config,
boolean performDedupe,
Option<BulkInsertPartitioner<I>> userDefinedBulkInsertPartitioner,
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner,
boolean addMetadataFields,
int parallelism,
WriteHandleFactory writeHandleFactory);

View File

@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
/**
* A spark implementation of {@link BaseDeleteHelper}.
*
* @param <T>
*/
@SuppressWarnings("checkstyle:LineLength")
public class HoodieDeleteHelper<T extends HoodieRecordPayload, R> extends
BaseDeleteHelper<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {
private HoodieDeleteHelper() {
}
private static class DeleteHelperHolder {
private static final HoodieDeleteHelper HOODIE_DELETE_HELPER = new HoodieDeleteHelper<>();
}
public static HoodieDeleteHelper newInstance() {
return DeleteHelperHolder.HOODIE_DELETE_HELPER;
}
@Override
public HoodieData<HoodieKey> deduplicateKeys(HoodieData<HoodieKey> keys, HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table, int parallelism) {
boolean isIndexingGlobal = table.getIndex().isGlobal();
if (isIndexingGlobal) {
return keys.distinctWithKey(HoodieKey::getRecordKey, parallelism);
} else {
return keys.distinct(parallelism);
}
}
@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(String instantTime,
HoodieData<HoodieKey> keys,
HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
BaseCommitActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> deleteExecutor) {
try {
HoodieData<HoodieKey> dedupedKeys = keys;
final int parallelism = config.getDeleteShuffleParallelism();
if (config.shouldCombineBeforeDelete()) {
// De-dupe/merge if needed
dedupedKeys = deduplicateKeys(keys, table, parallelism);
} else if (!keys.isEmpty()) {
dedupedKeys = keys.repartition(parallelism);
}
HoodieData<HoodieRecord<T>> dedupedRecords =
dedupedKeys.map(key -> new HoodieAvroRecord(key, new EmptyHoodieRecordPayload()));
Instant beginTag = Instant.now();
// perform index loop up to get existing location of records
HoodieData<HoodieRecord<T>> taggedRecords = table.getIndex().tagLocation(dedupedRecords, context, table);
Duration tagLocationDuration = Duration.between(beginTag, Instant.now());
// filter out non existent keys/records
HoodieData<HoodieRecord<T>> taggedValidRecords = taggedRecords.filter(HoodieRecord::isCurrentLocationKnown);
HoodieWriteMetadata<HoodieData<WriteStatus>> result;
if (!taggedValidRecords.isEmpty()) {
result = deleteExecutor.execute(taggedValidRecords);
result.setIndexLookupDuration(tagLocationDuration);
} else {
// if entire set of keys are non existent
deleteExecutor.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new HashMap<>(), new WorkloadStat())), instantTime);
result = new HoodieWriteMetadata<>();
result.setWriteStatuses(context.emptyHoodieData());
deleteExecutor.commitOnAutoCommit(result);
}
return result;
} catch (Throwable e) {
if (e instanceof HoodieUpsertException) {
throw (HoodieUpsertException) e;
}
throw new HoodieUpsertException("Failed to delete for commit time " + instantTime, e);
}
}
}

View File

@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
import java.util.Iterator;
public class HoodieMergeHelper<T extends HoodieRecordPayload> extends
BaseMergeHelper<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> {
private HoodieMergeHelper() {
}
private static class MergeHelperHolder {
private static final HoodieMergeHelper HOODIE_MERGE_HELPER = new HoodieMergeHelper<>();
}
public static HoodieMergeHelper newInstance() {
return MergeHelperHolder.HOODIE_MERGE_HELPER;
}
@Override
public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
HoodieMergeHandle<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> mergeHandle) throws IOException {
final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation();
Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
final GenericDatumWriter<GenericRecord> gWriter;
final GenericDatumReader<GenericRecord> gReader;
Schema readSchema;
if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) {
readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema();
gWriter = new GenericDatumWriter<>(readSchema);
gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetaFields());
} else {
gReader = null;
gWriter = null;
readSchema = mergeHandle.getWriterSchemaWithMetaFields();
}
BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath());
try {
final Iterator<GenericRecord> readerIterator;
if (baseFile.getBootstrapBaseFile().isPresent()) {
readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation);
} else {
readerIterator = reader.getRecordIterator(readSchema);
}
ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
wrapper = new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), readerIterator,
new UpdateHandler(mergeHandle), record -> {
if (!externalSchemaTransformation) {
return record;
}
return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord) record);
}, table.getPreExecuteRunnable());
wrapper.execute();
} catch (Exception e) {
throw new HoodieException(e);
} finally {
if (reader != null) {
reader.close();
}
mergeHandle.close();
if (null != wrapper) {
wrapper.shutdownNow();
}
}
}
}

View File

@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
public class HoodieWriteHelper<T extends HoodieRecordPayload, R> extends BaseWriteHelper<T, HoodieData<HoodieRecord<T>>,
HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {
private HoodieWriteHelper() {
}
private static class WriteHelperHolder {
private static final HoodieWriteHelper HOODIE_WRITE_HELPER = new HoodieWriteHelper<>();
}
public static HoodieWriteHelper newInstance() {
return WriteHelperHolder.HOODIE_WRITE_HELPER;
}
@Override
protected HoodieData<HoodieRecord<T>> tag(HoodieData<HoodieRecord<T>> dedupedRecords, HoodieEngineContext context,
HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table) {
return table.getIndex().tagLocation(dedupedRecords, context, table);
}
@Override
public HoodieData<HoodieRecord<T>> deduplicateRecords(
HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) {
boolean isIndexingGlobal = index.isGlobal();
return records.mapToPair(record -> {
HoodieKey hoodieKey = record.getKey();
// If index used is global, then records are expected to differ in their partitionPath
Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
return Pair.of(key, record);
}).reduceByKey((rec1, rec2) -> {
@SuppressWarnings("unchecked")
T reducedData = (T) rec2.getData().preCombine(rec1.getData());
HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();
return new HoodieAvroRecord<>(reducedKey, reducedData);
}, parallelism).map(Pair::getRight);
}
}