diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java index 4f9b9fcd5..e156310c7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java @@ -24,15 +24,20 @@ import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.engine.EngineType; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import javax.annotation.concurrent.Immutable; import java.io.File; import java.io.FileReader; import java.io.IOException; +import java.util.Arrays; import java.util.Properties; +import java.util.stream.Collectors; import static org.apache.hudi.config.HoodieHBaseIndexConfig.GET_BATCH_SIZE; import static org.apache.hudi.config.HoodieHBaseIndexConfig.PUT_BATCH_SIZE; @@ -54,7 +59,7 @@ public class HoodieIndexConfig extends HoodieConfig { .key("hoodie.index.type") .noDefaultValue() .withDocumentation("Type of index to use. Default is Bloom filter. " - + "Possible options are [BLOOM | GLOBAL_BLOOM |SIMPLE | GLOBAL_SIMPLE | INMEMORY | HBASE]. " + + "Possible options are [BLOOM | GLOBAL_BLOOM |SIMPLE | GLOBAL_SIMPLE | INMEMORY | HBASE | BUCKET]. " + "Bloom filters removes the dependency on a external system " + "and is stored in the footer of the Parquet Data Files"); @@ -200,6 +205,30 @@ public class HoodieIndexConfig extends HoodieConfig { .defaultValue("true") .withDocumentation("Similar to " + BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE + ", but for simple index."); + /** + * ***** Bucket Index Configs ***** + * Bucket Index is targeted to locate the record fast by hash in big data scenarios. + * The current implementation is a basic version, so there are some constraints: + * 1. Unsupported operation: bulk insert, cluster and so on. + * 2. Bucket num change requires rewriting the partition. + * 3. Predict the table size and future data growth well to set a reasonable bucket num. + * 4. A bucket size is recommended less than 3GB and avoid bing too small. + * more details and progress see [HUDI-3039]. + */ + // Bucket num equals file groups num in each partition. + // Bucket num can be set according to partition size and file group size. + public static final ConfigProperty BUCKET_INDEX_NUM_BUCKETS = ConfigProperty + .key("hoodie.bucket.index.num.buckets") + .defaultValue(256) + .withDocumentation("Only applies if index type is BUCKET_INDEX. Determine the number of buckets in the hudi table, " + + "and each partition is divided to N buckets."); + + public static final ConfigProperty BUCKET_INDEX_HASH_FIELD = ConfigProperty + .key("hoodie.bucket.index.hash.field") + .noDefaultValue() + .withDocumentation("Index key. It is used to index the record and find its file group. " + + "If not set, use record key field as default"); + /** * Deprecated configs. These are now part of {@link HoodieHBaseIndexConfig}. */ @@ -516,12 +545,23 @@ public class HoodieIndexConfig extends HoodieConfig { return this; } + public Builder withBucketNum(String bucketNum) { + hoodieIndexConfig.setValue(BUCKET_INDEX_NUM_BUCKETS, bucketNum); + return this; + } + + public Builder withIndexKeyField(String keyField) { + hoodieIndexConfig.setValue(BUCKET_INDEX_HASH_FIELD, keyField); + return this; + } + public HoodieIndexConfig build() { hoodieIndexConfig.setDefaultValue(INDEX_TYPE, getDefaultIndexType(engineType)); hoodieIndexConfig.setDefaults(HoodieIndexConfig.class.getName()); // Throws IllegalArgumentException if the value set is not a known Hoodie Index Type HoodieIndex.IndexType.valueOf(hoodieIndexConfig.getString(INDEX_TYPE)); + validateBucketIndexConfig(); return hoodieIndexConfig; } @@ -540,5 +580,27 @@ public class HoodieIndexConfig extends HoodieConfig { public EngineType getEngineType() { return engineType; } + + private void validateBucketIndexConfig() { + if (hoodieIndexConfig.getString(INDEX_TYPE).equalsIgnoreCase(HoodieIndex.IndexType.BUCKET.toString())) { + // check the bucket index hash field + if (StringUtils.isNullOrEmpty(hoodieIndexConfig.getString(BUCKET_INDEX_HASH_FIELD))) { + hoodieIndexConfig.setValue(BUCKET_INDEX_HASH_FIELD, + hoodieIndexConfig.getStringOrDefault(KeyGeneratorOptions.RECORDKEY_FIELD_NAME)); + } else { + boolean valid = Arrays + .stream(hoodieIndexConfig.getStringOrDefault(KeyGeneratorOptions.RECORDKEY_FIELD_NAME).split(",")) + .collect(Collectors.toSet()) + .containsAll(Arrays.asList(hoodieIndexConfig.getString(BUCKET_INDEX_HASH_FIELD).split(","))); + if (!valid) { + throw new HoodieIndexException("Bucket index key (if configured) must be subset of record key."); + } + } + // check the bucket num + if (hoodieIndexConfig.getIntOrDefault(BUCKET_INDEX_NUM_BUCKETS) <= 0) { + throw new HoodieIndexException("When using bucket index, hoodie.bucket.index.num.buckets cannot be negative."); + } + } + } } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLayoutConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLayoutConfig.java new file mode 100644 index 000000000..183c00c7b --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLayoutConfig.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config; + +import org.apache.hudi.common.config.ConfigClassProperty; +import org.apache.hudi.common.config.ConfigGroups; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.table.storage.HoodieStorageLayout; + +import javax.annotation.concurrent.Immutable; +import java.util.Properties; + +/** + * Storage layout related config. + */ +@Immutable +@ConfigClassProperty(name = "Layout Configs", + groupName = ConfigGroups.Names.WRITE_CLIENT, + description = "Configurations that control storage layout and data distribution, " + + "which defines how the files are organized within a table.") +public class HoodieLayoutConfig extends HoodieConfig { + + public static final ConfigProperty LAYOUT_TYPE = ConfigProperty + .key("hoodie.storage.layout.type") + .defaultValue("DEFAULT") + .withDocumentation("Type of storage layout. Possible options are [DEFAULT | BUCKET]"); + + public static final ConfigProperty LAYOUT_PARTITIONER_CLASS_NAME = ConfigProperty + .key("hoodie.storage.layout.partitioner.class") + .noDefaultValue() + .withDocumentation("Partitioner class, it is used to distribute data in a specific way."); + + private HoodieLayoutConfig() { + super(); + } + + public static HoodieLayoutConfig.Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + public HoodieLayoutConfig layoutConfig = new HoodieLayoutConfig(); + + public Builder fromProperties(Properties props) { + this.layoutConfig.getProps().putAll(props); + return this; + } + + public Builder withLayoutType(String type) { + layoutConfig.setValue(LAYOUT_TYPE, type); + return this; + } + + public Builder withLayoutPartitioner(String partitionerClass) { + layoutConfig.setValue(LAYOUT_PARTITIONER_CLASS_NAME, partitionerClass); + return this; + } + + public HoodieLayoutConfig build() { + setDefault(); + return layoutConfig; + } + + private void setDefault() { + if (layoutConfig.getString(HoodieIndexConfig.INDEX_TYPE.key()).equals(HoodieIndex.IndexType.BUCKET.name())) { + layoutConfig.setDefaultValue(LAYOUT_TYPE, HoodieStorageLayout.LayoutType.BUCKET.name()); + } + layoutConfig.setDefaultValue(LAYOUT_TYPE, LAYOUT_TYPE.defaultValue()); + } + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 3571da172..74a17bffe 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -57,6 +57,7 @@ import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hudi.table.storage.HoodieStorageLayout; import org.apache.orc.CompressionKind; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -1426,6 +1427,14 @@ public class HoodieWriteConfig extends HoodieConfig { return getBoolean(HoodieIndexConfig.SIMPLE_INDEX_UPDATE_PARTITION_PATH_ENABLE); } + public int getBucketIndexNumBuckets() { + return getIntOrDefault(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS); + } + + public String getBucketIndexHashField() { + return getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD); + } + /** * storage properties. */ @@ -1834,6 +1843,13 @@ public class HoodieWriteConfig extends HoodieConfig { return getString(FILEID_PREFIX_PROVIDER_CLASS); } + /** + * Layout configs. + */ + public HoodieStorageLayout.LayoutType getLayoutType() { + return HoodieStorageLayout.LayoutType.valueOf(getString(HoodieLayoutConfig.LAYOUT_TYPE)); + } + public static class Builder { protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig(); @@ -1855,6 +1871,7 @@ public class HoodieWriteConfig extends HoodieConfig { private boolean isPreCommitValidationConfigSet = false; private boolean isMetricsJmxConfigSet = false; private boolean isMetricsGraphiteConfigSet = false; + private boolean isLayoutConfigSet = false; public Builder withEngineType(EngineType engineType) { this.engineType = engineType; @@ -2085,6 +2102,12 @@ public class HoodieWriteConfig extends HoodieConfig { return this; } + public Builder withLayoutConfig(HoodieLayoutConfig layoutConfig) { + writeConfig.getProps().putAll(layoutConfig.getProps()); + isLayoutConfigSet = true; + return this; + } + public Builder withFinalizeWriteParallelism(int parallelism) { writeConfig.setValue(FINALIZE_WRITE_PARALLELISM_VALUE, String.valueOf(parallelism)); return this; @@ -2225,6 +2248,8 @@ public class HoodieWriteConfig extends HoodieConfig { HoodieLockConfig.newBuilder().fromProperties(writeConfig.getProps()).build()); writeConfig.setDefaultOnCondition(!isPreCommitValidationConfigSet, HoodiePreCommitValidatorConfig.newBuilder().fromProperties(writeConfig.getProps()).build()); + writeConfig.setDefaultOnCondition(!isLayoutConfigSet, + HoodieLayoutConfig.newBuilder().fromProperties(writeConfig.getProps()).build()); writeConfig.setDefaultValue(TIMELINE_LAYOUT_VERSION_NUM, String.valueOf(TimelineLayoutVersion.CURR_VERSION)); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java index 974f4d546..0428d12c4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.exception.HoodieNotSupportedException; @@ -122,6 +123,20 @@ public abstract class HoodieIndex implem @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) public abstract boolean isImplicitWithStorage(); + /** + * If the `getCustomizedPartitioner` returns a partitioner, it has to be true. + */ + @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) + public boolean requiresTagging(WriteOperationType operationType) { + switch (operationType) { + case DELETE: + case UPSERT: + return true; + default: + return false; + } + } + /** * Each index type should implement it's own logic to release any resources acquired during the process. */ @@ -129,6 +144,6 @@ public abstract class HoodieIndex implem } public enum IndexType { - HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE + HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE, BUCKET } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java new file mode 100644 index 000000000..7dee9f3cd --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public class BucketIdentifier { + // compatible with the spark bucket name + private static final Pattern BUCKET_NAME = Pattern.compile(".*_(\\d+)(?:\\..*)?$"); + + public static int getBucketId(HoodieRecord record, String indexKeyFields, int numBuckets) { + return getBucketId(record.getKey(), indexKeyFields, numBuckets); + } + + public static int getBucketId(HoodieKey hoodieKey, String indexKeyFields, int numBuckets) { + List hashKeyFields; + if (!hoodieKey.getRecordKey().contains(":")) { + hashKeyFields = Arrays.asList(hoodieKey.getRecordKey()); + } else { + Map recordKeyPairs = Arrays.stream(hoodieKey.getRecordKey().split(",")) + .map(p -> p.split(":")) + .collect(Collectors.toMap(p -> p[0], p -> p[1])); + hashKeyFields = Arrays.stream(indexKeyFields.split(",")) + .map(f -> recordKeyPairs.get(f)) + .collect(Collectors.toList()); + } + return (hashKeyFields.hashCode() & Integer.MAX_VALUE) % numBuckets; + } + + // only for test + public static int getBucketId(List hashKeyFields, int numBuckets) { + return hashKeyFields.hashCode() % numBuckets; + } + + public static int bucketIdFromFileId(String fileId) { + return Integer.parseInt(fileId.substring(0, 8)); + } + + public static String bucketIdStr(int n) { + return String.format("%08d", n); + } + + public static String newBucketFileIdPrefix(String bucketId) { + return FSUtils.createNewFileIdPfx().replaceFirst(".{8}", bucketId); + } + + public static boolean isBucketFileName(String name) { + return BUCKET_NAME.matcher(name).matches(); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java new file mode 100644 index 000000000..acb06ea48 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.utils.LazyIterableIterator; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.index.HoodieIndexUtils; +import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.HashMap; +import java.util.Map; + +/** + * Hash indexing mechanism. + * @param + */ +public class HoodieBucketIndex> + extends HoodieIndex { + + private static final Logger LOG = LogManager.getLogger(HoodieBucketIndex.class); + + private final int numBuckets; + + public HoodieBucketIndex(HoodieWriteConfig config) { + super(config); + numBuckets = config.getBucketIndexNumBuckets(); + LOG.info("use bucket index, numBuckets=" + numBuckets); + } + + @Override + public HoodieData updateLocation(HoodieData writeStatuses, + HoodieEngineContext context, + HoodieTable hoodieTable) + throws HoodieIndexException { + return writeStatuses; + } + + @Override + public HoodieData> tagLocation(HoodieData> records, + HoodieEngineContext context, + HoodieTable hoodieTable) + throws HoodieIndexException { + HoodieData> taggedRecords = records.mapPartitions(recordIter -> { + // partitionPath -> bucketId -> fileInfo + Map>> partitionPathFileIDList = new HashMap<>(); + return new LazyIterableIterator, HoodieRecord>(recordIter) { + + @Override + protected void start() { + + } + + @Override + protected HoodieRecord computeNext() { + HoodieRecord record = recordIter.next(); + int bucketId = BucketIdentifier.getBucketId(record, config.getBucketIndexHashField(), numBuckets); + String partitionPath = record.getPartitionPath(); + if (!partitionPathFileIDList.containsKey(partitionPath)) { + partitionPathFileIDList.put(partitionPath, loadPartitionBucketIdFileIdMapping(hoodieTable, partitionPath)); + } + if (partitionPathFileIDList.get(partitionPath).containsKey(bucketId)) { + Pair fileInfo = partitionPathFileIDList.get(partitionPath).get(bucketId); + return HoodieIndexUtils.getTaggedRecord(record, Option.of( + new HoodieRecordLocation(fileInfo.getRight(), fileInfo.getLeft()) + )); + } + return record; + } + + @Override + protected void end() { + + } + }; + }, true); + return taggedRecords; + } + + private Map> loadPartitionBucketIdFileIdMapping( + HoodieTable hoodieTable, + String partition) { + // bucketId -> fileIds + Map> fileIDList = new HashMap<>(); + HoodieIndexUtils + .getLatestBaseFilesForPartition(partition, hoodieTable) + .forEach(file -> { + String fileId = file.getFileId(); + String commitTime = file.getCommitTime(); + int bucketId = BucketIdentifier.bucketIdFromFileId(fileId); + if (!fileIDList.containsKey(bucketId)) { + fileIDList.put(bucketId, Pair.of(fileId, commitTime)); + } else { + // check if bucket data is valid + throw new HoodieIOException("Find multiple files at partition path=" + + partition + " belongs to the same bucket id = " + bucketId); + } + }); + return fileIDList; + } + + @Override + public boolean rollbackCommit(String instantTime) { + return true; + } + + @Override + public boolean isGlobal() { + return false; + } + + @Override + public boolean canIndexLogFiles() { + return false; + } + + @Override + public boolean isImplicitWithStorage() { + return true; + } + + @Override + public boolean requiresTagging(WriteOperationType operationType) { + switch (operationType) { + case INSERT: + case INSERT_OVERWRITE: + case UPSERT: + return true; + default: + return false; + } + } + + public int getNumBuckets() { + return numBuckets; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 01ad45342..22fa31539 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -119,7 +119,8 @@ public class HoodieCreateHandle extends @Override public boolean canWrite(HoodieRecord record) { - return fileWriter.canWrite() && record.getPartitionPath().equals(writeStatus.getPartitionPath()); + return (fileWriter.canWrite() && record.getPartitionPath().equals(writeStatus.getPartitionPath())) + || layoutControlsNumFiles(); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index d8faa95a4..37721611e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -195,6 +195,10 @@ public abstract class HoodieWriteHandle return false; } + boolean layoutControlsNumFiles() { + return hoodieTable.getStorageLayout().determinesNumFileGroups(); + } + /** * Perform the actual writing of the given record into the backing file. */ diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 3c794ef04..40e3a316d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -74,6 +74,8 @@ import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hudi.table.storage.HoodieLayoutFactory; +import org.apache.hudi.table.storage.HoodieStorageLayout; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -106,6 +108,7 @@ public abstract class HoodieTable implem private SerializableConfiguration hadoopConfiguration; protected final TaskContextSupplier taskContextSupplier; private final HoodieTableMetadata metadata; + private final HoodieStorageLayout storageLayout; private transient FileSystemViewManager viewManager; protected final transient HoodieEngineContext context; @@ -123,11 +126,16 @@ public abstract class HoodieTable implem this.viewManager = FileSystemViewManager.createViewManager(context, config.getMetadataConfig(), config.getViewStorageConfig(), config.getCommonConfig(), () -> metadata); this.metaClient = metaClient; this.index = getIndex(config, context); + this.storageLayout = getStorageLayout(config); this.taskContextSupplier = context.getTaskContextSupplier(); } protected abstract HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext context); + protected HoodieStorageLayout getStorageLayout(HoodieWriteConfig config) { + return HoodieLayoutFactory.createLayout(config); + } + private synchronized FileSystemViewManager getViewManager() { if (null == viewManager) { viewManager = FileSystemViewManager.createViewManager(getContext(), config.getMetadataConfig(), config.getViewStorageConfig(), config.getCommonConfig(), () -> metadata); @@ -365,6 +373,10 @@ public abstract class HoodieTable implem return index; } + public HoodieStorageLayout getStorageLayout() { + return storageLayout; + } + /** * Schedule compaction for the instant time. * diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractWriteHelper.java index 47cb34fa3..3f241944c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractWriteHelper.java @@ -20,6 +20,7 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; @@ -38,7 +39,7 @@ public abstract class AbstractWriteHelper executor, - boolean performTagging) { + WriteOperationType operationType) { try { // De-dupe/merge if needed I dedupedRecords = @@ -46,7 +47,7 @@ public abstract class AbstractWriteHelper execute(I inputRecords); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieBucketLayout.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieBucketLayout.java new file mode 100644 index 000000000..6247b870f --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieBucketLayout.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.storage; + +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieLayoutConfig; +import org.apache.hudi.config.HoodieWriteConfig; + +import java.util.HashSet; +import java.util.Set; + +/** + * Storage layout when using bucket index. Data distribution and files organization are in a specific way. + */ +public class HoodieBucketLayout extends HoodieStorageLayout { + + public static final Set SUPPORTED_OPERATIONS = new HashSet() {{ + add(WriteOperationType.INSERT); + add(WriteOperationType.INSERT_PREPPED); + add(WriteOperationType.UPSERT); + add(WriteOperationType.UPSERT_PREPPED); + add(WriteOperationType.INSERT_OVERWRITE); + add(WriteOperationType.DELETE); + add(WriteOperationType.COMPACT); + add(WriteOperationType.DELETE_PARTITION); + }}; + + public HoodieBucketLayout(HoodieWriteConfig config) { + super(config); + } + + /** + * Bucketing controls the number of file groups directly. + */ + @Override + public boolean determinesNumFileGroups() { + return true; + } + + public Option layoutPartitionerClass() { + return config.contains(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME) + ? Option.of(config.getString(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME.key())) + : Option.empty(); + } + + @Override + public boolean doesNotSupport(WriteOperationType operationType) { + return !SUPPORTED_OPERATIONS.contains(operationType); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieDefaultLayout.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieDefaultLayout.java new file mode 100644 index 000000000..09d20707a --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieDefaultLayout.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.storage; + +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; + +/** + * Default storage layout with non-constraints. + */ +public class HoodieDefaultLayout extends HoodieStorageLayout { + + public HoodieDefaultLayout(HoodieWriteConfig config) { + super(config); + } + + public boolean determinesNumFileGroups() { + return false; + } + + public Option layoutPartitionerClass() { + return Option.empty(); + } + + public boolean doesNotSupport(WriteOperationType operationType) { + return false; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieLayoutFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieLayoutFactory.java new file mode 100644 index 000000000..e86d253df --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieLayoutFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.storage; + +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieNotSupportedException; + +/** + * A factory to generate layout. + */ +public final class HoodieLayoutFactory { + public static HoodieStorageLayout createLayout(HoodieWriteConfig config) { + switch (config.getLayoutType()) { + case DEFAULT: + return new HoodieDefaultLayout(config); + case BUCKET: + return new HoodieBucketLayout(config); + default: + throw new HoodieNotSupportedException("Unknown layout type, set " + config.getLayoutType()); + } + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieStorageLayout.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieStorageLayout.java new file mode 100644 index 000000000..a0a4eab46 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieStorageLayout.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.storage; + +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; + +import java.io.Serializable; + +/** + * Storage layout defines how the files are organized within a table. + */ +public abstract class HoodieStorageLayout implements Serializable { + + protected final HoodieWriteConfig config; + + public HoodieStorageLayout(HoodieWriteConfig config) { + this.config = config; + } + + /** + * By default, layout does not directly control the total number of files. + */ + public abstract boolean determinesNumFileGroups(); + + /** + * Return the layout specific partitioner for writing data, if any. + */ + public abstract Option layoutPartitionerClass(); + + /** + * Determines if the operation is supported by the layout. + */ + public abstract boolean doesNotSupport(WriteOperationType operationType); + + public enum LayoutType { + DEFAULT, BUCKET + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertCommitActionExecutor.java index 041598314..363aa2867 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertCommitActionExecutor.java @@ -47,6 +47,6 @@ public class FlinkInsertCommitActionExecutor> e @Override public HoodieWriteMetadata> execute() { return FlinkWriteHelper.newInstance().write(instantTime, inputRecords, context, table, - config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false); + config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, operationType); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertOverwriteCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertOverwriteCommitActionExecutor.java index 583e0b6a9..ca1ae6760 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertOverwriteCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertOverwriteCommitActionExecutor.java @@ -64,6 +64,6 @@ public class FlinkInsertOverwriteCommitActionExecutor> execute() { return FlinkWriteHelper.newInstance().write(instantTime, inputRecords, context, table, - config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false); + config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, operationType); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertOverwriteTableCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertOverwriteTableCommitActionExecutor.java index a31679b63..f64d8d20d 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertOverwriteTableCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertOverwriteTableCommitActionExecutor.java @@ -45,6 +45,6 @@ public class FlinkInsertOverwriteTableCommitActionExecutor> execute() { return FlinkWriteHelper.newInstance().write(instantTime, inputRecords, context, table, - config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false); + config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, operationType); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertCommitActionExecutor.java index 5859bb585..c76733db1 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertCommitActionExecutor.java @@ -47,6 +47,6 @@ public class FlinkUpsertCommitActionExecutor> e @Override public HoodieWriteMetadata> execute() { return FlinkWriteHelper.newInstance().write(instantTime, inputRecords, context, table, - config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(), this, true); + config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(), this, operationType); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java index 0863ad8e4..3914e486f 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.index.HoodieIndex; @@ -64,7 +65,7 @@ public class FlinkWriteHelper extends Abstract @Override public HoodieWriteMetadata> write(String instantTime, List> inputRecords, HoodieEngineContext context, HoodieTable>, List, List> table, boolean shouldCombine, int shuffleParallelism, - BaseCommitActionExecutor>, List, List, R> executor, boolean performTagging) { + BaseCommitActionExecutor>, List, List, R> executor, WriteOperationType operationType) { try { Instant lookupBegin = Instant.now(); Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now()); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/FlinkUpsertDeltaCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/FlinkUpsertDeltaCommitActionExecutor.java index 5fdf46f6e..7053f7a16 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/FlinkUpsertDeltaCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/FlinkUpsertDeltaCommitActionExecutor.java @@ -47,6 +47,6 @@ public class FlinkUpsertDeltaCommitActionExecutor> ex @Override public HoodieWriteMetadata> execute() { return JavaWriteHelper.newInstance().write(instantTime, inputRecords, context, table, - config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false); + config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, operationType); } } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteCommitActionExecutor.java index b80191909..a99485e67 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteCommitActionExecutor.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteCommitActionExecutor.java @@ -55,7 +55,7 @@ public class JavaInsertOverwriteCommitActionExecutor> execute() { return JavaWriteHelper.newInstance().write(instantTime, inputRecords, context, table, - config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false); + config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, operationType); } @Override diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertCommitActionExecutor.java index 06fce78a3..ed0af4402 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertCommitActionExecutor.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertCommitActionExecutor.java @@ -45,6 +45,6 @@ public class JavaUpsertCommitActionExecutor> ex @Override public HoodieWriteMetadata> execute() { return JavaWriteHelper.newInstance().write(instantTime, inputRecords, context, table, - config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(), this, true); + config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(), this, operationType); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java index 5e686463b..69e18714c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java @@ -28,6 +28,7 @@ import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.index.bloom.HoodieBloomIndex; import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex; import org.apache.hudi.index.bloom.SparkHoodieBloomIndexHelper; +import org.apache.hudi.index.bucket.HoodieBucketIndex; import org.apache.hudi.index.hbase.SparkHoodieHBaseIndex; import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex; import org.apache.hudi.index.simple.HoodieGlobalSimpleIndex; @@ -55,6 +56,8 @@ public final class SparkHoodieIndexFactory { return new SparkHoodieHBaseIndex<>(config); case INMEMORY: return new HoodieInMemoryHashIndex<>(config); + case BUCKET: + return new HoodieBucketIndex(config); case BLOOM: return new HoodieBloomIndex<>(config, SparkHoodieBloomIndexHelper.getInstance()); case GLOBAL_BLOOM: diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index a44172f37..c551310ba 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -216,7 +216,10 @@ public abstract class BaseSparkCommitActionExecutor layoutPartitionerClass = table.getStorageLayout().layoutPartitionerClass(); + if (layoutPartitionerClass.isPresent()) { + return getLayoutPartitioner(profile, layoutPartitionerClass.get()); + } else if (WriteOperationType.isChangingRecords(operationType)) { return getUpsertPartitioner(profile); } else { return getInsertPartitioner(profile); @@ -305,7 +308,7 @@ public abstract class BaseSparkCommitActionExecutor> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr, Partitioner partitioner) { - UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner; + SparkHoodiePartitioner upsertPartitioner = (SparkHoodiePartitioner) partitioner; BucketInfo binfo = upsertPartitioner.getBucketInfo(partition); BucketType btype = binfo.bucketType; try { @@ -394,6 +397,12 @@ public abstract class BaseSparkCommitActionExecutor> writeMetadata) { SparkValidatorUtils.runValidators(config, writeMetadata, context, table, instantTime); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java new file mode 100644 index 000000000..71da2244d --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.hudi.index.bucket.BucketIdentifier; +import scala.Tuple2; + +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.index.bucket.HoodieBucketIndex; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.WorkloadProfile; +import org.apache.hudi.table.WorkloadStat; + +/** + * Packs incoming records to be inserted into buckets (1 bucket = 1 RDD partition). + */ +public class SparkBucketIndexPartitioner> extends + SparkHoodiePartitioner { + + private final int numBuckets; + private final String indexKeyField; + private final int totalPartitionPaths; + private final List partitionPaths; + /** + * Helps get the RDD partition id, partition id is partition offset + bucket id. + * The partition offset is a multiple of the bucket num. + */ + private final Map partitionPathOffset; + + /** + * Partition path and file groups in it pair. Decide the file group an incoming update should go to. + */ + private Map> updatePartitionPathFileIds; + + public SparkBucketIndexPartitioner(WorkloadProfile profile, + HoodieEngineContext context, + HoodieTable table, + HoodieWriteConfig config) { + super(profile, table); + if (!(table.getIndex() instanceof HoodieBucketIndex)) { + throw new HoodieException( + " Bucket index partitioner should only be used by BucketIndex other than " + + table.getIndex().getClass().getSimpleName()); + } + this.numBuckets = ((HoodieBucketIndex) table.getIndex()).getNumBuckets(); + this.indexKeyField = config.getBucketIndexHashField(); + this.totalPartitionPaths = profile.getPartitionPaths().size(); + partitionPaths = new ArrayList<>(profile.getPartitionPaths()); + partitionPathOffset = new HashMap<>(); + int i = 0; + for (Object partitionPath : profile.getPartitionPaths()) { + partitionPathOffset.put(partitionPath.toString(), i); + i += numBuckets; + } + assignUpdates(profile); + } + + private void assignUpdates(WorkloadProfile profile) { + updatePartitionPathFileIds = new HashMap<>(); + // each update location gets a partition + Set> partitionStatEntries = profile.getPartitionPathStatMap() + .entrySet(); + for (Entry partitionStat : partitionStatEntries) { + if (!updatePartitionPathFileIds.containsKey(partitionStat.getKey())) { + updatePartitionPathFileIds.put(partitionStat.getKey(), new HashSet<>()); + } + for (Entry> updateLocEntry : + partitionStat.getValue().getUpdateLocationToCount().entrySet()) { + updatePartitionPathFileIds.get(partitionStat.getKey()).add(updateLocEntry.getKey()); + } + } + } + + @Override + public BucketInfo getBucketInfo(int bucketNumber) { + String partitionPath = partitionPaths.get(bucketNumber / numBuckets); + String bucketId = BucketIdentifier.bucketIdStr(bucketNumber % numBuckets); + Option fileIdOption = Option.fromJavaOptional(updatePartitionPathFileIds + .getOrDefault(partitionPath, Collections.emptySet()).stream() + .filter(e -> e.startsWith(bucketId)) + .findFirst()); + if (fileIdOption.isPresent()) { + return new BucketInfo(BucketType.UPDATE, fileIdOption.get(), partitionPath); + } else { + return new BucketInfo(BucketType.INSERT, BucketIdentifier.newBucketFileIdPrefix(bucketId), partitionPath); + } + } + + @Override + public int numPartitions() { + return totalPartitionPaths * numBuckets; + } + + @Override + public int getPartition(Object key) { + Tuple2> keyLocation = (Tuple2>) key; + String partitionPath = keyLocation._1.getPartitionPath(); + Option location = keyLocation._2; + int bucketId = location.isPresent() + ? BucketIdentifier.bucketIdFromFileId(location.get().getFileId()) + : BucketIdentifier.getBucketId(keyLocation._1, indexKeyField, numBuckets); + return partitionPathOffset.get(partitionPath) + bucketId; + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkHoodiePartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkHoodiePartitioner.java new file mode 100644 index 000000000..4a5bff421 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkHoodiePartitioner.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.WorkloadProfile; +import org.apache.spark.Partitioner; + +/** + * Packs incoming records to be inserted into buckets (1 bucket = 1 RDD partition). + */ +public abstract class SparkHoodiePartitioner> extends Partitioner + implements org.apache.hudi.table.action.commit.Partitioner { + + /** + * Stat for the current workload. Helps in determining inserts, upserts etc. + */ + protected WorkloadProfile profile; + + protected final HoodieTable table; + + public SparkHoodiePartitioner(WorkloadProfile profile, HoodieTable table) { + this.profile = profile; + this.table = table; + } + + @Override + public int getNumPartitions() { + return numPartitions(); + } + + public abstract BucketInfo getBucketInfo(int bucketNumber); +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertCommitActionExecutor.java index 25891e05a..ba91fe160 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertCommitActionExecutor.java @@ -44,6 +44,6 @@ public class SparkInsertCommitActionExecutor> @Override public HoodieWriteMetadata> execute() { return SparkWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table, - config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false); + config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, operationType); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java index bff85e7fe..7a3549c9e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java @@ -58,12 +58,14 @@ public class SparkInsertOverwriteCommitActionExecutor> execute() { return SparkWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table, - config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false); + config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, operationType); } @Override protected Partitioner getPartitioner(WorkloadProfile profile) { - return new SparkInsertOverwritePartitioner(profile, context, table, config); + return table.getStorageLayout().layoutPartitionerClass() + .map(c -> getLayoutPartitioner(profile, c)) + .orElse(new SparkInsertOverwritePartitioner(profile, context, table, config)); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkUpsertCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkUpsertCommitActionExecutor.java index fe90212b0..c914384cb 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkUpsertCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkUpsertCommitActionExecutor.java @@ -44,6 +44,6 @@ public class SparkUpsertCommitActionExecutor> @Override public HoodieWriteMetadata> execute() { return SparkWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table, - config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(), this, true); + config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(), this, operationType); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index a30fe302b..6729da72d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -37,7 +37,6 @@ import org.apache.hudi.table.WorkloadProfile; import org.apache.hudi.table.WorkloadStat; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; @@ -57,7 +56,7 @@ import scala.Tuple2; /** * Packs incoming records to be upserted, into buckets (1 bucket = 1 RDD partition). */ -public class UpsertPartitioner> extends Partitioner { +public class UpsertPartitioner> extends SparkHoodiePartitioner { private static final Logger LOG = LogManager.getLogger(UpsertPartitioner.class); @@ -69,10 +68,6 @@ public class UpsertPartitioner> extends Partiti * Total number of RDD partitions, is determined by total buckets we want to pack the incoming workload into. */ private int totalBuckets = 0; - /** - * Stat for the current workload. Helps in determining inserts, upserts etc. - */ - private WorkloadProfile profile; /** * Helps decide which bucket an incoming update should go to. */ @@ -86,17 +81,14 @@ public class UpsertPartitioner> extends Partiti */ private HashMap bucketInfoMap; - protected final HoodieTable table; - protected final HoodieWriteConfig config; public UpsertPartitioner(WorkloadProfile profile, HoodieEngineContext context, HoodieTable table, HoodieWriteConfig config) { + super(profile, table); updateLocationToBucket = new HashMap<>(); partitionPathToInsertBucketInfos = new HashMap<>(); bucketInfoMap = new HashMap<>(); - this.profile = profile; - this.table = table; this.config = config; assignUpdates(profile); assignInserts(profile, context); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/AbstractSparkDeltaCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/AbstractSparkDeltaCommitActionExecutor.java index c92cd9284..3b3edd308 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/AbstractSparkDeltaCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/AbstractSparkDeltaCommitActionExecutor.java @@ -74,8 +74,8 @@ public abstract class AbstractSparkDeltaCommitActionExecutor> handleUpdate(String partitionPath, String fileId, Iterator> recordItr) throws IOException { LOG.info("Merging updates for commit " + instantTime + " for file " + fileId); - - if (!table.getIndex().canIndexLogFiles() && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) { + if (!table.getIndex().canIndexLogFiles() && mergeOnReadUpsertPartitioner != null + && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) { LOG.info("Small file corrections for updates for commit " + instantTime + " for file " + fileId); return super.handleUpdate(partitionPath, fileId, recordItr); } else { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkInsertDeltaCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkInsertDeltaCommitActionExecutor.java index fcaedee11..7dd91710d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkInsertDeltaCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkInsertDeltaCommitActionExecutor.java @@ -45,6 +45,6 @@ public class SparkInsertDeltaCommitActionExecutor> execute() { return SparkWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table, - config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(),this, false); + config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(),this, operationType); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java index 82aa08152..c6f3901a3 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java @@ -44,6 +44,6 @@ public class SparkUpsertDeltaCommitActionExecutor writeStatues = writeClient.upsert(writeRecords, newCommitTime); Assertions.assertNoWriteErrors(writeStatues.collect()); - + List fileIds = writeStatues.map(WriteStatus::getFileId).collect(); // commit this upsert writeClient.commit(newCommitTime, writeStatues); HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient); @@ -249,7 +257,6 @@ public class TestHoodieIndex extends HoodieClientTestHarness { assert (javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == totalRecords); // check tagged records are tagged with correct fileIds - List fileIds = writeStatues.map(WriteStatus::getFileId).collect(); assert (javaRDD.filter(record -> record.getCurrentLocation().getFileId() == null).collect().size() == 0); List taggedFileIds = javaRDD.map(record -> record.getCurrentLocation().getFileId()).distinct().collect(); @@ -474,7 +481,6 @@ public class TestHoodieIndex extends HoodieClientTestHarness { .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build()) .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).build()) .forTable("test-trip-table") - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()) .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build()); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndexConfigs.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndexConfigs.java index 2fb364187..665e3a6a8 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndexConfigs.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndexConfigs.java @@ -32,6 +32,7 @@ import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.index.bloom.HoodieBloomIndex; import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex; +import org.apache.hudi.index.bucket.HoodieBucketIndex; import org.apache.hudi.index.hbase.SparkHoodieHBaseIndex; import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex; import org.apache.hudi.index.simple.HoodieSimpleIndex; @@ -60,7 +61,7 @@ public class TestHoodieIndexConfigs { } @ParameterizedTest - @EnumSource(value = IndexType.class, names = {"BLOOM", "GLOBAL_BLOOM", "SIMPLE", "GLOBAL_SIMPLE", "HBASE"}) + @EnumSource(value = IndexType.class, names = {"BLOOM", "GLOBAL_BLOOM", "SIMPLE", "GLOBAL_SIMPLE", "HBASE", "BUCKET"}) public void testCreateIndex(IndexType indexType) throws Exception { HoodieWriteConfig config; HoodieWriteConfig.Builder clientConfigBuilder = HoodieWriteConfig.newBuilder(); @@ -93,6 +94,11 @@ public class TestHoodieIndexConfigs { .build(); assertTrue(SparkHoodieIndexFactory.createIndex(config) instanceof SparkHoodieHBaseIndex); break; + case BUCKET: + config = clientConfigBuilder.withPath(basePath) + .withIndexConfig(indexConfigBuilder.withIndexType(IndexType.BUCKET).build()).build(); + assertTrue(SparkHoodieIndexFactory.createIndex(config) instanceof HoodieBucketIndex); + break; default: // no -op. just for checkstyle errors } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java new file mode 100644 index 000000000..6cf29df51 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import java.util.Arrays; +import java.util.List; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.keygen.KeyGenUtils; +import org.apache.hudi.testutils.KeyGeneratorTestUtilities; +import org.junit.jupiter.api.Test; + +public class TestBucketIdentifier { + + @Test + public void testBucketFileId() { + for (int i = 0; i < 1000; i++) { + String bucketId = BucketIdentifier.bucketIdStr(i); + String fileId = BucketIdentifier.newBucketFileIdPrefix(bucketId); + assert BucketIdentifier.bucketIdFromFileId(fileId) == i; + } + } + + @Test + public void testBucketIdWithSimpleRecordKey() { + String recordKeyField = "_row_key"; + String indexKeyField = "_row_key"; + GenericRecord record = KeyGeneratorTestUtilities.getRecord(); + HoodieRecord hoodieRecord = new HoodieRecord( + new HoodieKey(KeyGenUtils.getRecordKey(record, recordKeyField), ""), null); + int bucketId = BucketIdentifier.getBucketId(hoodieRecord, indexKeyField, 8); + assert bucketId == BucketIdentifier.getBucketId( + Arrays.asList(record.get(indexKeyField).toString()), 8); + } + + @Test + public void testBucketIdWithComplexRecordKey() { + List recordKeyField = Arrays.asList("_row_key","ts_ms"); + String indexKeyField = "_row_key"; + GenericRecord record = KeyGeneratorTestUtilities.getRecord(); + HoodieRecord hoodieRecord = new HoodieRecord( + new HoodieKey(KeyGenUtils.getRecordKey(record, recordKeyField), ""), null); + int bucketId = BucketIdentifier.getBucketId(hoodieRecord, indexKeyField, 8); + assert bucketId == BucketIdentifier.getBucketId( + Arrays.asList(record.get(indexKeyField).toString()), 8); + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieBucketIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieBucketIndex.java new file mode 100644 index 000000000..c79f9aec7 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieBucketIndex.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.avro.Schema; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.testutils.RawTripTestPayload; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.data.HoodieJavaRDD; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.testutils.HoodieClientTestHarness; +import org.apache.hudi.testutils.HoodieSparkWriteableTestTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Properties; +import java.util.UUID; + +import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestHoodieBucketIndex extends HoodieClientTestHarness { + + private static final Logger LOG = LogManager.getLogger(TestHoodieBucketIndex.class); + private static final Schema SCHEMA = getSchemaFromResource(TestHoodieBucketIndex.class, "/exampleSchema.avsc", true); + private static final int NUM_BUCKET = 8; + + @BeforeEach + public void setUp() throws Exception { + initSparkContexts(); + initPath(); + initFileSystem(); + // We have some records to be tagged (two different partitions) + initMetaClient(); + } + + @AfterEach + public void tearDown() throws Exception { + cleanupResources(); + } + + @Test + public void testBucketIndexValidityCheck() { + Properties props = new Properties(); + props.setProperty(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key(), "_row_key"); + assertThrows(HoodieIndexException.class, () -> { + HoodieIndexConfig.newBuilder().fromProperties(props) + .withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("8").build(); + }); + props.setProperty(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key(), "uuid"); + HoodieIndexConfig.newBuilder().fromProperties(props) + .withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("8").build(); + } + + @Test + public void testTagLocation() throws Exception { + String rowKey1 = UUID.randomUUID().toString(); + String rowKey2 = UUID.randomUUID().toString(); + String rowKey3 = UUID.randomUUID().toString(); + String recordStr1 = "{\"_row_key\":\"" + rowKey1 + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; + String recordStr2 = "{\"_row_key\":\"" + rowKey2 + "\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; + String recordStr3 = "{\"_row_key\":\"" + rowKey3 + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}"; + String recordStr4 = "{\"_row_key\":\"" + rowKey1 + "\",\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}"; + RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1); + HoodieRecord record1 = new HoodieRecord( + new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1); + RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2); + HoodieRecord record2 = new HoodieRecord( + new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2); + RawTripTestPayload rowChange3 = new RawTripTestPayload(recordStr3); + HoodieRecord record3 = new HoodieRecord( + new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3); + RawTripTestPayload rowChange4 = new RawTripTestPayload(recordStr4); + HoodieRecord record4 = new HoodieRecord( + new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4); + JavaRDD recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record4)); + + HoodieWriteConfig config = makeConfig(); + HoodieTable table = HoodieSparkTable.create(config, context, metaClient); + HoodieBucketIndex bucketIndex = new HoodieBucketIndex(config); + HoodieData taggedRecordRDD = bucketIndex.tagLocation(HoodieJavaRDD.of(recordRDD), context, table); + assertFalse(taggedRecordRDD.collectAsList().stream().anyMatch(r -> r.isCurrentLocationKnown())); + + HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(table, SCHEMA); + testTable.addCommit("001").withInserts("2016/01/31", getRecordFileId(record1), record1); + testTable.addCommit("002").withInserts("2016/01/31", getRecordFileId(record2), record2); + testTable.addCommit("003").withInserts("2016/01/31", getRecordFileId(record3), record3); + taggedRecordRDD = bucketIndex.tagLocation(HoodieJavaRDD.of(recordRDD), context, + HoodieSparkTable.create(config, context, metaClient)); + assertFalse(taggedRecordRDD.collectAsList().stream().filter(r -> r.isCurrentLocationKnown()) + .filter(r -> BucketIdentifier.bucketIdFromFileId(r.getCurrentLocation().getFileId()) + != getRecordBucketId(r)).findAny().isPresent()); + assertTrue(taggedRecordRDD.collectAsList().stream().filter(r -> r.getPartitionPath().equals("2015/01/31") + && !r.isCurrentLocationKnown()).count() == 1L); + } + + private HoodieWriteConfig makeConfig() { + Properties props = new Properties(); + props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key"); + return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(SCHEMA.toString()) + .withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props) + .withIndexType(HoodieIndex.IndexType.BUCKET) + .withIndexKeyField("_row_key") + .withBucketNum(String.valueOf(NUM_BUCKET)).build()).build(); + } + + private String getRecordFileId(HoodieRecord record) { + return BucketIdentifier.bucketIdStr( + BucketIdentifier.getBucketId(record, "_row_key", NUM_BUCKET)); + } + + private int getRecordBucketId(HoodieRecord record) { + return BucketIdentifier + .getBucketId(record, "_row_key", NUM_BUCKET); + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java index 91836bcad..cba77b0c7 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java @@ -33,14 +33,19 @@ import org.apache.hudi.common.testutils.Transformations; import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieLayoutConfig; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.hadoop.utils.HoodieHiveUtils; +import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.io.HoodieCreateHandle; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.table.HoodieSparkCopyOnWriteTable; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.storage.HoodieStorageLayout; import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.hudi.testutils.MetadataMergeWriteStatus; @@ -58,6 +63,8 @@ import org.apache.spark.TaskContext; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import java.io.File; @@ -67,7 +74,9 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.UUID; +import java.util.stream.Stream; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime; @@ -83,6 +92,13 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { private static final Logger LOG = LogManager.getLogger(TestCopyOnWriteActionExecutor.class); private static final Schema SCHEMA = getSchemaFromResource(TestCopyOnWriteActionExecutor.class, "/exampleSchema.avsc"); + private static final Stream indexType() { + HoodieIndex.IndexType[] data = new HoodieIndex.IndexType[] { + HoodieIndex.IndexType.BLOOM, + HoodieIndex.IndexType.BUCKET + }; + return Stream.of(data).map(Arguments::of); + } @Test public void testMakeNewPath() { @@ -118,11 +134,29 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { .withRemoteServerPort(timelineServicePort).build()); } + private Properties makeIndexConfig(HoodieIndex.IndexType indexType) { + Properties props = new Properties(); + HoodieIndexConfig.Builder indexConfig = HoodieIndexConfig.newBuilder() + .withIndexType(indexType); + props.putAll(indexConfig.build().getProps()); + if (indexType.equals(HoodieIndex.IndexType.BUCKET)) { + props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key"); + indexConfig.fromProperties(props).withIndexKeyField("_row_key").withBucketNum("1"); + props.putAll(indexConfig.build().getProps()); + props.putAll(HoodieLayoutConfig.newBuilder().fromProperties(props) + .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()) + .withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build().getProps()); + } + return props; + } + // TODO (weiy): Add testcases for crossing file writing. - @Test - public void testUpdateRecords() throws Exception { + @ParameterizedTest + @MethodSource("indexType") + public void testUpdateRecords(HoodieIndex.IndexType indexType) throws Exception { // Prepare the AvroParquetIO - HoodieWriteConfig config = makeHoodieClientConfig(); + HoodieWriteConfig config = makeHoodieClientConfigBuilder() + .withProps(makeIndexConfig(indexType)).build(); String firstCommitTime = makeNewCommitTime(); SparkRDDWriteClient writeClient = getHoodieWriteClient(config); writeClient.startCommitWithTime(firstCommitTime); @@ -168,7 +202,6 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { GenericRecord newRecord; int index = 0; for (GenericRecord record : fileRecords) { - //System.out.println("Got :" + record.get("_row_key").toString() + ", Exp :" + records.get(index).getRecordKey()); assertEquals(records.get(index).getRecordKey(), record.get("_row_key").toString()); index++; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java index 9c142ee3a..997f8a31e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java @@ -139,6 +139,12 @@ public class HoodieConfig implements Serializable { return rawValue.map(v -> Integer.parseInt(v.toString())).orElse(null); } + public Integer getIntOrDefault(ConfigProperty configProperty) { + Option rawValue = getRawValue(configProperty); + return rawValue.map(v -> Integer.parseInt(v.toString())) + .orElse((Integer) configProperty.defaultValue()); + } + public Boolean getBoolean(ConfigProperty configProperty) { Option rawValue = getRawValue(configProperty); return rawValue.map(v -> Boolean.parseBoolean(v.toString())).orElse(null); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index a2d210e2c..1e076b196 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -44,7 +44,6 @@ import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; -import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.util.DataTypeUtils; import org.apache.log4j.LogManager; @@ -185,7 +184,6 @@ public class DataSourceUtils { } return builder.forTable(tblName) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.BLOOM).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withPayloadClass(parameters.get(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key())) .withInlineCompaction(inlineCompact).build()) @@ -309,6 +307,10 @@ public class DataSourceUtils { DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE().defaultValue())); hiveSyncConfig.supportTimestamp = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SUPPORT_TIMESTAMP_TYPE().key(), DataSourceWriteOptions.HIVE_SUPPORT_TIMESTAMP_TYPE().defaultValue())); + hiveSyncConfig.bucketSpec = props.getBoolean(DataSourceWriteOptions.HIVE_SYNC_BUCKET_SYNC().key(), + (boolean) DataSourceWriteOptions.HIVE_SYNC_BUCKET_SYNC().defaultValue()) + ? HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key()), + props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())) : null; return hiveSyncConfig; } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 51bcd882d..dbcc847fc 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -526,6 +526,12 @@ object DataSourceWriteOptions { .noDefaultValue() .withDocumentation("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql.") + val HIVE_SYNC_BUCKET_SYNC: ConfigProperty[Boolean] = ConfigProperty + .key("hoodie.datasource.hive_sync.bucket_sync") + .defaultValue(false) + .withDocumentation("Whether sync hive metastore bucket specification when using bucket index." + + "The specification is 'CLUSTERED BY (trace_id) SORTED BY (trace_id ASC) INTO 65536 BUCKETS'") + // Async Compaction - Enabled by default for MOR val ASYNC_COMPACT_ENABLE: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.compaction.async.enable") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceWithBucket.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceWithBucket.scala new file mode 100644 index 000000000..57ebd038f --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceWithBucket.scala @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import scala.collection.JavaConversions._ + +import org.apache.hudi.common.testutils.HoodieTestDataGenerator +import org.apache.hudi.config.{HoodieIndexConfig, HoodieLayoutConfig, HoodieWriteConfig} +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.index.HoodieIndex.IndexType +import org.apache.hudi.keygen.constant.KeyGeneratorOptions +import org.apache.hudi.testutils.HoodieClientTestBase + +import org.apache.spark.sql._ +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} + +import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner +import org.apache.hudi.table.storage.HoodieStorageLayout + +/** + * + */ +class TestDataSourceForBucketIndex extends HoodieClientTestBase { + + var spark: SparkSession = null + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + "hoodie.bulkinsert.shuffle.parallelism" -> "2", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + HoodieIndexConfig.INDEX_TYPE.key -> IndexType.BUCKET.name, + HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key -> "8", + KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key -> "_row_key", + HoodieLayoutConfig.LAYOUT_TYPE.key -> HoodieStorageLayout.LayoutType.BUCKET.name, + HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME.key -> classOf[SparkBucketIndexPartitioner[_]].getName + ) + + @BeforeEach override def setUp(): Unit = { + initPath() + initSparkContexts() + spark = sqlContext.sparkSession + initTestDataGenerator() + initFileSystem() + } + + @AfterEach override def tearDown(): Unit = { + cleanupSparkContexts() + cleanupTestDataGenerator() + cleanupFileSystem() + } + + @Test def testDoubleInsert(): Unit = { + val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList + val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) + val records2 = recordsToStrings(dataGen.generateInserts("002", 100)).toList + val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + inputDF2.write.format("org.apache.hudi") + .options(commonOpts) + .option("hoodie.compact.inline", "false") + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + val hudiSnapshotDF1 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .load(basePath + "/*/*/*/*") + assertEquals(200, hudiSnapshotDF1.count()) + } + + @Test def testCountWithBucketIndex(): Unit = { + // First Operation: + // Producing parquet files to three default partitions. + // SNAPSHOT view on MOR table with parquet files only. + val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList + val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OVERWRITE_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) + val hudiSnapshotDF1 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .load(basePath + "/*/*/*/*") + assertEquals(100, hudiSnapshotDF1.count()) // still 100, since we only updated + + // Second Operation: + // Upsert the update to the default partitions with duplicate records. Produced a log file for each parquet. + // SNAPSHOT view should read the log files only with the latest commit time. + val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList + val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + inputDF2.write.format("org.apache.hudi") + .options(commonOpts) + .mode(SaveMode.Append) + .save(basePath) + val hudiSnapshotDF2 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .load(basePath + "/*/*/*/*") + assertEquals(100, hudiSnapshotDF2.count()) // still 100, since we only updated + val commit1Time = hudiSnapshotDF1.select("_hoodie_commit_time").head().get(0).toString + val commit2Time = hudiSnapshotDF2.select("_hoodie_commit_time").head().get(0).toString + assertEquals(hudiSnapshotDF2.select("_hoodie_commit_time").distinct().count(), 1) + assertTrue(commit2Time > commit1Time) + assertEquals(100, hudiSnapshotDF2.join(hudiSnapshotDF1, Seq("_hoodie_record_key"), "left").count()) + + val partitionPaths = new Array[String](1) + partitionPaths.update(0, "2020/01/10") + val newDataGen = new HoodieTestDataGenerator(partitionPaths) + val records4 = recordsToStrings(newDataGen.generateInserts("004", 100)).toList + val inputDF4: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records4, 2)) + inputDF4.write.format("org.apache.hudi") + .options(commonOpts) + .mode(SaveMode.Append) + .save(basePath) + val hudiSnapshotDF4 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .load(basePath + "/*/*/*/*") + // 200, because we insert 100 records to a new partition + assertEquals(200, hudiSnapshotDF4.count()) + assertEquals(100, + hudiSnapshotDF1.join(hudiSnapshotDF4, Seq("_hoodie_record_key"), "inner").count()) + } +} diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java index 9b6385120..8dc3c6e0e 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java @@ -70,6 +70,9 @@ public class HiveSyncConfig implements Serializable { + "org.apache.hudi input format.") public Boolean usePreApacheInputFormat = false; + @Parameter(names = {"--bucket-spec"}, description = "bucket spec stored in metastore", required = false) + public String bucketSpec; + @Deprecated @Parameter(names = {"--use-jdbc"}, description = "Hive jdbc connect url") public Boolean useJdbc = true; @@ -135,6 +138,7 @@ public class HiveSyncConfig implements Serializable { newConfig.partitionValueExtractorClass = cfg.partitionValueExtractorClass; newConfig.jdbcUrl = cfg.jdbcUrl; newConfig.tableName = cfg.tableName; + newConfig.bucketSpec = cfg.bucketSpec; newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat; newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata; newConfig.supportTimestamp = cfg.supportTimestamp; @@ -155,6 +159,7 @@ public class HiveSyncConfig implements Serializable { return "HiveSyncConfig{" + "databaseName='" + databaseName + '\'' + ", tableName='" + tableName + '\'' + + ", bucketSpec='" + bucketSpec + '\'' + ", baseFileFormat='" + baseFileFormat + '\'' + ", hiveUser='" + hiveUser + '\'' + ", hivePass='" + hivePass + '\'' @@ -181,4 +186,8 @@ public class HiveSyncConfig implements Serializable { + ", isConditionalSync=" + isConditionalSync + '}'; } + + public static String getBucketSpec(String bucketCols, int bucketNum) { + return "CLUSTERED BY (" + bucketCols + " INTO " + bucketNum + " BUCKETS"; + } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java index 7cef6abf8..2d700596f 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java @@ -471,6 +471,9 @@ public class HiveSchemaUtil { if (!config.partitionFields.isEmpty()) { sb.append(" PARTITIONED BY (").append(partitionsStr).append(")"); } + if (config.bucketSpec != null) { + sb.append(' ' + config.bucketSpec + ' '); + } sb.append(" ROW FORMAT SERDE '").append(serdeClass).append("'"); if (serdeProperties != null && !serdeProperties.isEmpty()) { sb.append(" WITH SERDEPROPERTIES (").append(propertyToString(serdeProperties)).append(")");