1
0

[HUDI-2149] Ensure and Audit docs for every configuration class in the codebase (#3272)

- Added docs when missing
 - Rewrote, reworded as needed
 - Made couple more classes extend HoodieConfig
This commit is contained in:
vinoth chandar
2021-07-14 10:56:08 -07:00
committed by GitHub
parent c1810f210e
commit 75040ee9e5
28 changed files with 406 additions and 400 deletions

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.client;
import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIClass;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
@@ -40,6 +42,7 @@ import static org.apache.hudi.common.model.DefaultHoodieRecordPayload.METADATA_E
/**
* Status of a write operation.
*/
@PublicAPIClass(maturity = ApiMaturityLevel.STABLE)
public class WriteStatus implements Serializable {
private static final Logger LOG = LogManager.getLogger(WriteStatus.class);

View File

@@ -97,7 +97,7 @@ public class HoodieBootstrapConfig extends HoodieConfig {
.key("hoodie.bootstrap.index.class")
.defaultValue(HFileBootstrapIndex.class.getName())
.sinceVersion("0.6.0")
.withDocumentation("");
.withDocumentation("Implementation to use, for mapping a skeleton base file to a boostrap base file.");
private HoodieBootstrapConfig() {
super();

View File

@@ -31,36 +31,6 @@ import java.util.Properties;
*/
public class HoodieClusteringConfig extends HoodieConfig {
public static final ConfigProperty<String> CLUSTERING_PLAN_STRATEGY_CLASS = ConfigProperty
.key("hoodie.clustering.plan.strategy.class")
.defaultValue("org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy")
.sinceVersion("0.7.0")
.withDocumentation("Config to provide a strategy class to create ClusteringPlan. Class has to be subclass of ClusteringPlanStrategy");
public static final ConfigProperty<String> CLUSTERING_EXECUTION_STRATEGY_CLASS = ConfigProperty
.key("hoodie.clustering.execution.strategy.class")
.defaultValue("org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy")
.sinceVersion("0.7.0")
.withDocumentation("Config to provide a strategy class to execute a ClusteringPlan. Class has to be subclass of RunClusteringStrategy");
public static final ConfigProperty<String> INLINE_CLUSTERING_PROP = ConfigProperty
.key("hoodie.clustering.inline")
.defaultValue("false")
.sinceVersion("0.7.0")
.withDocumentation("Turn on inline clustering - clustering will be run after write operation is complete");
public static final ConfigProperty<String> INLINE_CLUSTERING_MAX_COMMIT_PROP = ConfigProperty
.key("hoodie.clustering.inline.max.commits")
.defaultValue("4")
.sinceVersion("0.7.0")
.withDocumentation("Config to control frequency of inline clustering");
public static final ConfigProperty<String> ASYNC_CLUSTERING_MAX_COMMIT_PROP = ConfigProperty
.key("hoodie.clustering.async.max.commits")
.defaultValue("4")
.sinceVersion("0.9.0")
.withDocumentation("Config to control frequency of async clustering");
// Any strategy specific params can be saved with this prefix
public static final String CLUSTERING_STRATEGY_PARAM_PREFIX = "hoodie.clustering.plan.strategy.";
@@ -70,6 +40,40 @@ public class HoodieClusteringConfig extends HoodieConfig {
.sinceVersion("0.7.0")
.withDocumentation("Number of partitions to list to create ClusteringPlan");
public static final ConfigProperty<String> CLUSTERING_PLAN_STRATEGY_CLASS = ConfigProperty
.key("hoodie.clustering.plan.strategy.class")
.defaultValue("org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy")
.sinceVersion("0.7.0")
.withDocumentation("Config to provide a strategy class (subclass of ClusteringPlanStrategy) to create clustering plan "
+ "i.e select what file groups are being clustered. Default strategy, looks at the last N (determined by "
+ CLUSTERING_TARGET_PARTITIONS.key() + ") day based partitions picks the small file slices within those partitions.");
public static final ConfigProperty<String> CLUSTERING_EXECUTION_STRATEGY_CLASS = ConfigProperty
.key("hoodie.clustering.execution.strategy.class")
.defaultValue("org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy")
.sinceVersion("0.7.0")
.withDocumentation("Config to provide a strategy class (subclass of RunClusteringStrategy) to define how the "
+ " clustering plan is executed. By default, we sort the file groups in th plan by the specified columns, while "
+ " meeting the configured target file sizes.");
public static final ConfigProperty<String> INLINE_CLUSTERING_PROP = ConfigProperty
.key("hoodie.clustering.inline")
.defaultValue("false")
.sinceVersion("0.7.0")
.withDocumentation("Turn on inline clustering - clustering will be run after each write operation is complete");
public static final ConfigProperty<String> INLINE_CLUSTERING_MAX_COMMIT_PROP = ConfigProperty
.key("hoodie.clustering.inline.max.commits")
.defaultValue("4")
.sinceVersion("0.7.0")
.withDocumentation("Config to control frequency of clustering planning");
public static final ConfigProperty<String> ASYNC_CLUSTERING_MAX_COMMIT_PROP = ConfigProperty
.key("hoodie.clustering.async.max.commits")
.defaultValue("4")
.sinceVersion("0.9.0")
.withDocumentation("Config to control frequency of async clustering");
public static final ConfigProperty<String> CLUSTERING_PLAN_SMALL_FILE_LIMIT = ConfigProperty
.key(CLUSTERING_STRATEGY_PARAM_PREFIX + "small.file.limit")
.defaultValue(String.valueOf(600 * 1024 * 1024L))
@@ -80,7 +84,7 @@ public class HoodieClusteringConfig extends HoodieConfig {
.key(CLUSTERING_STRATEGY_PARAM_PREFIX + "max.bytes.per.group")
.defaultValue(String.valueOf(2 * 1024 * 1024 * 1024L))
.sinceVersion("0.7.0")
.withDocumentation("Each clustering operation can create multiple groups. Total amount of data processed by clustering operation"
.withDocumentation("Each clustering operation can create multiple output file groups. Total amount of data processed by clustering operation"
+ " is defined by below two properties (CLUSTERING_MAX_BYTES_PER_GROUP * CLUSTERING_MAX_NUM_GROUPS)."
+ " Max amount of data to be included in one group");
@@ -92,7 +96,7 @@ public class HoodieClusteringConfig extends HoodieConfig {
public static final ConfigProperty<String> CLUSTERING_TARGET_FILE_MAX_BYTES = ConfigProperty
.key(CLUSTERING_STRATEGY_PARAM_PREFIX + "target.file.max.bytes")
.defaultValue(String.valueOf(1 * 1024 * 1024 * 1024L))
.defaultValue(String.valueOf(1024 * 1024 * 1024L))
.sinceVersion("0.7.0")
.withDocumentation("Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups");
@@ -106,13 +110,14 @@ public class HoodieClusteringConfig extends HoodieConfig {
.key("hoodie.clustering.updates.strategy")
.defaultValue("org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy")
.sinceVersion("0.7.0")
.withDocumentation("When file groups is in clustering, need to handle the update to these file groups. Default strategy just reject the update");
.withDocumentation("Determines how to handle updates, deletes to file groups that are under clustering."
+ " Default strategy just rejects the update");
public static final ConfigProperty<String> ASYNC_CLUSTERING_ENABLE_OPT_KEY = ConfigProperty
.key("hoodie.clustering.async.enabled")
.defaultValue("false")
.sinceVersion("0.7.0")
.withDocumentation("Async clustering");
.withDocumentation("Enable running of clustering service, asynchronously as inserts happen on the table.");
private HoodieClusteringConfig() {
super();

View File

@@ -33,7 +33,9 @@ import javax.annotation.concurrent.Immutable;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;
import java.util.stream.Collectors;
/**
* Compaction related config.
@@ -41,101 +43,114 @@ import java.util.Properties;
@Immutable
public class HoodieCompactionConfig extends HoodieConfig {
public static final ConfigProperty<String> CLEANER_POLICY_PROP = ConfigProperty
.key("hoodie.cleaner.policy")
.defaultValue(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
.withDocumentation("Cleaning policy to be used. Hudi will delete older versions of parquet files to re-claim space."
+ " Any Query/Computation referring to this version of the file will fail. "
+ "It is good to make sure that the data is retained for more than the maximum query execution time.");
public static final ConfigProperty<String> AUTO_CLEAN_PROP = ConfigProperty
.key("hoodie.clean.automatic")
.defaultValue("true")
.withDocumentation("Should cleanup if there is anything to cleanup immediately after the commit");
.withDocumentation("When enabled, the cleaner table service is invoked immediately after each commit,"
+ " to delete older file slices. It's recommended to enable this, to ensure metadata and data storage"
+ " growth is bounded.");
public static final ConfigProperty<String> ASYNC_CLEAN_PROP = ConfigProperty
.key("hoodie.clean.async")
.defaultValue("false")
.withDocumentation("Only applies when #withAutoClean is turned on. When turned on runs cleaner async with writing.");
public static final ConfigProperty<String> INLINE_COMPACT_PROP = ConfigProperty
.key("hoodie.compact.inline")
.defaultValue("false")
.withDocumentation("When set to true, compaction is triggered by the ingestion itself, "
+ "right after a commit/deltacommit action as part of insert/upsert/bulk_insert");
public static final ConfigProperty<String> INLINE_COMPACT_NUM_DELTA_COMMITS_PROP = ConfigProperty
.key("hoodie.compact.inline.max.delta.commits")
.defaultValue("5")
.withDocumentation("Number of max delta commits to keep before triggering an inline compaction");
public static final ConfigProperty<String> INLINE_COMPACT_TIME_DELTA_SECONDS_PROP = ConfigProperty
.key("hoodie.compact.inline.max.delta.seconds")
.defaultValue(String.valueOf(60 * 60))
.withDocumentation("Run a compaction when time elapsed > N seconds since last compaction");
public static final ConfigProperty<String> INLINE_COMPACT_TRIGGER_STRATEGY_PROP = ConfigProperty
.key("hoodie.compact.inline.trigger.strategy")
.defaultValue(CompactionTriggerStrategy.NUM_COMMITS.name())
.withDocumentation("");
public static final ConfigProperty<String> CLEANER_FILE_VERSIONS_RETAINED_PROP = ConfigProperty
.key("hoodie.cleaner.fileversions.retained")
.defaultValue("3")
.withDocumentation("");
.withDocumentation("Only applies when " + AUTO_CLEAN_PROP.key() + " is turned on. "
+ "When turned on runs cleaner async with writing, which can speed up overall write performance.");
public static final ConfigProperty<String> CLEANER_COMMITS_RETAINED_PROP = ConfigProperty
.key("hoodie.cleaner.commits.retained")
.defaultValue("10")
.withDocumentation("Number of commits to retain. So data will be retained for num_of_commits * time_between_commits "
+ "(scheduled). This also directly translates into how much you can incrementally pull on this table");
.withDocumentation("Number of commits to retain, without cleaning. This will be retained for num_of_commits * time_between_commits "
+ "(scheduled). This also directly translates into how much data retention the table supports for incremental queries.");
public static final ConfigProperty<String> CLEANER_POLICY_PROP = ConfigProperty
.key("hoodie.cleaner.policy")
.defaultValue(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
.withDocumentation("Cleaning policy to be used. The cleaner service deletes older file slices files to re-claim space."
+ " By default, cleaner spares the file slices written by the last N commits, determined by " + CLEANER_COMMITS_RETAINED_PROP.key()
+ " Long running query plans may often refer to older file slices and will break if those are cleaned, before the query has had"
+ " a chance to run. So, it is good to make sure that the data is retained for more than the maximum query execution time");
public static final ConfigProperty<String> INLINE_COMPACT_PROP = ConfigProperty
.key("hoodie.compact.inline")
.defaultValue("false")
.withDocumentation("When set to true, compaction service is triggered after each write. While being "
+ " simpler operationally, this adds extra latency on the write path.");
public static final ConfigProperty<String> INLINE_COMPACT_NUM_DELTA_COMMITS_PROP = ConfigProperty
.key("hoodie.compact.inline.max.delta.commits")
.defaultValue("5")
.withDocumentation("Number of delta commits after the last compaction, before scheduling of a new compaction is attempted.");
public static final ConfigProperty<String> INLINE_COMPACT_TIME_DELTA_SECONDS_PROP = ConfigProperty
.key("hoodie.compact.inline.max.delta.seconds")
.defaultValue(String.valueOf(60 * 60))
.withDocumentation("Number of elapsed seconds after the last compaction, before scheduling a new one.");
public static final ConfigProperty<String> INLINE_COMPACT_TRIGGER_STRATEGY_PROP = ConfigProperty
.key("hoodie.compact.inline.trigger.strategy")
.defaultValue(CompactionTriggerStrategy.NUM_COMMITS.name())
.withDocumentation("Controls how compaction scheduling is triggered, by time or num delta commits or combination of both. "
+ "Valid options: " + Arrays.stream(CompactionTriggerStrategy.values()).map(Enum::name).collect(Collectors.joining(",")));
public static final ConfigProperty<String> CLEANER_FILE_VERSIONS_RETAINED_PROP = ConfigProperty
.key("hoodie.cleaner.fileversions.retained")
.defaultValue("3")
.withDocumentation("When " + HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name() + " cleaning policy is used, "
+ " the minimum number of file slices to retain in each file group, during cleaning.");
public static final ConfigProperty<String> CLEANER_INCREMENTAL_MODE = ConfigProperty
.key("hoodie.cleaner.incremental.mode")
.defaultValue("true")
.withDocumentation("");
.withDocumentation("When enabled, the plans for each cleaner service run is computed incrementally off the events "
+ " in the timeline, since the last cleaner run. This is much more efficient than obtaining listings for the full"
+ " table for each planning (even with a metadata table).");
public static final ConfigProperty<String> MAX_COMMITS_TO_KEEP_PROP = ConfigProperty
.key("hoodie.keep.max.commits")
.defaultValue("30")
.withDocumentation("Each commit is a small file in the .hoodie directory. Since DFS typically does not favor lots of "
+ "small files, Hudi archives older commits into a sequential log. A commit is published atomically "
+ "by a rename of the commit file.");
.withDocumentation("Archiving service moves older entries from timeline into an archived log after each write, to "
+ " keep the metadata overhead constant, even as the table size grows."
+ "This config controls the maximum number of instants to retain in the active timeline. ");
public static final ConfigProperty<String> MIN_COMMITS_TO_KEEP_PROP = ConfigProperty
.key("hoodie.keep.min.commits")
.defaultValue("20")
.withDocumentation("Each commit is a small file in the .hoodie directory. Since DFS typically does not favor lots of "
+ "small files, Hudi archives older commits into a sequential log. A commit is published atomically "
+ "by a rename of the commit file.");
.withDocumentation("Similar to " + MAX_COMMITS_TO_KEEP_PROP.key() + ", but controls the minimum number of"
+ "instants to retain in the active timeline.");
public static final ConfigProperty<String> COMMITS_ARCHIVAL_BATCH_SIZE_PROP = ConfigProperty
.key("hoodie.commits.archival.batch")
.defaultValue(String.valueOf(10))
.withDocumentation("This controls the number of commit instants read in memory as a batch and archived together.");
.withDocumentation("Archiving of instants is batched in best-effort manner, to pack more instants into a single"
+ " archive log. This config controls such archival batch size.");
public static final ConfigProperty<String> CLEANER_BOOTSTRAP_BASE_FILE_ENABLED = ConfigProperty
.key("hoodie.cleaner.delete.bootstrap.base.file")
.defaultValue("false")
.withDocumentation("Set true to clean bootstrap source files when necessary");
.withDocumentation("When set to true, cleaner also deletes the bootstrap base file when it's skeleton base file is "
+ " cleaned. Turn this to true, if you want to ensure the bootstrap dataset storage is reclaimed over time, as the"
+ " table receives updates/deletes. Another reason to turn this on, would be to ensure data residing in bootstrap "
+ " base files are also physically deleted, to comply with data privacy enforcement processes.");
public static final ConfigProperty<String> PARQUET_SMALL_FILE_LIMIT_BYTES = ConfigProperty
.key("hoodie.parquet.small.file.limit")
.defaultValue(String.valueOf(104857600))
.withDocumentation("Upsert uses this file size to compact new data onto existing files. "
+ "By default, treat any file <= 100MB as a small file.");
.withDocumentation("During upsert operation, we opportunistically expand existing small files on storage, instead of writing"
+ " new files, to keep number of files to an optimum. This config sets the file size limit below which a file on storage "
+ " becomes a candidate to be selected as such a `small file`. By default, treat any file <= 100MB as a small file.");
public static final ConfigProperty<String> RECORD_SIZE_ESTIMATION_THRESHOLD_PROP = ConfigProperty
.key("hoodie.record.size.estimation.threshold")
.defaultValue("1.0")
.withDocumentation("Hudi will use the previous commit to calculate the estimated record size by totalBytesWritten/totalRecordsWritten. "
+ "If the previous commit is too small to make an accurate estimation, Hudi will search commits in the reverse order, "
+ "until find a commit has totalBytesWritten larger than (PARQUET_SMALL_FILE_LIMIT_BYTES * RECORD_SIZE_ESTIMATION_THRESHOLD)");
.withDocumentation("We use the previous commits' metadata to calculate the estimated record size and use it "
+ " to bin pack records into partitions. If the previous commit is too small to make an accurate estimation, "
+ " Hudi will search commits in the reverse order, until we find a commit that has totalBytesWritten "
+ " larger than (PARQUET_SMALL_FILE_LIMIT_BYTES * this_threshold)");
public static final ConfigProperty<String> CLEANER_PARALLELISM = ConfigProperty
.key("hoodie.cleaner.parallelism")
.defaultValue("200")
.withDocumentation("Increase this if cleaning becomes slow.");
.withDocumentation("Parallelism for the cleaning operation. Increase this if cleaning becomes slow.");
// 500GB of target IO per compaction (both read and write
public static final ConfigProperty<String> TARGET_IO_PER_COMPACTION_IN_MB_PROP = ConfigProperty
@@ -161,15 +176,15 @@ public class HoodieCompactionConfig extends HoodieConfig {
public static final ConfigProperty<String> COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = ConfigProperty
.key("hoodie.compaction.lazy.block.read")
.defaultValue("false")
.withDocumentation("When a CompactedLogScanner merges all log files, this config helps to choose whether the logblocks "
+ "should be read lazily or not. Choose true to use I/O intensive lazy block reading (low memory usage) or false "
+ "for Memory intensive immediate block read (high memory usage)");
.withDocumentation("When merging the delta log files, this config helps to choose whether the log blocks "
+ "should be read lazily or not. Choose true to use lazy block reading (low memory usage, but incurs seeks to each block"
+ " header) or false for immediate block read (higher memory usage)");
public static final ConfigProperty<String> COMPACTION_REVERSE_LOG_READ_ENABLED_PROP = ConfigProperty
.key("hoodie.compaction.reverse.log.read")
.defaultValue("false")
.withDocumentation("HoodieLogFormatReader reads a logfile in the forward direction starting from pos=0 to pos=file_length. "
+ "If this config is set to true, the Reader reads the logfile in reverse direction, from pos=file_length to pos=0");
+ "If this config is set to true, the reader reads the logfile in reverse direction, from pos=file_length to pos=0");
public static final ConfigProperty<String> FAILED_WRITES_CLEANER_POLICY_PROP = ConfigProperty
.key("hoodie.cleaner.policy.failed.writes")
@@ -190,22 +205,24 @@ public class HoodieCompactionConfig extends HoodieConfig {
public static final ConfigProperty<String> COPY_ON_WRITE_TABLE_INSERT_SPLIT_SIZE = ConfigProperty
.key("hoodie.copyonwrite.insert.split.size")
.defaultValue(String.valueOf(500000))
.withDocumentation("Number of inserts, that will be put each partition/bucket for writing. "
+ "The rationale to pick the insert parallelism is the following. Writing out 100MB files, "
+ "with at least 1kb records, means 100K records per file. we just over provision to 500K.");
.withDocumentation("Number of inserts assigned for each partition/bucket for writing. "
+ "We based the default on writing out 100MB files, with at least 1kb records (100K records per file), and "
+ " over provision to 500K. As long as auto-tuning of splits is turned on, this only affects the first "
+ " write, where there is no history to learn record sizes from.");
public static final ConfigProperty<String> COPY_ON_WRITE_TABLE_AUTO_SPLIT_INSERTS = ConfigProperty
.key("hoodie.copyonwrite.insert.auto.split")
.defaultValue("true")
.withDocumentation("Config to control whether we control insert split sizes automatically based on average"
+ " record sizes.");
+ " record sizes. It's recommended to keep this turned on, since hand tuning is otherwise extremely"
+ " cumbersome.");
public static final ConfigProperty<String> COPY_ON_WRITE_TABLE_RECORD_SIZE_ESTIMATE = ConfigProperty
.key("hoodie.copyonwrite.record.size.estimate")
.defaultValue(String.valueOf(1024))
.withDocumentation("The average record size. If specified, hudi will use this and not compute dynamically "
+ "based on the last 24 commits metadata. No value set as default. This is critical in computing "
+ "the insert parallelism and bin-packing inserts into small files. See above.");
.withDocumentation("The average record size. If not explicitly specified, hudi will compute the "
+ "record size estimate compute dynamically based on commit metadata. "
+ " This is critical in computing the insert parallelism and bin-packing inserts into small files.");
private HoodieCompactionConfig() {
super();

View File

@@ -48,7 +48,8 @@ public class HoodieHBaseIndexConfig extends HoodieConfig {
public static final ConfigProperty<Integer> HBASE_GET_BATCH_SIZE_PROP = ConfigProperty
.key("hoodie.index.hbase.get.batch.size")
.defaultValue(100)
.withDocumentation("");
.withDocumentation("Controls the batch size for performing gets against HBase. "
+ "Batching improves throughput, by saving round trips.");
public static final ConfigProperty<String> HBASE_ZK_ZNODEPARENT = ConfigProperty
.key("hoodie.index.hbase.zknode.path")
@@ -59,12 +60,14 @@ public class HoodieHBaseIndexConfig extends HoodieConfig {
public static final ConfigProperty<Integer> HBASE_PUT_BATCH_SIZE_PROP = ConfigProperty
.key("hoodie.index.hbase.put.batch.size")
.defaultValue(100)
.withDocumentation("");
.withDocumentation("Controls the batch size for performing puts against HBase. "
+ "Batching improves throughput, by saving round trips.");
public static final ConfigProperty<String> HBASE_INDEX_QPS_ALLOCATOR_CLASS = ConfigProperty
.key("hoodie.index.hbase.qps.allocator.class")
.defaultValue(DefaultHBaseQPSResourceAllocator.class.getName())
.withDocumentation("Property to set which implementation of HBase QPS resource allocator to be used");
.withDocumentation("Property to set which implementation of HBase QPS resource allocator to be used, which"
+ "controls the batching rate dynamically.");
public static final ConfigProperty<String> HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP = ConfigProperty
.key("hoodie.index.hbase.put.batch.size.autocompute")
@@ -90,17 +93,17 @@ public class HoodieHBaseIndexConfig extends HoodieConfig {
public static final ConfigProperty<Boolean> HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY = ConfigProperty
.key("hoodie.index.hbase.dynamic_qps")
.defaultValue(false)
.withDocumentation("Property to decide if HBASE_QPS_FRACTION_PROP is dynamically calculated based on volume");
.withDocumentation("Property to decide if HBASE_QPS_FRACTION_PROP is dynamically calculated based on write volume.");
public static final ConfigProperty<String> HBASE_MIN_QPS_FRACTION_PROP = ConfigProperty
.key("hoodie.index.hbase.min.qps.fraction")
.noDefaultValue()
.withDocumentation("Min for HBASE_QPS_FRACTION_PROP to stabilize skewed volume workloads");
.withDocumentation("Minimum for HBASE_QPS_FRACTION_PROP to stabilize skewed write workloads");
public static final ConfigProperty<String> HBASE_MAX_QPS_FRACTION_PROP = ConfigProperty
.key("hoodie.index.hbase.max.qps.fraction")
.noDefaultValue()
.withDocumentation("Max for HBASE_QPS_FRACTION_PROP to stabilize skewed volume workloads");
.withDocumentation("Maximum for HBASE_QPS_FRACTION_PROP to stabilize skewed write workloads");
public static final ConfigProperty<Integer> HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS = ConfigProperty
.key("hoodie.index.hbase.desired_puts_time_in_secs")
@@ -120,17 +123,18 @@ public class HoodieHBaseIndexConfig extends HoodieConfig {
public static final ConfigProperty<Integer> HOODIE_INDEX_HBASE_ZK_SESSION_TIMEOUT_MS = ConfigProperty
.key("hoodie.index.hbase.zk.session_timeout_ms")
.defaultValue(60 * 1000)
.withDocumentation("");
.withDocumentation("Session timeout value to use for Zookeeper failure detection, for the HBase client."
+ "Lower this value, if you want to fail faster.");
public static final ConfigProperty<Integer> HOODIE_INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS = ConfigProperty
.key("hoodie.index.hbase.zk.connection_timeout_ms")
.defaultValue(15 * 1000)
.withDocumentation("");
.withDocumentation("Timeout to use for establishing connection with zookeeper, from HBase client.");
public static final ConfigProperty<String> HBASE_ZK_PATH_QPS_ROOT = ConfigProperty
.key("hoodie.index.hbase.zkpath.qps_root")
.defaultValue("/QPS_ROOT")
.withDocumentation("");
.withDocumentation("chroot in zookeeper, to use for all qps allocation co-ordination.");
public static final ConfigProperty<Boolean> HBASE_INDEX_UPDATE_PARTITION_PATH = ConfigProperty
.key("hoodie.hbase.index.update.partition.path")

View File

@@ -58,13 +58,12 @@ public class HoodieIndexConfig extends HoodieConfig {
.defaultValue("60000")
.withDocumentation("Only applies if index type is BLOOM. "
+ "This is the number of entries to be stored in the bloom filter. "
+ "We assume the maxParquetFileSize is 128MB and averageRecordSize is 1024B and "
+ "The rationale for the default: Assume the maxParquetFileSize is 128MB and averageRecordSize is 1kb and "
+ "hence we approx a total of 130K records in a file. The default (60000) is roughly half of this approximation. "
+ "HUDI-56 tracks computing this dynamically. Warning: Setting this very low, "
+ "will generate a lot of false positives and index lookup will have to scan a lot more files "
+ "than it has to and Setting this to a very high number will increase the size every data file linearly "
+ "(roughly 4KB for every 50000 entries). "
+ "This config is also used with DYNNAMIC bloom filter which determines the initial size for the bloom.");
+ "Warning: Setting this very low, will generate a lot of false positives and index lookup "
+ "will have to scan a lot more files than it has to and setting this to a very high number will "
+ "increase the size every base file linearly (roughly 4KB for every 50000 entries). "
+ "This config is also used with DYNAMIC bloom filter which determines the initial size for the bloom.");
public static final ConfigProperty<String> BLOOM_FILTER_FPP = ConfigProperty
.key("hoodie.index.bloom.fpp")
@@ -73,16 +72,15 @@ public class HoodieIndexConfig extends HoodieConfig {
+ "Error rate allowed given the number of entries. This is used to calculate how many bits should be "
+ "assigned for the bloom filter and the number of hash functions. This is usually set very low (default: 0.000000001), "
+ "we like to tradeoff disk space for lower false positives. "
+ "If the number of entries added to bloom filter exceeds the congfigured value (hoodie.index.bloom.num_entries), "
+ "If the number of entries added to bloom filter exceeds the configured value (hoodie.index.bloom.num_entries), "
+ "then this fpp may not be honored.");
public static final ConfigProperty<String> BLOOM_INDEX_PARALLELISM_PROP = ConfigProperty
.key("hoodie.bloom.index.parallelism")
.defaultValue("0")
.withDocumentation("Only applies if index type is BLOOM. "
+ "This is the amount of parallelism for index lookup, which involves a Spark Shuffle. "
+ "By default, this is auto computed based on input workload characteristics. "
+ "Disable explicit bloom index parallelism setting by default - hoodie auto computes");
+ "This is the amount of parallelism for index lookup, which involves a shuffle. "
+ "By default, this is auto computed based on input workload characteristics.");
public static final ConfigProperty<String> BLOOM_INDEX_PRUNE_BY_RANGES_PROP = ConfigProperty
.key("hoodie.bloom.index.prune.by.ranges")
@@ -90,7 +88,8 @@ public class HoodieIndexConfig extends HoodieConfig {
.withDocumentation("Only applies if index type is BLOOM. "
+ "When true, range information from files to leveraged speed up index lookups. Particularly helpful, "
+ "if the key has a monotonously increasing prefix, such as timestamp. "
+ "If the record key is completely random, it is better to turn this off.");
+ "If the record key is completely random, it is better to turn this off, since range pruning will only "
+ " add extra overhead to the index lookup.");
public static final ConfigProperty<String> BLOOM_INDEX_USE_CACHING_PROP = ConfigProperty
.key("hoodie.bloom.index.use.caching")
@@ -131,7 +130,7 @@ public class HoodieIndexConfig extends HoodieConfig {
.key("hoodie.simple.index.use.caching")
.defaultValue("true")
.withDocumentation("Only applies if index type is SIMPLE. "
+ "When true, the input RDD will cached to speed up index lookup by reducing IO "
+ "When true, the incoming writes will cached to speed up index lookup by reducing IO "
+ "for computing parallelism or affected partitions");
public static final ConfigProperty<String> SIMPLE_INDEX_PARALLELISM_PROP = ConfigProperty
@@ -187,7 +186,7 @@ public class HoodieIndexConfig extends HoodieConfig {
public static final ConfigProperty<String> SIMPLE_INDEX_UPDATE_PARTITION_PATH = ConfigProperty
.key("hoodie.simple.index.update.partition.path")
.defaultValue("false")
.withDocumentation("");
.withDocumentation("Similar to " + BLOOM_INDEX_UPDATE_PARTITION_PATH + ", but for simple index.");
private EngineType engineType;

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.config;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
/**
@@ -30,13 +31,18 @@ public class HoodieInternalConfig extends HoodieConfig {
public static final String BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED = "hoodie.bulkinsert.are.partitioner.records.sorted";
public static final Boolean DEFAULT_BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED = false;
public static final ConfigProperty<String> BULKINSERT_INPUT_DATA_SCHEMA_DDL = ConfigProperty
.key("hoodie.bulkinsert.schema.ddl")
.noDefaultValue()
.withDocumentation("Schema set for row writer/bulk insert.");
/**
* Returns if partition records are sorted or not.
*
* @param propertyValue value for property BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED.
* @return the property value.
*/
public static Boolean getBulkInsertIsPartitionRecordsSorted(String propertyValue) {
return propertyValue != null ? Boolean.parseBoolean(propertyValue) : DEFAULT_BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED;
}
}

View File

@@ -17,8 +17,8 @@
package org.apache.hudi.config;
import org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy;
import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
import org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy;
import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
@@ -61,94 +61,94 @@ public class HoodieLockConfig extends HoodieConfig {
.key(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY)
.defaultValue(DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS)
.sinceVersion("0.8.0")
.withDocumentation("Parameter used in the exponential backoff retry policy. Stands for the Initial amount "
+ "of time to wait between retries by lock provider client");
.withDocumentation("Initial amount of time to wait between retries to acquire locks, "
+ " subsequent retries will exponentially backoff.");
public static final ConfigProperty<String> LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP = ConfigProperty
.key(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY)
.defaultValue(String.valueOf(5000L))
.sinceVersion("0.8.0")
.withDocumentation("Parameter used in the exponential backoff retry policy. Stands for the maximum amount "
+ "of time to wait between retries by lock provider client");
.withDocumentation("Maximum amount of time to wait between retries by lock provider client. This bounds"
+ " the maximum delay from the exponential backoff. Currently used by ZK based lock provider only.");
public static final ConfigProperty<String> LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP = ConfigProperty
.key(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY)
.defaultValue(String.valueOf(10000L))
.sinceVersion("0.8.0")
.withDocumentation("Amount of time to wait between retries from the hudi client");
.withDocumentation("Amount of time to wait between retries on the lock provider by the lock manager");
public static final ConfigProperty<String> LOCK_ACQUIRE_NUM_RETRIES_PROP = ConfigProperty
.key(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY)
.defaultValue(DEFAULT_LOCK_ACQUIRE_NUM_RETRIES)
.sinceVersion("0.8.0")
.withDocumentation("Maximum number of times to retry by lock provider client");
.withDocumentation("Maximum number of times to retry lock acquire, at each lock provider");
public static final ConfigProperty<String> LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP = ConfigProperty
.key(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY)
.defaultValue(String.valueOf(0))
.sinceVersion("0.8.0")
.withDocumentation("Maximum number of times to retry to acquire lock additionally from the hudi client");
.withDocumentation("Maximum number of times to retry to acquire lock additionally from the lock manager.");
public static final ConfigProperty<Integer> LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP = ConfigProperty
.key(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY)
.defaultValue(60 * 1000)
.sinceVersion("0.8.0")
.withDocumentation("");
.withDocumentation("Timeout in ms, to wait on an individual lock acquire() call, at the lock provider.");
public static final ConfigProperty<String> FILESYSTEM_LOCK_PATH_PROP = ConfigProperty
.key(FILESYSTEM_LOCK_PATH_PROP_KEY)
.noDefaultValue()
.sinceVersion("0.8.0")
.withDocumentation("");
.withDocumentation("For DFS based lock providers, path to store the locks under.");
public static final ConfigProperty<String> HIVE_DATABASE_NAME_PROP = ConfigProperty
.key(HIVE_DATABASE_NAME_PROP_KEY)
.noDefaultValue()
.sinceVersion("0.8.0")
.withDocumentation("The Hive database to acquire lock against");
.withDocumentation("For Hive based lock provider, the Hive database to acquire lock against");
public static final ConfigProperty<String> HIVE_TABLE_NAME_PROP = ConfigProperty
.key(HIVE_TABLE_NAME_PROP_KEY)
.noDefaultValue()
.sinceVersion("0.8.0")
.withDocumentation("The Hive table under the hive database to acquire lock against");
.withDocumentation("For Hive based lock provider, the Hive table to acquire lock against");
public static final ConfigProperty<String> HIVE_METASTORE_URI_PROP = ConfigProperty
.key(HIVE_METASTORE_URI_PROP_KEY)
.noDefaultValue()
.sinceVersion("0.8.0")
.withDocumentation("");
.withDocumentation("For Hive based lock provider, the Hive metastore URI to acquire locks against.");
public static final ConfigProperty<String> ZK_BASE_PATH_PROP = ConfigProperty
.key(ZK_BASE_PATH_PROP_KEY)
.noDefaultValue()
.sinceVersion("0.8.0")
.withDocumentation("The base path on Zookeeper under which to create a ZNode to acquire the lock. "
+ "This should be common for all jobs writing to the same table");
.withDocumentation("The base path on Zookeeper under which to create lock related ZNodes. "
+ "This should be same for all concurrent writers to the same table");
public static final ConfigProperty<Integer> ZK_SESSION_TIMEOUT_MS_PROP = ConfigProperty
.key(ZK_SESSION_TIMEOUT_MS_PROP_KEY)
.defaultValue(DEFAULT_ZK_SESSION_TIMEOUT_MS)
.sinceVersion("0.8.0")
.withDocumentation("How long to wait after losing a connection to ZooKeeper before the session is expired");
.withDocumentation("Timeout in ms, to wait after losing connection to ZooKeeper, before the session is expired");
public static final ConfigProperty<Integer> ZK_CONNECTION_TIMEOUT_MS_PROP = ConfigProperty
.key(ZK_CONNECTION_TIMEOUT_MS_PROP_KEY)
.defaultValue(DEFAULT_ZK_CONNECTION_TIMEOUT_MS)
.sinceVersion("0.8.0")
.withDocumentation("How long to wait when connecting to ZooKeeper before considering the connection a failure");
.withDocumentation("Timeout in ms, to wait for establishing connection with Zookeeper.");
public static final ConfigProperty<String> ZK_CONNECT_URL_PROP = ConfigProperty
.key(ZK_CONNECT_URL_PROP_KEY)
.noDefaultValue()
.sinceVersion("0.8.0")
.withDocumentation("Set the list of comma separated servers to connect to");
.withDocumentation("Zookeeper URL to connect to.");
public static final ConfigProperty<String> ZK_PORT_PROP = ConfigProperty
.key(ZK_PORT_PROP_KEY)
.noDefaultValue()
.sinceVersion("0.8.0")
.withDocumentation("The connection port to be used for Zookeeper");
.withDocumentation("Zookeeper port to connect to.");
public static final ConfigProperty<String> ZK_LOCK_KEY_PROP = ConfigProperty
.key(ZK_LOCK_KEY_PROP_KEY)

View File

@@ -59,22 +59,22 @@ public class HoodieMemoryConfig extends HoodieConfig {
public static final ConfigProperty<Long> MAX_MEMORY_FOR_MERGE_PROP = ConfigProperty
.key("hoodie.memory.merge.max.size")
.defaultValue(DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
.withDocumentation("Property to set the max memory for merge");
.withDocumentation("Maximum amount of memory used for merge operations, before spilling to local storage.");
public static final ConfigProperty<String> MAX_MEMORY_FOR_COMPACTION_PROP = ConfigProperty
.key("hoodie.memory.compaction.max.size")
.noDefaultValue()
.withDocumentation("Property to set the max memory for compaction");
.withDocumentation("Maximum amount of memory used for compaction operations, before spilling to local storage.");
public static final ConfigProperty<Integer> MAX_DFS_STREAM_BUFFER_SIZE_PROP = ConfigProperty
.key("hoodie.memory.dfs.buffer.max.size")
.defaultValue(16 * 1024 * 1024)
.withDocumentation("Property to set the max memory for dfs inputstream buffer size");
.withDocumentation("Property to control the max memory for dfs input stream buffer size");
public static final ConfigProperty<String> SPILLABLE_MAP_BASE_PATH_PROP = ConfigProperty
.key("hoodie.memory.spillable.map.path")
.defaultValue("/tmp/")
.withDocumentation("Default file path prefix for spillable file");
.withDocumentation("Default file path prefix for spillable map");
public static final ConfigProperty<Double> WRITESTATUS_FAILURE_FRACTION_PROP = ConfigProperty
.key("hoodie.memory.writestatus.failure.fraction")

View File

@@ -41,7 +41,7 @@ public class HoodieMetricsDatadogConfig extends HoodieConfig {
.key(DATADOG_PREFIX + ".report.period.seconds")
.defaultValue(30)
.sinceVersion("0.6.0")
.withDocumentation("Datadog report period in seconds. Default to 30.");
.withDocumentation("Datadog reporting period in seconds. Default to 30.");
public static final ConfigProperty<String> DATADOG_API_SITE = ConfigProperty
.key(DATADOG_PREFIX + ".api.site")

View File

@@ -34,19 +34,19 @@ public class HoodieMetricsPrometheusConfig extends HoodieConfig {
.key(PUSHGATEWAY_PREFIX + ".host")
.defaultValue("localhost")
.sinceVersion("0.6.0")
.withDocumentation("");
.withDocumentation("Hostname of the prometheus push gateway");
public static final ConfigProperty<Integer> PUSHGATEWAY_PORT = ConfigProperty
.key(PUSHGATEWAY_PREFIX + ".port")
.defaultValue(9091)
.sinceVersion("0.6.0")
.withDocumentation("");
.withDocumentation("Port for the push gateway.");
public static final ConfigProperty<Integer> PUSHGATEWAY_REPORT_PERIOD_SECONDS = ConfigProperty
.key(PUSHGATEWAY_PREFIX + ".report.period.seconds")
.defaultValue(30)
.sinceVersion("0.6.0")
.withDocumentation("");
.withDocumentation("Reporting interval in seconds.");
public static final ConfigProperty<Boolean> PUSHGATEWAY_DELETE_ON_SHUTDOWN = ConfigProperty
.key(PUSHGATEWAY_PREFIX + ".delete.on.shutdown")
@@ -58,7 +58,7 @@ public class HoodieMetricsPrometheusConfig extends HoodieConfig {
.key(PUSHGATEWAY_PREFIX + ".job.name")
.defaultValue("")
.sinceVersion("0.6.0")
.withDocumentation("");
.withDocumentation("Name of the push gateway job.");
public static final ConfigProperty<Boolean> PUSHGATEWAY_RANDOM_JOB_NAME_SUFFIX = ConfigProperty
.key(PUSHGATEWAY_PREFIX + ".random.job.name.suffix")
@@ -73,7 +73,7 @@ public class HoodieMetricsPrometheusConfig extends HoodieConfig {
.key(PROMETHEUS_PREFIX + ".port")
.defaultValue(9090)
.sinceVersion("0.6.0")
.withDocumentation("");
.withDocumentation("Port for prometheus server.");
private HoodieMetricsPrometheusConfig() {
super();

View File

@@ -37,12 +37,14 @@ public class HoodiePayloadConfig extends HoodieConfig {
public static final ConfigProperty<String> PAYLOAD_ORDERING_FIELD_PROP = ConfigProperty
.key(PAYLOAD_ORDERING_FIELD_PROP_KEY)
.defaultValue("ts")
.withDocumentation("Property to hold the payload ordering field name");
.withDocumentation("Table column/field name to order records that have the same key, before "
+ "merging and writing to storage.");
public static final ConfigProperty<String> PAYLOAD_EVENT_TIME_FIELD_PROP = ConfigProperty
.key(PAYLOAD_EVENT_TIME_FIELD_PROP_KEY)
.defaultValue("ts")
.withDocumentation("Property for payload event time field");
.withDocumentation("Table column/field name to derive timestamp associated with the records. This can"
+ "be useful for e.g, determining the freshness of the table.");
private HoodiePayloadConfig() {
super();

View File

@@ -43,19 +43,19 @@ public class HoodieStorageConfig extends HoodieConfig {
public static final ConfigProperty<String> PARQUET_BLOCK_SIZE_BYTES = ConfigProperty
.key("hoodie.parquet.block.size")
.defaultValue(String.valueOf(120 * 1024 * 1024))
.withDocumentation("Parquet RowGroup size. Its better this is same as the file size, so that a single column "
+ "within a file is stored continuously on disk");
.withDocumentation("Parquet RowGroup size. It's recommended to make this large enough that scan costs can be"
+ " amortized by packing enough column values into a single row group.");
public static final ConfigProperty<String> PARQUET_PAGE_SIZE_BYTES = ConfigProperty
.key("hoodie.parquet.page.size")
.defaultValue(String.valueOf(1 * 1024 * 1024))
.withDocumentation("Parquet page size. Page is the unit of read within a parquet file. "
+ "Within a block, pages are compressed seperately.");
+ "Within a block, pages are compressed separately.");
public static final ConfigProperty<String> ORC_FILE_MAX_BYTES = ConfigProperty
.key("hoodie.orc.max.file.size")
.defaultValue(String.valueOf(120 * 1024 * 1024))
.withDocumentation("");
.withDocumentation("Target file size for ORC base files.");
public static final ConfigProperty<String> ORC_STRIPE_SIZE = ConfigProperty
.key("hoodie.orc.stripe.size")
@@ -65,17 +65,18 @@ public class HoodieStorageConfig extends HoodieConfig {
public static final ConfigProperty<String> ORC_BLOCK_SIZE = ConfigProperty
.key("hoodie.orc.block.size")
.defaultValue(ORC_FILE_MAX_BYTES.defaultValue())
.withDocumentation("File system block size");
.withDocumentation("ORC block size, recommended to be aligned with the target file size.");
public static final ConfigProperty<String> HFILE_FILE_MAX_BYTES = ConfigProperty
.key("hoodie.hfile.max.file.size")
.defaultValue(String.valueOf(120 * 1024 * 1024))
.withDocumentation("");
.withDocumentation("Target file size for HFile base files.");
public static final ConfigProperty<String> HFILE_BLOCK_SIZE_BYTES = ConfigProperty
.key("hoodie.hfile.block.size")
.defaultValue(String.valueOf(1 * 1024 * 1024))
.withDocumentation("");
.defaultValue(String.valueOf(1024 * 1024))
.withDocumentation("Lower values increase the size of metadata tracked within HFile, but can offer potentially "
+ "faster lookup times.");
// used to size log files
public static final ConfigProperty<String> LOGFILE_SIZE_MAX_BYTES = ConfigProperty
@@ -107,12 +108,12 @@ public class HoodieStorageConfig extends HoodieConfig {
public static final ConfigProperty<String> HFILE_COMPRESSION_ALGORITHM = ConfigProperty
.key("hoodie.hfile.compression.algorithm")
.defaultValue("GZ")
.withDocumentation("");
.withDocumentation("Compression codec to use for hfile base files.");
public static final ConfigProperty<String> ORC_COMPRESSION_CODEC = ConfigProperty
.key("hoodie.orc.compression.codec")
.defaultValue("ZLIB")
.withDocumentation("");
.withDocumentation("Compression codec to use for ORC base files.");
// Default compression ratio for log file to parquet, general 3x
public static final ConfigProperty<String> LOGFILE_TO_PARQUET_COMPRESSION_RATIO = ConfigProperty

View File

@@ -36,7 +36,7 @@ public class HoodieWriteCommitCallbackConfig extends HoodieConfig {
.key(CALLBACK_PREFIX + "on")
.defaultValue(false)
.sinceVersion("0.6.0")
.withDocumentation("Turn callback on/off. off by default.");
.withDocumentation("Turn commit callback on/off. off by default.");
public static final ConfigProperty<String> CALLBACK_CLASS_PROP = ConfigProperty
.key(CALLBACK_PREFIX + "class")

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.config;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.bootstrap.BootstrapMode;
import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
@@ -27,8 +26,8 @@ import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
@@ -43,10 +42,13 @@ import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.orc.CompressionKind;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import javax.annotation.concurrent.Immutable;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
@@ -71,7 +73,7 @@ public class HoodieWriteConfig extends HoodieConfig {
public static final ConfigProperty<String> TABLE_NAME = ConfigProperty
.key("hoodie.table.name")
.noDefaultValue()
.withDocumentation("Table name that will be used for registering with Hive. Needs to be same across runs.");
.withDocumentation("Table name that will be used for registering with metastores like HMS. Needs to be same across runs.");
public static final ConfigProperty<String> PRECOMBINE_FIELD_PROP = ConfigProperty
.key("hoodie.datasource.write.precombine.field")
@@ -88,12 +90,14 @@ public class HoodieWriteConfig extends HoodieConfig {
public static final ConfigProperty<String> KEYGENERATOR_CLASS_PROP = ConfigProperty
.key("hoodie.datasource.write.keygenerator.class")
.noDefaultValue()
.withDocumentation("Key generator class, that implements will extract the key out of incoming Row object");
.withDocumentation("Key generator class, that implements `org.apache.hudi.keygen.KeyGenerator` "
+ "extract a key out of incoming records.");
public static final ConfigProperty<String> KEYGENERATOR_TYPE_PROP = ConfigProperty
.key("hoodie.datasource.write.keygenerator.type")
.defaultValue(KeyGeneratorType.SIMPLE.name())
.withDocumentation("Type of build-in key generator, currently support SIMPLE, COMPLEX, TIMESTAMP, CUSTOM, NON_PARTITION, GLOBAL_DELETE");
.withDocumentation("Easily configure one the built-in key generators, instead of specifying the key generator class."
+ "Currently supports SIMPLE, COMPLEX, TIMESTAMP, CUSTOM, NON_PARTITION, GLOBAL_DELETE");
public static final ConfigProperty<String> ROLLBACK_USING_MARKERS = ConfigProperty
.key("hoodie.rollback.using.markers")
@@ -104,206 +108,220 @@ public class HoodieWriteConfig extends HoodieConfig {
public static final ConfigProperty<String> TIMELINE_LAYOUT_VERSION = ConfigProperty
.key("hoodie.timeline.layout.version")
.noDefaultValue()
.withDocumentation("");
.sinceVersion("0.5.1")
.withDocumentation("Controls the layout of the timeline. Version 0 relied on renames, Version 1 (default) models "
+ "the timeline as an immutable log relying only on atomic writes for object storage.");
public static final ConfigProperty<String> BASE_PATH_PROP = ConfigProperty
.key("hoodie.base.path")
.noDefaultValue()
.withDocumentation("Base DFS path under which all the data partitions are created. "
.withDocumentation("Base path on lake storage, under which all the table data is stored. "
+ "Always prefix it explicitly with the storage scheme (e.g hdfs://, s3:// etc). "
+ "Hudi stores all the main meta-data about commits, savepoints, cleaning audit logs "
+ "etc in .hoodie directory under the base directory.");
+ "etc in .hoodie directory under this base path directory.");
public static final ConfigProperty<String> AVRO_SCHEMA = ConfigProperty
.key("hoodie.avro.schema")
.noDefaultValue()
.withDocumentation("This is the current reader avro schema for the table. This is a string of the entire schema. "
+ "HoodieWriteClient uses this schema to pass on to implementations of HoodieRecordPayload to convert "
+ "from the source format to avro record. This is also used when re-writing records during an update.");
.withDocumentation("Schema string representing the current write schema of the table. Hudi passes this to "
+ "implementations of HoodieRecordPayload to convert incoming records to avro. This is also used as the write schema "
+ "evolving records during an update.");
public static final ConfigProperty<String> AVRO_SCHEMA_VALIDATE = ConfigProperty
.key("hoodie.avro.schema.validate")
.defaultValue("false")
.withDocumentation("");
.withDocumentation("Validate the schema used for the write against the latest schema, for backwards compatibility.");
public static final ConfigProperty<String> INSERT_PARALLELISM = ConfigProperty
.key("hoodie.insert.shuffle.parallelism")
.defaultValue("1500")
.withDocumentation("Once data has been initially imported, this parallelism controls initial parallelism for reading input records. "
+ "Ensure this value is high enough say: 1 partition for 1 GB of input data");
.withDocumentation("Parallelism for inserting records into the table. Inserts can shuffle data before writing to tune file sizes and optimize the storage layout.");
public static final ConfigProperty<String> BULKINSERT_PARALLELISM = ConfigProperty
.key("hoodie.bulkinsert.shuffle.parallelism")
.defaultValue("1500")
.withDocumentation("Bulk insert is meant to be used for large initial imports and this parallelism determines "
+ "the initial number of files in your table. Tune this to achieve a desired optimal size during initial import.");
.withDocumentation("For large initial imports using bulk_insert operation, controls the parallelism to use for sort modes or custom partitioning done"
+ "before writing records to the table.");
public static final ConfigProperty<String> BULKINSERT_USER_DEFINED_PARTITIONER_CLASS = ConfigProperty
.key("hoodie.bulkinsert.user.defined.partitioner.class")
.noDefaultValue()
.withDocumentation("If specified, this class will be used to re-partition input records before they are inserted.");
public static final ConfigProperty<String> BULKINSERT_INPUT_DATA_SCHEMA_DDL = ConfigProperty
.key("hoodie.bulkinsert.schema.ddl")
.noDefaultValue()
.withDocumentation("");
.withDocumentation("If specified, this class will be used to re-partition records before they are bulk inserted. This can be used to sort, pack, cluster data"
+ " optimally for common query patterns.");
public static final ConfigProperty<String> UPSERT_PARALLELISM = ConfigProperty
.key("hoodie.upsert.shuffle.parallelism")
.defaultValue("1500")
.withDocumentation("Once data has been initially imported, this parallelism controls initial parallelism for reading input records. "
+ "Ensure this value is high enough say: 1 partition for 1 GB of input data");
.withDocumentation("Parallelism to use for upsert operation on the table. Upserts can shuffle data to perform index lookups, file sizing, bin packing records optimally"
+ "into file groups.");
public static final ConfigProperty<String> DELETE_PARALLELISM = ConfigProperty
.key("hoodie.delete.shuffle.parallelism")
.defaultValue("1500")
.withDocumentation("This parallelism is Used for “delete” operation while deduping or repartioning.");
.withDocumentation("Parallelism used for “delete” operation. Delete operations also performs shuffles, similar to upsert operation.");
public static final ConfigProperty<String> ROLLBACK_PARALLELISM = ConfigProperty
.key("hoodie.rollback.parallelism")
.defaultValue("100")
.withDocumentation("Determines the parallelism for rollback of commits.");
.withDocumentation("Parallelism for rollback of commits. Rollbacks perform delete of files or logging delete blocks to file groups on storage in parallel.");
public static final ConfigProperty<String> WRITE_BUFFER_LIMIT_BYTES = ConfigProperty
.key("hoodie.write.buffer.limit.bytes")
.defaultValue(String.valueOf(4 * 1024 * 1024))
.withDocumentation("");
.withDocumentation("Size of in-memory buffer used for parallelizing network reads and lake storage writes.");
public static final ConfigProperty<String> COMBINE_BEFORE_INSERT_PROP = ConfigProperty
.key("hoodie.combine.before.insert")
.defaultValue("false")
.withDocumentation("Flag which first combines the input RDD and merges multiple partial records into a single record "
+ "before inserting or updating in DFS");
.withDocumentation("When inserted records share same key, controls whether they should be first combined (i.e de-duplicated) before"
+ " writing to storage.");
public static final ConfigProperty<String> COMBINE_BEFORE_UPSERT_PROP = ConfigProperty
.key("hoodie.combine.before.upsert")
.defaultValue("true")
.withDocumentation("Flag which first combines the input RDD and merges multiple partial records into a single record "
+ "before inserting or updating in DFS");
.withDocumentation("When upserted records share same key, controls whether they should be first combined (i.e de-duplicated) before"
+ " writing to storage. This should be turned off only if you are absolutely certain that there are no duplicates incoming, "
+ " otherwise it can lead to duplicate keys and violate the uniqueness guarantees.");
public static final ConfigProperty<String> COMBINE_BEFORE_DELETE_PROP = ConfigProperty
.key("hoodie.combine.before.delete")
.defaultValue("true")
.withDocumentation("Flag which first combines the input RDD and merges multiple partial records into a single record "
+ "before deleting in DFS");
.withDocumentation("During delete operations, controls whether we should combine deletes (and potentially also upserts) before "
+ " writing to storage.");
public static final ConfigProperty<String> WRITE_STATUS_STORAGE_LEVEL = ConfigProperty
.key("hoodie.write.status.storage.level")
.defaultValue("MEMORY_AND_DISK_SER")
.withDocumentation("HoodieWriteClient.insert and HoodieWriteClient.upsert returns a persisted RDD[WriteStatus], "
+ "this is because the Client can choose to inspect the WriteStatus and choose and commit or not based on the failures. "
+ "This is a configuration for the storage level for this RDD");
.withDocumentation("Write status objects hold metadata about a write (stats, errors), that is not yet committed to storage. "
+ "This controls the how that information is cached for inspection by clients. We rarely expect this to be changed.");
public static final ConfigProperty<String> HOODIE_AUTO_COMMIT_PROP = ConfigProperty
.key("hoodie.auto.commit")
.defaultValue("true")
.withDocumentation("Should HoodieWriteClient autoCommit after insert and upsert. "
+ "The client can choose to turn off auto-commit and commit on a “defined success condition”");
.withDocumentation("Controls whether a write operation should auto commit. This can be turned off to perform inspection"
+ " of the uncommitted write before deciding to commit.");
public static final ConfigProperty<String> HOODIE_WRITE_STATUS_CLASS_PROP = ConfigProperty
.key("hoodie.writestatus.class")
.defaultValue(WriteStatus.class.getName())
.withDocumentation("");
.withDocumentation("Subclass of " + WriteStatus.class.getName() + " to be used to collect information about a write. Can be "
+ "overridden to collection additional metrics/statistics about the data if needed.");
public static final ConfigProperty<String> FINALIZE_WRITE_PARALLELISM = ConfigProperty
.key("hoodie.finalize.write.parallelism")
.defaultValue("1500")
.withDocumentation("");
.withDocumentation("Parallelism for the write finalization internal operation, which involves removing any partially written "
+ "files from lake storage, before committing the write. Reduce this value, if the high number of tasks incur delays for smaller tables "
+ "or low latency writes.");
public static final ConfigProperty<String> MARKERS_DELETE_PARALLELISM = ConfigProperty
.key("hoodie.markers.delete.parallelism")
.defaultValue("100")
.withDocumentation("Determines the parallelism for deleting marker files.");
.withDocumentation("Determines the parallelism for deleting marker files, which are used to track all files (valid or invalid/partial) written during "
+ "a write operation. Increase this value if delays are observed, with large batch writes.");
public static final ConfigProperty<String> BULKINSERT_SORT_MODE = ConfigProperty
.key("hoodie.bulkinsert.sort.mode")
.defaultValue(BulkInsertSortMode.GLOBAL_SORT.toString())
.withDocumentation("Sorting modes to use for sorting records for bulk insert. This is leveraged when user "
+ "defined partitioner is not configured. Default is GLOBAL_SORT. Available values are - GLOBAL_SORT: "
+ "this ensures best file sizes, with lowest memory overhead at cost of sorting. PARTITION_SORT: "
+ "Strikes a balance by only sorting within a partition, still keeping the memory overhead of writing "
+ "lowest and best effort file sizing. NONE: No sorting. Fastest and matches spark.write.parquet() "
+ "in terms of number of files, overheads");
.withDocumentation("Sorting modes to use for sorting records for bulk insert. This is user when user "
+ BULKINSERT_USER_DEFINED_PARTITIONER_CLASS.key() + "is not configured. Available values are - "
+ "GLOBAL_SORT: this ensures best file sizes, with lowest memory overhead at cost of sorting. "
+ "PARTITION_SORT: Strikes a balance by only sorting within a partition, still keeping the memory overhead of writing "
+ "lowest and best effort file sizing. "
+ "NONE: No sorting. Fastest and matches `spark.write.parquet()` in terms of number of files, overheads");
public static final ConfigProperty<String> EMBEDDED_TIMELINE_SERVER_ENABLED = ConfigProperty
.key("hoodie.embed.timeline.server")
.defaultValue("true")
.withDocumentation("");
.withDocumentation("When true, spins up an instance of the timeline server (meta server that serves cached file listings, statistics),"
+ "running on each writer's driver process, accepting requests during the write from executors.");
public static final ConfigProperty<String> EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED = ConfigProperty
.key("hoodie.embed.timeline.server.reuse.enabled")
.defaultValue("false")
.withDocumentation("");
.withDocumentation("Controls whether the timeline server instance should be cached and reused across the JVM (across task lifecycles)"
+ "to avoid startup costs. This should rarely be changed.");
public static final ConfigProperty<String> EMBEDDED_TIMELINE_SERVER_PORT = ConfigProperty
.key("hoodie.embed.timeline.server.port")
.defaultValue("0")
.withDocumentation("");
.withDocumentation("Port at which the timeline server listens for requests. When running embedded in each writer, it picks "
+ "a free port and communicates to all the executors. This should rarely be changed.");
public static final ConfigProperty<String> EMBEDDED_TIMELINE_SERVER_THREADS = ConfigProperty
.key("hoodie.embed.timeline.server.threads")
.defaultValue("-1")
.withDocumentation("");
.withDocumentation("Number of threads to serve requests in the timeline server. By default, auto configured based on the number of underlying cores.");
public static final ConfigProperty<String> EMBEDDED_TIMELINE_SERVER_COMPRESS_OUTPUT = ConfigProperty
.key("hoodie.embed.timeline.server.gzip")
.defaultValue("true")
.withDocumentation("");
.withDocumentation("Controls whether gzip compression is used, for large responses from the timeline server, to improve latency.");
public static final ConfigProperty<String> EMBEDDED_TIMELINE_SERVER_USE_ASYNC = ConfigProperty
.key("hoodie.embed.timeline.server.async")
.defaultValue("false")
.withDocumentation("");
.withDocumentation("Controls whether or not, the requests to the timeline server are processed in asynchronous fashion, "
+ "potentially improving throughput.");
public static final ConfigProperty<String> FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP = ConfigProperty
.key("hoodie.fail.on.timeline.archiving")
.defaultValue("true")
.withDocumentation("");
.withDocumentation("Timeline archiving removes older instants from the timeline, after each write operation, to minimize metadata overhead. "
+ "Controls whether or not, the write should be failed as well, if such archiving fails.");
public static final ConfigProperty<Long> INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP = ConfigProperty
.key("hoodie.consistency.check.initial_interval_ms")
.defaultValue(2000L)
.withDocumentation("Time between successive attempts to ensure written data's metadata is consistent on storage");
.withDocumentation("Initial time between successive attempts to ensure written data's metadata is consistent on storage. Grows with exponential"
+ " backoff after the initial value.");
public static final ConfigProperty<Long> MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP = ConfigProperty
.key("hoodie.consistency.check.max_interval_ms")
.defaultValue(300000L)
.withDocumentation("Max interval time for consistency check");
.withDocumentation("Max time to wait between successive attempts at performing consistency checks");
public static final ConfigProperty<Integer> MAX_CONSISTENCY_CHECKS_PROP = ConfigProperty
.key("hoodie.consistency.check.max_checks")
.defaultValue(7)
.withDocumentation("Maximum number of checks, for consistency of written data. Will wait upto 256 Secs");
.withDocumentation("Maximum number of checks, for consistency of written data.");
public static final ConfigProperty<String> MERGE_DATA_VALIDATION_CHECK_ENABLED = ConfigProperty
.key("hoodie.merge.data.validation.enabled")
.defaultValue("false")
.withDocumentation("Data validation check performed during merges before actual commits");
.withDocumentation("When enabled, data validation checks are performed during merges to ensure expected "
+ "number of records after merge operation.");
public static final ConfigProperty<String> MERGE_ALLOW_DUPLICATE_ON_INSERTS = ConfigProperty
.key("hoodie.merge.allow.duplicate.on.inserts")
.defaultValue("false")
.withDocumentation("Allow duplicates with inserts while merging with existing records");
.withDocumentation("When enabled, we allow duplicate keys even if inserts are routed to merge with an existing file (for ensuring file sizing)."
+ " This is only relevant for insert operation, since upsert, delete operations will ensure unique key constraints are maintained.");
public static final ConfigProperty<ExternalSpillableMap.DiskMapType> SPILLABLE_DISK_MAP_TYPE = ConfigProperty
.key("hoodie.spillable.diskmap.type")
.defaultValue(ExternalSpillableMap.DiskMapType.BITCASK)
.withDocumentation("Enable usage of either BITCASK or ROCKS_DB as disk map for External Spillable Map");
.withDocumentation("When handling input data that cannot be held in memory, to merge with a file on storage, a spillable diskmap is employed. "
+ "By default, we use a persistent hashmap based loosely on bitcask, that offers O(1) inserts, lookups. "
+ "Change this to `ROCKS_DB` to prefer using rocksDB, for handling the spill.");
public static final ConfigProperty<Integer> CLIENT_HEARTBEAT_INTERVAL_IN_MS_PROP = ConfigProperty
.key("hoodie.client.heartbeat.interval_in_ms")
.defaultValue(60 * 1000)
.withDocumentation("");
.withDocumentation("Writers perform heartbeats to indicate liveness. Controls how often (in ms), such heartbeats are registered to lake storage.");
public static final ConfigProperty<Integer> CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES_PROP = ConfigProperty
.key("hoodie.client.heartbeat.tolerable.misses")
.defaultValue(2)
.withDocumentation("");
.withDocumentation("Number of heartbeat misses, before a writer is deemed not alive and all pending writes are aborted.");
public static final ConfigProperty<String> WRITE_CONCURRENCY_MODE_PROP = ConfigProperty
.key("hoodie.write.concurrency.mode")
.defaultValue(WriteConcurrencyMode.SINGLE_WRITER.name())
.withDocumentation("Enable different concurrency support");
.withDocumentation("Enable different concurrency modes. Options are "
+ "SINGLE_WRITER: Only one active writer to the table. Maximizes throughput"
+ "OPTIMISTIC_CONCURRENCY_CONTROL: Multiple writers can operate on the table and exactly one of them succeed "
+ "if a conflict (writes affect the same file group) is detected.");
public static final ConfigProperty<String> WRITE_META_KEY_PREFIXES_PROP = ConfigProperty
.key("hoodie.write.meta.key.prefixes")
@@ -312,16 +330,14 @@ public class HoodieWriteConfig extends HoodieConfig {
+ "during overlapping commits via multi writing");
/**
* The specified write schema. In most case, we do not need set this parameter,
* but for the case the write schema is not equal to the specified table schema, we can
* specify the write schema by this parameter.
*
* Currently the MergeIntoHoodieTableCommand use this to specify the write schema.
* Currently the use this to specify the write schema.
*/
public static final ConfigProperty<String> WRITE_SCHEMA_PROP = ConfigProperty
.key("hoodie.write.schema")
.noDefaultValue()
.withDocumentation("");
.withDocumentation("The specified write schema. In most case, we do not need set this parameter,"
+ " but for the case the write schema is not equal to the specified table schema, we can"
+ " specify the write schema by this parameter. Used by MergeIntoHoodieTableCommand");
/**
* HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow
@@ -342,7 +358,8 @@ public class HoodieWriteConfig extends HoodieConfig {
.key(AVRO_SCHEMA.key() + ".external.transformation")
.defaultValue("false")
.withAlternatives(AVRO_SCHEMA.key() + ".externalTransformation")
.withDocumentation("");
.withDocumentation("When enabled, records in older schema are rewritten into newer schema during upsert,delete and background"
+ " compaction,clustering operations.");
private ConsistencyGuardConfig consistencyGuardConfig;
@@ -352,7 +369,6 @@ public class HoodieWriteConfig extends HoodieConfig {
private FileSystemViewStorageConfig viewStorageConfig;
private HoodiePayloadConfig hoodiePayloadConfig;
private HoodieMetadataConfig metadataConfig;
private EngineType engineType;
/**

View File

@@ -19,13 +19,14 @@
package org.apache.hudi.keygen.constant;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
public class KeyGeneratorOptions {
public class KeyGeneratorOptions extends HoodieConfig {
public static final ConfigProperty<String> URL_ENCODE_PARTITIONING_OPT_KEY = ConfigProperty
.key("hoodie.datasource.write.partitionpath.urlencode")
.defaultValue("false")
.withDocumentation("");
.withDocumentation("Should we url encode the partition path value, before creating the folder structure.");
public static final ConfigProperty<String> HIVE_STYLE_PARTITIONING_OPT_KEY = ConfigProperty
.key("hoodie.datasource.write.hive_style_partitioning")

View File

@@ -60,7 +60,7 @@ public class HoodieAvroKeyGeneratorFactory {
props.getString(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key(), null);
if (StringUtils.isNullOrEmpty(keyGeneratorType)) {
LOG.info("The value of {} is empty, use SIMPLE", HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key());
LOG.info("The value of {} is empty, using SIMPLE", HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key());
keyGeneratorType = KeyGeneratorType.SIMPLE.name();
}

View File

@@ -37,14 +37,14 @@ public final class HoodieMetadataConfig extends HoodieConfig {
.key(METADATA_PREFIX + ".enable")
.defaultValue(false)
.sinceVersion("0.7.0")
.withDocumentation("Enable the internal Metadata Table which stores table level file listings");
.withDocumentation("Enable the internal metadata table which serves table metadata like level file listings");
// Validate contents of Metadata Table on each access against the actual filesystem
public static final ConfigProperty<Boolean> METADATA_VALIDATE_PROP = ConfigProperty
.key(METADATA_PREFIX + ".validate")
.defaultValue(false)
.sinceVersion("0.7.0")
.withDocumentation("Validate contents of Metadata Table on each access against the actual listings from DFS");
.withDocumentation("Validate contents of metadata table on each access; e.g against the actual listings from lake storage");
public static final boolean DEFAULT_METADATA_ENABLE_FOR_READERS = false;
@@ -53,14 +53,14 @@ public final class HoodieMetadataConfig extends HoodieConfig {
.key(METADATA_PREFIX + ".metrics.enable")
.defaultValue(false)
.sinceVersion("0.7.0")
.withDocumentation("");
.withDocumentation("Enable publishing of metrics around metadata table.");
// Parallelism for inserts
public static final ConfigProperty<Integer> METADATA_INSERT_PARALLELISM_PROP = ConfigProperty
.key(METADATA_PREFIX + ".insert.parallelism")
.defaultValue(1)
.sinceVersion("0.7.0")
.withDocumentation("Parallelism to use when writing to the metadata table");
.withDocumentation("Parallelism to use when inserting to the metadata table");
// Async clean
public static final ConfigProperty<Boolean> METADATA_ASYNC_CLEAN_PROP = ConfigProperty
@@ -81,32 +81,32 @@ public final class HoodieMetadataConfig extends HoodieConfig {
.key(METADATA_PREFIX + ".keep.min.commits")
.defaultValue(20)
.sinceVersion("0.7.0")
.withDocumentation("Controls the archival of the metadata tables timeline");
.withDocumentation("Controls the archival of the metadata tables timeline.");
public static final ConfigProperty<Integer> MAX_COMMITS_TO_KEEP_PROP = ConfigProperty
.key(METADATA_PREFIX + ".keep.max.commits")
.defaultValue(30)
.sinceVersion("0.7.0")
.withDocumentation("Controls the archival of the metadata tables timeline");
.withDocumentation("Controls the archival of the metadata tables timeline.");
// Cleaner commits retained
public static final ConfigProperty<Integer> CLEANER_COMMITS_RETAINED_PROP = ConfigProperty
.key(METADATA_PREFIX + ".cleaner.commits.retained")
.defaultValue(3)
.sinceVersion("0.7.0")
.withDocumentation("");
.withDocumentation("Controls retention/history for metadata table.");
// Regex to filter out matching directories during bootstrap
public static final ConfigProperty<String> DIRECTORY_FILTER_REGEX = ConfigProperty
.key(METADATA_PREFIX + ".dir.filter.regex")
.defaultValue("")
.sinceVersion("0.7.0")
.withDocumentation("");
.withDocumentation("Directories matching this regex, will be filtered out when initializing metadata table from lake storage for the first time.");
public static final ConfigProperty<String> HOODIE_ASSUME_DATE_PARTITIONING_PROP = ConfigProperty
.key("hoodie.assume.date.partitioning")
.defaultValue("false")
.sinceVersion("0.7.0")
.sinceVersion("0.3.0")
.withDocumentation("Should HoodieWriteClient assume the data is partitioned by dates, i.e three levels from base path. "
+ "This is a stop-gap to support tables created by versions < 0.3.1. Will be removed eventually");
@@ -114,7 +114,7 @@ public final class HoodieMetadataConfig extends HoodieConfig {
.key("hoodie.file.listing.parallelism")
.defaultValue(1500)
.sinceVersion("0.7.0")
.withDocumentation("");
.withDocumentation("Parallelism to use, when listing the table on lake storage.");
private HoodieMetadataConfig() {
super();

View File

@@ -31,12 +31,11 @@ import java.util.Properties;
*/
public class ConsistencyGuardConfig extends HoodieConfig {
// time between successive attempts to ensure written data's metadata is consistent on storage
@Deprecated
public static final ConfigProperty<String> CONSISTENCY_CHECK_ENABLED_PROP = ConfigProperty
.key("hoodie.consistency.check.enabled")
.defaultValue("false")
.sinceVersion("0.5.0")
.deprecatedAfter("0.7.0")
.withDocumentation("Enabled to handle S3 eventual consistency issue. This property is no longer required "
+ "since S3 is now strongly consistent. Will be removed in the future releases.");
@@ -44,35 +43,37 @@ public class ConsistencyGuardConfig extends HoodieConfig {
.key("hoodie.consistency.check.initial_interval_ms")
.defaultValue(400L)
.sinceVersion("0.5.0")
.withDocumentation("");
.deprecatedAfter("0.7.0")
.withDocumentation("Amount of time (in ms) to wait, before checking for consistency after an operation on storage.");
// max interval time
public static final ConfigProperty<Long> MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP = ConfigProperty
.key("hoodie.consistency.check.max_interval_ms")
.defaultValue(20000L)
.sinceVersion("0.5.0")
.withDocumentation("");
.deprecatedAfter("0.7.0")
.withDocumentation("Maximum amount of time (in ms), to wait for consistency checking.");
// maximum number of checks, for consistency of written data. Will wait upto 140 Secs
public static final ConfigProperty<Integer> MAX_CONSISTENCY_CHECKS_PROP = ConfigProperty
.key("hoodie.consistency.check.max_checks")
.defaultValue(6)
.sinceVersion("0.5.0")
.withDocumentation("");
.deprecatedAfter("0.7.0")
.withDocumentation("Maximum number of consistency checks to perform, with exponential backoff.");
// sleep time for OptimisticConsistencyGuard
public static final ConfigProperty<Long> OPTIMISTIC_CONSISTENCY_GUARD_SLEEP_TIME_MS_PROP = ConfigProperty
.key("hoodie.optimistic.consistency.guard.sleep_time_ms")
.defaultValue(500L)
.sinceVersion("0.6.0")
.withDocumentation("");
.withDocumentation("Amount of time (in ms), to wait after which we assume storage is consistent.");
// config to enable OptimisticConsistencyGuard in finalizeWrite instead of FailSafeConsistencyGuard
public static final ConfigProperty<Boolean> ENABLE_OPTIMISTIC_CONSISTENCY_GUARD_PROP = ConfigProperty
.key("_hoodie.optimistic.consistency.guard.enable")
.defaultValue(true)
.sinceVersion("0.6.0")
.withDocumentation("");
.withDocumentation("Enable consistency guard, which optimistically assumes consistency is achieved after a certain time period.");
private ConsistencyGuardConfig() {
super();

View File

@@ -23,7 +23,8 @@ import org.apache.hadoop.fs.Path;
import java.util.List;
/**
* Default Consistency guard that does nothing. Used for HDFS deployments
* Default Consistency guard that does nothing. Used for lake storage which provided read-after-write
* guarantees.
*/
public class NoOpConsistencyGuard implements ConsistencyGuard {

View File

@@ -72,66 +72,69 @@ public class HoodieTableConfig extends HoodieConfig implements Serializable {
public static final ConfigProperty<HoodieTableVersion> HOODIE_TABLE_VERSION_PROP = ConfigProperty
.key("hoodie.table.version")
.defaultValue(HoodieTableVersion.ZERO)
.withDocumentation("");
.withDocumentation("Version of table, used for running upgrade/downgrade steps between releases with potentially "
+ "breaking/backwards compatible changes.");
public static final ConfigProperty<String> HOODIE_TABLE_PRECOMBINE_FIELD_PROP = ConfigProperty
.key("hoodie.table.precombine.field")
.noDefaultValue()
.withDocumentation("Field used in preCombining before actual write. When two records have the same key value, "
+ "we will pick the one with the largest value for the precombine field, determined by Object.compareTo(..)");
.withDocumentation("Field used in preCombining before actual write. By default, when two records have the same key value, "
+ "the largest value for the precombine field determined by Object.compareTo(..), is picked.");
public static final ConfigProperty<String> HOODIE_TABLE_PARTITION_COLUMNS_PROP = ConfigProperty
.key("hoodie.table.partition.columns")
.noDefaultValue()
.withDocumentation("Partition path field. Value to be used at the partitionPath component of HoodieKey. "
+ "Actual value ontained by invoking .toString()");
.withDocumentation("Columns used to partition the table. Concatenated values of these fields are used as "
+ "the partition path, by invoking toString()");
public static final ConfigProperty<String> HOODIE_TABLE_RECORDKEY_FIELDS = ConfigProperty
.key("hoodie.table.recordkey.fields")
.noDefaultValue()
.withDocumentation("");
.withDocumentation("Columns used to uniquely identify the table. Concatenated values of these fields are used as "
+ " the record key component of HoodieKey.");
public static final ConfigProperty<String> HOODIE_TABLE_CREATE_SCHEMA = ConfigProperty
.key("hoodie.table.create.schema")
.noDefaultValue()
.withDocumentation("");
.withDocumentation("Schema used when creating the table, for the first time.");
public static final ConfigProperty<HoodieFileFormat> HOODIE_BASE_FILE_FORMAT_PROP = ConfigProperty
.key("hoodie.table.base.file.format")
.defaultValue(HoodieFileFormat.PARQUET)
.withAlternatives("hoodie.table.ro.file.format")
.withDocumentation("");
.withDocumentation("Base file format to store all the base file data.");
public static final ConfigProperty<HoodieFileFormat> HOODIE_LOG_FILE_FORMAT_PROP = ConfigProperty
.key("hoodie.table.log.file.format")
.defaultValue(HoodieFileFormat.HOODIE_LOG)
.withAlternatives("hoodie.table.rt.file.format")
.withDocumentation("");
.withDocumentation("Log format used for the delta logs.");
public static final ConfigProperty<String> HOODIE_TIMELINE_LAYOUT_VERSION_PROP = ConfigProperty
.key("hoodie.timeline.layout.version")
.noDefaultValue()
.withDocumentation("");
.withDocumentation("Version of timeline used, by the table.");
public static final ConfigProperty<String> HOODIE_PAYLOAD_CLASS_PROP = ConfigProperty
.key("hoodie.compaction.payload.class")
.defaultValue(OverwriteWithLatestAvroPayload.class.getName())
.withDocumentation("");
.withDocumentation("Payload class to use for performing compactions, i.e merge delta logs with current base file and then "
+ " produce a new base file.");
public static final ConfigProperty<String> HOODIE_ARCHIVELOG_FOLDER_PROP = ConfigProperty
.key("hoodie.archivelog.folder")
.defaultValue("archived")
.withDocumentation("");
.withDocumentation("path under the meta folder, to store archived timeline instants at.");
public static final ConfigProperty<String> HOODIE_BOOTSTRAP_INDEX_ENABLE_PROP = ConfigProperty
.key("hoodie.bootstrap.index.enable")
.noDefaultValue()
.withDocumentation("");
.withDocumentation("Whether or not, this is a bootstrapped table, with bootstrap base data and an mapping index defined.");
public static final ConfigProperty<String> HOODIE_BOOTSTRAP_INDEX_CLASS_PROP = ConfigProperty
.key("hoodie.bootstrap.index.class")
.defaultValue(HFileBootstrapIndex.class.getName())
.withDocumentation("");
.withDocumentation("Implementation to use, for mapping base files to bootstrap base file, that contain actual data.");
public static final ConfigProperty<String> HOODIE_BOOTSTRAP_BASE_PATH_PROP = ConfigProperty
.key("hoodie.bootstrap.base.path")

View File

@@ -34,7 +34,7 @@ public class TimelineLayoutVersion implements Serializable, Comparable<TimelineL
public static final Integer CURR_VERSION = VERSION_1;
public static final TimelineLayoutVersion CURR_LAYOUT_VERSION = new TimelineLayoutVersion(CURR_VERSION);
private Integer version;
private final Integer version;
public TimelineLayoutVersion(Integer version) {
ValidationUtils.checkArgument(version <= CURR_VERSION);

View File

@@ -25,7 +25,9 @@ import org.apache.hudi.common.util.ValidationUtils;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;
import java.util.stream.Collectors;
/**
* File System View Storage Configurations.
@@ -36,76 +38,78 @@ public class FileSystemViewStorageConfig extends HoodieConfig {
public static final ConfigProperty<FileSystemViewStorageType> FILESYSTEM_VIEW_STORAGE_TYPE = ConfigProperty
.key("hoodie.filesystem.view.type")
.defaultValue(FileSystemViewStorageType.MEMORY)
.withDocumentation("");
.withDocumentation("File system view provides APIs for viewing the files on the underlying lake storage, "
+ " as file groups and file slices. This config controls how such a view is held. Options include "
+ Arrays.stream(FileSystemViewStorageType.values()).map(Enum::name).collect(Collectors.joining(","))
+ " which provide different trade offs for memory usage and API request performance.");
public static final ConfigProperty<String> FILESYSTEM_VIEW_INCREMENTAL_SYNC_MODE = ConfigProperty
.key("hoodie.filesystem.view.incr.timeline.sync.enable")
.defaultValue("false")
.withDocumentation("");
.withDocumentation("Controls whether or not, the file system view is incrementally updated as "
+ "new actions are performed on the timeline.");
public static final ConfigProperty<FileSystemViewStorageType> FILESYSTEM_SECONDARY_VIEW_STORAGE_TYPE = ConfigProperty
.key("hoodie.filesystem.view.secondary.type")
.defaultValue(FileSystemViewStorageType.MEMORY)
.withDocumentation("");
.withDocumentation("Specifies the secondary form of storage for file system view, if the primary (e.g timeline server) "
+ " is unavailable.");
public static final ConfigProperty<String> FILESYSTEM_VIEW_REMOTE_HOST = ConfigProperty
.key("hoodie.filesystem.view.remote.host")
.defaultValue("localhost")
.withDocumentation("");
.withDocumentation("We expect this to be rarely hand configured.");
public static final ConfigProperty<Integer> FILESYSTEM_VIEW_REMOTE_PORT = ConfigProperty
.key("hoodie.filesystem.view.remote.port")
.defaultValue(26754)
.withDocumentation("");
.withDocumentation("Port to serve file system view queries, when remote. We expect this to be rarely hand configured.");
public static final ConfigProperty<String> FILESYSTEM_VIEW_SPILLABLE_DIR = ConfigProperty
.key("hoodie.filesystem.view.spillable.dir")
.defaultValue("/tmp/view_map/")
.withDocumentation("");
.withDocumentation("Path on local storage to use, when file system view is held in a spillable map.");
public static final ConfigProperty<Long> FILESYSTEM_VIEW_SPILLABLE_MEM = ConfigProperty
.key("hoodie.filesystem.view.spillable.mem")
.defaultValue(100 * 1024 * 1024L) // 100 MB
.withDocumentation("");
.withDocumentation("Amount of memory to be used for holding file system view, before spilling to disk.");
public static final ConfigProperty<Double> FILESYSTEM_VIEW_PENDING_COMPACTION_MEM_FRACTION = ConfigProperty
.key("hoodie.filesystem.view.spillable.compaction.mem.fraction")
.defaultValue(0.8)
.withDocumentation("");
.withDocumentation("Fraction of the file system view memory, to be used for holding compaction related metadata.");
public static final ConfigProperty<Double> FILESYSTEM_VIEW_BOOTSTRAP_BASE_FILE_FRACTION = ConfigProperty
.key("hoodie.filesystem.view.spillable.bootstrap.base.file.mem.fraction")
.defaultValue(0.05)
.withDocumentation("");
.withDocumentation("Fraction of the file system view memory, to be used for holding mapping to bootstrap base files.");
public static final ConfigProperty<Double> FILESYSTEM_VIEW_REPLACED_MEM_FRACTION = ConfigProperty
.key("hoodie.filesystem.view.spillable.replaced.mem.fraction")
.defaultValue(0.01)
.withDocumentation("");
.withDocumentation("Fraction of the file system view memory, to be used for holding replace commit related metadata.");
public static final ConfigProperty<Double> FILESYSTEM_VIEW_PENDING_CLUSTERING_MEM_FRACTION = ConfigProperty
.key("hoodie.filesystem.view.spillable.clustering.mem.fraction")
.defaultValue(0.01)
.withDocumentation("");
.withDocumentation("Fraction of the file system view memory, to be used for holding clustering related metadata.");
public static final ConfigProperty<String> ROCKSDB_BASE_PATH_PROP = ConfigProperty
.key("hoodie.filesystem.view.rocksdb.base.path")
.defaultValue("/tmp/hoodie_timeline_rocksdb")
.withDocumentation("");
.withDocumentation("Path on local storage to use, when storing file system view in embedded kv store/rocksdb.");
public static final ConfigProperty<Integer> FILESTYSTEM_REMOTE_TIMELINE_CLIENT_TIMEOUT_SECS = ConfigProperty
public static final ConfigProperty<Integer> FILESYSTEM_REMOTE_TIMELINE_CLIENT_TIMEOUT_SECS = ConfigProperty
.key("hoodie.filesystem.view.remote.timeout.secs")
.defaultValue(5 * 60) // 5 min
.withDocumentation("");
.withDocumentation("Timeout in seconds, to wait for API requests against a remote file system view. e.g timeline server.");
/**
* Configs to control whether backup needs to be configured if clients were not able to reach
* timeline service.
*/
public static final ConfigProperty<String> REMOTE_BACKUP_VIEW_HANDLER_ENABLE = ConfigProperty
.key("hoodie.filesystem.remote.backup.view.enable")
.defaultValue("true") // Need to be disabled only for tests.
.withDocumentation("");
.withDocumentation("Config to control whether backup needs to be configured if clients were not able to reach"
+ " timeline service.");
public static FileSystemViewStorageConfig.Builder newBuilder() {
return new Builder();
@@ -132,7 +136,7 @@ public class FileSystemViewStorageConfig extends HoodieConfig {
}
public Integer getRemoteTimelineClientTimeoutSecs() {
return getInt(FILESTYSTEM_REMOTE_TIMELINE_CLIENT_TIMEOUT_SECS);
return getInt(FILESYSTEM_REMOTE_TIMELINE_CLIENT_TIMEOUT_SECS);
}
public long getMaxMemoryForFileGroupMap() {
@@ -232,7 +236,7 @@ public class FileSystemViewStorageConfig extends HoodieConfig {
}
public Builder withRemoteTimelineClientTimeoutSecs(Long timelineClientTimeoutSecs) {
fileSystemViewStorageConfig.setValue(FILESTYSTEM_REMOTE_TIMELINE_CLIENT_TIMEOUT_SECS, timelineClientTimeoutSecs.toString());
fileSystemViewStorageConfig.setValue(FILESYSTEM_REMOTE_TIMELINE_CLIENT_TIMEOUT_SECS, timelineClientTimeoutSecs.toString());
return this;
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.configuration;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -43,7 +44,7 @@ import java.util.Set;
*
* <p>It has the options for Hoodie table read and write. It also defines some utilities.
*/
public class FlinkOptions {
public class FlinkOptions extends HoodieConfig {
private FlinkOptions() {
}

View File

@@ -25,7 +25,6 @@ import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.{HiveSyncTool, SlashEncodedDayPartitionValueExtractor}
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{CustomKeyGenerator, SimpleKeyGenerator}
import org.apache.log4j.LogManager
import org.apache.spark.sql.execution.datasources.{DataSourceUtils => SparkDataSourceUtils}
@@ -41,15 +40,6 @@ object DataSourceReadOptions {
private val log = LogManager.getLogger(DataSourceReadOptions.getClass)
/**
* Whether data needs to be read, in
*
* 1) Snapshot mode (obtain latest view, based on row & columnar data)
* 2) incremental mode (new data since an instantTime)
* 3) Read Optimized mode (obtain latest view, based on columnar data)
*
* Default: snapshot
*/
val QUERY_TYPE_SNAPSHOT_OPT_VAL = "snapshot"
val QUERY_TYPE_READ_OPTIMIZED_OPT_VAL = "read_optimized"
val QUERY_TYPE_INCREMENTAL_OPT_VAL = "incremental"
@@ -58,30 +48,30 @@ object DataSourceReadOptions {
.defaultValue(QUERY_TYPE_SNAPSHOT_OPT_VAL)
.withAlternatives("hoodie.datasource.view.type")
.withDocumentation("Whether data needs to be read, in incremental mode (new data since an instantTime) " +
"(or) Read Optimized mode (obtain latest view, based on columnar data) (or) Snapshot mode " +
"(obtain latest view, based on row & columnar data)")
"(or) Read Optimized mode (obtain latest view, based on base files) (or) Snapshot mode " +
"(obtain latest view, by merging base and (if any) log files)")
/**
* For Snapshot query on merge on read table. Use this key to define the payload class.
*/
val REALTIME_SKIP_MERGE_OPT_VAL = "skip_merge"
val REALTIME_PAYLOAD_COMBINE_OPT_VAL = "payload_combine"
val REALTIME_MERGE_OPT_KEY: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.merge.type")
.defaultValue(REALTIME_PAYLOAD_COMBINE_OPT_VAL)
.withDocumentation("")
.withDocumentation("For Snapshot query on merge on read table, control whether we invoke the record " +
s"payload implementation to merge (${REALTIME_PAYLOAD_COMBINE_OPT_VAL}) or skip merging altogether" +
s"${REALTIME_SKIP_MERGE_OPT_VAL}")
val READ_PATHS_OPT_KEY: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.paths")
.noDefaultValue()
.withDocumentation("")
.withDocumentation("Comma separated list of file paths to read within a Hudi table.")
val READ_PRE_COMBINE_FIELD = HoodieWriteConfig.PRECOMBINE_FIELD_PROP
val ENABLE_HOODIE_FILE_INDEX: ConfigProperty[Boolean] = ConfigProperty
.key("hoodie.file.index.enable")
.defaultValue(true)
.withDocumentation("")
.withDocumentation("Enables use of the spark file index implementation for Hudi, "
+ "that speeds up listing of large tables.")
@Deprecated
val VIEW_TYPE_OPT_KEY = "hoodie.datasource.view.type"
@@ -94,14 +84,6 @@ object DataSourceReadOptions {
@Deprecated
val DEFAULT_VIEW_TYPE_OPT_VAL = VIEW_TYPE_READ_OPTIMIZED_OPT_VAL
/**
* Instant time to start incrementally pulling data from. The instanttime here need not
* necessarily correspond to an instant on the timeline. New data written with an
* `instant_time > BEGIN_INSTANTTIME` are fetched out. For e.g: '20170901080000' will get
* all new data written after Sep 1, 2017 08:00AM.
*
* Default: None (Mandatory in incremental mode)
*/
val BEGIN_INSTANTTIME_OPT_KEY: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.begin.instanttime")
.noDefaultValue()
@@ -109,48 +91,29 @@ object DataSourceReadOptions {
"correspond to an instant on the timeline. New data written with an instant_time > BEGIN_INSTANTTIME are fetched out. " +
"For e.g: 20170901080000 will get all new data written after Sep 1, 2017 08:00AM.")
/**
* Instant time to limit incrementally fetched data to. New data written with an
* `instant_time <= END_INSTANTTIME` are fetched out.
*
* Default: latest instant (i.e fetches all new data since begin instant time)
*
*/
val END_INSTANTTIME_OPT_KEY: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.end.instanttime")
.noDefaultValue()
.withDocumentation("Instant time to limit incrementally fetched data to. " +
"New data written with an instant_time <= END_INSTANTTIME are fetched out.")
/**
* If use the end instant schema when incrementally fetched data to.
*
* Default: false (use latest instant schema)
*
*/
val INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_KEY: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.schema.use.end.instanttime")
.defaultValue("false")
.withDocumentation("Uses end instant schema when incrementally fetched data to. Default: users latest instant schema.")
/**
* For use-cases like DeltaStreamer which reads from Hoodie Incremental table and applies opaque map functions,
* filters appearing late in the sequence of transformations cannot be automatically pushed down.
* This option allows setting filters directly on Hoodie Source
*/
val PUSH_DOWN_INCR_FILTERS_OPT_KEY: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.incr.filters")
.defaultValue("")
.withDocumentation("")
.withDocumentation("For use-cases like DeltaStreamer which reads from Hoodie Incremental table and applies "
+ "opaque map functions, filters appearing late in the sequence of transformations cannot be automatically "
+ "pushed down. This option allows setting filters directly on Hoodie Source.")
/**
* For the use-cases like users only want to incremental pull from certain partitions instead of the full table.
* This option allows using glob pattern to directly filter on path.
*/
val INCR_PATH_GLOB_OPT_KEY: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.incr.path.glob")
.defaultValue("")
.withDocumentation("")
.withDocumentation("For the use-cases like users only want to incremental pull from certain partitions "
+ "instead of the full table. This option allows using glob pattern to directly filter on path.")
}
/**
@@ -160,11 +123,6 @@ object DataSourceWriteOptions {
private val log = LogManager.getLogger(DataSourceWriteOptions.getClass)
/**
* The write operation, that this write should do
*
* Default: upsert()
*/
val BULK_INSERT_OPERATION_OPT_VAL = WriteOperationType.BULK_INSERT.value
val INSERT_OPERATION_OPT_VAL = WriteOperationType.INSERT.value
val UPSERT_OPERATION_OPT_VAL = WriteOperationType.UPSERT.value
@@ -179,12 +137,6 @@ object DataSourceWriteOptions {
"Use bulkinsert to load new data into a table, and there on use upsert/insert. " +
"bulk insert uses a disk based write path to scale to load large inputs without need to cache it.")
/**
* The table type for the underlying data, for this write.
* Note that this can't change across writes.
*
* Default: COPY_ON_WRITE
*/
val COW_TABLE_TYPE_OPT_VAL = HoodieTableType.COPY_ON_WRITE.name
val MOR_TABLE_TYPE_OPT_VAL = HoodieTableType.MERGE_ON_READ.name
val TABLE_TYPE_OPT_KEY: ConfigProperty[String] = ConfigProperty
@@ -239,15 +191,10 @@ object DataSourceWriteOptions {
translatedOptParams
}
/**
* Hive table name, to register the table into.
*
* Default: None (mandatory)
*/
val TABLE_NAME_OPT_KEY: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.table.name")
.noDefaultValue()
.withDocumentation("Hive table name, to register the table into.")
.withDocumentation("Table name for the datasource write. Also used to register the table into meta stores.")
/**
* Field used in preCombining before actual write. When two records have the same
@@ -292,65 +239,50 @@ object DataSourceWriteOptions {
val DEFAULT_KEYGENERATOR_CLASS_OPT_VAL = classOf[SimpleKeyGenerator].getName
/**
* When set to true, will perform write operations directly using the spark native `Row` representation.
*
* By default, false (will be enabled as default in a future release)
*/
val ENABLE_ROW_WRITER_OPT_KEY: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.row.writer.enable")
.defaultValue("false")
.withDocumentation("")
.withDocumentation("When set to true, will perform write operations directly using the spark native " +
"`Row` representation, avoiding any additional conversion costs.")
/**
* Option keys beginning with this prefix, are automatically added to the commit/deltacommit metadata.
* This is useful to store checkpointing information, in a consistent way with the hoodie timeline
*/
val COMMIT_METADATA_KEYPREFIX_OPT_KEY: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.commitmeta.key.prefix")
.defaultValue("_")
.withDocumentation("Option keys beginning with this prefix, are automatically added to the commit/deltacommit metadata. " +
"This is useful to store checkpointing information, in a consistent way with the hudi timeline")
/**
* Flag to indicate whether to drop duplicates upon insert.
* By default insert will accept duplicates, to gain extra performance.
*/
val INSERT_DROP_DUPS_OPT_KEY: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.insert.drop.duplicates")
.defaultValue("false")
.withDocumentation("If set to true, filters out all duplicate records from incoming dataframe, during insert operations.")
/**
* Flag to indicate how many times streaming job should retry for a failed microbatch
* By default 3
*/
val STREAMING_RETRY_CNT_OPT_KEY: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.streaming.retry.count")
.defaultValue("3")
.withDocumentation("")
.withDocumentation("Config to indicate how many times streaming job should retry for a failed micro batch.")
/**
* Flag to indicate how long (by millisecond) before a retry should issued for failed microbatch
* By default 2000 and it will be doubled by every retry
*/
val STREAMING_RETRY_INTERVAL_MS_OPT_KEY: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.streaming.retry.interval.ms")
.defaultValue("2000")
.withDocumentation("")
.withDocumentation(" Config to indicate how long (by millisecond) before a retry should issued for failed microbatch")
/**
* Flag to indicate whether to ignore any non exception error (e.g. writestatus error)
* within a streaming microbatch
*
* By default true (in favor of streaming progressing over data integrity)
*/
val STREAMING_IGNORE_FAILED_BATCH_OPT_KEY: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.streaming.ignore.failed.batch")
.defaultValue("true")
.withDocumentation("")
.withDocumentation("Config to indicate whether to ignore any non exception error (e.g. writestatus error)"
+ " within a streaming microbatch")
val META_SYNC_CLIENT_TOOL_CLASS: ConfigProperty[String] = ConfigProperty
.key("hoodie.meta.sync.client.tool.class")
.defaultValue(classOf[HiveSyncTool].getName)
.withDocumentation("")
.withDocumentation("Sync tool class name used to sync to metastore. Defaults to Hive.")
// HIVE SYNC SPECIFIC CONFIGS
// NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes
@@ -378,7 +310,7 @@ object DataSourceWriteOptions {
val HIVE_BASE_FILE_FORMAT_OPT_KEY: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.hive_sync.base_file_format")
.defaultValue("PARQUET")
.withDocumentation("")
.withDocumentation("Base file format for the sync.")
val HIVE_USER_OPT_KEY: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.hive_sync.username")
@@ -441,6 +373,16 @@ object DataSourceWriteOptions {
.withDocumentation("INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type. " +
"Disabled by default for backward compatibility.")
val HIVE_TABLE_PROPERTIES: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.hive_sync.table_properties")
.noDefaultValue()
.withDocumentation("Additional properties to store with table.")
val HIVE_TABLE_SERDE_PROPERTIES: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.hive_sync.serde_properties")
.noDefaultValue()
.withDocumentation("")
val HIVE_SYNC_AS_DATA_SOURCE_TABLE: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.hive_sync.sync_as_datasource")
.defaultValue("true")
@@ -461,7 +403,7 @@ object DataSourceWriteOptions {
val ASYNC_COMPACT_ENABLE_OPT_KEY: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.compaction.async.enable")
.defaultValue("true")
.withDocumentation("")
.withDocumentation("Controls whether async compaction should be turned on for MOR table writing.")
val INLINE_CLUSTERING_ENABLE_OPT_KEY: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.clustering.inline.enable")

View File

@@ -17,20 +17,17 @@
package org.apache.hudi
import java.util
import java.util.Properties
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.HoodieWriteResult
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, BOOTSTRAP_INDEX_CLASS_PROP}
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
@@ -43,14 +40,15 @@ import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.sync.common.AbstractSyncTool
import org.apache.hudi.table.BulkInsertPartitioner
import org.apache.log4j.LogManager
import org.apache.spark.SPARK_VERSION
import org.apache.spark.SparkContext
import org.apache.spark.{SPARK_VERSION, SparkContext}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset,Row, SQLContext, SaveMode, SparkSession}
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import java.util
import java.util.Properties
import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer
@@ -365,7 +363,7 @@ object HoodieSparkSqlWriter {
} else if (SPARK_VERSION.startsWith("3.")) {
hoodieDF.write.format("org.apache.hudi.spark3.internal")
.option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
.option(HoodieWriteConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key, hoodieDF.schema.toDDL)
.option(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key, hoodieDF.schema.toDDL)
.options(params)
.mode(SaveMode.Append)
.save()

View File

@@ -40,7 +40,7 @@ public class DefaultSource extends BaseDefaultSource implements TableProvider {
@Override
public StructType inferSchema(CaseInsensitiveStringMap options) {
return StructType.fromDDL(options.get(HoodieWriteConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key()));
return StructType.fromDDL(options.get(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key()));
}
@Override

View File

@@ -31,31 +31,33 @@ public class HoodieWriteCommitKafkaCallbackConfig extends HoodieConfig {
.key(CALLBACK_PREFIX + "kafka.bootstrap.servers")
.noDefaultValue()
.sinceVersion("0.7.0")
.withDocumentation("Bootstrap servers of kafka callback cluster");
.withDocumentation("Bootstrap servers of kafka cluster, to be used for publishing commit metadata.");
public static final ConfigProperty<String> CALLBACK_KAFKA_TOPIC = ConfigProperty
.key(CALLBACK_PREFIX + "kafka.topic")
.noDefaultValue()
.sinceVersion("0.7.0")
.withDocumentation("Kafka topic to be sent along with callback messages");
.withDocumentation("Kafka topic name to publish timeline activity into.");
public static final ConfigProperty<String> CALLBACK_KAFKA_PARTITION = ConfigProperty
.key(CALLBACK_PREFIX + "kafka.partition")
.noDefaultValue()
.sinceVersion("0.7.0")
.withDocumentation("partition of CALLBACK_KAFKA_TOPIC, 0 by default");
.withDocumentation("It may be desirable to serialize all changes into a single Kafka partition "
+ " for providing strict ordering. By default, Kafka messages are keyed by table name, which "
+ " guarantees ordering at the table level, but not globally (or when new partitions are added)");
public static final ConfigProperty<String> CALLBACK_KAFKA_ACKS = ConfigProperty
.key(CALLBACK_PREFIX + "kafka.acks")
.defaultValue("all")
.sinceVersion("0.7.0")
.withDocumentation("kafka acks level, all by default");
.withDocumentation("kafka acks level, all by default to ensure strong durability.");
public static final ConfigProperty<Integer> CALLBACK_KAFKA_RETRIES = ConfigProperty
.key(CALLBACK_PREFIX + "kafka.retries")
.defaultValue(3)
.sinceVersion("0.7.0")
.withDocumentation("Times to retry. 3 by default");
.withDocumentation("Times to retry the produce. 3 by default");
/**
* Set default value for {@link HoodieWriteCommitKafkaCallbackConfig} if needed.