1
0

[HUDI-4249] Fixing in-memory HoodieData implementation to operate lazily (#5855)

This commit is contained in:
Alexey Kudinkin
2022-07-16 16:26:48 -07:00
committed by GitHub
parent 80368a049d
commit 4bda6afe0b
35 changed files with 868 additions and 555 deletions

View File

@@ -40,7 +40,6 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.io.HoodieRangeInfoHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

View File

@@ -20,7 +20,6 @@
package org.apache.hudi.index.bloom;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
@@ -60,7 +59,7 @@ public class ListBasedHoodieBloomIndexHelper extends BaseHoodieBloomIndexHelper
HoodieData<Pair<String, HoodieKey>> fileComparisonPairs,
Map<String, List<BloomIndexFileInfo>> partitionToFileInfo, Map<String, Long> recordsPerPartition) {
List<Pair<String, HoodieKey>> fileComparisonPairList =
HoodieList.getList(fileComparisonPairs).stream()
fileComparisonPairs.collectAsList().stream()
.sorted(Comparator.comparing(Pair::getLeft)).collect(toList());
List<HoodieKeyLookupResult> keyLookupResults = new ArrayList<>();

View File

@@ -83,7 +83,8 @@ public abstract class HoodieBucketIndex extends HoodieIndex<Object, Object> {
Option<HoodieRecordLocation> loc = mapper.getRecordLocation(record.getKey(), record.getPartitionPath());
return HoodieIndexUtils.getTaggedRecord(record, loc);
}
}
},
false
);
}

View File

@@ -20,7 +20,7 @@ package org.apache.hudi.client;
import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
@@ -127,8 +127,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
// Create a Hoodie table which encapsulated the commits and files visible
HoodieFlinkTable<T> table = getHoodieTable();
Timer.Context indexTimer = metrics.getIndexCtx();
List<HoodieRecord<T>> recordsWithLocation = HoodieList.getList(
getIndex().tagLocation(HoodieList.of(hoodieRecords), context, table));
List<HoodieRecord<T>> recordsWithLocation = getIndex().tagLocation(HoodieListData.eager(hoodieRecords), context, table).collectAsList();
metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
return recordsWithLocation.stream().filter(v1 -> !v1.isCurrentLocationKnown()).collect(Collectors.toList());
}

View File

