diff --git a/hudi-client/src/main/java/org/apache/hudi/client/utils/SparkConfigUtils.java b/hudi-client/src/main/java/org/apache/hudi/client/utils/SparkConfigUtils.java index 604be016d..0a6b6080b 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/utils/SparkConfigUtils.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/utils/SparkConfigUtils.java @@ -99,4 +99,8 @@ public class SparkConfigUtils { String fraction = properties.getProperty(MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP, DEFAULT_MAX_MEMORY_FRACTION_FOR_COMPACTION); return getMaxMemoryAllowedForMerge(fraction); } + + public static StorageLevel getSimpleIndexInputStorageLevel(Properties properties) { + return StorageLevel.fromString(properties.getProperty(HoodieIndexConfig.SIMPLE_INDEX_INPUT_STORAGE_LEVEL)); + } } diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java index df2177e36..4e974af3a 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java @@ -62,6 +62,12 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { public static final String DEFAULT_BLOOM_INDEX_FILTER_TYPE = BloomFilterTypeCode.SIMPLE.name(); public static final String HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES = "hoodie.bloom.index.filter.dynamic.max.entries"; public static final String DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES = "100000"; + public static final String SIMPLE_INDEX_USE_CACHING_PROP = "hoodie.simple.index.use.caching"; + public static final String DEFAULT_SIMPLE_INDEX_USE_CACHING = "true"; + public static final String SIMPLE_INDEX_PARALLELISM_PROP = "hoodie.simple.index.parallelism"; + public static final String DEFAULT_SIMPLE_INDEX_PARALLELISM = "0"; + public static final String GLOBAL_SIMPLE_INDEX_PARALLELISM_PROP = "hoodie.global.simple.index.parallelism"; + public static final String DEFAULT_GLOBAL_SIMPLE_INDEX_PARALLELISM = "0"; // 1B bloom filter checks happen in 250 seconds. 500ms to read a bloom filter. // 10M checks in 2500ms, thus amortizing the cost of reading bloom filter across partitions. @@ -80,6 +86,8 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { public static final String BLOOM_INDEX_INPUT_STORAGE_LEVEL = "hoodie.bloom.index.input.storage.level"; public static final String DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL = "MEMORY_AND_DISK_SER"; + public static final String SIMPLE_INDEX_INPUT_STORAGE_LEVEL = "hoodie.simple.index.input.storage.level"; + public static final String DEFAULT_SIMPLE_INDEX_INPUT_STORAGE_LEVEL = "MEMORY_AND_DISK_SER"; /** * Only applies if index type is GLOBAL_BLOOM. @@ -92,6 +100,9 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { public static final String BLOOM_INDEX_UPDATE_PARTITION_PATH = "hoodie.bloom.index.update.partition.path"; public static final String DEFAULT_BLOOM_INDEX_UPDATE_PARTITION_PATH = "false"; + public static final String SIMPLE_INDEX_UPDATE_PARTITION_PATH = "hoodie.simple.index.update.partition.path"; + public static final String DEFAULT_SIMPLE_INDEX_UPDATE_PARTITION_PATH = "false"; + private HoodieIndexConfig(Properties props) { super(props); } @@ -201,6 +212,31 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { return this; } + public Builder withSimpleIndexParallelism(int parallelism) { + props.setProperty(SIMPLE_INDEX_PARALLELISM_PROP, String.valueOf(parallelism)); + return this; + } + + public Builder simpleIndexUseCaching(boolean useCaching) { + props.setProperty(SIMPLE_INDEX_USE_CACHING_PROP, String.valueOf(useCaching)); + return this; + } + + public Builder withSimpleIndexInputStorageLevel(String level) { + props.setProperty(SIMPLE_INDEX_INPUT_STORAGE_LEVEL, level); + return this; + } + + public Builder withGlobalSimpleIndexParallelism(int parallelism) { + props.setProperty(GLOBAL_SIMPLE_INDEX_PARALLELISM_PROP, String.valueOf(parallelism)); + return this; + } + + public Builder withGlobalSimpleIndexUpdatePartitionPath(boolean updatePartitionPath) { + props.setProperty(SIMPLE_INDEX_UPDATE_PARTITION_PATH, String.valueOf(updatePartitionPath)); + return this; + } + public HoodieIndexConfig build() { HoodieIndexConfig config = new HoodieIndexConfig(props); setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP), INDEX_TYPE_PROP, DEFAULT_INDEX_TYPE); @@ -228,6 +264,16 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { BLOOM_INDEX_FILTER_TYPE, DEFAULT_BLOOM_INDEX_FILTER_TYPE); setDefaultOnCondition(props, !props.contains(HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES), HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES, DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES); + setDefaultOnCondition(props, !props.containsKey(SIMPLE_INDEX_PARALLELISM_PROP), SIMPLE_INDEX_PARALLELISM_PROP, + DEFAULT_SIMPLE_INDEX_PARALLELISM); + setDefaultOnCondition(props, !props.containsKey(SIMPLE_INDEX_USE_CACHING_PROP), SIMPLE_INDEX_USE_CACHING_PROP, + DEFAULT_SIMPLE_INDEX_USE_CACHING); + setDefaultOnCondition(props, !props.containsKey(SIMPLE_INDEX_INPUT_STORAGE_LEVEL), SIMPLE_INDEX_INPUT_STORAGE_LEVEL, + DEFAULT_SIMPLE_INDEX_INPUT_STORAGE_LEVEL); + setDefaultOnCondition(props, !props.containsKey(GLOBAL_SIMPLE_INDEX_PARALLELISM_PROP), GLOBAL_SIMPLE_INDEX_PARALLELISM_PROP, + DEFAULT_GLOBAL_SIMPLE_INDEX_PARALLELISM); + setDefaultOnCondition(props, !props.containsKey(SIMPLE_INDEX_UPDATE_PARTITION_PATH), + SIMPLE_INDEX_UPDATE_PARTITION_PATH, DEFAULT_SIMPLE_INDEX_UPDATE_PARTITION_PATH); // Throws IllegalArgumentException if the value set is not a known Hoodie Index Type HoodieIndex.IndexType.valueOf(props.getProperty(INDEX_TYPE_PROP)); return config; diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 2d98edca4..04676579b 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -441,6 +441,22 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH)); } + public int getSimpleIndexParallelism() { + return Integer.parseInt(props.getProperty(HoodieIndexConfig.SIMPLE_INDEX_PARALLELISM_PROP)); + } + + public boolean getSimpleIndexUseCaching() { + return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.SIMPLE_INDEX_USE_CACHING_PROP)); + } + + public int getGlobalSimpleIndexParallelism() { + return Integer.parseInt(props.getProperty(HoodieIndexConfig.GLOBAL_SIMPLE_INDEX_PARALLELISM_PROP)); + } + + public boolean getGlobalSimpleIndexUpdatePartitionPath() { + return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.SIMPLE_INDEX_UPDATE_PARTITION_PATH)); + } + /** * storage properties. */ diff --git a/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndex.java index 93fcc8930..03a965a95 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndex.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndex.java @@ -32,6 +32,8 @@ import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.index.bloom.HoodieBloomIndex; import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex; import org.apache.hudi.index.hbase.HBaseIndex; +import org.apache.hudi.index.simple.HoodieGlobalSimpleIndex; +import org.apache.hudi.index.simple.HoodieSimpleIndex; import org.apache.hudi.table.HoodieTable; import org.apache.spark.api.java.JavaPairRDD; @@ -70,6 +72,10 @@ public abstract class HoodieIndex implements Seri return new HoodieBloomIndex<>(config); case GLOBAL_BLOOM: return new HoodieGlobalBloomIndex<>(config); + case SIMPLE: + return new HoodieSimpleIndex<>(config); + case GLOBAL_SIMPLE: + return new HoodieGlobalSimpleIndex<>(config); default: throw new HoodieIndexException("Index type unspecified, set " + config.getIndexType()); } @@ -87,7 +93,7 @@ public abstract class HoodieIndex implements Seri * present). */ public abstract JavaRDD> tagLocation(JavaRDD> recordRDD, JavaSparkContext jsc, - HoodieTable hoodieTable) throws HoodieIndexException; + HoodieTable hoodieTable) throws HoodieIndexException; /** * Extracts the location of written records, and updates the index. @@ -95,7 +101,7 @@ public abstract class HoodieIndex implements Seri * TODO(vc): We may need to propagate the record as well in a WriteStatus class */ public abstract JavaRDD updateLocation(JavaRDD writeStatusRDD, JavaSparkContext jsc, - HoodieTable hoodieTable) throws HoodieIndexException; + HoodieTable hoodieTable) throws HoodieIndexException; /** * Rollback the efffects of the commit made at instantTime. @@ -128,9 +134,10 @@ public abstract class HoodieIndex implements Seri /** * Each index type should implement it's own logic to release any resources acquired during the process. */ - public void close() {} + public void close() { + } public enum IndexType { - HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM + HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE } } diff --git a/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java new file mode 100644 index 000000000..806dcf515 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index; + +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.table.HoodieTable; + +import org.apache.spark.api.java.JavaSparkContext; + +import java.util.ArrayList; +import java.util.List; + +import static java.util.stream.Collectors.toList; + +/** + * Hoodie Index Utilities. + */ +public class HoodieIndexUtils { + + /** + * Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions. + * + * @param partitions list of partitions of interest + * @param jsc instance of {@link JavaSparkContext} to use + * @param hoodieTable instance of {@link HoodieTable} of interest + * @return the list of Pairs of partition path and fileId + */ + public static List> getLatestBaseFilesForAllPartitions(final List partitions, + final JavaSparkContext jsc, + final HoodieTable hoodieTable) { + return jsc.parallelize(partitions, Math.max(partitions.size(), 1)) + .flatMap(partitionPath -> { + Option latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline() + .filterCompletedInstants().lastInstant(); + List> filteredFiles = new ArrayList<>(); + if (latestCommitTime.isPresent()) { + filteredFiles = hoodieTable.getBaseFileOnlyView() + .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp()) + .map(f -> Pair.of(partitionPath, f)) + .collect(toList()); + } + return filteredFiles.iterator(); + }) + .collect(); + } + + /** + * Get tagged record for the passed in {@link HoodieRecord}. + * + * @param inputRecord instance of {@link HoodieRecord} for which tagging is requested + * @param location {@link HoodieRecordLocation} for the passed in {@link HoodieRecord} + * @return the tagged {@link HoodieRecord} + */ + public static HoodieRecord getTaggedRecord(HoodieRecord inputRecord, Option location) { + HoodieRecord record = inputRecord; + if (location.isPresent()) { + // When you have a record in multiple files in the same partition, then rowKeyRecordPairRDD + // will have 2 entries with the same exact in memory copy of the HoodieRecord and the 2 + // separate filenames that the record is found in. This will result in setting + // currentLocation 2 times and it will fail the second time. So creating a new in memory + // copy of the hoodie record. + record = new HoodieRecord<>(inputRecord); + record.unseal(); + record.setCurrentLocation(location.get()); + record.seal(); + } + return record; + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java index 35ac526f4..ca960ef88 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java @@ -24,12 +24,12 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.MetadataNotFoundException; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.index.HoodieIndexUtils; import org.apache.hudi.io.HoodieRangeInfoHandle; import org.apache.hudi.table.HoodieTable; @@ -52,6 +52,7 @@ import scala.Tuple2; import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.mapping; import static java.util.stream.Collectors.toList; +import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions; /** * Indexing mechanism based on bloom filter. Each parquet file includes its row_key bloom filter in its metadata. @@ -192,18 +193,9 @@ public class HoodieBloomIndex extends HoodieIndex final HoodieTable hoodieTable) { // Obtain the latest data files from all the partitions. - List> partitionPathFileIDList = - jsc.parallelize(partitions, Math.max(partitions.size(), 1)).flatMap(partitionPath -> { - Option latestCommitTime = - hoodieTable.getMetaClient().getCommitsTimeline().filterCompletedInstants().lastInstant(); - List> filteredFiles = new ArrayList<>(); - if (latestCommitTime.isPresent()) { - filteredFiles = hoodieTable.getBaseFileOnlyView() - .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp()) - .map(f -> Pair.of(partitionPath, f.getFileId())).collect(toList()); - } - return filteredFiles.iterator(); - }).collect(); + List> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, jsc, hoodieTable).stream() + .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId())) + .collect(toList()); if (config.getBloomIndexPruneByRanges()) { // also obtain file ranges, if range pruning is enabled @@ -312,21 +304,6 @@ public class HoodieBloomIndex extends HoodieIndex .collect(Collectors.toList()).iterator()); } - HoodieRecord getTaggedRecord(HoodieRecord inputRecord, Option location) { - HoodieRecord record = inputRecord; - if (location.isPresent()) { - // When you have a record in multiple files in the same partition, then rowKeyRecordPairRDD - // will have 2 entries with the same exact in memory copy of the HoodieRecord and the 2 - // separate filenames that the record is found in. This will result in setting - // currentLocation 2 times and it will fail the second time. So creating a new in memory - // copy of the hoodie record. - record = new HoodieRecord<>(inputRecord); - record.unseal(); - record.setCurrentLocation(location.get()); - record.seal(); - } - return record; - } /** * Tag the back to the original HoodieRecord RDD. @@ -338,7 +315,7 @@ public class HoodieBloomIndex extends HoodieIndex // Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null), // so we do left outer join. return keyRecordPairRDD.leftOuterJoin(keyFilenamePairRDD).values() - .map(v1 -> getTaggedRecord(v1._1, Option.ofNullable(v1._2.orNull()))); + .map(v1 -> HoodieIndexUtils.getTaggedRecord(v1._1, Option.ofNullable(v1._2.orNull()))); } @Override diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java index a35d32fc0..1e57a3891 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.index.HoodieIndexUtils; import org.apache.hudi.table.HoodieTable; import org.apache.spark.api.java.JavaPairRDD; @@ -125,17 +126,17 @@ public class HoodieGlobalBloomIndex extends Hoodi HoodieRecord emptyRecord = new HoodieRecord(recordLocationHoodieKeyPair.get()._2, new EmptyHoodieRecordPayload()); // Tag the incoming record for inserting to the new partition - HoodieRecord taggedRecord = getTaggedRecord(hoodieRecord, Option.empty()); + HoodieRecord taggedRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty()); return Arrays.asList(emptyRecord, taggedRecord).iterator(); } else { // Ignore the incoming record's partition, regardless of whether it differs from its old partition or not. // When it differs, the record will still be updated at its old partition. return Collections.singletonList( - getTaggedRecord(new HoodieRecord<>(recordLocationHoodieKeyPair.get()._2, hoodieRecord.getData()), + (HoodieRecord) HoodieIndexUtils.getTaggedRecord(new HoodieRecord<>(recordLocationHoodieKeyPair.get()._2, hoodieRecord.getData()), Option.ofNullable(recordLocationHoodieKeyPair.get()._1))).iterator(); } } else { - return Collections.singletonList(getTaggedRecord(hoodieRecord, Option.empty())).iterator(); + return Collections.singletonList((HoodieRecord) HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty())).iterator(); } }); } diff --git a/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java new file mode 100644 index 000000000..bb1d8d69b --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.simple; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.EmptyHoodieRecordPayload; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.index.HoodieIndexUtils; +import org.apache.hudi.table.HoodieTable; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import scala.Tuple2; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions; + +/** + * A global simple index which reads interested fields(record key and partition path) from base files and + * joins with incoming records to find the tagged location. + * + * @param + */ +public class HoodieGlobalSimpleIndex extends HoodieSimpleIndex { + + public HoodieGlobalSimpleIndex(HoodieWriteConfig config) { + super(config); + } + + @Override + public JavaRDD> tagLocation(JavaRDD> recordRDD, JavaSparkContext jsc, + HoodieTable hoodieTable) { + return tagLocationInternal(recordRDD, jsc, hoodieTable); + } + + /** + * Tags records location for incoming records. + * + * @param inputRecordRDD {@link JavaRDD} of incoming records + * @param jsc instance of {@link JavaSparkContext} to use + * @param hoodieTable instance of {@link HoodieTable} to use + * @return {@link JavaRDD} of records with record locations set + */ + protected JavaRDD> tagLocationInternal(JavaRDD> inputRecordRDD, JavaSparkContext jsc, + HoodieTable hoodieTable) { + + JavaPairRDD> keyedInputRecordRDD = inputRecordRDD.mapToPair(entry -> new Tuple2<>(entry.getRecordKey(), entry)); + JavaPairRDD allRecordLocationsInTable = fetchAllRecordLocations(jsc, hoodieTable, + config.getGlobalSimpleIndexParallelism()); + return getTaggedRecords(keyedInputRecordRDD, allRecordLocationsInTable); + } + + /** + * Fetch record locations for passed in {@link HoodieKey}s. + * + * @param jsc instance of {@link JavaSparkContext} to use + * @param hoodieTable instance of {@link HoodieTable} of interest + * @param parallelism parallelism to use + * @return {@link JavaPairRDD} of {@link HoodieKey} and {@link HoodieRecordLocation} + */ + protected JavaPairRDD fetchAllRecordLocations(JavaSparkContext jsc, + HoodieTable hoodieTable, + int parallelism) { + List> latestBaseFiles = getAllBaseFilesInTable(jsc, hoodieTable); + return fetchRecordLocations(jsc, hoodieTable, parallelism, latestBaseFiles); + } + + /** + * Load all files for all partitions as pair RDD. + */ + protected List> getAllBaseFilesInTable(final JavaSparkContext jsc, final HoodieTable hoodieTable) { + HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); + try { + List allPartitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), config.shouldAssumeDatePartitioning()); + // Obtain the latest data files from all the partitions. + return getLatestBaseFilesForAllPartitions(allPartitionPaths, jsc, hoodieTable); + } catch (IOException e) { + throw new HoodieIOException("Failed to load all partitions", e); + } + } + + /** + * Tag records with right {@link HoodieRecordLocation}. + * + * @param incomingRecords incoming {@link HoodieRecord}s + * @param existingRecords existing records with {@link HoodieRecordLocation}s + * @return {@link JavaRDD} of {@link HoodieRecord}s with tagged {@link HoodieRecordLocation}s + */ + private JavaRDD> getTaggedRecords(JavaPairRDD> incomingRecords, JavaPairRDD existingRecords) { + JavaPairRDD> existingRecordByRecordKey = existingRecords + .mapToPair(entry -> new Tuple2<>(entry._1.getRecordKey(), Pair.of(entry._1.getPartitionPath(), entry._2))); + + return incomingRecords.leftOuterJoin(existingRecordByRecordKey).values() + .flatMap(entry -> { + HoodieRecord inputRecord = entry._1; + Option> partitionPathLocationPair = Option.ofNullable(entry._2.orNull()); + List> taggedRecords; + + if (partitionPathLocationPair.isPresent()) { + String partitionPath = partitionPathLocationPair.get().getKey(); + HoodieRecordLocation location = partitionPathLocationPair.get().getRight(); + if (config.getGlobalSimpleIndexUpdatePartitionPath() && !(inputRecord.getPartitionPath().equals(partitionPath))) { + // Create an empty record to delete the record in the old partition + HoodieRecord emptyRecord = new HoodieRecord(new HoodieKey(inputRecord.getRecordKey(), partitionPath), new EmptyHoodieRecordPayload()); + // Tag the incoming record for inserting to the new partition + HoodieRecord taggedRecord = (HoodieRecord) HoodieIndexUtils.getTaggedRecord(inputRecord, Option.empty()); + taggedRecords = Arrays.asList(emptyRecord, taggedRecord); + } else { + // Ignore the incoming record's partition, regardless of whether it differs from its old partition or not. + // When it differs, the record will still be updated at its old partition. + HoodieRecord newRecord = new HoodieRecord<>(new HoodieKey(inputRecord.getRecordKey(), partitionPath), inputRecord.getData()); + taggedRecords = Collections.singletonList((HoodieRecord) HoodieIndexUtils.getTaggedRecord(newRecord, Option.ofNullable(location))); + } + } else { + taggedRecords = Collections.singletonList((HoodieRecord) HoodieIndexUtils.getTaggedRecord(inputRecord, Option.empty())); + } + return taggedRecords.iterator(); + }); + } + + /** + * Returns an RDD mapping each HoodieKey with a partitionPath/fileID which contains it. Option.Empty if the key is not. + * found. + * + * @param hoodieKeys keys to lookup + * @param jsc spark context + * @param hoodieTable hoodie table object + */ + @Override + public JavaPairRDD>> fetchRecordLocation(JavaRDD hoodieKeys, + JavaSparkContext jsc, + HoodieTable hoodieTable) { + return fetchRecordLocationInternal(hoodieKeys, jsc, hoodieTable, config.getGlobalSimpleIndexParallelism()); + } + + @Override + public boolean isGlobal() { + return true; + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java new file mode 100644 index 000000000..af963aa96 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.simple; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.utils.SparkConfigUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.index.HoodieIndexUtils; +import org.apache.hudi.io.HoodieKeyLocationFetchHandle; +import org.apache.hudi.table.HoodieTable; + +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +import java.util.List; + +import scala.Tuple2; + +import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions; + +/** + * A simple index which reads interested fields(record key and partition path) from base files and + * joins with incoming records to find the tagged location. + * + * @param + */ +public class HoodieSimpleIndex extends HoodieIndex { + + public HoodieSimpleIndex(HoodieWriteConfig config) { + super(config); + } + + @Override + public JavaRDD updateLocation(JavaRDD writeStatusRDD, JavaSparkContext jsc, + HoodieTable hoodieTable) { + return writeStatusRDD; + } + + @Override + public boolean rollbackCommit(String commitTime) { + return true; + } + + @Override + public boolean isGlobal() { + return false; + } + + @Override + public boolean canIndexLogFiles() { + return false; + } + + @Override + public boolean isImplicitWithStorage() { + return true; + } + + @Override + public JavaRDD> tagLocation(JavaRDD> recordRDD, JavaSparkContext jsc, + HoodieTable hoodieTable) { + return tagLocationInternal(recordRDD, jsc, hoodieTable); + } + + /** + * Returns an RDD mapping each HoodieKey with a partitionPath/fileID which contains it. Option. Empty if the key is not + * found. + * + * @param hoodieKeys keys to lookup + * @param jsc spark context + * @param hoodieTable hoodie table object + */ + @Override + public JavaPairRDD>> fetchRecordLocation(JavaRDD hoodieKeys, + JavaSparkContext jsc, HoodieTable hoodieTable) { + + return fetchRecordLocationInternal(hoodieKeys, jsc, hoodieTable, config.getSimpleIndexParallelism()); + } + + /** + * Tags records location for incoming records. + * + * @param inputRecordRDD {@link JavaRDD} of incoming records + * @param jsc instance of {@link JavaSparkContext} to use + * @param hoodieTable instance of {@link HoodieTable} to use + * @return {@link JavaRDD} of records with record locations set + */ + protected JavaRDD> tagLocationInternal(JavaRDD> inputRecordRDD, JavaSparkContext jsc, + HoodieTable hoodieTable) { + if (config.getSimpleIndexUseCaching()) { + inputRecordRDD.persist(SparkConfigUtils.getSimpleIndexInputStorageLevel(config.getProps())); + } + + JavaPairRDD> keyedInputRecordRDD = inputRecordRDD.mapToPair(record -> new Tuple2<>(record.getKey(), record)); + JavaPairRDD existingLocationsOnTable = fetchRecordLocationsForAffectedPartitions(keyedInputRecordRDD.keys(), jsc, hoodieTable, + config.getSimpleIndexParallelism()); + + JavaRDD> taggedRecordRDD = keyedInputRecordRDD.leftOuterJoin(existingLocationsOnTable) + .map(entry -> { + final HoodieRecord untaggedRecord = entry._2._1; + final Option location = Option.ofNullable(entry._2._2.orNull()); + return HoodieIndexUtils.getTaggedRecord(untaggedRecord, location); + }); + + if (config.getSimpleIndexUseCaching()) { + inputRecordRDD.unpersist(); + } + return taggedRecordRDD; + } + + /** + * Fetch record locations for passed in {@link JavaRDD} of HoodieKeys. + * + * @param lookupKeys {@link JavaRDD} of {@link HoodieKey}s + * @param jsc instance of {@link JavaSparkContext} to use + * @param hoodieTable instance of {@link HoodieTable} of interest + * @param parallelism parallelism to use + * @return Hoodiekeys mapped to partitionpath and filenames + */ + JavaPairRDD>> fetchRecordLocationInternal(JavaRDD lookupKeys, JavaSparkContext jsc, + HoodieTable hoodieTable, int parallelism) { + JavaPairRDD> keyLocationsRDD = lookupKeys.mapToPair(key -> new Tuple2<>(key, Option.empty())); + JavaPairRDD existingRecords = fetchRecordLocationsForAffectedPartitions(lookupKeys, jsc, hoodieTable, parallelism); + + return keyLocationsRDD.leftOuterJoin(existingRecords) + .mapToPair(entry -> { + final Option locationOpt = Option.ofNullable(entry._2._2.orNull()); + final HoodieKey key = entry._1; + return locationOpt + .map(location -> new Tuple2<>(key, Option.of(Pair.of(key.getPartitionPath(), location.getFileId())))) + .orElse(new Tuple2<>(key, Option.empty())); + }); + } + + /** + * Fetch record locations for passed in {@link HoodieKey}s. + * + * @param hoodieKeys {@link JavaRDD} of {@link HoodieKey}s for which locations are fetched + * @param jsc instance of {@link JavaSparkContext} to use + * @param hoodieTable instance of {@link HoodieTable} of interest + * @param parallelism parallelism to use + * @return {@link JavaPairRDD} of {@link HoodieKey} and {@link HoodieRecordLocation} + */ + protected JavaPairRDD fetchRecordLocationsForAffectedPartitions(JavaRDD hoodieKeys, JavaSparkContext jsc, HoodieTable hoodieTable, + int parallelism) { + List affectedPartitionPathList = hoodieKeys.map(HoodieKey::getPartitionPath).distinct().collect(); + List> latestBaseFiles = getLatestBaseFilesForAllPartitions(affectedPartitionPathList, jsc, hoodieTable); + return fetchRecordLocations(jsc, hoodieTable, parallelism, latestBaseFiles); + } + + protected JavaPairRDD fetchRecordLocations(JavaSparkContext jsc, HoodieTable hoodieTable, int parallelism, List> baseFiles) { + int fetchParallelism = Math.max(1, Math.max(baseFiles.size(), parallelism)); + return jsc.parallelize(baseFiles, fetchParallelism) + .flatMapToPair(partitionPathBaseFile -> new HoodieKeyLocationFetchHandle(config, hoodieTable, partitionPathBaseFile).locations()); + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java new file mode 100644 index 000000000..3aa13987e --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.ParquetUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.Path; + +import java.util.Iterator; + +import scala.Tuple2; + +/** + * {@link HoodieRecordLocation} fetch handle for all records from {@link HoodieBaseFile} of interest. + * + * @param + */ +public class HoodieKeyLocationFetchHandle extends HoodieReadHandle { + + private final Pair partitionPathBaseFilePair; + + public HoodieKeyLocationFetchHandle(HoodieWriteConfig config, HoodieTable hoodieTable, + Pair partitionPathBaseFilePair) { + super(config, null, hoodieTable, Pair.of(partitionPathBaseFilePair.getLeft(), partitionPathBaseFilePair.getRight().getFileId())); + this.partitionPathBaseFilePair = partitionPathBaseFilePair; + } + + public Iterator> locations() { + HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight(); + return ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hoodieTable.getHadoopConf(), new Path(baseFile.getPath())).stream() + .map(entry -> new Tuple2<>(entry, + new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()))).iterator(); + } +} diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java index 9e93da305..58a703176 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java @@ -18,15 +18,30 @@ package org.apache.hudi.index; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.HoodieClientTestHarness; +import org.apache.hudi.common.HoodieClientTestUtils; +import org.apache.hudi.common.HoodieTestDataGenerator; +import org.apache.hudi.common.TestRawTripPayload; +import org.apache.hudi.common.fs.ConsistencyGuardConfig; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.EmptyHoodieRecordPayload; import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.table.view.FileSystemViewStorageType; +import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieHBaseIndexConfig; import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIndexException; @@ -34,63 +49,126 @@ import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.index.bloom.HoodieBloomIndex; import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex; import org.apache.hudi.index.hbase.HBaseIndex; +import org.apache.hudi.index.simple.HoodieSimpleIndex; import org.apache.hudi.table.HoodieTable; +import org.apache.avro.Schema; +import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; + +import scala.Tuple2; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class TestHoodieIndex extends HoodieClientTestHarness { - private HoodieWriteConfig.Builder clientConfigBuilder; - private HoodieIndexConfig.Builder indexConfigBuilder; + private final Random random = new Random(); + private IndexType indexType; + private HoodieIndex index; + private HoodieWriteConfig config; + private HoodieWriteClient writeClient; + private String schemaStr; + private Schema schema; - @BeforeEach - public void setUp() throws Exception { + private void setUp(IndexType indexType) throws Exception { + setUp(indexType, true); + } + + private void setUp(IndexType indexType, boolean initializeIndex) throws Exception { + this.indexType = indexType; initSparkContexts("TestHoodieIndex"); initPath(); + initTestDataGenerator(); + initFileSystem(); + // We have some records to be tagged (two different partitions) + schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt")); + schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr)); initMetaClient(); - clientConfigBuilder = HoodieWriteConfig.newBuilder(); - indexConfigBuilder = HoodieIndexConfig.newBuilder(); + if (initializeIndex) { + instantiateIndex(); + } } @AfterEach - public void tearDown() { + public void tearDown() throws IOException { cleanupSparkContexts(); + cleanupFileSystem(); cleanupMetaClient(); } - @Test - public void testCreateIndex() { - // Different types - HoodieWriteConfig config = clientConfigBuilder.withPath(basePath) - .withIndexConfig(indexConfigBuilder.withIndexType(HoodieIndex.IndexType.HBASE) - .withHBaseIndexConfig(new HoodieHBaseIndexConfig.Builder().build()).build()) - .build(); - assertTrue(HoodieIndex.createIndex(config) instanceof HBaseIndex); - config = clientConfigBuilder.withPath(basePath) - .withIndexConfig(indexConfigBuilder.withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build(); - assertTrue(HoodieIndex.createIndex(config) instanceof InMemoryHashIndex); - config = clientConfigBuilder.withPath(basePath) - .withIndexConfig(indexConfigBuilder.withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(); - assertTrue(HoodieIndex.createIndex(config) instanceof HoodieBloomIndex); - config = clientConfigBuilder.withPath(basePath) - .withIndexConfig(indexConfigBuilder.withIndexType(IndexType.GLOBAL_BLOOM).build()).build(); - assertTrue(HoodieIndex.createIndex(config) instanceof HoodieGlobalBloomIndex); + @ParameterizedTest + @EnumSource(value = IndexType.class, names = {"BLOOM", "GLOBAL_BLOOM", "SIMPLE", "GLOBAL_SIMPLE", "HBASE"}) + public void testCreateIndex(IndexType indexType) throws Exception { + setUp(indexType, false); + HoodieWriteConfig.Builder clientConfigBuilder = HoodieWriteConfig.newBuilder(); + HoodieIndexConfig.Builder indexConfigBuilder = HoodieIndexConfig.newBuilder(); + switch (indexType) { + case INMEMORY: + config = clientConfigBuilder.withPath(basePath) + .withIndexConfig(indexConfigBuilder.withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build(); + assertTrue(HoodieIndex.createIndex(config) instanceof InMemoryHashIndex); + break; + case BLOOM: + config = clientConfigBuilder.withPath(basePath) + .withIndexConfig(indexConfigBuilder.withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(); + assertTrue(HoodieIndex.createIndex(config) instanceof HoodieBloomIndex); + break; + case GLOBAL_BLOOM: + config = clientConfigBuilder.withPath(basePath) + .withIndexConfig(indexConfigBuilder.withIndexType(IndexType.GLOBAL_BLOOM).build()).build(); + assertTrue(HoodieIndex.createIndex(config) instanceof HoodieGlobalBloomIndex); + break; + case SIMPLE: + config = clientConfigBuilder.withPath(basePath) + .withIndexConfig(indexConfigBuilder.withIndexType(IndexType.SIMPLE).build()).build(); + assertTrue(HoodieIndex.createIndex(config) instanceof HoodieSimpleIndex); + break; + case HBASE: + config = clientConfigBuilder.withPath(basePath) + .withIndexConfig(indexConfigBuilder.withIndexType(HoodieIndex.IndexType.HBASE) + .withHBaseIndexConfig(new HoodieHBaseIndexConfig.Builder().build()).build()) + .build(); + assertTrue(HoodieIndex.createIndex(config) instanceof HBaseIndex); + break; + default: + // no -op. just for checkstyle errors + } + } + @Test + public void testCreateDummyIndex() throws Exception { + setUp(IndexType.BLOOM, false); + HoodieWriteConfig.Builder clientConfigBuilder = HoodieWriteConfig.newBuilder(); + HoodieIndexConfig.Builder indexConfigBuilder = HoodieIndexConfig.newBuilder(); config = clientConfigBuilder.withPath(basePath) .withIndexConfig(indexConfigBuilder.withIndexClass(DummyHoodieIndex.class.getName()).build()).build(); assertTrue(HoodieIndex.createIndex(config) instanceof DummyHoodieIndex); } @Test - public void testCreateIndex_withException() { + public void testCreateIndex_withException() throws Exception { + setUp(IndexType.BLOOM, false); + HoodieWriteConfig.Builder clientConfigBuilder = HoodieWriteConfig.newBuilder(); + HoodieIndexConfig.Builder indexConfigBuilder = HoodieIndexConfig.newBuilder(); final HoodieWriteConfig config1 = clientConfigBuilder.withPath(basePath) .withIndexConfig(indexConfigBuilder.withIndexClass(IndexWithConstructor.class.getName()).build()).build(); final Throwable thrown1 = assertThrows(HoodieException.class, () -> { @@ -106,6 +184,385 @@ public class TestHoodieIndex extends HoodieClientTestHarness { assertTrue(thrown2.getMessage().contains("Unable to instantiate class")); } + @ParameterizedTest + @EnumSource(value = IndexType.class, names = {"BLOOM", "GLOBAL_BLOOM", "SIMPLE", "GLOBAL_SIMPLE"}) + public void testSimpleTagLocationAndUpdate(IndexType indexType) throws Exception { + setUp(indexType); + String newCommitTime = "001"; + int totalRecords = 10 + random.nextInt(20); + List records = dataGen.generateInserts(newCommitTime, totalRecords); + JavaRDD writeRecords = jsc.parallelize(records, 1); + + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration()); + + // Test tagLocation without any entries in index + JavaRDD javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); + assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0); + + // Insert totalRecords records + writeClient.startCommitWithTime(newCommitTime); + JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime); + assertNoWriteErrors(writeStatues.collect()); + + // Now tagLocation for these records, index should not tag them since it was a failed + // commit + javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); + assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0); + // Now commit this & update location of records inserted and validate no errors + writeClient.commit(newCommitTime, writeStatues); + // Now tagLocation for these records, index should tag them correctly + metaClient = HoodieTableMetaClient.reload(metaClient); + hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration()); + javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); + Map recordKeyToPartitionPathMap = new HashMap(); + List hoodieRecords = writeRecords.collect(); + hoodieRecords.forEach(entry -> recordKeyToPartitionPathMap.put(entry.getRecordKey(), entry.getPartitionPath())); + + assertEquals(totalRecords, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size()); + assertEquals(totalRecords, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count()); + assertEquals(totalRecords, javaRDD.filter(record -> (record.getCurrentLocation() != null + && record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count()); + javaRDD.foreach(entry -> assertEquals(recordKeyToPartitionPathMap.get(entry.getRecordKey()), entry.getPartitionPath(), "PartitionPath mismatch")); + + JavaRDD hoodieKeyJavaRDD = writeRecords.map(entry -> entry.getKey()); + JavaPairRDD>> recordLocations = index.fetchRecordLocation(hoodieKeyJavaRDD, jsc, hoodieTable); + List hoodieKeys = hoodieKeyJavaRDD.collect(); + assertEquals(totalRecords, recordLocations.collect().size()); + assertEquals(totalRecords, recordLocations.map(record -> record._1).distinct().count()); + recordLocations.foreach(entry -> assertTrue(hoodieKeys.contains(entry._1), "Missing HoodieKey")); + recordLocations.foreach(entry -> assertEquals(recordKeyToPartitionPathMap.get(entry._1.getRecordKey()), entry._1.getPartitionPath(), "PartitionPath mismatch")); + } + + @ParameterizedTest + @EnumSource(value = IndexType.class, names = {"BLOOM", "GLOBAL_BLOOM", "SIMPLE", "GLOBAL_SIMPLE"}) + public void testTagLocationAndDuplicateUpdate(IndexType indexType) throws Exception { + setUp(indexType); + String newCommitTime = "001"; + int totalRecords = 10 + random.nextInt(20); + List records = dataGen.generateInserts(newCommitTime, totalRecords); + JavaRDD writeRecords = jsc.parallelize(records, 1); + + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration()); + + writeClient.startCommitWithTime(newCommitTime); + JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime); + JavaRDD javaRDD1 = index.tagLocation(writeRecords, jsc, hoodieTable); + + // Duplicate upsert and ensure correctness is maintained + // We are trying to approximately imitate the case when the RDD is recomputed. For RDD creating, driver code is not + // recomputed. This includes the state transitions. We need to delete the inflight instance so that subsequent + // upsert will not run into conflicts. + metaClient.getFs().delete(new Path(metaClient.getMetaPath(), "001.inflight")); + + writeClient.upsert(writeRecords, newCommitTime); + assertNoWriteErrors(writeStatues.collect()); + + // Now commit this & update location of records inserted and validate no errors + writeClient.commit(newCommitTime, writeStatues); + // Now tagLocation for these records, hbaseIndex should tag them correctly + metaClient = HoodieTableMetaClient.reload(metaClient); + hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration()); + JavaRDD javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); + + Map recordKeyToPartitionPathMap = new HashMap(); + List hoodieRecords = writeRecords.collect(); + hoodieRecords.forEach(entry -> recordKeyToPartitionPathMap.put(entry.getRecordKey(), entry.getPartitionPath())); + + assertEquals(totalRecords, javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size()); + assertEquals(totalRecords, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count()); + assertEquals(totalRecords, javaRDD.filter(record -> (record.getCurrentLocation() != null + && record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count()); + javaRDD.foreach(entry -> assertEquals(recordKeyToPartitionPathMap.get(entry.getRecordKey()), entry.getPartitionPath(), "PartitionPath mismatch")); + + JavaRDD hoodieKeyJavaRDD = writeRecords.map(entry -> entry.getKey()); + JavaPairRDD>> recordLocations = index.fetchRecordLocation(hoodieKeyJavaRDD, jsc, hoodieTable); + List hoodieKeys = hoodieKeyJavaRDD.collect(); + assertEquals(totalRecords, recordLocations.collect().size()); + assertEquals(totalRecords, recordLocations.map(record -> record._1).distinct().count()); + recordLocations.foreach(entry -> assertTrue(hoodieKeys.contains(entry._1), "Missing HoodieKey")); + recordLocations.foreach(entry -> assertEquals(recordKeyToPartitionPathMap.get(entry._1.getRecordKey()), entry._1.getPartitionPath(), "PartitionPath mismatch")); + } + + @ParameterizedTest + @EnumSource(value = IndexType.class, names = {"BLOOM", "GLOBAL_BLOOM", "SIMPLE", "GLOBAL_SIMPLE"}) + public void testSimpleTagLocationAndUpdateWithRollback(IndexType indexType) throws Exception { + setUp(indexType); + String newCommitTime = writeClient.startCommit(); + int totalRecords = 20 + random.nextInt(20); + List records = dataGen.generateInserts(newCommitTime, totalRecords); + JavaRDD writeRecords = jsc.parallelize(records, 1); + metaClient = HoodieTableMetaClient.reload(metaClient); + + // Insert 200 records + JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime); + assertNoWriteErrors(writeStatues.collect()); + + // commit this upsert + writeClient.commit(newCommitTime, writeStatues); + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration()); + + // Now tagLocation for these records, hbaseIndex should tag them + JavaRDD javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); + assert (javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == totalRecords); + + // check tagged records are tagged with correct fileIds + List fileIds = writeStatues.map(WriteStatus::getFileId).collect(); + assert (javaRDD.filter(record -> record.getCurrentLocation().getFileId() == null).collect().size() == 0); + List taggedFileIds = javaRDD.map(record -> record.getCurrentLocation().getFileId()).distinct().collect(); + + Map recordKeyToPartitionPathMap = new HashMap(); + List hoodieRecords = writeRecords.collect(); + hoodieRecords.forEach(entry -> recordKeyToPartitionPathMap.put(entry.getRecordKey(), entry.getPartitionPath())); + + JavaRDD hoodieKeyJavaRDD = writeRecords.map(entry -> entry.getKey()); + JavaPairRDD>> recordLocations = index.fetchRecordLocation(hoodieKeyJavaRDD, jsc, hoodieTable); + List hoodieKeys = hoodieKeyJavaRDD.collect(); + assertEquals(totalRecords, recordLocations.collect().size()); + assertEquals(totalRecords, recordLocations.map(record -> record._1).distinct().count()); + recordLocations.foreach(entry -> assertTrue(hoodieKeys.contains(entry._1), "Missing HoodieKey")); + recordLocations.foreach(entry -> assertEquals(recordKeyToPartitionPathMap.get(entry._1.getRecordKey()), entry._1.getPartitionPath(), "PartitionPath mismatch")); + + // both lists should match + assertTrue(taggedFileIds.containsAll(fileIds) && fileIds.containsAll(taggedFileIds)); + // Rollback the last commit + writeClient.rollback(newCommitTime); + + hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration()); + // Now tagLocation for these records, hbaseIndex should not tag them since it was a rolled + // back commit + javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); + assert (javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 0); + assert (javaRDD.filter(record -> record.getCurrentLocation() != null).collect().size() == 0); + } + + @ParameterizedTest + @EnumSource(value = IndexType.class, names = {"BLOOM", "SIMPLE",}) + public void testTagLocationAndFetchRecordLocations(IndexType indexType) throws Exception { + setUp(indexType); + String rowKey1 = UUID.randomUUID().toString(); + String rowKey2 = UUID.randomUUID().toString(); + String rowKey3 = UUID.randomUUID().toString(); + String recordStr1 = "{\"_row_key\":\"" + rowKey1 + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; + String recordStr2 = "{\"_row_key\":\"" + rowKey2 + "\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; + String recordStr3 = "{\"_row_key\":\"" + rowKey3 + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}"; + // place same row key under a different partition. + String recordStr4 = "{\"_row_key\":\"" + rowKey1 + "\",\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}"; + TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1); + HoodieRecord record1 = + new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1); + TestRawTripPayload rowChange2 = new TestRawTripPayload(recordStr2); + HoodieRecord record2 = + new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2); + TestRawTripPayload rowChange3 = new TestRawTripPayload(recordStr3); + HoodieRecord record3 = + new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3); + TestRawTripPayload rowChange4 = new TestRawTripPayload(recordStr4); + HoodieRecord record4 = + new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4); + JavaRDD recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record4)); + + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration()); + + JavaRDD taggedRecordRDD = index.tagLocation(recordRDD, jsc, hoodieTable); + + // Should not find any files + for (HoodieRecord record : taggedRecordRDD.collect()) { + assertFalse(record.isCurrentLocationKnown()); + } + + // We create three parquet file, each having one record. (two different partitions) + String filename1 = + HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record1), schema, null, true); + String filename2 = + HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record2), schema, null, true); + String filename3 = + HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Collections.singletonList(record4), schema, null, true); + + // We do the tag again + metaClient = HoodieTableMetaClient.reload(metaClient); + hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration()); + + taggedRecordRDD = index.tagLocation(recordRDD, jsc, hoodieTable); + + // Check results + for (HoodieRecord record : taggedRecordRDD.collect()) { + if (record.getRecordKey().equals(rowKey1)) { + if (record.getPartitionPath().equals("2015/01/31")) { + assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename3)); + } else { + assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename1)); + } + } else if (record.getRecordKey().equals(rowKey2)) { + assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename2)); + } else if (record.getRecordKey().equals(rowKey3)) { + assertFalse(record.isCurrentLocationKnown()); + } + } + + JavaPairRDD>> recordLocations = index.fetchRecordLocation(recordRDD.map(entry -> entry.getKey()), jsc, hoodieTable); + + for (Tuple2>> entry : recordLocations.collect()) { + if (entry._1.getRecordKey().equals(rowKey1)) { + assertTrue(entry._2.isPresent(), "Row1 should have been present "); + if (entry._1.getPartitionPath().equals("2015/01/31")) { + assertTrue(entry._2.isPresent(), "Row1 should have been present "); + assertEquals(entry._2.get().getRight(), FSUtils.getFileId(filename3)); + } else { + assertEquals(entry._2.get().getRight(), FSUtils.getFileId(filename1)); + } + } else if (entry._1.getRecordKey().equals(rowKey2)) { + assertTrue(entry._2.isPresent(), "Row2 should have been present "); + assertEquals(entry._2.get().getRight(), FSUtils.getFileId(filename2)); + } else if (entry._1.getRecordKey().equals(rowKey3)) { + assertFalse(entry._2.isPresent(), "Row3 should have been absent "); + } + } + } + + @ParameterizedTest + @EnumSource(value = IndexType.class, names = {"GLOBAL_SIMPLE"}) + public void testSimpleGlobalIndexTagLocationWhenShouldUpdatePartitionPath(IndexType indexType) throws Exception { + setUp(indexType); + config = getConfigBuilder() + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType) + .withGlobalSimpleIndexUpdatePartitionPath(true) + .withBloomIndexUpdatePartitionPath(true) + .build()).build(); + writeClient = getHoodieWriteClient(config); + index = writeClient.getIndex(); + + // Create the original partition, and put a record, along with the meta file + // "2016/01/31": 1 file (1_0_20160131101010.parquet) + new File(basePath + "/2016/01/31").mkdirs(); + new File(basePath + "/2016/01/31/" + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE).createNewFile(); + + // this record will be saved in table and will be tagged to an empty record + TestRawTripPayload originalPayload = + new TestRawTripPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); + HoodieRecord originalRecord = + new HoodieRecord(new HoodieKey(originalPayload.getRowKey(), originalPayload.getPartitionPath()), + originalPayload); + + /* + This record has the same record key as originalRecord but different time so different partition + Because GLOBAL_BLOOM_INDEX_SHOULD_UPDATE_PARTITION_PATH = true, + globalBloomIndex should + - tag the original partition of the originalRecord to an empty record for deletion, and + - tag the new partition of the incomingRecord + */ + TestRawTripPayload incomingPayload = + new TestRawTripPayload("{\"_row_key\":\"000\",\"time\":\"2016-02-31T03:16:41.415Z\",\"number\":12}"); + HoodieRecord incomingRecord = + new HoodieRecord(new HoodieKey(incomingPayload.getRowKey(), incomingPayload.getPartitionPath()), + incomingPayload); + /* + This record has the same record key as originalRecord and the same partition + Though GLOBAL_BLOOM_INDEX_SHOULD_UPDATE_PARTITION_PATH = true, + globalBloomIndex should just tag the original partition + */ + TestRawTripPayload incomingPayloadSamePartition = + new TestRawTripPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T04:16:41.415Z\",\"number\":15}"); + HoodieRecord incomingRecordSamePartition = + new HoodieRecord( + new HoodieKey(incomingPayloadSamePartition.getRowKey(), incomingPayloadSamePartition.getPartitionPath()), + incomingPayloadSamePartition); + + // We have some records to be tagged (two different partitions) + String schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt")); + Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr)); + + HoodieClientTestUtils + .writeParquetFile(basePath, "2016/01/31", Collections.singletonList(originalRecord), schema, null, false); + + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration()); + + // Add some commits + new File(basePath + "/.hoodie").mkdirs(); + + // test against incoming record with a different partition + JavaRDD recordRDD = jsc.parallelize(Collections.singletonList(incomingRecord)); + JavaRDD taggedRecordRDD = index.tagLocation(recordRDD, jsc, table); + + assertEquals(2, taggedRecordRDD.count()); + for (HoodieRecord record : taggedRecordRDD.collect()) { + switch (record.getPartitionPath()) { + case "2016/01/31": + assertEquals("000", record.getRecordKey()); + assertTrue(record.getData() instanceof EmptyHoodieRecordPayload); + break; + case "2016/02/31": + assertEquals("000", record.getRecordKey()); + assertEquals(incomingPayload.getJsonData(), ((TestRawTripPayload) record.getData()).getJsonData()); + break; + default: + assertFalse(true, String.format("Should not get partition path: %s", record.getPartitionPath())); + } + } + + // test against incoming record with the same partition + JavaRDD recordRDDSamePartition = jsc + .parallelize(Collections.singletonList(incomingRecordSamePartition)); + JavaRDD taggedRecordRDDSamePartition = index.tagLocation(recordRDDSamePartition, jsc, table); + + assertEquals(1, taggedRecordRDDSamePartition.count()); + HoodieRecord record = taggedRecordRDDSamePartition.first(); + assertEquals("000", record.getRecordKey()); + assertEquals("2016/01/31", record.getPartitionPath()); + assertEquals(incomingPayloadSamePartition.getJsonData(), ((TestRawTripPayload) record.getData()).getJsonData()); + } + + /** + * Get Config builder with default configs set. + * + * @return Config Builder + */ + public HoodieWriteConfig.Builder getConfigBuilder() { + return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); + } + + HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) { + return getConfigBuilder(schemaStr, indexType); + } + + /** + * Get Config builder with default configs set. + * + * @return Config Builder + */ + private HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) { + return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr) + .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2) + .withWriteStatusClass(TestRawTripPayload.MetadataMergeWriteStatus.class) + .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build()) + .forTable("test-trip-table") + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()) + .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build()); + } + + private HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) { + return new HoodieWriteClient(jsc, cfg, false); + } + + private void instantiateIndex() { + config = getConfigBuilder() + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType) + .build()).withAutoCommit(false).build(); + writeClient = getHoodieWriteClient(config); + this.index = writeClient.getIndex(); + } + + private void assertNoWriteErrors(List statuses) { + // Verify there are no errors + for (WriteStatus status : statuses) { + assertFalse(status.hasErrors()); + } + } + public static class DummyHoodieIndex extends HoodieIndex { public DummyHoodieIndex(HoodieWriteConfig config) { @@ -157,4 +614,5 @@ public class TestHoodieIndex extends HoodieClientTestHarness { public static class IndexWithoutConstructor { } + } diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java index 8b938287b..5a85f9c06 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java @@ -475,5 +475,4 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { } } } - } diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java new file mode 100644 index 000000000..50e212733 --- /dev/null +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.common.HoodieClientTestHarness; +import org.apache.hudi.common.HoodieClientTestUtils; +import org.apache.hudi.common.HoodieTestDataGenerator; +import org.apache.hudi.common.TestRawTripPayload; +import org.apache.hudi.common.fs.ConsistencyGuardConfig; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieTestUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.table.view.FileSystemViewStorageType; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieStorageConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndexUtils; +import org.apache.hudi.table.HoodieTable; + +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import scala.Tuple2; + +import static java.util.stream.Collectors.toList; +import static org.apache.hudi.common.HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Tests {@link HoodieKeyLocationFetchHandle}. + */ +public class TestHoodieKeyLocationFetchHandle extends HoodieClientTestHarness { + + private HoodieWriteConfig config; + + @BeforeEach + public void setUp() throws Exception { + initSparkContexts("TestRecordFetcher"); + initPath(); + initTestDataGenerator(); + initFileSystem(); + initMetaClient(); + config = getConfigBuilder() + .withIndexConfig(HoodieIndexConfig.newBuilder() + .build()).build(); + } + + @AfterEach + public void tearDown() throws IOException { + cleanupSparkContexts(); + cleanupFileSystem(); + cleanupMetaClient(); + } + + @Test + public void testFetchHandle() throws Exception { + + String commitTime = "000"; + List records = dataGen.generateInserts(commitTime, 100); + Map> recordsPerPartiton = getRecordsPerPartition(records); + + Map, List>> expectedList = writeToParquetAndGetExpectedRecordLocations(recordsPerPartiton); + + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration()); + + Files.createDirectories(Paths.get(basePath, ".hoodie")); + + List> partitionPathFileIdPairs = loadAllFilesForPartitions(new ArrayList<>(recordsPerPartiton.keySet()), jsc, hoodieTable); + + for (Tuple2 entry : partitionPathFileIdPairs) { + HoodieKeyLocationFetchHandle fetcherHandle = new HoodieKeyLocationFetchHandle(config, hoodieTable, Pair.of(entry._1, entry._2)); + Iterator> result = fetcherHandle.locations(); + List> actualList = new ArrayList<>(); + result.forEachRemaining(actualList::add); + assertEquals(expectedList.get(new Tuple2<>(entry._1, entry._2.getFileId())), actualList); + } + } + + private Map> getRecordsPerPartition(List records) { + Map> recordsPerPartiton = new HashMap<>(); + for (HoodieRecord record : records) { + if (!recordsPerPartiton.containsKey(record.getPartitionPath())) { + recordsPerPartiton.put(record.getPartitionPath(), new ArrayList<>()); + } + recordsPerPartiton.get(record.getPartitionPath()).add(record); + } + return recordsPerPartiton; + } + + private Map, List>> writeToParquetAndGetExpectedRecordLocations( + Map> recordsPerPartiton) throws Exception { + Map, List>> expectedList = new HashMap<>(); + for (Map.Entry> entry : recordsPerPartiton.entrySet()) { + int totalRecordsPerPartition = entry.getValue().size(); + int totalSlices = 1; + if (totalRecordsPerPartition > 5) { + totalSlices = totalRecordsPerPartition / 3; + } + int recordsPerFileSlice = totalRecordsPerPartition / totalSlices; + + List> recordsForFileSlices = new ArrayList<>(); + recordsForFileSlices.add(new ArrayList<>()); + int index = 0; + int count = 0; + for (HoodieRecord record : entry.getValue()) { + if (count < recordsPerFileSlice) { + recordsForFileSlices.get(index).add(record); + count++; + } else { + recordsForFileSlices.add(new ArrayList<>()); + index++; + count = 0; + } + } + + for (List recordsPerSlice : recordsForFileSlices) { + Tuple2 fileIdInstantTimePair = writeToParquet(entry.getKey(), recordsPerSlice); + List> expectedEntries = new ArrayList<>(); + for (HoodieRecord record : recordsPerSlice) { + expectedEntries.add(new Tuple2<>(record.getKey(), new HoodieRecordLocation(fileIdInstantTimePair._2, fileIdInstantTimePair._1))); + } + expectedList.put(new Tuple2<>(entry.getKey(), fileIdInstantTimePair._1), expectedEntries); + } + } + return expectedList; + } + + protected List> loadAllFilesForPartitions(List partitions, final JavaSparkContext jsc, + final HoodieTable hoodieTable) { + + // Obtain the latest data files from all the partitions. + List> partitionPathFileIDList = HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, jsc, hoodieTable); + return partitionPathFileIDList.stream() + .map(pf -> new Tuple2<>(pf.getKey(), pf.getValue())).collect(toList()); + } + + /** + * Get Config builder with default configs set. + * + * @return Config Builder + */ + public HoodieWriteConfig.Builder getConfigBuilder() { + return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); + } + + /** + * Get Config builder with default configs set. + * + * @return Config Builder + */ + private HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) { + return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr) + .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2) + .withWriteStatusClass(TestRawTripPayload.MetadataMergeWriteStatus.class) + .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build()) + .forTable("test-trip-table") + .withIndexConfig(HoodieIndexConfig.newBuilder().build()) + .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build()); + } + + private Tuple2 writeToParquet(String partitionPath, List records) throws Exception { + Thread.sleep(100); + String instantTime = HoodieTestUtils.makeNewCommitTime(); + String fileId = UUID.randomUUID().toString(); + String filename = FSUtils.makeDataFileName(instantTime, "1-0-1", fileId); + HoodieTestUtils.createCommitFiles(basePath, instantTime); + HoodieClientTestUtils.writeParquetFile(basePath, partitionPath, filename, records, AVRO_SCHEMA_WITH_METADATA_FIELDS, null, + true); + return new Tuple2<>(fileId, instantTime); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index bffe8df23..8c2212244 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -198,6 +198,24 @@ public class HoodieAvroUtils { return RECORD_KEY_SCHEMA; } + /** + * Fetch schema for record key and partition path. + */ + public static Schema getRecordKeyPartitionPathSchema() { + List toBeAddedFields = new ArrayList<>(); + Schema recordSchema = Schema.createRecord("HoodieRecordKey", "", "", false); + + Schema.Field recordKeyField = + new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance()); + Schema.Field partitionPathField = + new Schema.Field(HoodieRecord.PARTITION_PATH_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance()); + + toBeAddedFields.add(recordKeyField); + toBeAddedFields.add(partitionPathField); + recordSchema.setFields(toBeAddedFields); + return recordSchema; + } + public static GenericRecord addHoodieKeyToRecord(GenericRecord record, String recordKey, String partitionPath, String fileName) { record.put(HoodieRecord.FILENAME_METADATA_FIELD, fileName); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index 923b174fd..3c8d9e861 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.bloom.BloomFilterTypeCode; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -76,13 +77,27 @@ public class ParquetUtils { * @return Set Set of row keys matching candidateRecordKeys */ public static Set filterParquetRowKeys(Configuration configuration, Path filePath, Set filter) { + return filterParquetRowKeys(configuration, filePath, filter, HoodieAvroUtils.getRecordKeySchema()); + } + + /** + * Read the rowKey list matching the given filter, from the given parquet file. If the filter is empty, then this will + * return all the rowkeys. + * + * @param filePath The parquet file path. + * @param configuration configuration to build fs object + * @param filter record keys filter + * @param readSchema schema of columns to be read + * @return Set Set of row keys matching candidateRecordKeys + */ + private static Set filterParquetRowKeys(Configuration configuration, Path filePath, Set filter, + Schema readSchema) { Option filterFunction = Option.empty(); if (filter != null && !filter.isEmpty()) { filterFunction = Option.of(new RecordKeysFilterFunction(filter)); } Configuration conf = new Configuration(configuration); conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf()); - Schema readSchema = HoodieAvroUtils.getRecordKeySchema(); AvroReadSupport.setAvroReadSchema(conf, readSchema); AvroReadSupport.setRequestedProjection(conf, readSchema); Set rowKeys = new HashSet<>(); @@ -105,6 +120,41 @@ public class ParquetUtils { return rowKeys; } + /** + * Fetch {@link HoodieKey}s from the given parquet file. + * + * @param filePath The parquet file path. + * @param configuration configuration to build fs object + * @return {@link List} of {@link HoodieKey}s fetched from the parquet file + */ + public static List fetchRecordKeyPartitionPathFromParquet(Configuration configuration, Path filePath) { + List hoodieKeys = new ArrayList<>(); + try { + if (!filePath.getFileSystem(configuration).exists(filePath)) { + return new ArrayList<>(); + } + + Configuration conf = new Configuration(configuration); + conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf()); + Schema readSchema = HoodieAvroUtils.getRecordKeyPartitionPathSchema(); + AvroReadSupport.setAvroReadSchema(conf, readSchema); + AvroReadSupport.setRequestedProjection(conf, readSchema); + ParquetReader reader = AvroParquetReader.builder(filePath).withConf(conf).build(); + Object obj = reader.read(); + while (obj != null) { + if (obj instanceof GenericRecord) { + String recordKey = ((GenericRecord) obj).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); + String partitionPath = ((GenericRecord) obj).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); + hoodieKeys.add(new HoodieKey(recordKey, partitionPath)); + obj = reader.read(); + } + } + } catch (IOException e) { + throw new HoodieIOException("Failed to read from Parquet file " + filePath, e); + } + return hoodieKeys; + } + public static ParquetMetadata readMetadata(Configuration conf, Path parquetFilePath) { ParquetMetadata footer; try { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java index 2d2084d5a..15b160292 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java @@ -23,6 +23,7 @@ import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.bloom.BloomFilterTypeCode; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; @@ -120,9 +121,38 @@ public class TestParquetUtils extends HoodieCommonTestHarness { } } + @ParameterizedTest + @MethodSource("bloomFilterTypeCodes") + public void testFetchRecordKeyPartitionPathFromParquet(String typeCode) throws Exception { + List rowKeys = new ArrayList<>(); + List expected = new ArrayList<>(); + String partitionPath = "path1"; + for (int i = 0; i < 1000; i++) { + String rowKey = UUID.randomUUID().toString(); + rowKeys.add(rowKey); + expected.add(new HoodieKey(rowKey, partitionPath)); + } + + String filePath = basePath + "/test.parquet"; + Schema schema = HoodieAvroUtils.getRecordKeyPartitionPathSchema(); + writeParquetFile(typeCode, filePath, rowKeys, schema, true, partitionPath); + + // Read and verify + List fetchedRows = + ParquetUtils.fetchRecordKeyPartitionPathFromParquet(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath)); + assertEquals(rowKeys.size(), fetchedRows.size(), "Total count does not match"); + + for (HoodieKey entry : fetchedRows) { + assertTrue(expected.contains(entry), "Record key must be in the given filter"); + } + } + private void writeParquetFile(String typeCode, String filePath, List rowKeys) throws Exception { + writeParquetFile(typeCode, filePath, rowKeys, HoodieAvroUtils.getRecordKeySchema(), false, ""); + } + + private void writeParquetFile(String typeCode, String filePath, List rowKeys, Schema schema, boolean addPartitionPathField, String partitionPath) throws Exception { // Write out a parquet file - Schema schema = HoodieAvroUtils.getRecordKeySchema(); BloomFilter filter = BloomFilterFactory .createBloomFilter(1000, 0.0001, 10000, typeCode); HoodieAvroWriteSupport writeSupport = @@ -132,6 +162,9 @@ public class TestParquetUtils extends HoodieCommonTestHarness { for (String rowKey : rowKeys) { GenericRecord rec = new GenericData.Record(schema); rec.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, rowKey); + if (addPartitionPathField) { + rec.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, partitionPath); + } writer.write(rec); writeSupport.add(rowKey); }