[HUDI-1332] Introduce FlinkHoodieBloomIndex to hudi-flink-client (#2375)
* [HUDI] Add bloom index for hudi-flink-client Co-authored-by: yangxiang <yangxiang@oppo.com>
This commit is contained in:
@@ -29,6 +29,7 @@ import org.apache.hudi.common.util.ReflectionUtils;
|
|||||||
import org.apache.hudi.common.util.StringUtils;
|
import org.apache.hudi.common.util.StringUtils;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.exception.HoodieIndexException;
|
import org.apache.hudi.exception.HoodieIndexException;
|
||||||
|
import org.apache.hudi.index.bloom.FlinkHoodieBloomIndex;
|
||||||
import org.apache.hudi.index.state.FlinkInMemoryStateIndex;
|
import org.apache.hudi.index.state.FlinkInMemoryStateIndex;
|
||||||
import org.apache.hudi.PublicAPIMethod;
|
import org.apache.hudi.PublicAPIMethod;
|
||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
@@ -58,6 +59,8 @@ public abstract class FlinkHoodieIndex<T extends HoodieRecordPayload> extends Ho
|
|||||||
switch (config.getIndexType()) {
|
switch (config.getIndexType()) {
|
||||||
case INMEMORY:
|
case INMEMORY:
|
||||||
return new FlinkInMemoryStateIndex<>(context, config);
|
return new FlinkInMemoryStateIndex<>(context, config);
|
||||||
|
case BLOOM:
|
||||||
|
return new FlinkHoodieBloomIndex(config);
|
||||||
default:
|
default:
|
||||||
throw new HoodieIndexException("Unsupported index type " + config.getIndexType());
|
throw new HoodieIndexException("Unsupported index type " + config.getIndexType());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,267 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.index.bloom;
|
||||||
|
|
||||||
|
import org.apache.hudi.client.WriteStatus;
|
||||||
|
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||||
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
|
import org.apache.hudi.exception.MetadataNotFoundException;
|
||||||
|
import org.apache.hudi.index.FlinkHoodieIndex;
|
||||||
|
import org.apache.hudi.index.HoodieIndexUtils;
|
||||||
|
import org.apache.hudi.io.HoodieKeyLookupHandle;
|
||||||
|
import org.apache.hudi.io.HoodieRangeInfoHandle;
|
||||||
|
import org.apache.hudi.table.HoodieTable;
|
||||||
|
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import com.beust.jcommander.internal.Lists;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
import static java.util.stream.Collectors.mapping;
|
||||||
|
import static java.util.stream.Collectors.groupingBy;
|
||||||
|
import static java.util.stream.Collectors.toList;
|
||||||
|
import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indexing mechanism based on bloom filter. Each parquet file includes its row_key bloom filter in its metadata.
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("checkstyle:LineLength")
|
||||||
|
public class FlinkHoodieBloomIndex<T extends HoodieRecordPayload> extends FlinkHoodieIndex<T> {
|
||||||
|
|
||||||
|
private static final Logger LOG = LogManager.getLogger(FlinkHoodieBloomIndex.class);
|
||||||
|
|
||||||
|
public FlinkHoodieBloomIndex(HoodieWriteConfig config) {
|
||||||
|
super(config);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> records, HoodieEngineContext context,
|
||||||
|
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
|
||||||
|
// Step 1: Extract out thinner Map of (partitionPath, recordKey)
|
||||||
|
Map<String, List<String>> partitionRecordKeyMap = new HashMap<>();
|
||||||
|
records.forEach(record -> {
|
||||||
|
if (partitionRecordKeyMap.containsKey(record.getPartitionPath())) {
|
||||||
|
partitionRecordKeyMap.get(record.getPartitionPath()).add(record.getRecordKey());
|
||||||
|
} else {
|
||||||
|
List<String> recordKeys = Lists.newArrayList();
|
||||||
|
recordKeys.add(record.getRecordKey());
|
||||||
|
partitionRecordKeyMap.put(record.getPartitionPath(), recordKeys);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Step 2: Lookup indexes for all the partition/recordkey pair
|
||||||
|
Map<HoodieKey, HoodieRecordLocation> keyFilenamePairMap =
|
||||||
|
lookupIndex(partitionRecordKeyMap, context, hoodieTable);
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
long totalTaggedRecords = keyFilenamePairMap.values().size();
|
||||||
|
LOG.debug("Number of update records (ones tagged with a fileID): " + totalTaggedRecords);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 3: Tag the incoming records, as inserts or updates, by joining with existing record keys
|
||||||
|
List<HoodieRecord<T>> taggedRecords = tagLocationBacktoRecords(keyFilenamePairMap, records);
|
||||||
|
|
||||||
|
return taggedRecords;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lookup the location for each record key and return the pair<record_key,location> for all record keys already
|
||||||
|
* present and drop the record keys if not present.
|
||||||
|
*/
|
||||||
|
private Map<HoodieKey, HoodieRecordLocation> lookupIndex(
|
||||||
|
Map<String, List<String>> partitionRecordKeyMap, final HoodieEngineContext context,
|
||||||
|
final HoodieTable hoodieTable) {
|
||||||
|
// Obtain records per partition, in the incoming records
|
||||||
|
Map<String, Long> recordsPerPartition = new HashMap<>();
|
||||||
|
partitionRecordKeyMap.keySet().forEach(k -> recordsPerPartition.put(k, Long.valueOf(partitionRecordKeyMap.get(k).size())));
|
||||||
|
List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());
|
||||||
|
|
||||||
|
// Step 2: Load all involved files as <Partition, filename> pairs
|
||||||
|
List<Tuple2<String, BloomIndexFileInfo>> fileInfoList =
|
||||||
|
loadInvolvedFiles(affectedPartitionPathList, context, hoodieTable);
|
||||||
|
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
|
||||||
|
fileInfoList.stream().collect(groupingBy(Tuple2::_1, mapping(Tuple2::_2, toList())));
|
||||||
|
|
||||||
|
// Step 3: Obtain a List, for each incoming record, that already exists, with the file id,
|
||||||
|
// that contains it.
|
||||||
|
List<Tuple2<String, HoodieKey>> fileComparisons =
|
||||||
|
explodeRecordsWithFileComparisons(partitionToFileInfo, partitionRecordKeyMap);
|
||||||
|
return findMatchingFilesForRecordKeys(fileComparisons, hoodieTable);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Load all involved files as <Partition, filename> pair List.
|
||||||
|
*/
|
||||||
|
//TODO duplicate code with spark, we can optimize this method later
|
||||||
|
List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final HoodieEngineContext context,
|
||||||
|
final HoodieTable hoodieTable) {
|
||||||
|
// Obtain the latest data files from all the partitions.
|
||||||
|
List<Pair<String, String>> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
|
||||||
|
.map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
|
||||||
|
.collect(toList());
|
||||||
|
|
||||||
|
if (config.getBloomIndexPruneByRanges()) {
|
||||||
|
// also obtain file ranges, if range pruning is enabled
|
||||||
|
context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
|
||||||
|
return context.map(partitionPathFileIDList, pf -> {
|
||||||
|
try {
|
||||||
|
HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf);
|
||||||
|
String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
|
||||||
|
return new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
|
||||||
|
} catch (MetadataNotFoundException me) {
|
||||||
|
LOG.warn("Unable to find range metadata in file :" + pf);
|
||||||
|
return new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
|
||||||
|
}
|
||||||
|
}, Math.max(partitionPathFileIDList.size(), 1));
|
||||||
|
} else {
|
||||||
|
return partitionPathFileIDList.stream()
|
||||||
|
.map(pf -> new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean rollbackCommit(String instantTime) {
|
||||||
|
// Nope, don't need to do anything.
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is not global, since we depend on the partitionPath to do the lookup.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean isGlobal() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* No indexes into log files yet.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean canIndexLogFiles() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Bloom filters are stored, into the same data files.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean isImplicitWithStorage() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be
|
||||||
|
* checked. For tables, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files
|
||||||
|
* to be compared gets cut down a lot from range pruning.
|
||||||
|
* <p>
|
||||||
|
* Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on
|
||||||
|
* recordKey ranges in the index info.
|
||||||
|
*/
|
||||||
|
List<Tuple2<String, HoodieKey>> explodeRecordsWithFileComparisons(
|
||||||
|
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
|
||||||
|
Map<String, List<String>> partitionRecordKeyMap) {
|
||||||
|
IndexFileFilter indexFileFilter =
|
||||||
|
config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo)
|
||||||
|
: new ListBasedIndexFileFilter(partitionToFileIndexInfo);
|
||||||
|
|
||||||
|
List<Tuple2<String, HoodieKey>> fileRecordPairs = new ArrayList<>();
|
||||||
|
partitionRecordKeyMap.keySet().forEach(partitionPath -> {
|
||||||
|
List<String> hoodieRecordKeys = partitionRecordKeyMap.get(partitionPath);
|
||||||
|
hoodieRecordKeys.forEach(hoodieRecordKey -> {
|
||||||
|
indexFileFilter.getMatchingFilesAndPartition(partitionPath, hoodieRecordKey).forEach(partitionFileIdPair -> {
|
||||||
|
fileRecordPairs.add(new Tuple2<>(partitionFileIdPair.getRight(),
|
||||||
|
new HoodieKey(hoodieRecordKey, partitionPath)));
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
return fileRecordPairs;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Find out <RowKey, filename> pair.
|
||||||
|
*/
|
||||||
|
Map<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
|
||||||
|
List<Tuple2<String, HoodieKey>> fileComparisons,
|
||||||
|
HoodieTable hoodieTable) {
|
||||||
|
|
||||||
|
fileComparisons = fileComparisons.stream().sorted((o1, o2) -> o1._1.compareTo(o2._1)).collect(toList());
|
||||||
|
|
||||||
|
List<HoodieKeyLookupHandle.KeyLookupResult> keyLookupResults = new ArrayList<>();
|
||||||
|
|
||||||
|
Iterator<List<HoodieKeyLookupHandle.KeyLookupResult>> iterator = new HoodieFlinkBloomIndexCheckFunction(hoodieTable, config).apply(fileComparisons.iterator());
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
keyLookupResults.addAll(iterator.next());
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<HoodieKey, HoodieRecordLocation> hoodieRecordLocationMap = new HashMap<>();
|
||||||
|
|
||||||
|
keyLookupResults = keyLookupResults.stream().filter(lr -> lr.getMatchingRecordKeys().size() > 0).collect(toList());
|
||||||
|
keyLookupResults.forEach(lookupResult -> {
|
||||||
|
lookupResult.getMatchingRecordKeys().forEach(r -> {
|
||||||
|
hoodieRecordLocationMap.put(new HoodieKey(r, lookupResult.getPartitionPath()), new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId()));
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
return hoodieRecordLocationMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tag the <rowKey, filename> back to the original HoodieRecord List.
|
||||||
|
*/
|
||||||
|
protected List<HoodieRecord<T>> tagLocationBacktoRecords(
|
||||||
|
Map<HoodieKey, HoodieRecordLocation> keyFilenamePair, List<HoodieRecord<T>> records) {
|
||||||
|
Map<HoodieKey, HoodieRecord<T>> keyRecordPairMap = new HashMap<>();
|
||||||
|
records.forEach(r -> keyRecordPairMap.put(r.getKey(), r));
|
||||||
|
// Here as the record might have more data than rowKey (some rowKeys' fileId is null),
|
||||||
|
// so we do left outer join.
|
||||||
|
List<Tuple2<HoodieRecord<T>, HoodieRecordLocation>> newList = new ArrayList<>();
|
||||||
|
keyRecordPairMap.keySet().forEach(k -> {
|
||||||
|
if (keyFilenamePair.containsKey(k)) {
|
||||||
|
newList.add(new Tuple2(keyRecordPairMap.get(k), keyFilenamePair.get(k)));
|
||||||
|
} else {
|
||||||
|
newList.add(new Tuple2(keyRecordPairMap.get(k), null));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
List<HoodieRecord<T>> res = Lists.newArrayList();
|
||||||
|
for (Tuple2<HoodieRecord<T>, HoodieRecordLocation> v : newList) {
|
||||||
|
res.add(HoodieIndexUtils.getTaggedRecord(v._1, Option.ofNullable(v._2)));
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<WriteStatus> updateLocation(List<WriteStatus> writeStatusList, HoodieEngineContext context,
|
||||||
|
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
|
||||||
|
return writeStatusList;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,127 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.index.bloom;
|
||||||
|
|
||||||
|
import org.apache.hudi.client.utils.LazyIterableIterator;
|
||||||
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
|
import org.apache.hudi.exception.HoodieException;
|
||||||
|
import org.apache.hudi.exception.HoodieIndexException;
|
||||||
|
import org.apache.hudi.io.HoodieKeyLookupHandle;
|
||||||
|
import org.apache.hudi.io.HoodieKeyLookupHandle.KeyLookupResult;
|
||||||
|
import org.apache.hudi.table.HoodieTable;
|
||||||
|
|
||||||
|
import java.util.function.Function;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Function performing actual checking of list containing (fileId, hoodieKeys) against the actual files.
|
||||||
|
*/
|
||||||
|
//TODO we can move this class into the hudi-client-common and reuse it for spark client
|
||||||
|
public class HoodieFlinkBloomIndexCheckFunction
|
||||||
|
implements Function<Iterator<Tuple2<String, HoodieKey>>, Iterator<List<KeyLookupResult>>> {
|
||||||
|
|
||||||
|
private final HoodieTable hoodieTable;
|
||||||
|
|
||||||
|
private final HoodieWriteConfig config;
|
||||||
|
|
||||||
|
public HoodieFlinkBloomIndexCheckFunction(HoodieTable hoodieTable, HoodieWriteConfig config) {
|
||||||
|
this.hoodieTable = hoodieTable;
|
||||||
|
this.config = config;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterator<List<KeyLookupResult>> apply(Iterator<Tuple2<String, HoodieKey>> fileParitionRecordKeyTripletItr) {
|
||||||
|
return new LazyKeyCheckIterator(fileParitionRecordKeyTripletItr);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <V> Function<V, Iterator<List<KeyLookupResult>>> compose(Function<? super V, ? extends Iterator<Tuple2<String, HoodieKey>>> before) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <V> Function<Iterator<Tuple2<String, HoodieKey>>, V> andThen(Function<? super Iterator<List<KeyLookupResult>>, ? extends V> after) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
class LazyKeyCheckIterator extends LazyIterableIterator<Tuple2<String, HoodieKey>, List<KeyLookupResult>> {
|
||||||
|
|
||||||
|
private HoodieKeyLookupHandle keyLookupHandle;
|
||||||
|
|
||||||
|
LazyKeyCheckIterator(Iterator<Tuple2<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
|
||||||
|
super(filePartitionRecordKeyTripletItr);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void start() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<KeyLookupResult> computeNext() {
|
||||||
|
List<KeyLookupResult> ret = new ArrayList<>();
|
||||||
|
try {
|
||||||
|
// process one file in each go.
|
||||||
|
while (inputItr.hasNext()) {
|
||||||
|
Tuple2<String, HoodieKey> currentTuple = inputItr.next();
|
||||||
|
String fileId = currentTuple._1;
|
||||||
|
String partitionPath = currentTuple._2.getPartitionPath();
|
||||||
|
String recordKey = currentTuple._2.getRecordKey();
|
||||||
|
Pair<String, String> partitionPathFilePair = Pair.of(partitionPath, fileId);
|
||||||
|
|
||||||
|
// lazily init state
|
||||||
|
if (keyLookupHandle == null) {
|
||||||
|
keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
|
||||||
|
}
|
||||||
|
|
||||||
|
// if continue on current file
|
||||||
|
if (keyLookupHandle.getPartitionPathFilePair().equals(partitionPathFilePair)) {
|
||||||
|
keyLookupHandle.addKey(recordKey);
|
||||||
|
} else {
|
||||||
|
// do the actual checking of file & break out
|
||||||
|
ret.add(keyLookupHandle.getLookupResult());
|
||||||
|
keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
|
||||||
|
keyLookupHandle.addKey(recordKey);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handle case, where we ran out of input, close pending work, update return val
|
||||||
|
if (!inputItr.hasNext()) {
|
||||||
|
ret.add(keyLookupHandle.getLookupResult());
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
if (e instanceof HoodieException) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
throw new HoodieIndexException("Error checking bloom filter index. ", e);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void end() {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,31 @@
|
|||||||
|
###
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
###
|
||||||
|
log4j.rootLogger=WARN, CONSOLE
|
||||||
|
log4j.logger.org.apache=INFO
|
||||||
|
log4j.logger.org.apache.hudi=DEBUG
|
||||||
|
log4j.logger.org.apache.hadoop.hbase=ERROR
|
||||||
|
|
||||||
|
# A1 is set to be a ConsoleAppender.
|
||||||
|
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
|
||||||
|
# A1 uses PatternLayout.
|
||||||
|
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
|
||||||
|
log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
|
||||||
|
log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter
|
||||||
|
log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true
|
||||||
|
log4j.appender.CONSOLE.filter.a.LevelMin=WARN
|
||||||
|
log4j.appender.CONSOLE.filter.a.LevelMax=FATAL
|
||||||
@@ -0,0 +1,469 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.index.bloom;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.bloom.BloomFilter;
|
||||||
|
import org.apache.hudi.common.bloom.BloomFilterFactory;
|
||||||
|
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
|
||||||
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
|
import org.apache.hudi.common.testutils.RawTripTestPayload;
|
||||||
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
import org.apache.hudi.config.HoodieIndexConfig;
|
||||||
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
|
import org.apache.hudi.io.HoodieKeyLookupHandle;
|
||||||
|
import org.apache.hudi.table.HoodieFlinkTable;
|
||||||
|
import org.apache.hudi.table.HoodieTable;
|
||||||
|
import org.apache.hudi.testutils.HoodieFlinkClientTestHarness;
|
||||||
|
import org.apache.hudi.testutils.HoodieFlinkWriteableTestTable;
|
||||||
|
|
||||||
|
import org.apache.avro.Schema;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.junit.jupiter.api.AfterEach;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
|
import org.junit.jupiter.params.provider.Arguments;
|
||||||
|
import org.junit.jupiter.params.provider.MethodSource;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
import static java.util.Arrays.asList;
|
||||||
|
import static java.util.UUID.randomUUID;
|
||||||
|
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit test against FlinkHoodieBloomIndex.
|
||||||
|
*/
|
||||||
|
//TODO merge code with Spark Bloom index tests.
|
||||||
|
public class TestFlinkHoodieBloomIndex extends HoodieFlinkClientTestHarness {
|
||||||
|
|
||||||
|
private static final Schema SCHEMA = getSchemaFromResource(TestFlinkHoodieBloomIndex.class, "/exampleSchema.avsc", true);
|
||||||
|
private static final String TEST_NAME_WITH_PARAMS = "[{index}] Test with rangePruning={0}, treeFiltering={1}, bucketizedChecking={2}";
|
||||||
|
|
||||||
|
public static Stream<Arguments> configParams() {
|
||||||
|
Object[][] data =
|
||||||
|
new Object[][] {{true, true, true}, {false, true, true}, {true, true, false}, {true, false, true}};
|
||||||
|
return Stream.of(data).map(Arguments::of);
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
initPath();
|
||||||
|
initFileSystem();
|
||||||
|
// We have some records to be tagged (two different partitions)
|
||||||
|
initMetaClient();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterEach
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
cleanupResources();
|
||||||
|
}
|
||||||
|
|
||||||
|
private HoodieWriteConfig makeConfig(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) {
|
||||||
|
return HoodieWriteConfig.newBuilder().withPath(basePath)
|
||||||
|
.withIndexConfig(HoodieIndexConfig.newBuilder().bloomIndexPruneByRanges(rangePruning)
|
||||||
|
.bloomIndexTreebasedFilter(treeFiltering).bloomIndexBucketizedChecking(bucketizedChecking)
|
||||||
|
.bloomIndexKeysPerBucket(2).build())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
|
||||||
|
@MethodSource("configParams")
|
||||||
|
public void testLoadInvolvedFiles(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception {
|
||||||
|
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
|
||||||
|
FlinkHoodieBloomIndex index = new FlinkHoodieBloomIndex(config);
|
||||||
|
HoodieTable hoodieTable = HoodieFlinkTable.create(config, context, metaClient);
|
||||||
|
HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(hoodieTable, SCHEMA);
|
||||||
|
|
||||||
|
// Create some partitions, and put some files
|
||||||
|
// "2016/01/21": 0 file
|
||||||
|
// "2016/04/01": 1 file (2_0_20160401010101.parquet)
|
||||||
|
// "2015/03/12": 3 files (1_0_20150312101010.parquet, 3_0_20150312101010.parquet, 4_0_20150312101010.parquet)
|
||||||
|
testTable.withPartitionMetaFiles("2016/01/21", "2016/04/01", "2015/03/12");
|
||||||
|
|
||||||
|
RawTripTestPayload rowChange1 =
|
||||||
|
new RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
|
||||||
|
HoodieRecord record1 =
|
||||||
|
new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1);
|
||||||
|
RawTripTestPayload rowChange2 =
|
||||||
|
new RawTripTestPayload("{\"_row_key\":\"001\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
|
||||||
|
HoodieRecord record2 =
|
||||||
|
new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2);
|
||||||
|
RawTripTestPayload rowChange3 =
|
||||||
|
new RawTripTestPayload("{\"_row_key\":\"002\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
|
||||||
|
HoodieRecord record3 =
|
||||||
|
new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3);
|
||||||
|
RawTripTestPayload rowChange4 =
|
||||||
|
new RawTripTestPayload("{\"_row_key\":\"003\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
|
||||||
|
HoodieRecord record4 =
|
||||||
|
new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4);
|
||||||
|
|
||||||
|
List<String> partitions = asList("2016/01/21", "2016/04/01", "2015/03/12");
|
||||||
|
List<Tuple2<String, BloomIndexFileInfo>> filesList = index.loadInvolvedFiles(partitions, context, hoodieTable);
|
||||||
|
// Still 0, as no valid commit
|
||||||
|
assertEquals(0, filesList.size());
|
||||||
|
|
||||||
|
testTable.addCommit("20160401010101").withInserts("2016/04/01", "2");
|
||||||
|
testTable.addCommit("20150312101010").withInserts("2015/03/12", "1")
|
||||||
|
.withInserts("2015/03/12", "3", record1)
|
||||||
|
.withInserts("2015/03/12", "4", record2, record3, record4);
|
||||||
|
metaClient.reloadActiveTimeline();
|
||||||
|
|
||||||
|
filesList = index.loadInvolvedFiles(partitions, context, hoodieTable);
|
||||||
|
assertEquals(4, filesList.size());
|
||||||
|
|
||||||
|
if (rangePruning) {
|
||||||
|
// these files will not have the key ranges
|
||||||
|
assertNull(filesList.get(0)._2().getMaxRecordKey());
|
||||||
|
assertNull(filesList.get(0)._2().getMinRecordKey());
|
||||||
|
assertFalse(filesList.get(1)._2().hasKeyRanges());
|
||||||
|
assertNotNull(filesList.get(2)._2().getMaxRecordKey());
|
||||||
|
assertNotNull(filesList.get(2)._2().getMinRecordKey());
|
||||||
|
assertTrue(filesList.get(3)._2().hasKeyRanges());
|
||||||
|
|
||||||
|
// no longer sorted, but should have same files.
|
||||||
|
|
||||||
|
List<Tuple2<String, BloomIndexFileInfo>> expected =
|
||||||
|
asList(new Tuple2<>("2016/04/01", new BloomIndexFileInfo("2")),
|
||||||
|
new Tuple2<>("2015/03/12", new BloomIndexFileInfo("1")),
|
||||||
|
new Tuple2<>("2015/03/12", new BloomIndexFileInfo("3", "000", "000")),
|
||||||
|
new Tuple2<>("2015/03/12", new BloomIndexFileInfo("4", "001", "003")));
|
||||||
|
assertEquals(expected, filesList);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
|
||||||
|
@MethodSource("configParams")
|
||||||
|
public void testRangePruning(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) {
|
||||||
|
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
|
||||||
|
FlinkHoodieBloomIndex index = new FlinkHoodieBloomIndex(config);
|
||||||
|
|
||||||
|
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo = new HashMap<>();
|
||||||
|
partitionToFileIndexInfo.put("2017/10/22",
|
||||||
|
asList(new BloomIndexFileInfo("f1"), new BloomIndexFileInfo("f2", "000", "000"),
|
||||||
|
new BloomIndexFileInfo("f3", "001", "003"), new BloomIndexFileInfo("f4", "002", "007"),
|
||||||
|
new BloomIndexFileInfo("f5", "009", "010")));
|
||||||
|
|
||||||
|
Map<String, List<String>> partitionRecordKeyMap = new HashMap<>();
|
||||||
|
asList(new Tuple2<>("2017/10/22", "003"), new Tuple2<>("2017/10/22", "002"),
|
||||||
|
new Tuple2<>("2017/10/22", "005"), new Tuple2<>("2017/10/22", "004"))
|
||||||
|
.forEach(t -> {
|
||||||
|
List<String> recordKeyList = partitionRecordKeyMap.getOrDefault(t._1, new ArrayList<>());
|
||||||
|
recordKeyList.add(t._2);
|
||||||
|
partitionRecordKeyMap.put(t._1, recordKeyList);
|
||||||
|
});
|
||||||
|
|
||||||
|
List<scala.Tuple2<String, HoodieKey>> comparisonKeyList =
|
||||||
|
index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyMap);
|
||||||
|
|
||||||
|
assertEquals(10, comparisonKeyList.size());
|
||||||
|
java.util.Map<String, List<String>> recordKeyToFileComps = comparisonKeyList.stream()
|
||||||
|
.collect(java.util.stream.Collectors.groupingBy(t -> t._2.getRecordKey(), java.util.stream.Collectors.mapping(t -> t._1, java.util.stream.Collectors.toList())));
|
||||||
|
|
||||||
|
assertEquals(4, recordKeyToFileComps.size());
|
||||||
|
assertEquals(new java.util.HashSet<>(asList("f1", "f3", "f4")), new java.util.HashSet<>(recordKeyToFileComps.get("002")));
|
||||||
|
assertEquals(new java.util.HashSet<>(asList("f1", "f3", "f4")), new java.util.HashSet<>(recordKeyToFileComps.get("003")));
|
||||||
|
assertEquals(new java.util.HashSet<>(asList("f1", "f4")), new java.util.HashSet<>(recordKeyToFileComps.get("004")));
|
||||||
|
assertEquals(new java.util.HashSet<>(asList("f1", "f4")), new java.util.HashSet<>(recordKeyToFileComps.get("005")));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCheckUUIDsAgainstOneFile() throws Exception {
|
||||||
|
final String partition = "2016/01/31";
|
||||||
|
// Create some records to use
|
||||||
|
String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
|
||||||
|
+ "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
|
||||||
|
String recordStr2 = "{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\","
|
||||||
|
+ "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
|
||||||
|
String recordStr3 = "{\"_row_key\":\"3eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
|
||||||
|
+ "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
|
||||||
|
String recordStr4 = "{\"_row_key\":\"4eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
|
||||||
|
+ "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":32}";
|
||||||
|
RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1);
|
||||||
|
HoodieRecord record1 =
|
||||||
|
new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1);
|
||||||
|
RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2);
|
||||||
|
HoodieRecord record2 =
|
||||||
|
new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2);
|
||||||
|
RawTripTestPayload rowChange3 = new RawTripTestPayload(recordStr3);
|
||||||
|
HoodieRecord record3 =
|
||||||
|
new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3);
|
||||||
|
RawTripTestPayload rowChange4 = new RawTripTestPayload(recordStr4);
|
||||||
|
HoodieRecord record4 =
|
||||||
|
new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4);
|
||||||
|
|
||||||
|
// We write record1, record2 to a parquet file, but the bloom filter contains (record1,
|
||||||
|
// record2, record3).
|
||||||
|
BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name());
|
||||||
|
filter.add(record3.getRecordKey());
|
||||||
|
HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(metaClient, SCHEMA, filter);
|
||||||
|
String fileId = testTable.addCommit("000").getFileIdWithInserts(partition, record1, record2);
|
||||||
|
String filename = testTable.getBaseFileNameById(fileId);
|
||||||
|
|
||||||
|
// The bloom filter contains 3 records
|
||||||
|
assertTrue(filter.mightContain(record1.getRecordKey()));
|
||||||
|
assertTrue(filter.mightContain(record2.getRecordKey()));
|
||||||
|
assertTrue(filter.mightContain(record3.getRecordKey()));
|
||||||
|
assertFalse(filter.mightContain(record4.getRecordKey()));
|
||||||
|
|
||||||
|
// Compare with file
|
||||||
|
List<String> uuids = asList(record1.getRecordKey(), record2.getRecordKey(), record3.getRecordKey(), record4.getRecordKey());
|
||||||
|
|
||||||
|
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
|
||||||
|
HoodieFlinkTable table = HoodieFlinkTable.create(config, context, metaClient);
|
||||||
|
HoodieKeyLookupHandle keyHandle = new HoodieKeyLookupHandle<>(config, table, Pair.of(partition, fileId));
|
||||||
|
List<String> results = keyHandle.checkCandidatesAgainstFile(hadoopConf, uuids,
|
||||||
|
new Path(java.nio.file.Paths.get(basePath, partition, filename).toString()));
|
||||||
|
assertEquals(results.size(), 2);
|
||||||
|
assertTrue(results.get(0).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")
|
||||||
|
|| results.get(1).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0"));
|
||||||
|
assertTrue(results.get(0).equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")
|
||||||
|
|| results.get(1).equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0"));
|
||||||
|
// TODO(vc): Need more coverage on actual filenames
|
||||||
|
// assertTrue(results.get(0)._2().equals(filename));
|
||||||
|
// assertTrue(results.get(1)._2().equals(filename));
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
|
||||||
|
@MethodSource("configParams")
|
||||||
|
public void testTagLocationWithEmptyList(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) {
|
||||||
|
// We have some records to be tagged (two different partitions)
|
||||||
|
List<HoodieRecord> records = new ArrayList<>();
|
||||||
|
// Also create the metadata and config
|
||||||
|
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
|
||||||
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
|
HoodieFlinkTable table = HoodieFlinkTable.create(config, context, metaClient);
|
||||||
|
|
||||||
|
// Let's tag
|
||||||
|
FlinkHoodieBloomIndex bloomIndex = new FlinkHoodieBloomIndex(config);
|
||||||
|
|
||||||
|
assertDoesNotThrow(() -> {
|
||||||
|
bloomIndex.tagLocation(records, context, table);
|
||||||
|
}, "EmptyList should not result in IllegalArgumentException: Positive number of slices required");
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
|
||||||
|
@MethodSource("configParams")
|
||||||
|
public void testTagLocation(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception {
|
||||||
|
// We have some records to be tagged (two different partitions)
|
||||||
|
String rowKey1 = randomUUID().toString();
|
||||||
|
String rowKey2 = randomUUID().toString();
|
||||||
|
String rowKey3 = randomUUID().toString();
|
||||||
|
String recordStr1 = "{\"_row_key\":\"" + rowKey1 + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
|
||||||
|
String recordStr2 = "{\"_row_key\":\"" + rowKey2 + "\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
|
||||||
|
String recordStr3 = "{\"_row_key\":\"" + rowKey3 + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
|
||||||
|
// place same row key under a different partition.
|
||||||
|
String recordStr4 = "{\"_row_key\":\"" + rowKey1 + "\",\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}";
|
||||||
|
RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1);
|
||||||
|
HoodieRecord record1 =
|
||||||
|
new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1);
|
||||||
|
RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2);
|
||||||
|
HoodieRecord record2 =
|
||||||
|
new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2);
|
||||||
|
RawTripTestPayload rowChange3 = new RawTripTestPayload(recordStr3);
|
||||||
|
HoodieRecord record3 =
|
||||||
|
new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3);
|
||||||
|
RawTripTestPayload rowChange4 = new RawTripTestPayload(recordStr4);
|
||||||
|
HoodieRecord record4 =
|
||||||
|
new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4);
|
||||||
|
List<HoodieRecord> records = asList(record1, record2, record3, record4);
|
||||||
|
|
||||||
|
// Also create the metadata and config
|
||||||
|
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
|
||||||
|
HoodieFlinkTable hoodieTable = HoodieFlinkTable.create(config, context, metaClient);
|
||||||
|
HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(hoodieTable, SCHEMA);
|
||||||
|
|
||||||
|
// Let's tag
|
||||||
|
FlinkHoodieBloomIndex bloomIndex = new FlinkHoodieBloomIndex(config);
|
||||||
|
List<HoodieRecord> taggedRecords = bloomIndex.tagLocation(records, context, hoodieTable);
|
||||||
|
|
||||||
|
// Should not find any files
|
||||||
|
for (HoodieRecord record : taggedRecords) {
|
||||||
|
assertFalse(record.isCurrentLocationKnown());
|
||||||
|
}
|
||||||
|
|
||||||
|
// We create three parquet file, each having one record. (two different partitions)
|
||||||
|
String fileId1 = testTable.addCommit("001").getFileIdWithInserts("2016/01/31", record1);
|
||||||
|
String fileId2 = testTable.addCommit("002").getFileIdWithInserts("2016/01/31", record2);
|
||||||
|
String fileId3 = testTable.addCommit("003").getFileIdWithInserts("2015/01/31", record4);
|
||||||
|
|
||||||
|
metaClient.reloadActiveTimeline();
|
||||||
|
|
||||||
|
// We do the tag again
|
||||||
|
taggedRecords = bloomIndex.tagLocation(records, context, HoodieFlinkTable.create(config, context, metaClient));
|
||||||
|
|
||||||
|
// Check results
|
||||||
|
for (HoodieRecord record : taggedRecords) {
|
||||||
|
if (record.getRecordKey().equals(rowKey1)) {
|
||||||
|
if (record.getPartitionPath().equals("2015/01/31")) {
|
||||||
|
assertEquals(record.getCurrentLocation().getFileId(), fileId3);
|
||||||
|
} else {
|
||||||
|
assertEquals(record.getCurrentLocation().getFileId(), fileId1);
|
||||||
|
}
|
||||||
|
} else if (record.getRecordKey().equals(rowKey2)) {
|
||||||
|
assertEquals(record.getCurrentLocation().getFileId(), fileId2);
|
||||||
|
} else if (record.getRecordKey().equals(rowKey3)) {
|
||||||
|
assertFalse(record.isCurrentLocationKnown());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
|
||||||
|
@MethodSource("configParams")
|
||||||
|
public void testCheckExists(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception {
|
||||||
|
// We have some records to be tagged (two different partitions)
|
||||||
|
|
||||||
|
String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
|
||||||
|
+ "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
|
||||||
|
String recordStr2 = "{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\","
|
||||||
|
+ "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
|
||||||
|
String recordStr3 = "{\"_row_key\":\"3eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
|
||||||
|
+ "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
|
||||||
|
// record key same as recordStr2
|
||||||
|
String recordStr4 = "{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\","
|
||||||
|
+ "\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}";
|
||||||
|
RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1);
|
||||||
|
HoodieKey key1 = new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath());
|
||||||
|
HoodieRecord record1 = new HoodieRecord(key1, rowChange1);
|
||||||
|
RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2);
|
||||||
|
HoodieKey key2 = new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath());
|
||||||
|
HoodieRecord record2 = new HoodieRecord(key2, rowChange2);
|
||||||
|
RawTripTestPayload rowChange3 = new RawTripTestPayload(recordStr3);
|
||||||
|
HoodieKey key3 = new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath());
|
||||||
|
RawTripTestPayload rowChange4 = new RawTripTestPayload(recordStr4);
|
||||||
|
HoodieKey key4 = new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath());
|
||||||
|
HoodieRecord record4 = new HoodieRecord(key4, rowChange4);
|
||||||
|
List<HoodieKey> keys = asList(key1, key2, key3, key4);
|
||||||
|
|
||||||
|
// Also create the metadata and config
|
||||||
|
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
|
||||||
|
HoodieTable hoodieTable = HoodieFlinkTable.create(config, context, metaClient);
|
||||||
|
HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(hoodieTable, SCHEMA);
|
||||||
|
|
||||||
|
// Let's tag
|
||||||
|
FlinkHoodieBloomIndex bloomIndex = new FlinkHoodieBloomIndex(config);
|
||||||
|
List<HoodieRecord> toTagRecords = new ArrayList<>();
|
||||||
|
toTagRecords.add(new HoodieRecord(record4.getKey(), null));
|
||||||
|
List<HoodieRecord> taggedRecords = bloomIndex.tagLocation(toTagRecords, context, hoodieTable);
|
||||||
|
Map<HoodieKey, Option<Pair<String, String>>> recordLocations = new HashMap<>();
|
||||||
|
for (HoodieRecord taggedRecord : taggedRecords) {
|
||||||
|
recordLocations.put(taggedRecord.getKey(), taggedRecord.isCurrentLocationKnown()
|
||||||
|
? Option.of(Pair.of(taggedRecord.getPartitionPath(), taggedRecord.getCurrentLocation().getFileId()))
|
||||||
|
: Option.empty());
|
||||||
|
}
|
||||||
|
// Should not find any files
|
||||||
|
for (Option<Pair<String, String>> record : recordLocations.values()) {
|
||||||
|
assertTrue(!record.isPresent());
|
||||||
|
}
|
||||||
|
|
||||||
|
// We create three parquet file, each having one record. (two different partitions)
|
||||||
|
String fileId1 = testTable.addCommit("001").getFileIdWithInserts("2016/01/31", record1);
|
||||||
|
String fileId2 = testTable.addCommit("002").getFileIdWithInserts("2016/01/31", record2);
|
||||||
|
String fileId3 = testTable.addCommit("003").getFileIdWithInserts("2015/01/31", record4);
|
||||||
|
|
||||||
|
// We do the tag again
|
||||||
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
|
hoodieTable = HoodieFlinkTable.create(config, context, metaClient);
|
||||||
|
List<HoodieRecord> toTagRecords1 = new ArrayList<>();
|
||||||
|
for (HoodieKey key : keys) {
|
||||||
|
taggedRecords.add(new HoodieRecord(key, null));
|
||||||
|
}
|
||||||
|
|
||||||
|
taggedRecords = bloomIndex.tagLocation(toTagRecords1, context, hoodieTable);
|
||||||
|
recordLocations.clear();
|
||||||
|
for (HoodieRecord taggedRecord : taggedRecords) {
|
||||||
|
recordLocations.put(taggedRecord.getKey(), taggedRecord.isCurrentLocationKnown()
|
||||||
|
? Option.of(Pair.of(taggedRecord.getPartitionPath(), taggedRecord.getCurrentLocation().getFileId()))
|
||||||
|
: Option.empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check results
|
||||||
|
for (Map.Entry<HoodieKey, Option<Pair<String, String>>> record : recordLocations.entrySet()) {
|
||||||
|
if (record.getKey().getRecordKey().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")) {
|
||||||
|
assertTrue(record.getValue().isPresent());
|
||||||
|
assertEquals(fileId1, record.getValue().get().getRight());
|
||||||
|
} else if (record.getKey().getRecordKey().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")) {
|
||||||
|
assertTrue(record.getValue().isPresent());
|
||||||
|
if (record.getKey().getPartitionPath().equals("2015/01/31")) {
|
||||||
|
assertEquals(fileId3, record.getValue().get().getRight());
|
||||||
|
} else {
|
||||||
|
assertEquals(fileId2, record.getValue().get().getRight());
|
||||||
|
}
|
||||||
|
} else if (record.getKey().getRecordKey().equals("3eb5b87c-1fej-4edd-87b4-6ec96dc405a0")) {
|
||||||
|
assertFalse(record.getValue().isPresent());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
|
||||||
|
@MethodSource("configParams")
|
||||||
|
public void testBloomFilterFalseError(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception {
|
||||||
|
// We have two hoodie records
|
||||||
|
String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
|
||||||
|
+ "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
|
||||||
|
String recordStr2 = "{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\","
|
||||||
|
+ "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
|
||||||
|
|
||||||
|
// We write record1 to a parquet file, using a bloom filter having both records
|
||||||
|
RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1);
|
||||||
|
HoodieRecord record1 = new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1);
|
||||||
|
RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2);
|
||||||
|
HoodieRecord record2 = new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2);
|
||||||
|
|
||||||
|
BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name());
|
||||||
|
filter.add(record2.getRecordKey());
|
||||||
|
HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(metaClient, SCHEMA, filter);
|
||||||
|
String fileId = testTable.addCommit("000").getFileIdWithInserts("2016/01/31", record1);
|
||||||
|
assertTrue(filter.mightContain(record1.getRecordKey()));
|
||||||
|
assertTrue(filter.mightContain(record2.getRecordKey()));
|
||||||
|
|
||||||
|
// We do the tag
|
||||||
|
List<HoodieRecord> records = asList(record1, record2);
|
||||||
|
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
|
||||||
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
|
HoodieTable table = HoodieFlinkTable.create(config, context, metaClient);
|
||||||
|
|
||||||
|
FlinkHoodieBloomIndex bloomIndex = new FlinkHoodieBloomIndex(config);
|
||||||
|
List<HoodieRecord> taggedRecords = bloomIndex.tagLocation(records, context, table);
|
||||||
|
|
||||||
|
// Check results
|
||||||
|
for (HoodieRecord record : taggedRecords) {
|
||||||
|
if (record.getKey().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")) {
|
||||||
|
assertEquals(record.getCurrentLocation().getFileId(), fileId);
|
||||||
|
} else if (record.getRecordKey().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")) {
|
||||||
|
assertFalse(record.isCurrentLocationKnown());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -18,21 +18,31 @@
|
|||||||
|
|
||||||
package org.apache.hudi.testutils;
|
package org.apache.hudi.testutils;
|
||||||
|
|
||||||
|
import org.apache.hudi.client.FlinkTaskContextSupplier;
|
||||||
|
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
||||||
|
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.model.HoodieTableType;
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
|
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||||
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
|
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
|
||||||
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||||
|
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
|
||||||
|
import org.apache.hudi.index.bloom.TestFlinkHoodieBloomIndex;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
|
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
|
||||||
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
|
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
|
||||||
import org.apache.flink.test.util.MiniClusterWithClientResource;
|
import org.apache.flink.test.util.MiniClusterWithClientResource;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.LocalFileSystem;
|
import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
|
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.TestInfo;
|
import org.junit.jupiter.api.TestInfo;
|
||||||
|
|
||||||
@@ -40,7 +50,11 @@ import java.io.IOException;
|
|||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The test harness for resource initialization and cleanup.
|
||||||
|
*/
|
||||||
public class HoodieFlinkClientTestHarness extends HoodieCommonTestHarness implements Serializable {
|
public class HoodieFlinkClientTestHarness extends HoodieCommonTestHarness implements Serializable {
|
||||||
|
|
||||||
protected static final Logger LOG = LogManager.getLogger(HoodieFlinkClientTestHarness.class);
|
protected static final Logger LOG = LogManager.getLogger(HoodieFlinkClientTestHarness.class);
|
||||||
@@ -48,6 +62,17 @@ public class HoodieFlinkClientTestHarness extends HoodieCommonTestHarness implem
|
|||||||
protected transient Configuration hadoopConf = null;
|
protected transient Configuration hadoopConf = null;
|
||||||
protected transient FileSystem fs;
|
protected transient FileSystem fs;
|
||||||
protected transient MiniClusterWithClientResource flinkCluster = null;
|
protected transient MiniClusterWithClientResource flinkCluster = null;
|
||||||
|
protected transient HoodieFlinkEngineContext context = null;
|
||||||
|
protected transient ExecutorService executorService;
|
||||||
|
protected transient HoodieFlinkWriteClient writeClient;
|
||||||
|
protected transient HoodieTableFileSystemView tableView;
|
||||||
|
|
||||||
|
protected final FlinkTaskContextSupplier supplier = new FlinkTaskContextSupplier(null);
|
||||||
|
|
||||||
|
// dfs
|
||||||
|
protected transient HdfsTestService hdfsTestService;
|
||||||
|
protected transient MiniDFSCluster dfsCluster;
|
||||||
|
protected transient DistributedFileSystem dfs;
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void setTestMethodName(TestInfo testInfo) {
|
public void setTestMethodName(TestInfo testInfo) {
|
||||||
@@ -69,6 +94,7 @@ public class HoodieFlinkClientTestHarness extends HoodieCommonTestHarness implem
|
|||||||
protected void initFileSystem() {
|
protected void initFileSystem() {
|
||||||
hadoopConf = new Configuration();
|
hadoopConf = new Configuration();
|
||||||
initFileSystemWithConfiguration(hadoopConf);
|
initFileSystemWithConfiguration(hadoopConf);
|
||||||
|
context = new HoodieFlinkEngineContext(supplier);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initFileSystemWithConfiguration(Configuration configuration) {
|
private void initFileSystemWithConfiguration(Configuration configuration) {
|
||||||
@@ -116,6 +142,19 @@ public class HoodieFlinkClientTestHarness extends HoodieCommonTestHarness implem
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanups resource group for the subclasses of {@link TestFlinkHoodieBloomIndex}.
|
||||||
|
*/
|
||||||
|
public void cleanupResources() throws java.io.IOException {
|
||||||
|
cleanupClients();
|
||||||
|
cleanupFlinkContexts();
|
||||||
|
cleanupTestDataGenerator();
|
||||||
|
cleanupFileSystem();
|
||||||
|
cleanupDFS();
|
||||||
|
cleanupExecutorService();
|
||||||
|
System.gc();
|
||||||
|
}
|
||||||
|
|
||||||
protected void cleanupFlinkMiniCluster() {
|
protected void cleanupFlinkMiniCluster() {
|
||||||
if (flinkCluster != null) {
|
if (flinkCluster != null) {
|
||||||
flinkCluster.after();
|
flinkCluster.after();
|
||||||
@@ -133,4 +172,59 @@ public class HoodieFlinkClientTestHarness extends HoodieCommonTestHarness implem
|
|||||||
valuesList.add(value);
|
valuesList.add(value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanups hoodie clients.
|
||||||
|
*/
|
||||||
|
protected void cleanupClients() throws java.io.IOException {
|
||||||
|
if (metaClient != null) {
|
||||||
|
metaClient = null;
|
||||||
|
}
|
||||||
|
if (writeClient != null) {
|
||||||
|
writeClient.close();
|
||||||
|
writeClient = null;
|
||||||
|
}
|
||||||
|
if (tableView != null) {
|
||||||
|
tableView.close();
|
||||||
|
tableView = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanups the distributed file system.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
protected void cleanupDFS() throws java.io.IOException {
|
||||||
|
if (hdfsTestService != null) {
|
||||||
|
hdfsTestService.stop();
|
||||||
|
dfsCluster.shutdown();
|
||||||
|
hdfsTestService = null;
|
||||||
|
dfsCluster = null;
|
||||||
|
dfs = null;
|
||||||
|
}
|
||||||
|
// Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
|
||||||
|
// same JVM
|
||||||
|
FileSystem.closeAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanups the executor service.
|
||||||
|
*/
|
||||||
|
protected void cleanupExecutorService() {
|
||||||
|
if (this.executorService != null) {
|
||||||
|
this.executorService.shutdownNow();
|
||||||
|
this.executorService = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanups Flink contexts.
|
||||||
|
*/
|
||||||
|
protected void cleanupFlinkContexts() {
|
||||||
|
if (context != null) {
|
||||||
|
LOG.info("Closing flink engine context used in previous test-case");
|
||||||
|
context = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,136 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.testutils;
|
||||||
|
|
||||||
|
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||||
|
import org.apache.hudi.common.bloom.BloomFilter;
|
||||||
|
import org.apache.hudi.common.bloom.BloomFilterFactory;
|
||||||
|
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
|
||||||
|
import org.apache.hudi.common.model.HoodieLogFile;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||||
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
|
import org.apache.hudi.common.table.log.HoodieLogFormat;
|
||||||
|
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
|
||||||
|
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
|
||||||
|
import org.apache.hudi.table.HoodieTable;
|
||||||
|
|
||||||
|
import org.apache.avro.Schema;
|
||||||
|
import org.apache.avro.generic.GenericRecord;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
public class HoodieFlinkWriteableTestTable extends HoodieWriteableTestTable {
|
||||||
|
private static final Logger LOG = LogManager.getLogger(HoodieFlinkWriteableTestTable.class);
|
||||||
|
|
||||||
|
private HoodieFlinkWriteableTestTable(String basePath, org.apache.hadoop.fs.FileSystem fs, HoodieTableMetaClient metaClient, Schema schema, BloomFilter filter) {
|
||||||
|
super(basePath, fs, metaClient, schema, filter);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static HoodieFlinkWriteableTestTable of(HoodieTableMetaClient metaClient, Schema schema, BloomFilter filter) {
|
||||||
|
return new HoodieFlinkWriteableTestTable(metaClient.getBasePath(), metaClient.getRawFs(), metaClient, schema, filter);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static HoodieFlinkWriteableTestTable of(HoodieTableMetaClient metaClient, Schema schema) {
|
||||||
|
BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name());
|
||||||
|
return of(metaClient, schema, filter);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static HoodieFlinkWriteableTestTable of(HoodieTable hoodieTable, Schema schema) {
|
||||||
|
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
|
||||||
|
return of(metaClient, schema);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static HoodieFlinkWriteableTestTable of(HoodieTable hoodieTable, Schema schema, BloomFilter filter) {
|
||||||
|
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
|
||||||
|
return of(metaClient, schema, filter);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HoodieFlinkWriteableTestTable addCommit(String instantTime) throws Exception {
|
||||||
|
return (HoodieFlinkWriteableTestTable) super.addCommit(instantTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HoodieFlinkWriteableTestTable forCommit(String instantTime) {
|
||||||
|
return (HoodieFlinkWriteableTestTable) super.forCommit(instantTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getFileIdWithInserts(String partition) throws Exception {
|
||||||
|
return getFileIdWithInserts(partition, new HoodieRecord[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getFileIdWithInserts(String partition, HoodieRecord... records) throws Exception {
|
||||||
|
return getFileIdWithInserts(partition, Arrays.asList(records));
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getFileIdWithInserts(String partition, List<HoodieRecord> records) throws Exception {
|
||||||
|
String fileId = java.util.UUID.randomUUID().toString();
|
||||||
|
withInserts(partition, fileId, records);
|
||||||
|
return fileId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public HoodieFlinkWriteableTestTable withInserts(String partition, String fileId) throws Exception {
|
||||||
|
return withInserts(partition, fileId, new HoodieRecord[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
public HoodieFlinkWriteableTestTable withInserts(String partition, String fileId, HoodieRecord... records) throws Exception {
|
||||||
|
return withInserts(partition, fileId, Arrays.asList(records));
|
||||||
|
}
|
||||||
|
|
||||||
|
public HoodieFlinkWriteableTestTable withInserts(String partition, String fileId, List<HoodieRecord> records) throws Exception {
|
||||||
|
return (HoodieFlinkWriteableTestTable) withInserts(partition, fileId, records, new org.apache.hudi.client.FlinkTaskContextSupplier(null));
|
||||||
|
}
|
||||||
|
|
||||||
|
public HoodieFlinkWriteableTestTable withLogAppends(List<HoodieRecord> records) throws Exception {
|
||||||
|
for (List<HoodieRecord> groupedRecords: records.stream().collect(Collectors.groupingBy(HoodieRecord::getCurrentLocation)).values()) {
|
||||||
|
appendRecordsToLogFile(groupedRecords);
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void appendRecordsToLogFile(List<HoodieRecord> groupedRecords) throws Exception {
|
||||||
|
String partitionPath = groupedRecords.get(0).getPartitionPath();
|
||||||
|
HoodieRecordLocation location = groupedRecords.get(0).getCurrentLocation();
|
||||||
|
try (HoodieLogFormat.Writer logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(basePath, partitionPath))
|
||||||
|
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId())
|
||||||
|
.overBaseCommit(location.getInstantTime()).withFs(fs).build()) {
|
||||||
|
Map<HeaderMetadataType, String> header = new java.util.HashMap<>();
|
||||||
|
header.put(HeaderMetadataType.INSTANT_TIME, location.getInstantTime());
|
||||||
|
header.put(HeaderMetadataType.SCHEMA, schema.toString());
|
||||||
|
logWriter.appendBlock(new HoodieAvroDataBlock(groupedRecords.stream().map(r -> {
|
||||||
|
try {
|
||||||
|
GenericRecord val = (GenericRecord) r.getData().getInsertValue(schema).get();
|
||||||
|
HoodieAvroUtils.addHoodieKeyToRecord(val, r.getRecordKey(), r.getPartitionPath(), "");
|
||||||
|
return (org.apache.avro.generic.IndexedRecord) val;
|
||||||
|
} catch (java.io.IOException e) {
|
||||||
|
LOG.warn("Failed to convert record " + r.toString(), e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}).collect(Collectors.toList()), header));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user