@@ -23,7 +23,7 @@ import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
@@ -84,12 +84,12 @@ public class HoodieFlinkEngineContext extends HoodieEngineContext {
@Override
public <T> HoodieData<T> emptyHoodieData() {
return HoodieList.of(Collections.emptyList());
return HoodieListData.eager(Collections.emptyList());
}
@Override
public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
return HoodieList.of(data);
return HoodieListData.eager(data);
}
public RuntimeContext getRuntimeContext() {

View File

@@ -23,7 +23,7 @@ import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -61,8 +61,8 @@ public abstract class FlinkHoodieIndex<T extends HoodieRecordPayload> extends Ho
public <R> HoodieData<HoodieRecord<R>> tagLocation(
HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException {
List<HoodieRecord<T>> hoodieRecords = tagLocation(HoodieList.getList(records.map(record -> (HoodieRecord<T>) record)), context, hoodieTable);
return HoodieList.of(hoodieRecords.stream().map(r -> (HoodieRecord<R>) r).collect(Collectors.toList()));
List<HoodieRecord<T>> hoodieRecords = tagLocation(records.map(record -> (HoodieRecord<T>) record).collectAsList(), context, hoodieTable);
return HoodieListData.eager(hoodieRecords.stream().map(r -> (HoodieRecord<R>) r).collect(Collectors.toList()));
}
@Override
@@ -70,6 +70,6 @@ public abstract class FlinkHoodieIndex<T extends HoodieRecordPayload> extends Ho
public HoodieData<WriteStatus> updateLocation(
HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException {
return HoodieList.of(updateLocation(HoodieList.getList(writeStatuses), context, hoodieTable));
return HoodieListData.eager(updateLocation(writeStatuses.collectAsList(), context, hoodieTable));
}
}

View File

@@ -21,7 +21,6 @@ package org.apache.hudi.metadata;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.HoodieRecord;
@@ -106,7 +105,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled");
ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is not fully initialized yet.");
HoodieData<HoodieRecord> preppedRecords = prepRecords(partitionRecordsMap);
List<HoodieRecord> preppedRecordList = HoodieList.getList(preppedRecords);
List<HoodieRecord> preppedRecordList = preppedRecords.collectAsList();
try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(engineContext, metadataWriteConfig)) {
if (canTriggerTableService) {

View File

@@ -40,8 +40,6 @@ import org.apache.avro.specific.SpecificRecordBase;
import java.util.List;
import static org.apache.hudi.common.data.HoodieList.getList;
public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>
implements ExplicitWriteHandleTable<T> {
@@ -78,7 +76,7 @@ public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
public static HoodieWriteMetadata<List<WriteStatus>> convertMetadata(
HoodieWriteMetadata<HoodieData<WriteStatus>> metadata) {
return metadata.clone(getList(metadata.getWriteStatuses()));
return metadata.clone(metadata.getWriteStatuses().collectAsList());
}
@Override

View File

@@ -19,7 +19,7 @@
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -97,8 +97,7 @@ public class FlinkDeleteHelper<R> extends
dedupedKeys.stream().map(key -> new HoodieAvroRecord<>(key, new EmptyHoodieRecordPayload())).collect(Collectors.toList());
Instant beginTag = Instant.now();
// perform index look up to get existing location of records
List<HoodieRecord<EmptyHoodieRecordPayload>> taggedRecords = HoodieList.getList(
table.getIndex().tagLocation(HoodieList.of(dedupedRecords), context, table));
List<HoodieRecord<EmptyHoodieRecordPayload>> taggedRecords = table.getIndex().tagLocation(HoodieListData.eager(dedupedRecords), context, table).collectAsList();
Duration tagLocationDuration = Duration.between(beginTag, Instant.now());
// filter out non existent keys/records

View File

@@ -19,7 +19,7 @@
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
@@ -83,8 +83,7 @@ public class FlinkWriteHelper<T extends HoodieRecordPayload, R> extends BaseWrit
@Override
protected List<HoodieRecord<T>> tag(List<HoodieRecord<T>> dedupedRecords, HoodieEngineContext context, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table) {
return HoodieList.getList(
table.getIndex().tagLocation(HoodieList.of(dedupedRecords), context, table));
return table.getIndex().tagLocation(HoodieListData.eager(dedupedRecords), context, table).collectAsList();
}
@Override

View File

@@ -18,11 +18,12 @@
package org.apache.hudi.index.bloom;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.data.HoodieMapPair;
import org.apache.hudi.common.data.HoodieListPairData;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -37,9 +38,6 @@ import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieFlinkClientTestHarness;
import org.apache.hudi.testutils.HoodieFlinkWriteableTestTable;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -185,8 +183,7 @@ public class TestFlinkHoodieBloomIndex extends HoodieFlinkClientTestHarness {
partitionRecordKeyMap.put(t.getLeft(), recordKeyList);
});
List<Pair<String, HoodieKey>> comparisonKeyList = HoodieList.getList(
index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, HoodieMapPair.of(partitionRecordKeyMap)));
List<Pair<String, HoodieKey>> comparisonKeyList = index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, HoodieListPairData.lazy(partitionRecordKeyMap)).collectAsList();
assertEquals(10, comparisonKeyList.size());
java.util.Map<String, List<String>> recordKeyToFileComps = comparisonKeyList.stream()

View File

@@ -21,7 +21,8 @@ package org.apache.hudi.testutils;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
@@ -133,7 +134,7 @@ public class HoodieFlinkClientTestHarness extends HoodieCommonTestHarness implem
protected List<HoodieRecord> tagLocation(
HoodieIndex index, List<HoodieRecord> records, HoodieTable table) {
return HoodieList.getList(index.tagLocation(HoodieList.of(records), context, table));
return ((HoodieData<HoodieRecord>) index.tagLocation(HoodieListData.eager(records), context, table)).collectAsList();
}
/**

View File

@@ -20,7 +20,7 @@ package org.apache.hudi.client;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
@@ -67,8 +67,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
// Create a Hoodie table which encapsulated the commits and files visible
HoodieJavaTable<T> table = HoodieJavaTable.create(config, (HoodieJavaEngineContext) context);
Timer.Context indexTimer = metrics.getIndexCtx();
List<HoodieRecord<T>> recordsWithLocation = HoodieList.getList(
getIndex().tagLocation(HoodieList.of(hoodieRecords), context, table));
List<HoodieRecord<T>> recordsWithLocation = getIndex().tagLocation(HoodieListData.eager(hoodieRecords), context, table).collectAsList();
metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
return recordsWithLocation.stream().filter(v1 -> !v1.isCurrentLocationKnown()).collect(Collectors.toList());
}

View File

@@ -25,7 +25,7 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.JavaTaskContextSupplier;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -94,7 +94,7 @@ public abstract class JavaExecutionStrategy<T extends HoodieRecordPayload<T>>
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false),
instantTime)));
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = new HoodieWriteMetadata<>();
writeMetadata.setWriteStatuses(HoodieList.of(writeStatusList));
writeMetadata.setWriteStatuses(HoodieListData.eager(writeStatusList));
return writeMetadata;
}

View File

@@ -24,7 +24,7 @@ import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
@@ -74,12 +74,12 @@ public class HoodieJavaEngineContext extends HoodieEngineContext {
@Override
public <T> HoodieData<T> emptyHoodieData() {
return HoodieList.of(Collections.emptyList());
return HoodieListData.eager(Collections.emptyList());
}
@Override
public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
return HoodieList.of(data);
return HoodieListData.eager(data);
}
@Override

View File

@@ -23,7 +23,7 @@ import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -58,8 +58,8 @@ public abstract class JavaHoodieIndex<T extends HoodieRecordPayload> extends Hoo
public <R> HoodieData<HoodieRecord<R>> tagLocation(
HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException {
List<HoodieRecord<T>> hoodieRecords = tagLocation(HoodieList.getList(records.map(record -> (HoodieRecord<T>) record)), context, hoodieTable);
return HoodieList.of(hoodieRecords.stream().map(r -> (HoodieRecord<R>) r).collect(Collectors.toList()));
List<HoodieRecord<T>> hoodieRecords = tagLocation(records.map(record -> (HoodieRecord<T>) record).collectAsList(), context, hoodieTable);
return HoodieListData.eager(hoodieRecords.stream().map(r -> (HoodieRecord<R>) r).collect(Collectors.toList()));
}
@Override
@@ -67,6 +67,6 @@ public abstract class JavaHoodieIndex<T extends HoodieRecordPayload> extends Hoo
public HoodieData<WriteStatus> updateLocation(
HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException {
return HoodieList.of(updateLocation(HoodieList.getList(writeStatuses), context, hoodieTable));
return HoodieListData.eager(updateLocation(writeStatuses.collectAsList(), context, hoodieTable));
}
}

View File

@@ -36,8 +36,6 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import java.util.List;
import static org.apache.hudi.common.data.HoodieList.getList;
public abstract class HoodieJavaTable<T extends HoodieRecordPayload>
extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
protected HoodieJavaTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
@@ -67,7 +65,7 @@ public abstract class HoodieJavaTable<T extends HoodieRecordPayload>
public static HoodieWriteMetadata<List<WriteStatus>> convertMetadata(
HoodieWriteMetadata<HoodieData<WriteStatus>> metadata) {
return metadata.clone(getList(metadata.getWriteStatuses()));
return metadata.clone(metadata.getWriteStatuses().collectAsList());
}
@Override

View File

@@ -19,7 +19,7 @@
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -130,8 +130,7 @@ public abstract class BaseJavaCommitActionExecutor<T extends HoodieRecordPayload
protected List<WriteStatus> updateIndex(List<WriteStatus> writeStatuses, HoodieWriteMetadata<List<WriteStatus>> result) {
Instant indexStartTime = Instant.now();
// Update the index back
List<WriteStatus> statuses = HoodieList.getList(
table.getIndex().updateLocation(HoodieList.of(writeStatuses), context, table));
List<WriteStatus> statuses = table.getIndex().updateLocation(HoodieListData.eager(writeStatuses), context, table).collectAsList();
result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
result.setWriteStatuses(statuses);
return statuses;
@@ -339,8 +338,7 @@ public abstract class BaseJavaCommitActionExecutor<T extends HoodieRecordPayload
public void updateIndexAndCommitIfNeeded(List<WriteStatus> writeStatuses, HoodieWriteMetadata result) {
Instant indexStartTime = Instant.now();
// Update the index back
List<WriteStatus> statuses = HoodieList.getList(
table.getIndex().updateLocation(HoodieList.of(writeStatuses), context, table));
List<WriteStatus> statuses = table.getIndex().updateLocation(HoodieListData.eager(writeStatuses), context, table).collectAsList();
result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
result.setWriteStatuses(statuses);
result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(result));

View File

@@ -19,7 +19,7 @@
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -99,8 +99,7 @@ public class JavaDeleteHelper<R> extends
dedupedKeys.stream().map(key -> new HoodieAvroRecord<>(key, new EmptyHoodieRecordPayload())).collect(Collectors.toList());
Instant beginTag = Instant.now();
// perform index look up to get existing location of records
List<HoodieRecord<EmptyHoodieRecordPayload>> taggedRecords = HoodieList.getList(
table.getIndex().tagLocation(HoodieList.of(dedupedRecords), context, table));
List<HoodieRecord<EmptyHoodieRecordPayload>> taggedRecords = table.getIndex().tagLocation(HoodieListData.eager(dedupedRecords), context, table).collectAsList();
Duration tagLocationDuration = Duration.between(beginTag, Instant.now());
// filter out non existent keys/records

View File

@@ -19,7 +19,7 @@
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
@@ -50,8 +50,7 @@ public class JavaWriteHelper<T extends HoodieRecordPayload,R> extends BaseWriteH
@Override
protected List<HoodieRecord<T>> tag(List<HoodieRecord<T>> dedupedRecords, HoodieEngineContext context, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table) {
return HoodieList.getList(
table.getIndex().tagLocation(HoodieList.of(dedupedRecords), context, table));
return table.getIndex().tagLocation(HoodieListData.eager(dedupedRecords), context, table).collectAsList();
}
@Override

View File

@@ -31,6 +31,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.storage.StorageLevel;
import java.util.List;
import java.util.Map;
import scala.Tuple2;
@@ -41,7 +42,7 @@ import scala.Tuple2;
* @param <K> type of key.
* @param <V> type of value.
*/
public class HoodieJavaPairRDD<K, V> extends HoodiePairData<K, V> {
public class HoodieJavaPairRDD<K, V> implements HoodiePairData<K, V> {
private final JavaPairRDD<K, V> pairRDDData;
@@ -105,8 +106,13 @@ public class HoodieJavaPairRDD<K, V> extends HoodiePairData<K, V> {
}
@Override
public HoodiePairData<K, V> reduceByKey(SerializableBiFunction<V, V, V> func, int parallelism) {
return HoodieJavaPairRDD.of(pairRDDData.reduceByKey(func::apply, parallelism));
public HoodiePairData<K, Iterable<V>> groupByKey() {
return new HoodieJavaPairRDD<>(pairRDDData.groupByKey());
}
@Override
public HoodiePairData<K, V> reduceByKey(SerializableBiFunction<V, V, V> combiner, int parallelism) {
return HoodieJavaPairRDD.of(pairRDDData.reduceByKey(combiner::apply, parallelism));
}
@Override
@@ -130,4 +136,9 @@ public class HoodieJavaPairRDD<K, V> extends HoodiePairData<K, V> {
.map(tuple -> new Tuple2<>(tuple._1,
new ImmutablePair<>(tuple._2._1, Option.ofNullable(tuple._2._2.orElse(null)))))));
}
@Override
public List<Pair<K, V>> collectAsList() {
return pairRDDData.map(t -> Pair.of(t._1, t._2)).collect();
}
}

View File

@@ -26,6 +26,7 @@ import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.storage.StorageLevel;
@@ -39,7 +40,7 @@ import scala.Tuple2;
*
* @param <T> type of object.
*/
public class HoodieJavaRDD<T> extends HoodieData<T> {
public class HoodieJavaRDD<T> implements HoodieData<T> {
private final JavaRDD<T> rddData;
@@ -74,17 +75,16 @@ public class HoodieJavaRDD<T> extends HoodieData<T> {
* @return the a {@link JavaRDD} of objects in type T.
*/
public static <T> JavaRDD<T> getJavaRDD(HoodieData<T> hoodieData) {
return ((HoodieJavaRDD<T>) hoodieData).get();
return ((HoodieJavaRDD<T>) hoodieData).rddData;
}
public static <K, V> JavaPairRDD<K, V> getJavaRDD(HoodiePairData<K, V> hoodieData) {
return ((HoodieJavaPairRDD<K, V>) hoodieData).get();
}
@Override
public JavaRDD<T> get() {
return rddData;
}
@Override
public void persist(String cacheConfig) {
rddData.persist(StorageLevel.fromString(cacheConfig));
public void persist(String level) {
rddData.persist(StorageLevel.fromString(level));
}
@Override
@@ -112,20 +112,15 @@ public class HoodieJavaRDD<T> extends HoodieData<T> {
return HoodieJavaRDD.of(rddData.mapPartitions(func::apply, preservesPartitioning));
}
@Override
public <O> HoodieData<O> mapPartitions(SerializableFunction<Iterator<T>, Iterator<O>> func) {
return HoodieJavaRDD.of(rddData.mapPartitions(func::apply));
}
@Override
public <O> HoodieData<O> flatMap(SerializableFunction<T, Iterator<O>> func) {
return HoodieJavaRDD.of(rddData.flatMap(e -> func.apply(e)));
}
@Override
public <K, V> HoodiePairData<K, V> mapToPair(SerializablePairFunction<T, K, V> mapToPairFunc) {
public <K, V> HoodiePairData<K, V> mapToPair(SerializablePairFunction<T, K, V> func) {
return HoodieJavaPairRDD.of(rddData.mapToPair(input -> {
Pair<K, V> pair = mapToPairFunc.call(input);
Pair<K, V> pair = func.call(input);
return new Tuple2<>(pair.getLeft(), pair.getRight());
}));
}
@@ -140,13 +135,6 @@ public class HoodieJavaRDD<T> extends HoodieData<T> {
return HoodieJavaRDD.of(rddData.distinct(parallelism));
}
@Override
public <O> HoodieData<T> distinctWithKey(SerializableFunction<T, O> keyGetter, int parallelism) {
return mapToPair(i -> Pair.of(keyGetter.apply(i), i))
.reduceByKey((value1, value2) -> value1, parallelism)
.values();
}
@Override
public HoodieData<T> filter(SerializableFunction<T, Boolean> filterFunc) {
return HoodieJavaRDD.of(rddData.filter(filterFunc::apply));
@@ -154,7 +142,7 @@ public class HoodieJavaRDD<T> extends HoodieData<T> {
@Override
public HoodieData<T> union(HoodieData<T> other) {
return HoodieJavaRDD.of(rddData.union((JavaRDD<T>) other.get()));
return HoodieJavaRDD.of(rddData.union(((HoodieJavaRDD<T>) other).rddData));
}
@Override