[HUDI-1951] Add bucket hash index, compatible with the hive bucket (#3173)
* [HUDI-2154] Add index key field to HoodieKey * [HUDI-2157] Add the bucket index and its read/write implemention of Spark engine. * revert HUDI-2154 add index key field to HoodieKey * fix all comments and introduce a new tricky way to get index key at runtime support double insert for bucket index * revert spark read optimizer based on bucket index * add the storage layout * index tag, hash function and add ut * fix ut * address partial comments * Code review feedback * add layout config and docs * fix ut * rename hoodie.layout and rebase master Co-authored-by: Vinoth Chandar <vinoth@apache.org>
This commit is contained in:
@@ -44,7 +44,6 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
|
||||
import org.apache.hudi.exception.TableNotFoundException;
|
||||
import org.apache.hudi.hive.HiveSyncConfig;
|
||||
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
|
||||
import org.apache.hudi.index.HoodieIndex.IndexType;
|
||||
import org.apache.hudi.table.BulkInsertPartitioner;
|
||||
import org.apache.hudi.util.DataTypeUtils;
|
||||
import org.apache.log4j.LogManager;
|
||||
@@ -185,7 +184,6 @@ public class DataSourceUtils {
|
||||
}
|
||||
|
||||
return builder.forTable(tblName)
|
||||
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.BLOOM).build())
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||
.withPayloadClass(parameters.get(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key()))
|
||||
.withInlineCompaction(inlineCompact).build())
|
||||
@@ -309,6 +307,10 @@ public class DataSourceUtils {
|
||||
DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE().defaultValue()));
|
||||
hiveSyncConfig.supportTimestamp = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SUPPORT_TIMESTAMP_TYPE().key(),
|
||||
DataSourceWriteOptions.HIVE_SUPPORT_TIMESTAMP_TYPE().defaultValue()));
|
||||
hiveSyncConfig.bucketSpec = props.getBoolean(DataSourceWriteOptions.HIVE_SYNC_BUCKET_SYNC().key(),
|
||||
(boolean) DataSourceWriteOptions.HIVE_SYNC_BUCKET_SYNC().defaultValue())
|
||||
? HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key()),
|
||||
props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())) : null;
|
||||
return hiveSyncConfig;
|
||||
}
|
||||
|
||||
|
||||
@@ -526,6 +526,12 @@ object DataSourceWriteOptions {
|
||||
.noDefaultValue()
|
||||
.withDocumentation("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql.")
|
||||
|
||||
val HIVE_SYNC_BUCKET_SYNC: ConfigProperty[Boolean] = ConfigProperty
|
||||
.key("hoodie.datasource.hive_sync.bucket_sync")
|
||||
.defaultValue(false)
|
||||
.withDocumentation("Whether sync hive metastore bucket specification when using bucket index." +
|
||||
"The specification is 'CLUSTERED BY (trace_id) SORTED BY (trace_id ASC) INTO 65536 BUCKETS'")
|
||||
|
||||
// Async Compaction - Enabled by default for MOR
|
||||
val ASYNC_COMPACT_ENABLE: ConfigProperty[String] = ConfigProperty
|
||||
.key("hoodie.datasource.compaction.async.enable")
|
||||
|
||||
@@ -0,0 +1,153 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.functional
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
|
||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
|
||||
import org.apache.hudi.config.{HoodieIndexConfig, HoodieLayoutConfig, HoodieWriteConfig}
|
||||
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
|
||||
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
|
||||
import org.apache.hudi.index.HoodieIndex.IndexType
|
||||
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
|
||||
import org.apache.hudi.testutils.HoodieClientTestBase
|
||||
|
||||
import org.apache.spark.sql._
|
||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
|
||||
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
|
||||
|
||||
import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner
|
||||
import org.apache.hudi.table.storage.HoodieStorageLayout
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
class TestDataSourceForBucketIndex extends HoodieClientTestBase {
|
||||
|
||||
var spark: SparkSession = null
|
||||
val commonOpts = Map(
|
||||
"hoodie.insert.shuffle.parallelism" -> "4",
|
||||
"hoodie.upsert.shuffle.parallelism" -> "4",
|
||||
"hoodie.bulkinsert.shuffle.parallelism" -> "2",
|
||||
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
|
||||
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
|
||||
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
|
||||
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
|
||||
HoodieIndexConfig.INDEX_TYPE.key -> IndexType.BUCKET.name,
|
||||
HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key -> "8",
|
||||
KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key -> "_row_key",
|
||||
HoodieLayoutConfig.LAYOUT_TYPE.key -> HoodieStorageLayout.LayoutType.BUCKET.name,
|
||||
HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME.key -> classOf[SparkBucketIndexPartitioner[_]].getName
|
||||
)
|
||||
|
||||
@BeforeEach override def setUp(): Unit = {
|
||||
initPath()
|
||||
initSparkContexts()
|
||||
spark = sqlContext.sparkSession
|
||||
initTestDataGenerator()
|
||||
initFileSystem()
|
||||
}
|
||||
|
||||
@AfterEach override def tearDown(): Unit = {
|
||||
cleanupSparkContexts()
|
||||
cleanupTestDataGenerator()
|
||||
cleanupFileSystem()
|
||||
}
|
||||
|
||||
@Test def testDoubleInsert(): Unit = {
|
||||
val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
|
||||
val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
|
||||
inputDF1.write.format("org.apache.hudi")
|
||||
.options(commonOpts)
|
||||
.option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
|
||||
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
|
||||
.mode(SaveMode.Append)
|
||||
.save(basePath)
|
||||
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
|
||||
val records2 = recordsToStrings(dataGen.generateInserts("002", 100)).toList
|
||||
val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
|
||||
inputDF2.write.format("org.apache.hudi")
|
||||
.options(commonOpts)
|
||||
.option("hoodie.compact.inline", "false")
|
||||
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
|
||||
.mode(SaveMode.Append)
|
||||
.save(basePath)
|
||||
val hudiSnapshotDF1 = spark.read.format("org.apache.hudi")
|
||||
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
|
||||
.load(basePath + "/*/*/*/*")
|
||||
assertEquals(200, hudiSnapshotDF1.count())
|
||||
}
|
||||
|
||||
@Test def testCountWithBucketIndex(): Unit = {
|
||||
// First Operation:
|
||||
// Producing parquet files to three default partitions.
|
||||
// SNAPSHOT view on MOR table with parquet files only.
|
||||
val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
|
||||
val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
|
||||
inputDF1.write.format("org.apache.hudi")
|
||||
.options(commonOpts)
|
||||
.option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
|
||||
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OVERWRITE_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
|
||||
.mode(SaveMode.Append)
|
||||
.save(basePath)
|
||||
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
|
||||
val hudiSnapshotDF1 = spark.read.format("org.apache.hudi")
|
||||
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
|
||||
.load(basePath + "/*/*/*/*")
|
||||
assertEquals(100, hudiSnapshotDF1.count()) // still 100, since we only updated
|
||||
|
||||
// Second Operation:
|
||||
// Upsert the update to the default partitions with duplicate records. Produced a log file for each parquet.
|
||||
// SNAPSHOT view should read the log files only with the latest commit time.
|
||||
val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList
|
||||
val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
|
||||
inputDF2.write.format("org.apache.hudi")
|
||||
.options(commonOpts)
|
||||
.mode(SaveMode.Append)
|
||||
.save(basePath)
|
||||
val hudiSnapshotDF2 = spark.read.format("org.apache.hudi")
|
||||
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
|
||||
.load(basePath + "/*/*/*/*")
|
||||
assertEquals(100, hudiSnapshotDF2.count()) // still 100, since we only updated
|
||||
val commit1Time = hudiSnapshotDF1.select("_hoodie_commit_time").head().get(0).toString
|
||||
val commit2Time = hudiSnapshotDF2.select("_hoodie_commit_time").head().get(0).toString
|
||||
assertEquals(hudiSnapshotDF2.select("_hoodie_commit_time").distinct().count(), 1)
|
||||
assertTrue(commit2Time > commit1Time)
|
||||
assertEquals(100, hudiSnapshotDF2.join(hudiSnapshotDF1, Seq("_hoodie_record_key"), "left").count())
|
||||
|
||||
val partitionPaths = new Array[String](1)
|
||||
partitionPaths.update(0, "2020/01/10")
|
||||
val newDataGen = new HoodieTestDataGenerator(partitionPaths)
|
||||
val records4 = recordsToStrings(newDataGen.generateInserts("004", 100)).toList
|
||||
val inputDF4: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records4, 2))
|
||||
inputDF4.write.format("org.apache.hudi")
|
||||
.options(commonOpts)
|
||||
.mode(SaveMode.Append)
|
||||
.save(basePath)
|
||||
val hudiSnapshotDF4 = spark.read.format("org.apache.hudi")
|
||||
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
|
||||
.load(basePath + "/*/*/*/*")
|
||||
// 200, because we insert 100 records to a new partition
|
||||
assertEquals(200, hudiSnapshotDF4.count())
|
||||
assertEquals(100,
|
||||
hudiSnapshotDF1.join(hudiSnapshotDF4, Seq("_hoodie_record_key"), "inner").count())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user