[HUDI-407] Adding Simple Index to Hoodie. (#1402)
This index finds the location by joining incoming records with records from base files.
This commit is contained in:
committed by
GitHub
parent
3c9da2e5f0
commit
29edf4b3b8
@@ -99,4 +99,8 @@ public class SparkConfigUtils {
|
|||||||
String fraction = properties.getProperty(MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP, DEFAULT_MAX_MEMORY_FRACTION_FOR_COMPACTION);
|
String fraction = properties.getProperty(MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP, DEFAULT_MAX_MEMORY_FRACTION_FOR_COMPACTION);
|
||||||
return getMaxMemoryAllowedForMerge(fraction);
|
return getMaxMemoryAllowedForMerge(fraction);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static StorageLevel getSimpleIndexInputStorageLevel(Properties properties) {
|
||||||
|
return StorageLevel.fromString(properties.getProperty(HoodieIndexConfig.SIMPLE_INDEX_INPUT_STORAGE_LEVEL));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -62,6 +62,12 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
|
|||||||
public static final String DEFAULT_BLOOM_INDEX_FILTER_TYPE = BloomFilterTypeCode.SIMPLE.name();
|
public static final String DEFAULT_BLOOM_INDEX_FILTER_TYPE = BloomFilterTypeCode.SIMPLE.name();
|
||||||
public static final String HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES = "hoodie.bloom.index.filter.dynamic.max.entries";
|
public static final String HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES = "hoodie.bloom.index.filter.dynamic.max.entries";
|
||||||
public static final String DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES = "100000";
|
public static final String DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES = "100000";
|
||||||
|
public static final String SIMPLE_INDEX_USE_CACHING_PROP = "hoodie.simple.index.use.caching";
|
||||||
|
public static final String DEFAULT_SIMPLE_INDEX_USE_CACHING = "true";
|
||||||
|
public static final String SIMPLE_INDEX_PARALLELISM_PROP = "hoodie.simple.index.parallelism";
|
||||||
|
public static final String DEFAULT_SIMPLE_INDEX_PARALLELISM = "0";
|
||||||
|
public static final String GLOBAL_SIMPLE_INDEX_PARALLELISM_PROP = "hoodie.global.simple.index.parallelism";
|
||||||
|
public static final String DEFAULT_GLOBAL_SIMPLE_INDEX_PARALLELISM = "0";
|
||||||
|
|
||||||
// 1B bloom filter checks happen in 250 seconds. 500ms to read a bloom filter.
|
// 1B bloom filter checks happen in 250 seconds. 500ms to read a bloom filter.
|
||||||
// 10M checks in 2500ms, thus amortizing the cost of reading bloom filter across partitions.
|
// 10M checks in 2500ms, thus amortizing the cost of reading bloom filter across partitions.
|
||||||
@@ -80,6 +86,8 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
|
|||||||
|
|
||||||
public static final String BLOOM_INDEX_INPUT_STORAGE_LEVEL = "hoodie.bloom.index.input.storage.level";
|
public static final String BLOOM_INDEX_INPUT_STORAGE_LEVEL = "hoodie.bloom.index.input.storage.level";
|
||||||
public static final String DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";
|
public static final String DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";
|
||||||
|
public static final String SIMPLE_INDEX_INPUT_STORAGE_LEVEL = "hoodie.simple.index.input.storage.level";
|
||||||
|
public static final String DEFAULT_SIMPLE_INDEX_INPUT_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Only applies if index type is GLOBAL_BLOOM.
|
* Only applies if index type is GLOBAL_BLOOM.
|
||||||
@@ -92,6 +100,9 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
|
|||||||
public static final String BLOOM_INDEX_UPDATE_PARTITION_PATH = "hoodie.bloom.index.update.partition.path";
|
public static final String BLOOM_INDEX_UPDATE_PARTITION_PATH = "hoodie.bloom.index.update.partition.path";
|
||||||
public static final String DEFAULT_BLOOM_INDEX_UPDATE_PARTITION_PATH = "false";
|
public static final String DEFAULT_BLOOM_INDEX_UPDATE_PARTITION_PATH = "false";
|
||||||
|
|
||||||
|
public static final String SIMPLE_INDEX_UPDATE_PARTITION_PATH = "hoodie.simple.index.update.partition.path";
|
||||||
|
public static final String DEFAULT_SIMPLE_INDEX_UPDATE_PARTITION_PATH = "false";
|
||||||
|
|
||||||
private HoodieIndexConfig(Properties props) {
|
private HoodieIndexConfig(Properties props) {
|
||||||
super(props);
|
super(props);
|
||||||
}
|
}
|
||||||
@@ -201,6 +212,31 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder withSimpleIndexParallelism(int parallelism) {
|
||||||
|
props.setProperty(SIMPLE_INDEX_PARALLELISM_PROP, String.valueOf(parallelism));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder simpleIndexUseCaching(boolean useCaching) {
|
||||||
|
props.setProperty(SIMPLE_INDEX_USE_CACHING_PROP, String.valueOf(useCaching));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder withSimpleIndexInputStorageLevel(String level) {
|
||||||
|
props.setProperty(SIMPLE_INDEX_INPUT_STORAGE_LEVEL, level);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder withGlobalSimpleIndexParallelism(int parallelism) {
|
||||||
|
props.setProperty(GLOBAL_SIMPLE_INDEX_PARALLELISM_PROP, String.valueOf(parallelism));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder withGlobalSimpleIndexUpdatePartitionPath(boolean updatePartitionPath) {
|
||||||
|
props.setProperty(SIMPLE_INDEX_UPDATE_PARTITION_PATH, String.valueOf(updatePartitionPath));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public HoodieIndexConfig build() {
|
public HoodieIndexConfig build() {
|
||||||
HoodieIndexConfig config = new HoodieIndexConfig(props);
|
HoodieIndexConfig config = new HoodieIndexConfig(props);
|
||||||
setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP), INDEX_TYPE_PROP, DEFAULT_INDEX_TYPE);
|
setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP), INDEX_TYPE_PROP, DEFAULT_INDEX_TYPE);
|
||||||
@@ -228,6 +264,16 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
|
|||||||
BLOOM_INDEX_FILTER_TYPE, DEFAULT_BLOOM_INDEX_FILTER_TYPE);
|
BLOOM_INDEX_FILTER_TYPE, DEFAULT_BLOOM_INDEX_FILTER_TYPE);
|
||||||
setDefaultOnCondition(props, !props.contains(HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES),
|
setDefaultOnCondition(props, !props.contains(HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES),
|
||||||
HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES, DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES);
|
HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES, DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES);
|
||||||
|
setDefaultOnCondition(props, !props.containsKey(SIMPLE_INDEX_PARALLELISM_PROP), SIMPLE_INDEX_PARALLELISM_PROP,
|
||||||
|
DEFAULT_SIMPLE_INDEX_PARALLELISM);
|
||||||
|
setDefaultOnCondition(props, !props.containsKey(SIMPLE_INDEX_USE_CACHING_PROP), SIMPLE_INDEX_USE_CACHING_PROP,
|
||||||
|
DEFAULT_SIMPLE_INDEX_USE_CACHING);
|
||||||
|
setDefaultOnCondition(props, !props.containsKey(SIMPLE_INDEX_INPUT_STORAGE_LEVEL), SIMPLE_INDEX_INPUT_STORAGE_LEVEL,
|
||||||
|
DEFAULT_SIMPLE_INDEX_INPUT_STORAGE_LEVEL);
|
||||||
|
setDefaultOnCondition(props, !props.containsKey(GLOBAL_SIMPLE_INDEX_PARALLELISM_PROP), GLOBAL_SIMPLE_INDEX_PARALLELISM_PROP,
|
||||||
|
DEFAULT_GLOBAL_SIMPLE_INDEX_PARALLELISM);
|
||||||
|
setDefaultOnCondition(props, !props.containsKey(SIMPLE_INDEX_UPDATE_PARTITION_PATH),
|
||||||
|
SIMPLE_INDEX_UPDATE_PARTITION_PATH, DEFAULT_SIMPLE_INDEX_UPDATE_PARTITION_PATH);
|
||||||
// Throws IllegalArgumentException if the value set is not a known Hoodie Index Type
|
// Throws IllegalArgumentException if the value set is not a known Hoodie Index Type
|
||||||
HoodieIndex.IndexType.valueOf(props.getProperty(INDEX_TYPE_PROP));
|
HoodieIndex.IndexType.valueOf(props.getProperty(INDEX_TYPE_PROP));
|
||||||
return config;
|
return config;
|
||||||
|
|||||||
@@ -441,6 +441,22 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH));
|
return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getSimpleIndexParallelism() {
|
||||||
|
return Integer.parseInt(props.getProperty(HoodieIndexConfig.SIMPLE_INDEX_PARALLELISM_PROP));
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean getSimpleIndexUseCaching() {
|
||||||
|
return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.SIMPLE_INDEX_USE_CACHING_PROP));
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getGlobalSimpleIndexParallelism() {
|
||||||
|
return Integer.parseInt(props.getProperty(HoodieIndexConfig.GLOBAL_SIMPLE_INDEX_PARALLELISM_PROP));
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean getGlobalSimpleIndexUpdatePartitionPath() {
|
||||||
|
return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.SIMPLE_INDEX_UPDATE_PARTITION_PATH));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* storage properties.
|
* storage properties.
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -32,6 +32,8 @@ import org.apache.hudi.exception.HoodieIndexException;
|
|||||||
import org.apache.hudi.index.bloom.HoodieBloomIndex;
|
import org.apache.hudi.index.bloom.HoodieBloomIndex;
|
||||||
import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex;
|
import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex;
|
||||||
import org.apache.hudi.index.hbase.HBaseIndex;
|
import org.apache.hudi.index.hbase.HBaseIndex;
|
||||||
|
import org.apache.hudi.index.simple.HoodieGlobalSimpleIndex;
|
||||||
|
import org.apache.hudi.index.simple.HoodieSimpleIndex;
|
||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
|
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
@@ -70,6 +72,10 @@ public abstract class HoodieIndex<T extends HoodieRecordPayload> implements Seri
|
|||||||
return new HoodieBloomIndex<>(config);
|
return new HoodieBloomIndex<>(config);
|
||||||
case GLOBAL_BLOOM:
|
case GLOBAL_BLOOM:
|
||||||
return new HoodieGlobalBloomIndex<>(config);
|
return new HoodieGlobalBloomIndex<>(config);
|
||||||
|
case SIMPLE:
|
||||||
|
return new HoodieSimpleIndex<>(config);
|
||||||
|
case GLOBAL_SIMPLE:
|
||||||
|
return new HoodieGlobalSimpleIndex<>(config);
|
||||||
default:
|
default:
|
||||||
throw new HoodieIndexException("Index type unspecified, set " + config.getIndexType());
|
throw new HoodieIndexException("Index type unspecified, set " + config.getIndexType());
|
||||||
}
|
}
|
||||||
@@ -87,7 +93,7 @@ public abstract class HoodieIndex<T extends HoodieRecordPayload> implements Seri
|
|||||||
* present).
|
* present).
|
||||||
*/
|
*/
|
||||||
public abstract JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc,
|
public abstract JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc,
|
||||||
HoodieTable<T> hoodieTable) throws HoodieIndexException;
|
HoodieTable<T> hoodieTable) throws HoodieIndexException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extracts the location of written records, and updates the index.
|
* Extracts the location of written records, and updates the index.
|
||||||
@@ -95,7 +101,7 @@ public abstract class HoodieIndex<T extends HoodieRecordPayload> implements Seri
|
|||||||
* TODO(vc): We may need to propagate the record as well in a WriteStatus class
|
* TODO(vc): We may need to propagate the record as well in a WriteStatus class
|
||||||
*/
|
*/
|
||||||
public abstract JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc,
|
public abstract JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc,
|
||||||
HoodieTable<T> hoodieTable) throws HoodieIndexException;
|
HoodieTable<T> hoodieTable) throws HoodieIndexException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Rollback the efffects of the commit made at instantTime.
|
* Rollback the efffects of the commit made at instantTime.
|
||||||
@@ -128,9 +134,10 @@ public abstract class HoodieIndex<T extends HoodieRecordPayload> implements Seri
|
|||||||
/**
|
/**
|
||||||
* Each index type should implement it's own logic to release any resources acquired during the process.
|
* Each index type should implement it's own logic to release any resources acquired during the process.
|
||||||
*/
|
*/
|
||||||
public void close() {}
|
public void close() {
|
||||||
|
}
|
||||||
|
|
||||||
public enum IndexType {
|
public enum IndexType {
|
||||||
HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM
|
HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,90 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.index;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||||
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
import org.apache.hudi.table.HoodieTable;
|
||||||
|
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static java.util.stream.Collectors.toList;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Hoodie Index Utilities.
|
||||||
|
*/
|
||||||
|
public class HoodieIndexUtils {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions.
|
||||||
|
*
|
||||||
|
* @param partitions list of partitions of interest
|
||||||
|
* @param jsc instance of {@link JavaSparkContext} to use
|
||||||
|
* @param hoodieTable instance of {@link HoodieTable} of interest
|
||||||
|
* @return the list of Pairs of partition path and fileId
|
||||||
|
*/
|
||||||
|
public static List<Pair<String, HoodieBaseFile>> getLatestBaseFilesForAllPartitions(final List<String> partitions,
|
||||||
|
final JavaSparkContext jsc,
|
||||||
|
final HoodieTable hoodieTable) {
|
||||||
|
return jsc.parallelize(partitions, Math.max(partitions.size(), 1))
|
||||||
|
.flatMap(partitionPath -> {
|
||||||
|
Option<HoodieInstant> latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline()
|
||||||
|
.filterCompletedInstants().lastInstant();
|
||||||
|
List<Pair<String, HoodieBaseFile>> filteredFiles = new ArrayList<>();
|
||||||
|
if (latestCommitTime.isPresent()) {
|
||||||
|
filteredFiles = hoodieTable.getBaseFileOnlyView()
|
||||||
|
.getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp())
|
||||||
|
.map(f -> Pair.of(partitionPath, f))
|
||||||
|
.collect(toList());
|
||||||
|
}
|
||||||
|
return filteredFiles.iterator();
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get tagged record for the passed in {@link HoodieRecord}.
|
||||||
|
*
|
||||||
|
* @param inputRecord instance of {@link HoodieRecord} for which tagging is requested
|
||||||
|
* @param location {@link HoodieRecordLocation} for the passed in {@link HoodieRecord}
|
||||||
|
* @return the tagged {@link HoodieRecord}
|
||||||
|
*/
|
||||||
|
public static HoodieRecord getTaggedRecord(HoodieRecord inputRecord, Option<HoodieRecordLocation> location) {
|
||||||
|
HoodieRecord record = inputRecord;
|
||||||
|
if (location.isPresent()) {
|
||||||
|
// When you have a record in multiple files in the same partition, then rowKeyRecordPairRDD
|
||||||
|
// will have 2 entries with the same exact in memory copy of the HoodieRecord and the 2
|
||||||
|
// separate filenames that the record is found in. This will result in setting
|
||||||
|
// currentLocation 2 times and it will fail the second time. So creating a new in memory
|
||||||
|
// copy of the hoodie record.
|
||||||
|
record = new HoodieRecord<>(inputRecord);
|
||||||
|
record.unseal();
|
||||||
|
record.setCurrentLocation(location.get());
|
||||||
|
record.seal();
|
||||||
|
}
|
||||||
|
return record;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -24,12 +24,12 @@ import org.apache.hudi.common.model.HoodieKey;
|
|||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.model.HoodieRecordLocation;
|
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.exception.MetadataNotFoundException;
|
import org.apache.hudi.exception.MetadataNotFoundException;
|
||||||
import org.apache.hudi.index.HoodieIndex;
|
import org.apache.hudi.index.HoodieIndex;
|
||||||
|
import org.apache.hudi.index.HoodieIndexUtils;
|
||||||
import org.apache.hudi.io.HoodieRangeInfoHandle;
|
import org.apache.hudi.io.HoodieRangeInfoHandle;
|
||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
|
|
||||||
@@ -52,6 +52,7 @@ import scala.Tuple2;
|
|||||||
import static java.util.stream.Collectors.groupingBy;
|
import static java.util.stream.Collectors.groupingBy;
|
||||||
import static java.util.stream.Collectors.mapping;
|
import static java.util.stream.Collectors.mapping;
|
||||||
import static java.util.stream.Collectors.toList;
|
import static java.util.stream.Collectors.toList;
|
||||||
|
import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Indexing mechanism based on bloom filter. Each parquet file includes its row_key bloom filter in its metadata.
|
* Indexing mechanism based on bloom filter. Each parquet file includes its row_key bloom filter in its metadata.
|
||||||
@@ -192,18 +193,9 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
|
|||||||
final HoodieTable hoodieTable) {
|
final HoodieTable hoodieTable) {
|
||||||
|
|
||||||
// Obtain the latest data files from all the partitions.
|
// Obtain the latest data files from all the partitions.
|
||||||
List<Pair<String, String>> partitionPathFileIDList =
|
List<Pair<String, String>> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, jsc, hoodieTable).stream()
|
||||||
jsc.parallelize(partitions, Math.max(partitions.size(), 1)).flatMap(partitionPath -> {
|
.map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
|
||||||
Option<HoodieInstant> latestCommitTime =
|
.collect(toList());
|
||||||
hoodieTable.getMetaClient().getCommitsTimeline().filterCompletedInstants().lastInstant();
|
|
||||||
List<Pair<String, String>> filteredFiles = new ArrayList<>();
|
|
||||||
if (latestCommitTime.isPresent()) {
|
|
||||||
filteredFiles = hoodieTable.getBaseFileOnlyView()
|
|
||||||
.getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp())
|
|
||||||
.map(f -> Pair.of(partitionPath, f.getFileId())).collect(toList());
|
|
||||||
}
|
|
||||||
return filteredFiles.iterator();
|
|
||||||
}).collect();
|
|
||||||
|
|
||||||
if (config.getBloomIndexPruneByRanges()) {
|
if (config.getBloomIndexPruneByRanges()) {
|
||||||
// also obtain file ranges, if range pruning is enabled
|
// also obtain file ranges, if range pruning is enabled
|
||||||
@@ -312,21 +304,6 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
|
|||||||
.collect(Collectors.toList()).iterator());
|
.collect(Collectors.toList()).iterator());
|
||||||
}
|
}
|
||||||
|
|
||||||
HoodieRecord<T> getTaggedRecord(HoodieRecord<T> inputRecord, Option<HoodieRecordLocation> location) {
|
|
||||||
HoodieRecord<T> record = inputRecord;
|
|
||||||
if (location.isPresent()) {
|
|
||||||
// When you have a record in multiple files in the same partition, then rowKeyRecordPairRDD
|
|
||||||
// will have 2 entries with the same exact in memory copy of the HoodieRecord and the 2
|
|
||||||
// separate filenames that the record is found in. This will result in setting
|
|
||||||
// currentLocation 2 times and it will fail the second time. So creating a new in memory
|
|
||||||
// copy of the hoodie record.
|
|
||||||
record = new HoodieRecord<>(inputRecord);
|
|
||||||
record.unseal();
|
|
||||||
record.setCurrentLocation(location.get());
|
|
||||||
record.seal();
|
|
||||||
}
|
|
||||||
return record;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tag the <rowKey, filename> back to the original HoodieRecord RDD.
|
* Tag the <rowKey, filename> back to the original HoodieRecord RDD.
|
||||||
@@ -338,7 +315,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
|
|||||||
// Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null),
|
// Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null),
|
||||||
// so we do left outer join.
|
// so we do left outer join.
|
||||||
return keyRecordPairRDD.leftOuterJoin(keyFilenamePairRDD).values()
|
return keyRecordPairRDD.leftOuterJoin(keyFilenamePairRDD).values()
|
||||||
.map(v1 -> getTaggedRecord(v1._1, Option.ofNullable(v1._2.orNull())));
|
.map(v1 -> HoodieIndexUtils.getTaggedRecord(v1._1, Option.ofNullable(v1._2.orNull())));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
|
|||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
|
import org.apache.hudi.index.HoodieIndexUtils;
|
||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
|
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
@@ -125,17 +126,17 @@ public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload> extends Hoodi
|
|||||||
HoodieRecord<T> emptyRecord = new HoodieRecord(recordLocationHoodieKeyPair.get()._2,
|
HoodieRecord<T> emptyRecord = new HoodieRecord(recordLocationHoodieKeyPair.get()._2,
|
||||||
new EmptyHoodieRecordPayload());
|
new EmptyHoodieRecordPayload());
|
||||||
// Tag the incoming record for inserting to the new partition
|
// Tag the incoming record for inserting to the new partition
|
||||||
HoodieRecord<T> taggedRecord = getTaggedRecord(hoodieRecord, Option.empty());
|
HoodieRecord<T> taggedRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty());
|
||||||
return Arrays.asList(emptyRecord, taggedRecord).iterator();
|
return Arrays.asList(emptyRecord, taggedRecord).iterator();
|
||||||
} else {
|
} else {
|
||||||
// Ignore the incoming record's partition, regardless of whether it differs from its old partition or not.
|
// Ignore the incoming record's partition, regardless of whether it differs from its old partition or not.
|
||||||
// When it differs, the record will still be updated at its old partition.
|
// When it differs, the record will still be updated at its old partition.
|
||||||
return Collections.singletonList(
|
return Collections.singletonList(
|
||||||
getTaggedRecord(new HoodieRecord<>(recordLocationHoodieKeyPair.get()._2, hoodieRecord.getData()),
|
(HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(new HoodieRecord<>(recordLocationHoodieKeyPair.get()._2, hoodieRecord.getData()),
|
||||||
Option.ofNullable(recordLocationHoodieKeyPair.get()._1))).iterator();
|
Option.ofNullable(recordLocationHoodieKeyPair.get()._1))).iterator();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return Collections.singletonList(getTaggedRecord(hoodieRecord, Option.empty())).iterator();
|
return Collections.singletonList((HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty())).iterator();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,169 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.index.simple;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
|
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
|
||||||
|
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||||
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
|
import org.apache.hudi.index.HoodieIndexUtils;
|
||||||
|
import org.apache.hudi.table.HoodieTable;
|
||||||
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A global simple index which reads interested fields(record key and partition path) from base files and
|
||||||
|
* joins with incoming records to find the tagged location.
|
||||||
|
*
|
||||||
|
* @param <T>
|
||||||
|
*/
|
||||||
|
public class HoodieGlobalSimpleIndex<T extends HoodieRecordPayload> extends HoodieSimpleIndex<T> {
|
||||||
|
|
||||||
|
public HoodieGlobalSimpleIndex(HoodieWriteConfig config) {
|
||||||
|
super(config);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc,
|
||||||
|
HoodieTable<T> hoodieTable) {
|
||||||
|
return tagLocationInternal(recordRDD, jsc, hoodieTable);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tags records location for incoming records.
|
||||||
|
*
|
||||||
|
* @param inputRecordRDD {@link JavaRDD} of incoming records
|
||||||
|
* @param jsc instance of {@link JavaSparkContext} to use
|
||||||
|
* @param hoodieTable instance of {@link HoodieTable} to use
|
||||||
|
* @return {@link JavaRDD} of records with record locations set
|
||||||
|
*/
|
||||||
|
protected JavaRDD<HoodieRecord<T>> tagLocationInternal(JavaRDD<HoodieRecord<T>> inputRecordRDD, JavaSparkContext jsc,
|
||||||
|
HoodieTable<T> hoodieTable) {
|
||||||
|
|
||||||
|
JavaPairRDD<String, HoodieRecord<T>> keyedInputRecordRDD = inputRecordRDD.mapToPair(entry -> new Tuple2<>(entry.getRecordKey(), entry));
|
||||||
|
JavaPairRDD<HoodieKey, HoodieRecordLocation> allRecordLocationsInTable = fetchAllRecordLocations(jsc, hoodieTable,
|
||||||
|
config.getGlobalSimpleIndexParallelism());
|
||||||
|
return getTaggedRecords(keyedInputRecordRDD, allRecordLocationsInTable);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetch record locations for passed in {@link HoodieKey}s.
|
||||||
|
*
|
||||||
|
* @param jsc instance of {@link JavaSparkContext} to use
|
||||||
|
* @param hoodieTable instance of {@link HoodieTable} of interest
|
||||||
|
* @param parallelism parallelism to use
|
||||||
|
* @return {@link JavaPairRDD} of {@link HoodieKey} and {@link HoodieRecordLocation}
|
||||||
|
*/
|
||||||
|
protected JavaPairRDD<HoodieKey, HoodieRecordLocation> fetchAllRecordLocations(JavaSparkContext jsc,
|
||||||
|
HoodieTable hoodieTable,
|
||||||
|
int parallelism) {
|
||||||
|
List<Pair<String, HoodieBaseFile>> latestBaseFiles = getAllBaseFilesInTable(jsc, hoodieTable);
|
||||||
|
return fetchRecordLocations(jsc, hoodieTable, parallelism, latestBaseFiles);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Load all files for all partitions as <Partition, filename> pair RDD.
|
||||||
|
*/
|
||||||
|
protected List<Pair<String, HoodieBaseFile>> getAllBaseFilesInTable(final JavaSparkContext jsc, final HoodieTable hoodieTable) {
|
||||||
|
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
|
||||||
|
try {
|
||||||
|
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), config.shouldAssumeDatePartitioning());
|
||||||
|
// Obtain the latest data files from all the partitions.
|
||||||
|
return getLatestBaseFilesForAllPartitions(allPartitionPaths, jsc, hoodieTable);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new HoodieIOException("Failed to load all partitions", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tag records with right {@link HoodieRecordLocation}.
|
||||||
|
*
|
||||||
|
* @param incomingRecords incoming {@link HoodieRecord}s
|
||||||
|
* @param existingRecords existing records with {@link HoodieRecordLocation}s
|
||||||
|
* @return {@link JavaRDD} of {@link HoodieRecord}s with tagged {@link HoodieRecordLocation}s
|
||||||
|
*/
|
||||||
|
private JavaRDD<HoodieRecord<T>> getTaggedRecords(JavaPairRDD<String, HoodieRecord<T>> incomingRecords, JavaPairRDD<HoodieKey, HoodieRecordLocation> existingRecords) {
|
||||||
|
JavaPairRDD<String, Pair<String, HoodieRecordLocation>> existingRecordByRecordKey = existingRecords
|
||||||
|
.mapToPair(entry -> new Tuple2<>(entry._1.getRecordKey(), Pair.of(entry._1.getPartitionPath(), entry._2)));
|
||||||
|
|
||||||
|
return incomingRecords.leftOuterJoin(existingRecordByRecordKey).values()
|
||||||
|
.flatMap(entry -> {
|
||||||
|
HoodieRecord<T> inputRecord = entry._1;
|
||||||
|
Option<Pair<String, HoodieRecordLocation>> partitionPathLocationPair = Option.ofNullable(entry._2.orNull());
|
||||||
|
List<HoodieRecord<T>> taggedRecords;
|
||||||
|
|
||||||
|
if (partitionPathLocationPair.isPresent()) {
|
||||||
|
String partitionPath = partitionPathLocationPair.get().getKey();
|
||||||
|
HoodieRecordLocation location = partitionPathLocationPair.get().getRight();
|
||||||
|
if (config.getGlobalSimpleIndexUpdatePartitionPath() && !(inputRecord.getPartitionPath().equals(partitionPath))) {
|
||||||
|
// Create an empty record to delete the record in the old partition
|
||||||
|
HoodieRecord<T> emptyRecord = new HoodieRecord(new HoodieKey(inputRecord.getRecordKey(), partitionPath), new EmptyHoodieRecordPayload());
|
||||||
|
// Tag the incoming record for inserting to the new partition
|
||||||
|
HoodieRecord<T> taggedRecord = (HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(inputRecord, Option.empty());
|
||||||
|
taggedRecords = Arrays.asList(emptyRecord, taggedRecord);
|
||||||
|
} else {
|
||||||
|
// Ignore the incoming record's partition, regardless of whether it differs from its old partition or not.
|
||||||
|
// When it differs, the record will still be updated at its old partition.
|
||||||
|
HoodieRecord<T> newRecord = new HoodieRecord<>(new HoodieKey(inputRecord.getRecordKey(), partitionPath), inputRecord.getData());
|
||||||
|
taggedRecords = Collections.singletonList((HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(newRecord, Option.ofNullable(location)));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
taggedRecords = Collections.singletonList((HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(inputRecord, Option.empty()));
|
||||||
|
}
|
||||||
|
return taggedRecords.iterator();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an RDD mapping each HoodieKey with a partitionPath/fileID which contains it. Option.Empty if the key is not.
|
||||||
|
* found.
|
||||||
|
*
|
||||||
|
* @param hoodieKeys keys to lookup
|
||||||
|
* @param jsc spark context
|
||||||
|
* @param hoodieTable hoodie table object
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public JavaPairRDD<HoodieKey, Option<Pair<String, String>>> fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys,
|
||||||
|
JavaSparkContext jsc,
|
||||||
|
HoodieTable<T> hoodieTable) {
|
||||||
|
return fetchRecordLocationInternal(hoodieKeys, jsc, hoodieTable, config.getGlobalSimpleIndexParallelism());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isGlobal() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,181 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.index.simple;
|
||||||
|
|
||||||
|
import org.apache.hudi.client.WriteStatus;
|
||||||
|
import org.apache.hudi.client.utils.SparkConfigUtils;
|
||||||
|
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||||
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
|
import org.apache.hudi.index.HoodieIndex;
|
||||||
|
import org.apache.hudi.index.HoodieIndexUtils;
|
||||||
|
import org.apache.hudi.io.HoodieKeyLocationFetchHandle;
|
||||||
|
import org.apache.hudi.table.HoodieTable;
|
||||||
|
|
||||||
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A simple index which reads interested fields(record key and partition path) from base files and
|
||||||
|
* joins with incoming records to find the tagged location.
|
||||||
|
*
|
||||||
|
* @param <T>
|
||||||
|
*/
|
||||||
|
public class HoodieSimpleIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
|
||||||
|
|
||||||
|
public HoodieSimpleIndex(HoodieWriteConfig config) {
|
||||||
|
super(config);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc,
|
||||||
|
HoodieTable<T> hoodieTable) {
|
||||||
|
return writeStatusRDD;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean rollbackCommit(String commitTime) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isGlobal() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean canIndexLogFiles() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isImplicitWithStorage() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc,
|
||||||
|
HoodieTable<T> hoodieTable) {
|
||||||
|
return tagLocationInternal(recordRDD, jsc, hoodieTable);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an RDD mapping each HoodieKey with a partitionPath/fileID which contains it. Option. Empty if the key is not
|
||||||
|
* found.
|
||||||
|
*
|
||||||
|
* @param hoodieKeys keys to lookup
|
||||||
|
* @param jsc spark context
|
||||||
|
* @param hoodieTable hoodie table object
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public JavaPairRDD<HoodieKey, Option<Pair<String, String>>> fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys,
|
||||||
|
JavaSparkContext jsc, HoodieTable<T> hoodieTable) {
|
||||||
|
|
||||||
|
return fetchRecordLocationInternal(hoodieKeys, jsc, hoodieTable, config.getSimpleIndexParallelism());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tags records location for incoming records.
|
||||||
|
*
|
||||||
|
* @param inputRecordRDD {@link JavaRDD} of incoming records
|
||||||
|
* @param jsc instance of {@link JavaSparkContext} to use
|
||||||
|
* @param hoodieTable instance of {@link HoodieTable} to use
|
||||||
|
* @return {@link JavaRDD} of records with record locations set
|
||||||
|
*/
|
||||||
|
protected JavaRDD<HoodieRecord<T>> tagLocationInternal(JavaRDD<HoodieRecord<T>> inputRecordRDD, JavaSparkContext jsc,
|
||||||
|
HoodieTable<T> hoodieTable) {
|
||||||
|
if (config.getSimpleIndexUseCaching()) {
|
||||||
|
inputRecordRDD.persist(SparkConfigUtils.getSimpleIndexInputStorageLevel(config.getProps()));
|
||||||
|
}
|
||||||
|
|
||||||
|
JavaPairRDD<HoodieKey, HoodieRecord<T>> keyedInputRecordRDD = inputRecordRDD.mapToPair(record -> new Tuple2<>(record.getKey(), record));
|
||||||
|
JavaPairRDD<HoodieKey, HoodieRecordLocation> existingLocationsOnTable = fetchRecordLocationsForAffectedPartitions(keyedInputRecordRDD.keys(), jsc, hoodieTable,
|
||||||
|
config.getSimpleIndexParallelism());
|
||||||
|
|
||||||
|
JavaRDD<HoodieRecord<T>> taggedRecordRDD = keyedInputRecordRDD.leftOuterJoin(existingLocationsOnTable)
|
||||||
|
.map(entry -> {
|
||||||
|
final HoodieRecord<T> untaggedRecord = entry._2._1;
|
||||||
|
final Option<HoodieRecordLocation> location = Option.ofNullable(entry._2._2.orNull());
|
||||||
|
return HoodieIndexUtils.getTaggedRecord(untaggedRecord, location);
|
||||||
|
});
|
||||||
|
|
||||||
|
if (config.getSimpleIndexUseCaching()) {
|
||||||
|
inputRecordRDD.unpersist();
|
||||||
|
}
|
||||||
|
return taggedRecordRDD;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetch record locations for passed in {@link JavaRDD} of HoodieKeys.
|
||||||
|
*
|
||||||
|
* @param lookupKeys {@link JavaRDD} of {@link HoodieKey}s
|
||||||
|
* @param jsc instance of {@link JavaSparkContext} to use
|
||||||
|
* @param hoodieTable instance of {@link HoodieTable} of interest
|
||||||
|
* @param parallelism parallelism to use
|
||||||
|
* @return Hoodiekeys mapped to partitionpath and filenames
|
||||||
|
*/
|
||||||
|
JavaPairRDD<HoodieKey, Option<Pair<String, String>>> fetchRecordLocationInternal(JavaRDD<HoodieKey> lookupKeys, JavaSparkContext jsc,
|
||||||
|
HoodieTable<T> hoodieTable, int parallelism) {
|
||||||
|
JavaPairRDD<HoodieKey, Option<HoodieRecordLocation>> keyLocationsRDD = lookupKeys.mapToPair(key -> new Tuple2<>(key, Option.empty()));
|
||||||
|
JavaPairRDD<HoodieKey, HoodieRecordLocation> existingRecords = fetchRecordLocationsForAffectedPartitions(lookupKeys, jsc, hoodieTable, parallelism);
|
||||||
|
|
||||||
|
return keyLocationsRDD.leftOuterJoin(existingRecords)
|
||||||
|
.mapToPair(entry -> {
|
||||||
|
final Option<HoodieRecordLocation> locationOpt = Option.ofNullable(entry._2._2.orNull());
|
||||||
|
final HoodieKey key = entry._1;
|
||||||
|
return locationOpt
|
||||||
|
.map(location -> new Tuple2<>(key, Option.of(Pair.of(key.getPartitionPath(), location.getFileId()))))
|
||||||
|
.orElse(new Tuple2<>(key, Option.empty()));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetch record locations for passed in {@link HoodieKey}s.
|
||||||
|
*
|
||||||
|
* @param hoodieKeys {@link JavaRDD} of {@link HoodieKey}s for which locations are fetched
|
||||||
|
* @param jsc instance of {@link JavaSparkContext} to use
|
||||||
|
* @param hoodieTable instance of {@link HoodieTable} of interest
|
||||||
|
* @param parallelism parallelism to use
|
||||||
|
* @return {@link JavaPairRDD} of {@link HoodieKey} and {@link HoodieRecordLocation}
|
||||||
|
*/
|
||||||
|
protected JavaPairRDD<HoodieKey, HoodieRecordLocation> fetchRecordLocationsForAffectedPartitions(JavaRDD<HoodieKey> hoodieKeys, JavaSparkContext jsc, HoodieTable<T> hoodieTable,
|
||||||
|
int parallelism) {
|
||||||
|
List<String> affectedPartitionPathList = hoodieKeys.map(HoodieKey::getPartitionPath).distinct().collect();
|
||||||
|
List<Pair<String, HoodieBaseFile>> latestBaseFiles = getLatestBaseFilesForAllPartitions(affectedPartitionPathList, jsc, hoodieTable);
|
||||||
|
return fetchRecordLocations(jsc, hoodieTable, parallelism, latestBaseFiles);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected JavaPairRDD<HoodieKey, HoodieRecordLocation> fetchRecordLocations(JavaSparkContext jsc, HoodieTable<T> hoodieTable, int parallelism, List<Pair<String, HoodieBaseFile>> baseFiles) {
|
||||||
|
int fetchParallelism = Math.max(1, Math.max(baseFiles.size(), parallelism));
|
||||||
|
return jsc.parallelize(baseFiles, fetchParallelism)
|
||||||
|
.flatMapToPair(partitionPathBaseFile -> new HoodieKeyLocationFetchHandle(config, hoodieTable, partitionPathBaseFile).locations());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,57 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.io;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||||
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
|
import org.apache.hudi.common.util.ParquetUtils;
|
||||||
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
|
import org.apache.hudi.table.HoodieTable;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
import java.util.Iterator;
|
||||||
|
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link HoodieRecordLocation} fetch handle for all records from {@link HoodieBaseFile} of interest.
|
||||||
|
*
|
||||||
|
* @param <T>
|
||||||
|
*/
|
||||||
|
public class HoodieKeyLocationFetchHandle<T extends HoodieRecordPayload> extends HoodieReadHandle<T> {
|
||||||
|
|
||||||
|
private final Pair<String, HoodieBaseFile> partitionPathBaseFilePair;
|
||||||
|
|
||||||
|
public HoodieKeyLocationFetchHandle(HoodieWriteConfig config, HoodieTable<T> hoodieTable,
|
||||||
|
Pair<String, HoodieBaseFile> partitionPathBaseFilePair) {
|
||||||
|
super(config, null, hoodieTable, Pair.of(partitionPathBaseFilePair.getLeft(), partitionPathBaseFilePair.getRight().getFileId()));
|
||||||
|
this.partitionPathBaseFilePair = partitionPathBaseFilePair;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Iterator<Tuple2<HoodieKey, HoodieRecordLocation>> locations() {
|
||||||
|
HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight();
|
||||||
|
return ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hoodieTable.getHadoopConf(), new Path(baseFile.getPath())).stream()
|
||||||
|
.map(entry -> new Tuple2<>(entry,
|
||||||
|
new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()))).iterator();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -18,15 +18,30 @@
|
|||||||
|
|
||||||
package org.apache.hudi.index;
|
package org.apache.hudi.index;
|
||||||
|
|
||||||
|
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||||
|
import org.apache.hudi.client.HoodieWriteClient;
|
||||||
import org.apache.hudi.client.WriteStatus;
|
import org.apache.hudi.client.WriteStatus;
|
||||||
import org.apache.hudi.common.HoodieClientTestHarness;
|
import org.apache.hudi.common.HoodieClientTestHarness;
|
||||||
|
import org.apache.hudi.common.HoodieClientTestUtils;
|
||||||
|
import org.apache.hudi.common.HoodieTestDataGenerator;
|
||||||
|
import org.apache.hudi.common.TestRawTripPayload;
|
||||||
|
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
|
||||||
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
|
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
|
||||||
import org.apache.hudi.common.model.HoodieKey;
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
|
import org.apache.hudi.common.model.HoodiePartitionMetadata;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
|
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
||||||
|
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
|
||||||
|
import org.apache.hudi.common.util.FileIOUtils;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||||
import org.apache.hudi.config.HoodieHBaseIndexConfig;
|
import org.apache.hudi.config.HoodieHBaseIndexConfig;
|
||||||
import org.apache.hudi.config.HoodieIndexConfig;
|
import org.apache.hudi.config.HoodieIndexConfig;
|
||||||
|
import org.apache.hudi.config.HoodieStorageConfig;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.exception.HoodieIndexException;
|
import org.apache.hudi.exception.HoodieIndexException;
|
||||||
@@ -34,63 +49,126 @@ import org.apache.hudi.index.HoodieIndex.IndexType;
|
|||||||
import org.apache.hudi.index.bloom.HoodieBloomIndex;
|
import org.apache.hudi.index.bloom.HoodieBloomIndex;
|
||||||
import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex;
|
import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex;
|
||||||
import org.apache.hudi.index.hbase.HBaseIndex;
|
import org.apache.hudi.index.hbase.HBaseIndex;
|
||||||
|
import org.apache.hudi.index.simple.HoodieSimpleIndex;
|
||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
|
|
||||||
|
import org.apache.avro.Schema;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
|
import org.junit.jupiter.params.provider.EnumSource;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
public class TestHoodieIndex extends HoodieClientTestHarness {
|
public class TestHoodieIndex extends HoodieClientTestHarness {
|
||||||
|
|
||||||
private HoodieWriteConfig.Builder clientConfigBuilder;
|
private final Random random = new Random();
|
||||||
private HoodieIndexConfig.Builder indexConfigBuilder;
|
private IndexType indexType;
|
||||||
|
private HoodieIndex index;
|
||||||
|
private HoodieWriteConfig config;
|
||||||
|
private HoodieWriteClient writeClient;
|
||||||
|
private String schemaStr;
|
||||||
|
private Schema schema;
|
||||||
|
|
||||||
@BeforeEach
|
private void setUp(IndexType indexType) throws Exception {
|
||||||
public void setUp() throws Exception {
|
setUp(indexType, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setUp(IndexType indexType, boolean initializeIndex) throws Exception {
|
||||||
|
this.indexType = indexType;
|
||||||
initSparkContexts("TestHoodieIndex");
|
initSparkContexts("TestHoodieIndex");
|
||||||
initPath();
|
initPath();
|
||||||
|
initTestDataGenerator();
|
||||||
|
initFileSystem();
|
||||||
|
// We have some records to be tagged (two different partitions)
|
||||||
|
schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
|
||||||
|
schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));
|
||||||
initMetaClient();
|
initMetaClient();
|
||||||
clientConfigBuilder = HoodieWriteConfig.newBuilder();
|
if (initializeIndex) {
|
||||||
indexConfigBuilder = HoodieIndexConfig.newBuilder();
|
instantiateIndex();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
public void tearDown() {
|
public void tearDown() throws IOException {
|
||||||
cleanupSparkContexts();
|
cleanupSparkContexts();
|
||||||
|
cleanupFileSystem();
|
||||||
cleanupMetaClient();
|
cleanupMetaClient();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void testCreateIndex() {
|
@EnumSource(value = IndexType.class, names = {"BLOOM", "GLOBAL_BLOOM", "SIMPLE", "GLOBAL_SIMPLE", "HBASE"})
|
||||||
// Different types
|
public void testCreateIndex(IndexType indexType) throws Exception {
|
||||||
HoodieWriteConfig config = clientConfigBuilder.withPath(basePath)
|
setUp(indexType, false);
|
||||||
.withIndexConfig(indexConfigBuilder.withIndexType(HoodieIndex.IndexType.HBASE)
|
HoodieWriteConfig.Builder clientConfigBuilder = HoodieWriteConfig.newBuilder();
|
||||||
.withHBaseIndexConfig(new HoodieHBaseIndexConfig.Builder().build()).build())
|
HoodieIndexConfig.Builder indexConfigBuilder = HoodieIndexConfig.newBuilder();
|
||||||
.build();
|
switch (indexType) {
|
||||||
assertTrue(HoodieIndex.createIndex(config) instanceof HBaseIndex);
|
case INMEMORY:
|
||||||
config = clientConfigBuilder.withPath(basePath)
|
config = clientConfigBuilder.withPath(basePath)
|
||||||
.withIndexConfig(indexConfigBuilder.withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
|
.withIndexConfig(indexConfigBuilder.withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
|
||||||
assertTrue(HoodieIndex.createIndex(config) instanceof InMemoryHashIndex);
|
assertTrue(HoodieIndex.createIndex(config) instanceof InMemoryHashIndex);
|
||||||
config = clientConfigBuilder.withPath(basePath)
|
break;
|
||||||
.withIndexConfig(indexConfigBuilder.withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
|
case BLOOM:
|
||||||
assertTrue(HoodieIndex.createIndex(config) instanceof HoodieBloomIndex);
|
config = clientConfigBuilder.withPath(basePath)
|
||||||
config = clientConfigBuilder.withPath(basePath)
|
.withIndexConfig(indexConfigBuilder.withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
|
||||||
.withIndexConfig(indexConfigBuilder.withIndexType(IndexType.GLOBAL_BLOOM).build()).build();
|
assertTrue(HoodieIndex.createIndex(config) instanceof HoodieBloomIndex);
|
||||||
assertTrue(HoodieIndex.createIndex(config) instanceof HoodieGlobalBloomIndex);
|
break;
|
||||||
|
case GLOBAL_BLOOM:
|
||||||
|
config = clientConfigBuilder.withPath(basePath)
|
||||||
|
.withIndexConfig(indexConfigBuilder.withIndexType(IndexType.GLOBAL_BLOOM).build()).build();
|
||||||
|
assertTrue(HoodieIndex.createIndex(config) instanceof HoodieGlobalBloomIndex);
|
||||||
|
break;
|
||||||
|
case SIMPLE:
|
||||||
|
config = clientConfigBuilder.withPath(basePath)
|
||||||
|
.withIndexConfig(indexConfigBuilder.withIndexType(IndexType.SIMPLE).build()).build();
|
||||||
|
assertTrue(HoodieIndex.createIndex(config) instanceof HoodieSimpleIndex);
|
||||||
|
break;
|
||||||
|
case HBASE:
|
||||||
|
config = clientConfigBuilder.withPath(basePath)
|
||||||
|
.withIndexConfig(indexConfigBuilder.withIndexType(HoodieIndex.IndexType.HBASE)
|
||||||
|
.withHBaseIndexConfig(new HoodieHBaseIndexConfig.Builder().build()).build())
|
||||||
|
.build();
|
||||||
|
assertTrue(HoodieIndex.createIndex(config) instanceof HBaseIndex);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
// no -op. just for checkstyle errors
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateDummyIndex() throws Exception {
|
||||||
|
setUp(IndexType.BLOOM, false);
|
||||||
|
HoodieWriteConfig.Builder clientConfigBuilder = HoodieWriteConfig.newBuilder();
|
||||||
|
HoodieIndexConfig.Builder indexConfigBuilder = HoodieIndexConfig.newBuilder();
|
||||||
config = clientConfigBuilder.withPath(basePath)
|
config = clientConfigBuilder.withPath(basePath)
|
||||||
.withIndexConfig(indexConfigBuilder.withIndexClass(DummyHoodieIndex.class.getName()).build()).build();
|
.withIndexConfig(indexConfigBuilder.withIndexClass(DummyHoodieIndex.class.getName()).build()).build();
|
||||||
assertTrue(HoodieIndex.createIndex(config) instanceof DummyHoodieIndex);
|
assertTrue(HoodieIndex.createIndex(config) instanceof DummyHoodieIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCreateIndex_withException() {
|
public void testCreateIndex_withException() throws Exception {
|
||||||
|
setUp(IndexType.BLOOM, false);
|
||||||
|
HoodieWriteConfig.Builder clientConfigBuilder = HoodieWriteConfig.newBuilder();
|
||||||
|
HoodieIndexConfig.Builder indexConfigBuilder = HoodieIndexConfig.newBuilder();
|
||||||
final HoodieWriteConfig config1 = clientConfigBuilder.withPath(basePath)
|
final HoodieWriteConfig config1 = clientConfigBuilder.withPath(basePath)
|
||||||
.withIndexConfig(indexConfigBuilder.withIndexClass(IndexWithConstructor.class.getName()).build()).build();
|
.withIndexConfig(indexConfigBuilder.withIndexClass(IndexWithConstructor.class.getName()).build()).build();
|
||||||
final Throwable thrown1 = assertThrows(HoodieException.class, () -> {
|
final Throwable thrown1 = assertThrows(HoodieException.class, () -> {
|
||||||
@@ -106,6 +184,385 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
|
|||||||
assertTrue(thrown2.getMessage().contains("Unable to instantiate class"));
|
assertTrue(thrown2.getMessage().contains("Unable to instantiate class"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@EnumSource(value = IndexType.class, names = {"BLOOM", "GLOBAL_BLOOM", "SIMPLE", "GLOBAL_SIMPLE"})
|
||||||
|
public void testSimpleTagLocationAndUpdate(IndexType indexType) throws Exception {
|
||||||
|
setUp(indexType);
|
||||||
|
String newCommitTime = "001";
|
||||||
|
int totalRecords = 10 + random.nextInt(20);
|
||||||
|
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, totalRecords);
|
||||||
|
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||||
|
|
||||||
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
|
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration());
|
||||||
|
|
||||||
|
// Test tagLocation without any entries in index
|
||||||
|
JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||||
|
assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
|
||||||
|
|
||||||
|
// Insert totalRecords records
|
||||||
|
writeClient.startCommitWithTime(newCommitTime);
|
||||||
|
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
|
||||||
|
assertNoWriteErrors(writeStatues.collect());
|
||||||
|
|
||||||
|
// Now tagLocation for these records, index should not tag them since it was a failed
|
||||||
|
// commit
|
||||||
|
javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||||
|
assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
|
||||||
|
// Now commit this & update location of records inserted and validate no errors
|
||||||
|
writeClient.commit(newCommitTime, writeStatues);
|
||||||
|
// Now tagLocation for these records, index should tag them correctly
|
||||||
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
|
hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration());
|
||||||
|
javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||||
|
Map<String, String> recordKeyToPartitionPathMap = new HashMap();
|
||||||
|
List<HoodieRecord> hoodieRecords = writeRecords.collect();
|
||||||
|
hoodieRecords.forEach(entry -> recordKeyToPartitionPathMap.put(entry.getRecordKey(), entry.getPartitionPath()));
|
||||||
|
|
||||||
|
assertEquals(totalRecords, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
|
||||||
|
assertEquals(totalRecords, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
|
||||||
|
assertEquals(totalRecords, javaRDD.filter(record -> (record.getCurrentLocation() != null
|
||||||
|
&& record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
|
||||||
|
javaRDD.foreach(entry -> assertEquals(recordKeyToPartitionPathMap.get(entry.getRecordKey()), entry.getPartitionPath(), "PartitionPath mismatch"));
|
||||||
|
|
||||||
|
JavaRDD<HoodieKey> hoodieKeyJavaRDD = writeRecords.map(entry -> entry.getKey());
|
||||||
|
JavaPairRDD<HoodieKey, Option<Pair<String, String>>> recordLocations = index.fetchRecordLocation(hoodieKeyJavaRDD, jsc, hoodieTable);
|
||||||
|
List<HoodieKey> hoodieKeys = hoodieKeyJavaRDD.collect();
|
||||||
|
assertEquals(totalRecords, recordLocations.collect().size());
|
||||||
|
assertEquals(totalRecords, recordLocations.map(record -> record._1).distinct().count());
|
||||||
|
recordLocations.foreach(entry -> assertTrue(hoodieKeys.contains(entry._1), "Missing HoodieKey"));
|
||||||
|
recordLocations.foreach(entry -> assertEquals(recordKeyToPartitionPathMap.get(entry._1.getRecordKey()), entry._1.getPartitionPath(), "PartitionPath mismatch"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@EnumSource(value = IndexType.class, names = {"BLOOM", "GLOBAL_BLOOM", "SIMPLE", "GLOBAL_SIMPLE"})
|
||||||
|
public void testTagLocationAndDuplicateUpdate(IndexType indexType) throws Exception {
|
||||||
|
setUp(indexType);
|
||||||
|
String newCommitTime = "001";
|
||||||
|
int totalRecords = 10 + random.nextInt(20);
|
||||||
|
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, totalRecords);
|
||||||
|
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||||
|
|
||||||
|
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration());
|
||||||
|
|
||||||
|
writeClient.startCommitWithTime(newCommitTime);
|
||||||
|
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
|
||||||
|
JavaRDD<HoodieRecord> javaRDD1 = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||||
|
|
||||||
|
// Duplicate upsert and ensure correctness is maintained
|
||||||
|
// We are trying to approximately imitate the case when the RDD is recomputed. For RDD creating, driver code is not
|
||||||
|
// recomputed. This includes the state transitions. We need to delete the inflight instance so that subsequent
|
||||||
|
// upsert will not run into conflicts.
|
||||||
|
metaClient.getFs().delete(new Path(metaClient.getMetaPath(), "001.inflight"));
|
||||||
|
|
||||||
|
writeClient.upsert(writeRecords, newCommitTime);
|
||||||
|
assertNoWriteErrors(writeStatues.collect());
|
||||||
|
|
||||||
|
// Now commit this & update location of records inserted and validate no errors
|
||||||
|
writeClient.commit(newCommitTime, writeStatues);
|
||||||
|
// Now tagLocation for these records, hbaseIndex should tag them correctly
|
||||||
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
|
hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration());
|
||||||
|
JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||||
|
|
||||||
|
Map<String, String> recordKeyToPartitionPathMap = new HashMap();
|
||||||
|
List<HoodieRecord> hoodieRecords = writeRecords.collect();
|
||||||
|
hoodieRecords.forEach(entry -> recordKeyToPartitionPathMap.put(entry.getRecordKey(), entry.getPartitionPath()));
|
||||||
|
|
||||||
|
assertEquals(totalRecords, javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size());
|
||||||
|
assertEquals(totalRecords, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
|
||||||
|
assertEquals(totalRecords, javaRDD.filter(record -> (record.getCurrentLocation() != null
|
||||||
|
&& record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
|
||||||
|
javaRDD.foreach(entry -> assertEquals(recordKeyToPartitionPathMap.get(entry.getRecordKey()), entry.getPartitionPath(), "PartitionPath mismatch"));
|
||||||
|
|
||||||
|
JavaRDD<HoodieKey> hoodieKeyJavaRDD = writeRecords.map(entry -> entry.getKey());
|
||||||
|
JavaPairRDD<HoodieKey, Option<Pair<String, String>>> recordLocations = index.fetchRecordLocation(hoodieKeyJavaRDD, jsc, hoodieTable);
|
||||||
|
List<HoodieKey> hoodieKeys = hoodieKeyJavaRDD.collect();
|
||||||
|
assertEquals(totalRecords, recordLocations.collect().size());
|
||||||
|
assertEquals(totalRecords, recordLocations.map(record -> record._1).distinct().count());
|
||||||
|
recordLocations.foreach(entry -> assertTrue(hoodieKeys.contains(entry._1), "Missing HoodieKey"));
|
||||||
|
recordLocations.foreach(entry -> assertEquals(recordKeyToPartitionPathMap.get(entry._1.getRecordKey()), entry._1.getPartitionPath(), "PartitionPath mismatch"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@EnumSource(value = IndexType.class, names = {"BLOOM", "GLOBAL_BLOOM", "SIMPLE", "GLOBAL_SIMPLE"})
|
||||||
|
public void testSimpleTagLocationAndUpdateWithRollback(IndexType indexType) throws Exception {
|
||||||
|
setUp(indexType);
|
||||||
|
String newCommitTime = writeClient.startCommit();
|
||||||
|
int totalRecords = 20 + random.nextInt(20);
|
||||||
|
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, totalRecords);
|
||||||
|
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||||
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
|
|
||||||
|
// Insert 200 records
|
||||||
|
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
|
||||||
|
assertNoWriteErrors(writeStatues.collect());
|
||||||
|
|
||||||
|
// commit this upsert
|
||||||
|
writeClient.commit(newCommitTime, writeStatues);
|
||||||
|
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration());
|
||||||
|
|
||||||
|
// Now tagLocation for these records, hbaseIndex should tag them
|
||||||
|
JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||||
|
assert (javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == totalRecords);
|
||||||
|
|
||||||
|
// check tagged records are tagged with correct fileIds
|
||||||
|
List<String> fileIds = writeStatues.map(WriteStatus::getFileId).collect();
|
||||||
|
assert (javaRDD.filter(record -> record.getCurrentLocation().getFileId() == null).collect().size() == 0);
|
||||||
|
List<String> taggedFileIds = javaRDD.map(record -> record.getCurrentLocation().getFileId()).distinct().collect();
|
||||||
|
|
||||||
|
Map<String, String> recordKeyToPartitionPathMap = new HashMap();
|
||||||
|
List<HoodieRecord> hoodieRecords = writeRecords.collect();
|
||||||
|
hoodieRecords.forEach(entry -> recordKeyToPartitionPathMap.put(entry.getRecordKey(), entry.getPartitionPath()));
|
||||||
|
|
||||||
|
JavaRDD<HoodieKey> hoodieKeyJavaRDD = writeRecords.map(entry -> entry.getKey());
|
||||||
|
JavaPairRDD<HoodieKey, Option<Pair<String, String>>> recordLocations = index.fetchRecordLocation(hoodieKeyJavaRDD, jsc, hoodieTable);
|
||||||
|
List<HoodieKey> hoodieKeys = hoodieKeyJavaRDD.collect();
|
||||||
|
assertEquals(totalRecords, recordLocations.collect().size());
|
||||||
|
assertEquals(totalRecords, recordLocations.map(record -> record._1).distinct().count());
|
||||||
|
recordLocations.foreach(entry -> assertTrue(hoodieKeys.contains(entry._1), "Missing HoodieKey"));
|
||||||
|
recordLocations.foreach(entry -> assertEquals(recordKeyToPartitionPathMap.get(entry._1.getRecordKey()), entry._1.getPartitionPath(), "PartitionPath mismatch"));
|
||||||
|
|
||||||
|
// both lists should match
|
||||||
|
assertTrue(taggedFileIds.containsAll(fileIds) && fileIds.containsAll(taggedFileIds));
|
||||||
|
// Rollback the last commit
|
||||||
|
writeClient.rollback(newCommitTime);
|
||||||
|
|
||||||
|
hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration());
|
||||||
|
// Now tagLocation for these records, hbaseIndex should not tag them since it was a rolled
|
||||||
|
// back commit
|
||||||
|
javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||||
|
assert (javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 0);
|
||||||
|
assert (javaRDD.filter(record -> record.getCurrentLocation() != null).collect().size() == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@EnumSource(value = IndexType.class, names = {"BLOOM", "SIMPLE",})
|
||||||
|
public void testTagLocationAndFetchRecordLocations(IndexType indexType) throws Exception {
|
||||||
|
setUp(indexType);
|
||||||
|
String rowKey1 = UUID.randomUUID().toString();
|
||||||
|
String rowKey2 = UUID.randomUUID().toString();
|
||||||
|
String rowKey3 = UUID.randomUUID().toString();
|
||||||
|
String recordStr1 = "{\"_row_key\":\"" + rowKey1 + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
|
||||||
|
String recordStr2 = "{\"_row_key\":\"" + rowKey2 + "\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
|
||||||
|
String recordStr3 = "{\"_row_key\":\"" + rowKey3 + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
|
||||||
|
// place same row key under a different partition.
|
||||||
|
String recordStr4 = "{\"_row_key\":\"" + rowKey1 + "\",\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}";
|
||||||
|
TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1);
|
||||||
|
HoodieRecord record1 =
|
||||||
|
new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1);
|
||||||
|
TestRawTripPayload rowChange2 = new TestRawTripPayload(recordStr2);
|
||||||
|
HoodieRecord record2 =
|
||||||
|
new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2);
|
||||||
|
TestRawTripPayload rowChange3 = new TestRawTripPayload(recordStr3);
|
||||||
|
HoodieRecord record3 =
|
||||||
|
new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3);
|
||||||
|
TestRawTripPayload rowChange4 = new TestRawTripPayload(recordStr4);
|
||||||
|
HoodieRecord record4 =
|
||||||
|
new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4);
|
||||||
|
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record4));
|
||||||
|
|
||||||
|
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration());
|
||||||
|
|
||||||
|
JavaRDD<HoodieRecord> taggedRecordRDD = index.tagLocation(recordRDD, jsc, hoodieTable);
|
||||||
|
|
||||||
|
// Should not find any files
|
||||||
|
for (HoodieRecord record : taggedRecordRDD.collect()) {
|
||||||
|
assertFalse(record.isCurrentLocationKnown());
|
||||||
|
}
|
||||||
|
|
||||||
|
// We create three parquet file, each having one record. (two different partitions)
|
||||||
|
String filename1 =
|
||||||
|
HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record1), schema, null, true);
|
||||||
|
String filename2 =
|
||||||
|
HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record2), schema, null, true);
|
||||||
|
String filename3 =
|
||||||
|
HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Collections.singletonList(record4), schema, null, true);
|
||||||
|
|
||||||
|
// We do the tag again
|
||||||
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
|
hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration());
|
||||||
|
|
||||||
|
taggedRecordRDD = index.tagLocation(recordRDD, jsc, hoodieTable);
|
||||||
|
|
||||||
|
// Check results
|
||||||
|
for (HoodieRecord record : taggedRecordRDD.collect()) {
|
||||||
|
if (record.getRecordKey().equals(rowKey1)) {
|
||||||
|
if (record.getPartitionPath().equals("2015/01/31")) {
|
||||||
|
assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename3));
|
||||||
|
} else {
|
||||||
|
assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename1));
|
||||||
|
}
|
||||||
|
} else if (record.getRecordKey().equals(rowKey2)) {
|
||||||
|
assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename2));
|
||||||
|
} else if (record.getRecordKey().equals(rowKey3)) {
|
||||||
|
assertFalse(record.isCurrentLocationKnown());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
JavaPairRDD<HoodieKey, Option<Pair<String, String>>> recordLocations = index.fetchRecordLocation(recordRDD.map(entry -> entry.getKey()), jsc, hoodieTable);
|
||||||
|
|
||||||
|
for (Tuple2<HoodieKey, Option<Pair<String, String>>> entry : recordLocations.collect()) {
|
||||||
|
if (entry._1.getRecordKey().equals(rowKey1)) {
|
||||||
|
assertTrue(entry._2.isPresent(), "Row1 should have been present ");
|
||||||
|
if (entry._1.getPartitionPath().equals("2015/01/31")) {
|
||||||
|
assertTrue(entry._2.isPresent(), "Row1 should have been present ");
|
||||||
|
assertEquals(entry._2.get().getRight(), FSUtils.getFileId(filename3));
|
||||||
|
} else {
|
||||||
|
assertEquals(entry._2.get().getRight(), FSUtils.getFileId(filename1));
|
||||||
|
}
|
||||||
|
} else if (entry._1.getRecordKey().equals(rowKey2)) {
|
||||||
|
assertTrue(entry._2.isPresent(), "Row2 should have been present ");
|
||||||
|
assertEquals(entry._2.get().getRight(), FSUtils.getFileId(filename2));
|
||||||
|
} else if (entry._1.getRecordKey().equals(rowKey3)) {
|
||||||
|
assertFalse(entry._2.isPresent(), "Row3 should have been absent ");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@EnumSource(value = IndexType.class, names = {"GLOBAL_SIMPLE"})
|
||||||
|
public void testSimpleGlobalIndexTagLocationWhenShouldUpdatePartitionPath(IndexType indexType) throws Exception {
|
||||||
|
setUp(indexType);
|
||||||
|
config = getConfigBuilder()
|
||||||
|
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
|
||||||
|
.withGlobalSimpleIndexUpdatePartitionPath(true)
|
||||||
|
.withBloomIndexUpdatePartitionPath(true)
|
||||||
|
.build()).build();
|
||||||
|
writeClient = getHoodieWriteClient(config);
|
||||||
|
index = writeClient.getIndex();
|
||||||
|
|
||||||
|
// Create the original partition, and put a record, along with the meta file
|
||||||
|
// "2016/01/31": 1 file (1_0_20160131101010.parquet)
|
||||||
|
new File(basePath + "/2016/01/31").mkdirs();
|
||||||
|
new File(basePath + "/2016/01/31/" + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE).createNewFile();
|
||||||
|
|
||||||
|
// this record will be saved in table and will be tagged to an empty record
|
||||||
|
TestRawTripPayload originalPayload =
|
||||||
|
new TestRawTripPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
|
||||||
|
HoodieRecord originalRecord =
|
||||||
|
new HoodieRecord(new HoodieKey(originalPayload.getRowKey(), originalPayload.getPartitionPath()),
|
||||||
|
originalPayload);
|
||||||
|
|
||||||
|
/*
|
||||||
|
This record has the same record key as originalRecord but different time so different partition
|
||||||
|
Because GLOBAL_BLOOM_INDEX_SHOULD_UPDATE_PARTITION_PATH = true,
|
||||||
|
globalBloomIndex should
|
||||||
|
- tag the original partition of the originalRecord to an empty record for deletion, and
|
||||||
|
- tag the new partition of the incomingRecord
|
||||||
|
*/
|
||||||
|
TestRawTripPayload incomingPayload =
|
||||||
|
new TestRawTripPayload("{\"_row_key\":\"000\",\"time\":\"2016-02-31T03:16:41.415Z\",\"number\":12}");
|
||||||
|
HoodieRecord incomingRecord =
|
||||||
|
new HoodieRecord(new HoodieKey(incomingPayload.getRowKey(), incomingPayload.getPartitionPath()),
|
||||||
|
incomingPayload);
|
||||||
|
/*
|
||||||
|
This record has the same record key as originalRecord and the same partition
|
||||||
|
Though GLOBAL_BLOOM_INDEX_SHOULD_UPDATE_PARTITION_PATH = true,
|
||||||
|
globalBloomIndex should just tag the original partition
|
||||||
|
*/
|
||||||
|
TestRawTripPayload incomingPayloadSamePartition =
|
||||||
|
new TestRawTripPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T04:16:41.415Z\",\"number\":15}");
|
||||||
|
HoodieRecord incomingRecordSamePartition =
|
||||||
|
new HoodieRecord(
|
||||||
|
new HoodieKey(incomingPayloadSamePartition.getRowKey(), incomingPayloadSamePartition.getPartitionPath()),
|
||||||
|
incomingPayloadSamePartition);
|
||||||
|
|
||||||
|
// We have some records to be tagged (two different partitions)
|
||||||
|
String schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
|
||||||
|
Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));
|
||||||
|
|
||||||
|
HoodieClientTestUtils
|
||||||
|
.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(originalRecord), schema, null, false);
|
||||||
|
|
||||||
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
|
HoodieTable table = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration());
|
||||||
|
|
||||||
|
// Add some commits
|
||||||
|
new File(basePath + "/.hoodie").mkdirs();
|
||||||
|
|
||||||
|
// test against incoming record with a different partition
|
||||||
|
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Collections.singletonList(incomingRecord));
|
||||||
|
JavaRDD<HoodieRecord> taggedRecordRDD = index.tagLocation(recordRDD, jsc, table);
|
||||||
|
|
||||||
|
assertEquals(2, taggedRecordRDD.count());
|
||||||
|
for (HoodieRecord record : taggedRecordRDD.collect()) {
|
||||||
|
switch (record.getPartitionPath()) {
|
||||||
|
case "2016/01/31":
|
||||||
|
assertEquals("000", record.getRecordKey());
|
||||||
|
assertTrue(record.getData() instanceof EmptyHoodieRecordPayload);
|
||||||
|
break;
|
||||||
|
case "2016/02/31":
|
||||||
|
assertEquals("000", record.getRecordKey());
|
||||||
|
assertEquals(incomingPayload.getJsonData(), ((TestRawTripPayload) record.getData()).getJsonData());
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
assertFalse(true, String.format("Should not get partition path: %s", record.getPartitionPath()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// test against incoming record with the same partition
|
||||||
|
JavaRDD<HoodieRecord> recordRDDSamePartition = jsc
|
||||||
|
.parallelize(Collections.singletonList(incomingRecordSamePartition));
|
||||||
|
JavaRDD<HoodieRecord> taggedRecordRDDSamePartition = index.tagLocation(recordRDDSamePartition, jsc, table);
|
||||||
|
|
||||||
|
assertEquals(1, taggedRecordRDDSamePartition.count());
|
||||||
|
HoodieRecord record = taggedRecordRDDSamePartition.first();
|
||||||
|
assertEquals("000", record.getRecordKey());
|
||||||
|
assertEquals("2016/01/31", record.getPartitionPath());
|
||||||
|
assertEquals(incomingPayloadSamePartition.getJsonData(), ((TestRawTripPayload) record.getData()).getJsonData());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get Config builder with default configs set.
|
||||||
|
*
|
||||||
|
* @return Config Builder
|
||||||
|
*/
|
||||||
|
public HoodieWriteConfig.Builder getConfigBuilder() {
|
||||||
|
return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
|
||||||
|
}
|
||||||
|
|
||||||
|
HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
|
||||||
|
return getConfigBuilder(schemaStr, indexType);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get Config builder with default configs set.
|
||||||
|
*
|
||||||
|
* @return Config Builder
|
||||||
|
*/
|
||||||
|
private HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) {
|
||||||
|
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
|
||||||
|
.withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
|
||||||
|
.withWriteStatusClass(TestRawTripPayload.MetadataMergeWriteStatus.class)
|
||||||
|
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
|
||||||
|
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
|
||||||
|
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
|
||||||
|
.forTable("test-trip-table")
|
||||||
|
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
|
||||||
|
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
|
||||||
|
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
|
||||||
|
}
|
||||||
|
|
||||||
|
private HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
|
||||||
|
return new HoodieWriteClient(jsc, cfg, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void instantiateIndex() {
|
||||||
|
config = getConfigBuilder()
|
||||||
|
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
|
||||||
|
.build()).withAutoCommit(false).build();
|
||||||
|
writeClient = getHoodieWriteClient(config);
|
||||||
|
this.index = writeClient.getIndex();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertNoWriteErrors(List<WriteStatus> statuses) {
|
||||||
|
// Verify there are no errors
|
||||||
|
for (WriteStatus status : statuses) {
|
||||||
|
assertFalse(status.hasErrors());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static class DummyHoodieIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
|
public static class DummyHoodieIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
|
||||||
|
|
||||||
public DummyHoodieIndex(HoodieWriteConfig config) {
|
public DummyHoodieIndex(HoodieWriteConfig config) {
|
||||||
@@ -157,4 +614,5 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
|
|||||||
public static class IndexWithoutConstructor {
|
public static class IndexWithoutConstructor {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -475,5 +475,4 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,210 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.io;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.HoodieClientTestHarness;
|
||||||
|
import org.apache.hudi.common.HoodieClientTestUtils;
|
||||||
|
import org.apache.hudi.common.HoodieTestDataGenerator;
|
||||||
|
import org.apache.hudi.common.TestRawTripPayload;
|
||||||
|
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
|
||||||
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
|
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||||
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||||
|
import org.apache.hudi.common.model.HoodieTestUtils;
|
||||||
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
|
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
||||||
|
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
|
||||||
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||||
|
import org.apache.hudi.config.HoodieIndexConfig;
|
||||||
|
import org.apache.hudi.config.HoodieStorageConfig;
|
||||||
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
|
import org.apache.hudi.index.HoodieIndexUtils;
|
||||||
|
import org.apache.hudi.table.HoodieTable;
|
||||||
|
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.junit.jupiter.api.AfterEach;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
import static java.util.stream.Collectors.toList;
|
||||||
|
import static org.apache.hudi.common.HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests {@link HoodieKeyLocationFetchHandle}.
|
||||||
|
*/
|
||||||
|
public class TestHoodieKeyLocationFetchHandle extends HoodieClientTestHarness {
|
||||||
|
|
||||||
|
private HoodieWriteConfig config;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
initSparkContexts("TestRecordFetcher");
|
||||||
|
initPath();
|
||||||
|
initTestDataGenerator();
|
||||||
|
initFileSystem();
|
||||||
|
initMetaClient();
|
||||||
|
config = getConfigBuilder()
|
||||||
|
.withIndexConfig(HoodieIndexConfig.newBuilder()
|
||||||
|
.build()).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterEach
|
||||||
|
public void tearDown() throws IOException {
|
||||||
|
cleanupSparkContexts();
|
||||||
|
cleanupFileSystem();
|
||||||
|
cleanupMetaClient();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFetchHandle() throws Exception {
|
||||||
|
|
||||||
|
String commitTime = "000";
|
||||||
|
List<HoodieRecord> records = dataGen.generateInserts(commitTime, 100);
|
||||||
|
Map<String, List<HoodieRecord>> recordsPerPartiton = getRecordsPerPartition(records);
|
||||||
|
|
||||||
|
Map<Tuple2<String, String>, List<Tuple2<HoodieKey, HoodieRecordLocation>>> expectedList = writeToParquetAndGetExpectedRecordLocations(recordsPerPartiton);
|
||||||
|
|
||||||
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
|
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration());
|
||||||
|
|
||||||
|
Files.createDirectories(Paths.get(basePath, ".hoodie"));
|
||||||
|
|
||||||
|
List<Tuple2<String, HoodieBaseFile>> partitionPathFileIdPairs = loadAllFilesForPartitions(new ArrayList<>(recordsPerPartiton.keySet()), jsc, hoodieTable);
|
||||||
|
|
||||||
|
for (Tuple2<String, HoodieBaseFile> entry : partitionPathFileIdPairs) {
|
||||||
|
HoodieKeyLocationFetchHandle fetcherHandle = new HoodieKeyLocationFetchHandle(config, hoodieTable, Pair.of(entry._1, entry._2));
|
||||||
|
Iterator<Tuple2<HoodieKey, HoodieRecordLocation>> result = fetcherHandle.locations();
|
||||||
|
List<Tuple2<HoodieKey, HoodieRecordLocation>> actualList = new ArrayList<>();
|
||||||
|
result.forEachRemaining(actualList::add);
|
||||||
|
assertEquals(expectedList.get(new Tuple2<>(entry._1, entry._2.getFileId())), actualList);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, List<HoodieRecord>> getRecordsPerPartition(List<HoodieRecord> records) {
|
||||||
|
Map<String, List<HoodieRecord>> recordsPerPartiton = new HashMap<>();
|
||||||
|
for (HoodieRecord record : records) {
|
||||||
|
if (!recordsPerPartiton.containsKey(record.getPartitionPath())) {
|
||||||
|
recordsPerPartiton.put(record.getPartitionPath(), new ArrayList<>());
|
||||||
|
}
|
||||||
|
recordsPerPartiton.get(record.getPartitionPath()).add(record);
|
||||||
|
}
|
||||||
|
return recordsPerPartiton;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<Tuple2<String, String>, List<Tuple2<HoodieKey, HoodieRecordLocation>>> writeToParquetAndGetExpectedRecordLocations(
|
||||||
|
Map<String, List<HoodieRecord>> recordsPerPartiton) throws Exception {
|
||||||
|
Map<Tuple2<String, String>, List<Tuple2<HoodieKey, HoodieRecordLocation>>> expectedList = new HashMap<>();
|
||||||
|
for (Map.Entry<String, List<HoodieRecord>> entry : recordsPerPartiton.entrySet()) {
|
||||||
|
int totalRecordsPerPartition = entry.getValue().size();
|
||||||
|
int totalSlices = 1;
|
||||||
|
if (totalRecordsPerPartition > 5) {
|
||||||
|
totalSlices = totalRecordsPerPartition / 3;
|
||||||
|
}
|
||||||
|
int recordsPerFileSlice = totalRecordsPerPartition / totalSlices;
|
||||||
|
|
||||||
|
List<List<HoodieRecord>> recordsForFileSlices = new ArrayList<>();
|
||||||
|
recordsForFileSlices.add(new ArrayList<>());
|
||||||
|
int index = 0;
|
||||||
|
int count = 0;
|
||||||
|
for (HoodieRecord record : entry.getValue()) {
|
||||||
|
if (count < recordsPerFileSlice) {
|
||||||
|
recordsForFileSlices.get(index).add(record);
|
||||||
|
count++;
|
||||||
|
} else {
|
||||||
|
recordsForFileSlices.add(new ArrayList<>());
|
||||||
|
index++;
|
||||||
|
count = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (List<HoodieRecord> recordsPerSlice : recordsForFileSlices) {
|
||||||
|
Tuple2<String, String> fileIdInstantTimePair = writeToParquet(entry.getKey(), recordsPerSlice);
|
||||||
|
List<Tuple2<HoodieKey, HoodieRecordLocation>> expectedEntries = new ArrayList<>();
|
||||||
|
for (HoodieRecord record : recordsPerSlice) {
|
||||||
|
expectedEntries.add(new Tuple2<>(record.getKey(), new HoodieRecordLocation(fileIdInstantTimePair._2, fileIdInstantTimePair._1)));
|
||||||
|
}
|
||||||
|
expectedList.put(new Tuple2<>(entry.getKey(), fileIdInstantTimePair._1), expectedEntries);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return expectedList;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected List<Tuple2<String, HoodieBaseFile>> loadAllFilesForPartitions(List<String> partitions, final JavaSparkContext jsc,
|
||||||
|
final HoodieTable hoodieTable) {
|
||||||
|
|
||||||
|
// Obtain the latest data files from all the partitions.
|
||||||
|
List<Pair<String, HoodieBaseFile>> partitionPathFileIDList = HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, jsc, hoodieTable);
|
||||||
|
return partitionPathFileIDList.stream()
|
||||||
|
.map(pf -> new Tuple2<>(pf.getKey(), pf.getValue())).collect(toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get Config builder with default configs set.
|
||||||
|
*
|
||||||
|
* @return Config Builder
|
||||||
|
*/
|
||||||
|
public HoodieWriteConfig.Builder getConfigBuilder() {
|
||||||
|
return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get Config builder with default configs set.
|
||||||
|
*
|
||||||
|
* @return Config Builder
|
||||||
|
*/
|
||||||
|
private HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
|
||||||
|
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
|
||||||
|
.withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
|
||||||
|
.withWriteStatusClass(TestRawTripPayload.MetadataMergeWriteStatus.class)
|
||||||
|
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
|
||||||
|
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
|
||||||
|
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
|
||||||
|
.forTable("test-trip-table")
|
||||||
|
.withIndexConfig(HoodieIndexConfig.newBuilder().build())
|
||||||
|
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
|
||||||
|
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
|
||||||
|
}
|
||||||
|
|
||||||
|
private Tuple2<String, String> writeToParquet(String partitionPath, List<HoodieRecord> records) throws Exception {
|
||||||
|
Thread.sleep(100);
|
||||||
|
String instantTime = HoodieTestUtils.makeNewCommitTime();
|
||||||
|
String fileId = UUID.randomUUID().toString();
|
||||||
|
String filename = FSUtils.makeDataFileName(instantTime, "1-0-1", fileId);
|
||||||
|
HoodieTestUtils.createCommitFiles(basePath, instantTime);
|
||||||
|
HoodieClientTestUtils.writeParquetFile(basePath, partitionPath, filename, records, AVRO_SCHEMA_WITH_METADATA_FIELDS, null,
|
||||||
|
true);
|
||||||
|
return new Tuple2<>(fileId, instantTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -198,6 +198,24 @@ public class HoodieAvroUtils {
|
|||||||
return RECORD_KEY_SCHEMA;
|
return RECORD_KEY_SCHEMA;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetch schema for record key and partition path.
|
||||||
|
*/
|
||||||
|
public static Schema getRecordKeyPartitionPathSchema() {
|
||||||
|
List<Schema.Field> toBeAddedFields = new ArrayList<>();
|
||||||
|
Schema recordSchema = Schema.createRecord("HoodieRecordKey", "", "", false);
|
||||||
|
|
||||||
|
Schema.Field recordKeyField =
|
||||||
|
new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
|
||||||
|
Schema.Field partitionPathField =
|
||||||
|
new Schema.Field(HoodieRecord.PARTITION_PATH_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
|
||||||
|
|
||||||
|
toBeAddedFields.add(recordKeyField);
|
||||||
|
toBeAddedFields.add(partitionPathField);
|
||||||
|
recordSchema.setFields(toBeAddedFields);
|
||||||
|
return recordSchema;
|
||||||
|
}
|
||||||
|
|
||||||
public static GenericRecord addHoodieKeyToRecord(GenericRecord record, String recordKey, String partitionPath,
|
public static GenericRecord addHoodieKeyToRecord(GenericRecord record, String recordKey, String partitionPath,
|
||||||
String fileName) {
|
String fileName) {
|
||||||
record.put(HoodieRecord.FILENAME_METADATA_FIELD, fileName);
|
record.put(HoodieRecord.FILENAME_METADATA_FIELD, fileName);
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ import org.apache.hudi.common.bloom.BloomFilter;
|
|||||||
import org.apache.hudi.common.bloom.BloomFilterFactory;
|
import org.apache.hudi.common.bloom.BloomFilterFactory;
|
||||||
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
|
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
@@ -76,13 +77,27 @@ public class ParquetUtils {
|
|||||||
* @return Set Set of row keys matching candidateRecordKeys
|
* @return Set Set of row keys matching candidateRecordKeys
|
||||||
*/
|
*/
|
||||||
public static Set<String> filterParquetRowKeys(Configuration configuration, Path filePath, Set<String> filter) {
|
public static Set<String> filterParquetRowKeys(Configuration configuration, Path filePath, Set<String> filter) {
|
||||||
|
return filterParquetRowKeys(configuration, filePath, filter, HoodieAvroUtils.getRecordKeySchema());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read the rowKey list matching the given filter, from the given parquet file. If the filter is empty, then this will
|
||||||
|
* return all the rowkeys.
|
||||||
|
*
|
||||||
|
* @param filePath The parquet file path.
|
||||||
|
* @param configuration configuration to build fs object
|
||||||
|
* @param filter record keys filter
|
||||||
|
* @param readSchema schema of columns to be read
|
||||||
|
* @return Set Set of row keys matching candidateRecordKeys
|
||||||
|
*/
|
||||||
|
private static Set<String> filterParquetRowKeys(Configuration configuration, Path filePath, Set<String> filter,
|
||||||
|
Schema readSchema) {
|
||||||
Option<RecordKeysFilterFunction> filterFunction = Option.empty();
|
Option<RecordKeysFilterFunction> filterFunction = Option.empty();
|
||||||
if (filter != null && !filter.isEmpty()) {
|
if (filter != null && !filter.isEmpty()) {
|
||||||
filterFunction = Option.of(new RecordKeysFilterFunction(filter));
|
filterFunction = Option.of(new RecordKeysFilterFunction(filter));
|
||||||
}
|
}
|
||||||
Configuration conf = new Configuration(configuration);
|
Configuration conf = new Configuration(configuration);
|
||||||
conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf());
|
conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf());
|
||||||
Schema readSchema = HoodieAvroUtils.getRecordKeySchema();
|
|
||||||
AvroReadSupport.setAvroReadSchema(conf, readSchema);
|
AvroReadSupport.setAvroReadSchema(conf, readSchema);
|
||||||
AvroReadSupport.setRequestedProjection(conf, readSchema);
|
AvroReadSupport.setRequestedProjection(conf, readSchema);
|
||||||
Set<String> rowKeys = new HashSet<>();
|
Set<String> rowKeys = new HashSet<>();
|
||||||
@@ -105,6 +120,41 @@ public class ParquetUtils {
|
|||||||
return rowKeys;
|
return rowKeys;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetch {@link HoodieKey}s from the given parquet file.
|
||||||
|
*
|
||||||
|
* @param filePath The parquet file path.
|
||||||
|
* @param configuration configuration to build fs object
|
||||||
|
* @return {@link List} of {@link HoodieKey}s fetched from the parquet file
|
||||||
|
*/
|
||||||
|
public static List<HoodieKey> fetchRecordKeyPartitionPathFromParquet(Configuration configuration, Path filePath) {
|
||||||
|
List<HoodieKey> hoodieKeys = new ArrayList<>();
|
||||||
|
try {
|
||||||
|
if (!filePath.getFileSystem(configuration).exists(filePath)) {
|
||||||
|
return new ArrayList<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
Configuration conf = new Configuration(configuration);
|
||||||
|
conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf());
|
||||||
|
Schema readSchema = HoodieAvroUtils.getRecordKeyPartitionPathSchema();
|
||||||
|
AvroReadSupport.setAvroReadSchema(conf, readSchema);
|
||||||
|
AvroReadSupport.setRequestedProjection(conf, readSchema);
|
||||||
|
ParquetReader reader = AvroParquetReader.builder(filePath).withConf(conf).build();
|
||||||
|
Object obj = reader.read();
|
||||||
|
while (obj != null) {
|
||||||
|
if (obj instanceof GenericRecord) {
|
||||||
|
String recordKey = ((GenericRecord) obj).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
|
||||||
|
String partitionPath = ((GenericRecord) obj).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
|
||||||
|
hoodieKeys.add(new HoodieKey(recordKey, partitionPath));
|
||||||
|
obj = reader.read();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new HoodieIOException("Failed to read from Parquet file " + filePath, e);
|
||||||
|
}
|
||||||
|
return hoodieKeys;
|
||||||
|
}
|
||||||
|
|
||||||
public static ParquetMetadata readMetadata(Configuration conf, Path parquetFilePath) {
|
public static ParquetMetadata readMetadata(Configuration conf, Path parquetFilePath) {
|
||||||
ParquetMetadata footer;
|
ParquetMetadata footer;
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ import org.apache.hudi.avro.HoodieAvroWriteSupport;
|
|||||||
import org.apache.hudi.common.bloom.BloomFilter;
|
import org.apache.hudi.common.bloom.BloomFilter;
|
||||||
import org.apache.hudi.common.bloom.BloomFilterFactory;
|
import org.apache.hudi.common.bloom.BloomFilterFactory;
|
||||||
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
|
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
|
||||||
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.model.HoodieTestUtils;
|
import org.apache.hudi.common.model.HoodieTestUtils;
|
||||||
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
|
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
|
||||||
@@ -120,9 +121,38 @@ public class TestParquetUtils extends HoodieCommonTestHarness {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@MethodSource("bloomFilterTypeCodes")
|
||||||
|
public void testFetchRecordKeyPartitionPathFromParquet(String typeCode) throws Exception {
|
||||||
|
List<String> rowKeys = new ArrayList<>();
|
||||||
|
List<HoodieKey> expected = new ArrayList<>();
|
||||||
|
String partitionPath = "path1";
|
||||||
|
for (int i = 0; i < 1000; i++) {
|
||||||
|
String rowKey = UUID.randomUUID().toString();
|
||||||
|
rowKeys.add(rowKey);
|
||||||
|
expected.add(new HoodieKey(rowKey, partitionPath));
|
||||||
|
}
|
||||||
|
|
||||||
|
String filePath = basePath + "/test.parquet";
|
||||||
|
Schema schema = HoodieAvroUtils.getRecordKeyPartitionPathSchema();
|
||||||
|
writeParquetFile(typeCode, filePath, rowKeys, schema, true, partitionPath);
|
||||||
|
|
||||||
|
// Read and verify
|
||||||
|
List<HoodieKey> fetchedRows =
|
||||||
|
ParquetUtils.fetchRecordKeyPartitionPathFromParquet(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath));
|
||||||
|
assertEquals(rowKeys.size(), fetchedRows.size(), "Total count does not match");
|
||||||
|
|
||||||
|
for (HoodieKey entry : fetchedRows) {
|
||||||
|
assertTrue(expected.contains(entry), "Record key must be in the given filter");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void writeParquetFile(String typeCode, String filePath, List<String> rowKeys) throws Exception {
|
private void writeParquetFile(String typeCode, String filePath, List<String> rowKeys) throws Exception {
|
||||||
|
writeParquetFile(typeCode, filePath, rowKeys, HoodieAvroUtils.getRecordKeySchema(), false, "");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeParquetFile(String typeCode, String filePath, List<String> rowKeys, Schema schema, boolean addPartitionPathField, String partitionPath) throws Exception {
|
||||||
// Write out a parquet file
|
// Write out a parquet file
|
||||||
Schema schema = HoodieAvroUtils.getRecordKeySchema();
|
|
||||||
BloomFilter filter = BloomFilterFactory
|
BloomFilter filter = BloomFilterFactory
|
||||||
.createBloomFilter(1000, 0.0001, 10000, typeCode);
|
.createBloomFilter(1000, 0.0001, 10000, typeCode);
|
||||||
HoodieAvroWriteSupport writeSupport =
|
HoodieAvroWriteSupport writeSupport =
|
||||||
@@ -132,6 +162,9 @@ public class TestParquetUtils extends HoodieCommonTestHarness {
|
|||||||
for (String rowKey : rowKeys) {
|
for (String rowKey : rowKeys) {
|
||||||
GenericRecord rec = new GenericData.Record(schema);
|
GenericRecord rec = new GenericData.Record(schema);
|
||||||
rec.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, rowKey);
|
rec.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, rowKey);
|
||||||
|
if (addPartitionPathField) {
|
||||||
|
rec.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, partitionPath);
|
||||||
|
}
|
||||||
writer.write(rec);
|
writer.write(rec);
|
||||||
writeSupport.add(rowKey);
|
writeSupport.add(rowKey);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user