diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java index 3eddba07e..a73138440 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java @@ -18,6 +18,8 @@ package org.apache.hudi.client; +import org.apache.hudi.ApiMaturityLevel; +import org.apache.hudi.PublicAPIClass; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieWriteStat; @@ -40,6 +42,7 @@ import static org.apache.hudi.common.model.DefaultHoodieRecordPayload.METADATA_E /** * Status of a write operation. */ +@PublicAPIClass(maturity = ApiMaturityLevel.STABLE) public class WriteStatus implements Serializable { private static final Logger LOG = LogManager.getLogger(WriteStatus.class); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java index 9252b9283..b9a7ac2e7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java @@ -97,7 +97,7 @@ public class HoodieBootstrapConfig extends HoodieConfig { .key("hoodie.bootstrap.index.class") .defaultValue(HFileBootstrapIndex.class.getName()) .sinceVersion("0.6.0") - .withDocumentation(""); + .withDocumentation("Implementation to use, for mapping a skeleton base file to a boostrap base file."); private HoodieBootstrapConfig() { super(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java index a750fa23c..3dbd5d2a4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java @@ -31,36 +31,6 @@ import java.util.Properties; */ public class HoodieClusteringConfig extends HoodieConfig { - public static final ConfigProperty CLUSTERING_PLAN_STRATEGY_CLASS = ConfigProperty - .key("hoodie.clustering.plan.strategy.class") - .defaultValue("org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy") - .sinceVersion("0.7.0") - .withDocumentation("Config to provide a strategy class to create ClusteringPlan. Class has to be subclass of ClusteringPlanStrategy"); - - public static final ConfigProperty CLUSTERING_EXECUTION_STRATEGY_CLASS = ConfigProperty - .key("hoodie.clustering.execution.strategy.class") - .defaultValue("org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy") - .sinceVersion("0.7.0") - .withDocumentation("Config to provide a strategy class to execute a ClusteringPlan. Class has to be subclass of RunClusteringStrategy"); - - public static final ConfigProperty INLINE_CLUSTERING_PROP = ConfigProperty - .key("hoodie.clustering.inline") - .defaultValue("false") - .sinceVersion("0.7.0") - .withDocumentation("Turn on inline clustering - clustering will be run after write operation is complete"); - - public static final ConfigProperty INLINE_CLUSTERING_MAX_COMMIT_PROP = ConfigProperty - .key("hoodie.clustering.inline.max.commits") - .defaultValue("4") - .sinceVersion("0.7.0") - .withDocumentation("Config to control frequency of inline clustering"); - - public static final ConfigProperty ASYNC_CLUSTERING_MAX_COMMIT_PROP = ConfigProperty - .key("hoodie.clustering.async.max.commits") - .defaultValue("4") - .sinceVersion("0.9.0") - .withDocumentation("Config to control frequency of async clustering"); - // Any strategy specific params can be saved with this prefix public static final String CLUSTERING_STRATEGY_PARAM_PREFIX = "hoodie.clustering.plan.strategy."; @@ -70,6 +40,40 @@ public class HoodieClusteringConfig extends HoodieConfig { .sinceVersion("0.7.0") .withDocumentation("Number of partitions to list to create ClusteringPlan"); + public static final ConfigProperty CLUSTERING_PLAN_STRATEGY_CLASS = ConfigProperty + .key("hoodie.clustering.plan.strategy.class") + .defaultValue("org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy") + .sinceVersion("0.7.0") + .withDocumentation("Config to provide a strategy class (subclass of ClusteringPlanStrategy) to create clustering plan " + + "i.e select what file groups are being clustered. Default strategy, looks at the last N (determined by " + + CLUSTERING_TARGET_PARTITIONS.key() + ") day based partitions picks the small file slices within those partitions."); + + public static final ConfigProperty CLUSTERING_EXECUTION_STRATEGY_CLASS = ConfigProperty + .key("hoodie.clustering.execution.strategy.class") + .defaultValue("org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy") + .sinceVersion("0.7.0") + .withDocumentation("Config to provide a strategy class (subclass of RunClusteringStrategy) to define how the " + + " clustering plan is executed. By default, we sort the file groups in th plan by the specified columns, while " + + " meeting the configured target file sizes."); + + public static final ConfigProperty INLINE_CLUSTERING_PROP = ConfigProperty + .key("hoodie.clustering.inline") + .defaultValue("false") + .sinceVersion("0.7.0") + .withDocumentation("Turn on inline clustering - clustering will be run after each write operation is complete"); + + public static final ConfigProperty INLINE_CLUSTERING_MAX_COMMIT_PROP = ConfigProperty + .key("hoodie.clustering.inline.max.commits") + .defaultValue("4") + .sinceVersion("0.7.0") + .withDocumentation("Config to control frequency of clustering planning"); + + public static final ConfigProperty ASYNC_CLUSTERING_MAX_COMMIT_PROP = ConfigProperty + .key("hoodie.clustering.async.max.commits") + .defaultValue("4") + .sinceVersion("0.9.0") + .withDocumentation("Config to control frequency of async clustering"); + public static final ConfigProperty CLUSTERING_PLAN_SMALL_FILE_LIMIT = ConfigProperty .key(CLUSTERING_STRATEGY_PARAM_PREFIX + "small.file.limit") .defaultValue(String.valueOf(600 * 1024 * 1024L)) @@ -80,7 +84,7 @@ public class HoodieClusteringConfig extends HoodieConfig { .key(CLUSTERING_STRATEGY_PARAM_PREFIX + "max.bytes.per.group") .defaultValue(String.valueOf(2 * 1024 * 1024 * 1024L)) .sinceVersion("0.7.0") - .withDocumentation("Each clustering operation can create multiple groups. Total amount of data processed by clustering operation" + .withDocumentation("Each clustering operation can create multiple output file groups. Total amount of data processed by clustering operation" + " is defined by below two properties (CLUSTERING_MAX_BYTES_PER_GROUP * CLUSTERING_MAX_NUM_GROUPS)." + " Max amount of data to be included in one group"); @@ -92,7 +96,7 @@ public class HoodieClusteringConfig extends HoodieConfig { public static final ConfigProperty CLUSTERING_TARGET_FILE_MAX_BYTES = ConfigProperty .key(CLUSTERING_STRATEGY_PARAM_PREFIX + "target.file.max.bytes") - .defaultValue(String.valueOf(1 * 1024 * 1024 * 1024L)) + .defaultValue(String.valueOf(1024 * 1024 * 1024L)) .sinceVersion("0.7.0") .withDocumentation("Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups"); @@ -106,13 +110,14 @@ public class HoodieClusteringConfig extends HoodieConfig { .key("hoodie.clustering.updates.strategy") .defaultValue("org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy") .sinceVersion("0.7.0") - .withDocumentation("When file groups is in clustering, need to handle the update to these file groups. Default strategy just reject the update"); + .withDocumentation("Determines how to handle updates, deletes to file groups that are under clustering." + + " Default strategy just rejects the update"); public static final ConfigProperty ASYNC_CLUSTERING_ENABLE_OPT_KEY = ConfigProperty .key("hoodie.clustering.async.enabled") .defaultValue("false") .sinceVersion("0.7.0") - .withDocumentation("Async clustering"); + .withDocumentation("Enable running of clustering service, asynchronously as inserts happen on the table."); private HoodieClusteringConfig() { super(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index aa9e75ca5..e8d55934b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -33,7 +33,9 @@ import javax.annotation.concurrent.Immutable; import java.io.File; import java.io.FileReader; import java.io.IOException; +import java.util.Arrays; import java.util.Properties; +import java.util.stream.Collectors; /** * Compaction related config. @@ -41,101 +43,114 @@ import java.util.Properties; @Immutable public class HoodieCompactionConfig extends HoodieConfig { - public static final ConfigProperty CLEANER_POLICY_PROP = ConfigProperty - .key("hoodie.cleaner.policy") - .defaultValue(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()) - .withDocumentation("Cleaning policy to be used. Hudi will delete older versions of parquet files to re-claim space." - + " Any Query/Computation referring to this version of the file will fail. " - + "It is good to make sure that the data is retained for more than the maximum query execution time."); - public static final ConfigProperty AUTO_CLEAN_PROP = ConfigProperty .key("hoodie.clean.automatic") .defaultValue("true") - .withDocumentation("Should cleanup if there is anything to cleanup immediately after the commit"); + .withDocumentation("When enabled, the cleaner table service is invoked immediately after each commit," + + " to delete older file slices. It's recommended to enable this, to ensure metadata and data storage" + + " growth is bounded."); public static final ConfigProperty ASYNC_CLEAN_PROP = ConfigProperty .key("hoodie.clean.async") .defaultValue("false") - .withDocumentation("Only applies when #withAutoClean is turned on. When turned on runs cleaner async with writing."); - - public static final ConfigProperty INLINE_COMPACT_PROP = ConfigProperty - .key("hoodie.compact.inline") - .defaultValue("false") - .withDocumentation("When set to true, compaction is triggered by the ingestion itself, " - + "right after a commit/deltacommit action as part of insert/upsert/bulk_insert"); - - public static final ConfigProperty INLINE_COMPACT_NUM_DELTA_COMMITS_PROP = ConfigProperty - .key("hoodie.compact.inline.max.delta.commits") - .defaultValue("5") - .withDocumentation("Number of max delta commits to keep before triggering an inline compaction"); - - public static final ConfigProperty INLINE_COMPACT_TIME_DELTA_SECONDS_PROP = ConfigProperty - .key("hoodie.compact.inline.max.delta.seconds") - .defaultValue(String.valueOf(60 * 60)) - .withDocumentation("Run a compaction when time elapsed > N seconds since last compaction"); - - public static final ConfigProperty INLINE_COMPACT_TRIGGER_STRATEGY_PROP = ConfigProperty - .key("hoodie.compact.inline.trigger.strategy") - .defaultValue(CompactionTriggerStrategy.NUM_COMMITS.name()) - .withDocumentation(""); - - public static final ConfigProperty CLEANER_FILE_VERSIONS_RETAINED_PROP = ConfigProperty - .key("hoodie.cleaner.fileversions.retained") - .defaultValue("3") - .withDocumentation(""); + .withDocumentation("Only applies when " + AUTO_CLEAN_PROP.key() + " is turned on. " + + "When turned on runs cleaner async with writing, which can speed up overall write performance."); public static final ConfigProperty CLEANER_COMMITS_RETAINED_PROP = ConfigProperty .key("hoodie.cleaner.commits.retained") .defaultValue("10") - .withDocumentation("Number of commits to retain. So data will be retained for num_of_commits * time_between_commits " - + "(scheduled). This also directly translates into how much you can incrementally pull on this table"); + .withDocumentation("Number of commits to retain, without cleaning. This will be retained for num_of_commits * time_between_commits " + + "(scheduled). This also directly translates into how much data retention the table supports for incremental queries."); + + public static final ConfigProperty CLEANER_POLICY_PROP = ConfigProperty + .key("hoodie.cleaner.policy") + .defaultValue(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()) + .withDocumentation("Cleaning policy to be used. The cleaner service deletes older file slices files to re-claim space." + + " By default, cleaner spares the file slices written by the last N commits, determined by " + CLEANER_COMMITS_RETAINED_PROP.key() + + " Long running query plans may often refer to older file slices and will break if those are cleaned, before the query has had" + + " a chance to run. So, it is good to make sure that the data is retained for more than the maximum query execution time"); + + public static final ConfigProperty INLINE_COMPACT_PROP = ConfigProperty + .key("hoodie.compact.inline") + .defaultValue("false") + .withDocumentation("When set to true, compaction service is triggered after each write. While being " + + " simpler operationally, this adds extra latency on the write path."); + + public static final ConfigProperty INLINE_COMPACT_NUM_DELTA_COMMITS_PROP = ConfigProperty + .key("hoodie.compact.inline.max.delta.commits") + .defaultValue("5") + .withDocumentation("Number of delta commits after the last compaction, before scheduling of a new compaction is attempted."); + + public static final ConfigProperty INLINE_COMPACT_TIME_DELTA_SECONDS_PROP = ConfigProperty + .key("hoodie.compact.inline.max.delta.seconds") + .defaultValue(String.valueOf(60 * 60)) + .withDocumentation("Number of elapsed seconds after the last compaction, before scheduling a new one."); + + public static final ConfigProperty INLINE_COMPACT_TRIGGER_STRATEGY_PROP = ConfigProperty + .key("hoodie.compact.inline.trigger.strategy") + .defaultValue(CompactionTriggerStrategy.NUM_COMMITS.name()) + .withDocumentation("Controls how compaction scheduling is triggered, by time or num delta commits or combination of both. " + + "Valid options: " + Arrays.stream(CompactionTriggerStrategy.values()).map(Enum::name).collect(Collectors.joining(","))); + + public static final ConfigProperty CLEANER_FILE_VERSIONS_RETAINED_PROP = ConfigProperty + .key("hoodie.cleaner.fileversions.retained") + .defaultValue("3") + .withDocumentation("When " + HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name() + " cleaning policy is used, " + + " the minimum number of file slices to retain in each file group, during cleaning."); public static final ConfigProperty CLEANER_INCREMENTAL_MODE = ConfigProperty .key("hoodie.cleaner.incremental.mode") .defaultValue("true") - .withDocumentation(""); + .withDocumentation("When enabled, the plans for each cleaner service run is computed incrementally off the events " + + " in the timeline, since the last cleaner run. This is much more efficient than obtaining listings for the full" + + " table for each planning (even with a metadata table)."); public static final ConfigProperty MAX_COMMITS_TO_KEEP_PROP = ConfigProperty .key("hoodie.keep.max.commits") .defaultValue("30") - .withDocumentation("Each commit is a small file in the .hoodie directory. Since DFS typically does not favor lots of " - + "small files, Hudi archives older commits into a sequential log. A commit is published atomically " - + "by a rename of the commit file."); + .withDocumentation("Archiving service moves older entries from timeline into an archived log after each write, to " + + " keep the metadata overhead constant, even as the table size grows." + + "This config controls the maximum number of instants to retain in the active timeline. "); public static final ConfigProperty MIN_COMMITS_TO_KEEP_PROP = ConfigProperty .key("hoodie.keep.min.commits") .defaultValue("20") - .withDocumentation("Each commit is a small file in the .hoodie directory. Since DFS typically does not favor lots of " - + "small files, Hudi archives older commits into a sequential log. A commit is published atomically " - + "by a rename of the commit file."); + .withDocumentation("Similar to " + MAX_COMMITS_TO_KEEP_PROP.key() + ", but controls the minimum number of" + + "instants to retain in the active timeline."); public static final ConfigProperty COMMITS_ARCHIVAL_BATCH_SIZE_PROP = ConfigProperty .key("hoodie.commits.archival.batch") .defaultValue(String.valueOf(10)) - .withDocumentation("This controls the number of commit instants read in memory as a batch and archived together."); + .withDocumentation("Archiving of instants is batched in best-effort manner, to pack more instants into a single" + + " archive log. This config controls such archival batch size."); public static final ConfigProperty CLEANER_BOOTSTRAP_BASE_FILE_ENABLED = ConfigProperty .key("hoodie.cleaner.delete.bootstrap.base.file") .defaultValue("false") - .withDocumentation("Set true to clean bootstrap source files when necessary"); + .withDocumentation("When set to true, cleaner also deletes the bootstrap base file when it's skeleton base file is " + + " cleaned. Turn this to true, if you want to ensure the bootstrap dataset storage is reclaimed over time, as the" + + " table receives updates/deletes. Another reason to turn this on, would be to ensure data residing in bootstrap " + + " base files are also physically deleted, to comply with data privacy enforcement processes."); public static final ConfigProperty PARQUET_SMALL_FILE_LIMIT_BYTES = ConfigProperty .key("hoodie.parquet.small.file.limit") .defaultValue(String.valueOf(104857600)) - .withDocumentation("Upsert uses this file size to compact new data onto existing files. " - + "By default, treat any file <= 100MB as a small file."); + .withDocumentation("During upsert operation, we opportunistically expand existing small files on storage, instead of writing" + + " new files, to keep number of files to an optimum. This config sets the file size limit below which a file on storage " + + " becomes a candidate to be selected as such a `small file`. By default, treat any file <= 100MB as a small file."); public static final ConfigProperty RECORD_SIZE_ESTIMATION_THRESHOLD_PROP = ConfigProperty .key("hoodie.record.size.estimation.threshold") .defaultValue("1.0") - .withDocumentation("Hudi will use the previous commit to calculate the estimated record size by totalBytesWritten/totalRecordsWritten. " - + "If the previous commit is too small to make an accurate estimation, Hudi will search commits in the reverse order, " - + "until find a commit has totalBytesWritten larger than (PARQUET_SMALL_FILE_LIMIT_BYTES * RECORD_SIZE_ESTIMATION_THRESHOLD)"); + .withDocumentation("We use the previous commits' metadata to calculate the estimated record size and use it " + + " to bin pack records into partitions. If the previous commit is too small to make an accurate estimation, " + + " Hudi will search commits in the reverse order, until we find a commit that has totalBytesWritten " + + " larger than (PARQUET_SMALL_FILE_LIMIT_BYTES * this_threshold)"); public static final ConfigProperty CLEANER_PARALLELISM = ConfigProperty .key("hoodie.cleaner.parallelism") .defaultValue("200") - .withDocumentation("Increase this if cleaning becomes slow."); + .withDocumentation("Parallelism for the cleaning operation. Increase this if cleaning becomes slow."); // 500GB of target IO per compaction (both read and write public static final ConfigProperty TARGET_IO_PER_COMPACTION_IN_MB_PROP = ConfigProperty @@ -161,15 +176,15 @@ public class HoodieCompactionConfig extends HoodieConfig { public static final ConfigProperty COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = ConfigProperty .key("hoodie.compaction.lazy.block.read") .defaultValue("false") - .withDocumentation("When a CompactedLogScanner merges all log files, this config helps to choose whether the logblocks " - + "should be read lazily or not. Choose true to use I/O intensive lazy block reading (low memory usage) or false " - + "for Memory intensive immediate block read (high memory usage)"); + .withDocumentation("When merging the delta log files, this config helps to choose whether the log blocks " + + "should be read lazily or not. Choose true to use lazy block reading (low memory usage, but incurs seeks to each block" + + " header) or false for immediate block read (higher memory usage)"); public static final ConfigProperty COMPACTION_REVERSE_LOG_READ_ENABLED_PROP = ConfigProperty .key("hoodie.compaction.reverse.log.read") .defaultValue("false") .withDocumentation("HoodieLogFormatReader reads a logfile in the forward direction starting from pos=0 to pos=file_length. " - + "If this config is set to true, the Reader reads the logfile in reverse direction, from pos=file_length to pos=0"); + + "If this config is set to true, the reader reads the logfile in reverse direction, from pos=file_length to pos=0"); public static final ConfigProperty FAILED_WRITES_CLEANER_POLICY_PROP = ConfigProperty .key("hoodie.cleaner.policy.failed.writes") @@ -190,22 +205,24 @@ public class HoodieCompactionConfig extends HoodieConfig { public static final ConfigProperty COPY_ON_WRITE_TABLE_INSERT_SPLIT_SIZE = ConfigProperty .key("hoodie.copyonwrite.insert.split.size") .defaultValue(String.valueOf(500000)) - .withDocumentation("Number of inserts, that will be put each partition/bucket for writing. " - + "The rationale to pick the insert parallelism is the following. Writing out 100MB files, " - + "with at least 1kb records, means 100K records per file. we just over provision to 500K."); + .withDocumentation("Number of inserts assigned for each partition/bucket for writing. " + + "We based the default on writing out 100MB files, with at least 1kb records (100K records per file), and " + + " over provision to 500K. As long as auto-tuning of splits is turned on, this only affects the first " + + " write, where there is no history to learn record sizes from."); public static final ConfigProperty COPY_ON_WRITE_TABLE_AUTO_SPLIT_INSERTS = ConfigProperty .key("hoodie.copyonwrite.insert.auto.split") .defaultValue("true") .withDocumentation("Config to control whether we control insert split sizes automatically based on average" - + " record sizes."); + + " record sizes. It's recommended to keep this turned on, since hand tuning is otherwise extremely" + + " cumbersome."); public static final ConfigProperty COPY_ON_WRITE_TABLE_RECORD_SIZE_ESTIMATE = ConfigProperty .key("hoodie.copyonwrite.record.size.estimate") .defaultValue(String.valueOf(1024)) - .withDocumentation("The average record size. If specified, hudi will use this and not compute dynamically " - + "based on the last 24 commit’s metadata. No value set as default. This is critical in computing " - + "the insert parallelism and bin-packing inserts into small files. See above."); + .withDocumentation("The average record size. If not explicitly specified, hudi will compute the " + + "record size estimate compute dynamically based on commit metadata. " + + " This is critical in computing the insert parallelism and bin-packing inserts into small files."); private HoodieCompactionConfig() { super(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java index 9a47ff8b3..95219d049 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java @@ -48,7 +48,8 @@ public class HoodieHBaseIndexConfig extends HoodieConfig { public static final ConfigProperty HBASE_GET_BATCH_SIZE_PROP = ConfigProperty .key("hoodie.index.hbase.get.batch.size") .defaultValue(100) - .withDocumentation(""); + .withDocumentation("Controls the batch size for performing gets against HBase. " + + "Batching improves throughput, by saving round trips."); public static final ConfigProperty HBASE_ZK_ZNODEPARENT = ConfigProperty .key("hoodie.index.hbase.zknode.path") @@ -59,12 +60,14 @@ public class HoodieHBaseIndexConfig extends HoodieConfig { public static final ConfigProperty HBASE_PUT_BATCH_SIZE_PROP = ConfigProperty .key("hoodie.index.hbase.put.batch.size") .defaultValue(100) - .withDocumentation(""); + .withDocumentation("Controls the batch size for performing puts against HBase. " + + "Batching improves throughput, by saving round trips."); public static final ConfigProperty HBASE_INDEX_QPS_ALLOCATOR_CLASS = ConfigProperty .key("hoodie.index.hbase.qps.allocator.class") .defaultValue(DefaultHBaseQPSResourceAllocator.class.getName()) - .withDocumentation("Property to set which implementation of HBase QPS resource allocator to be used"); + .withDocumentation("Property to set which implementation of HBase QPS resource allocator to be used, which" + + "controls the batching rate dynamically."); public static final ConfigProperty HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP = ConfigProperty .key("hoodie.index.hbase.put.batch.size.autocompute") @@ -90,17 +93,17 @@ public class HoodieHBaseIndexConfig extends HoodieConfig { public static final ConfigProperty HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY = ConfigProperty .key("hoodie.index.hbase.dynamic_qps") .defaultValue(false) - .withDocumentation("Property to decide if HBASE_QPS_FRACTION_PROP is dynamically calculated based on volume"); + .withDocumentation("Property to decide if HBASE_QPS_FRACTION_PROP is dynamically calculated based on write volume."); public static final ConfigProperty HBASE_MIN_QPS_FRACTION_PROP = ConfigProperty .key("hoodie.index.hbase.min.qps.fraction") .noDefaultValue() - .withDocumentation("Min for HBASE_QPS_FRACTION_PROP to stabilize skewed volume workloads"); + .withDocumentation("Minimum for HBASE_QPS_FRACTION_PROP to stabilize skewed write workloads"); public static final ConfigProperty HBASE_MAX_QPS_FRACTION_PROP = ConfigProperty .key("hoodie.index.hbase.max.qps.fraction") .noDefaultValue() - .withDocumentation("Max for HBASE_QPS_FRACTION_PROP to stabilize skewed volume workloads"); + .withDocumentation("Maximum for HBASE_QPS_FRACTION_PROP to stabilize skewed write workloads"); public static final ConfigProperty HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS = ConfigProperty .key("hoodie.index.hbase.desired_puts_time_in_secs") @@ -120,17 +123,18 @@ public class HoodieHBaseIndexConfig extends HoodieConfig { public static final ConfigProperty HOODIE_INDEX_HBASE_ZK_SESSION_TIMEOUT_MS = ConfigProperty .key("hoodie.index.hbase.zk.session_timeout_ms") .defaultValue(60 * 1000) - .withDocumentation(""); + .withDocumentation("Session timeout value to use for Zookeeper failure detection, for the HBase client." + + "Lower this value, if you want to fail faster."); public static final ConfigProperty HOODIE_INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS = ConfigProperty .key("hoodie.index.hbase.zk.connection_timeout_ms") .defaultValue(15 * 1000) - .withDocumentation(""); + .withDocumentation("Timeout to use for establishing connection with zookeeper, from HBase client."); public static final ConfigProperty HBASE_ZK_PATH_QPS_ROOT = ConfigProperty .key("hoodie.index.hbase.zkpath.qps_root") .defaultValue("/QPS_ROOT") - .withDocumentation(""); + .withDocumentation("chroot in zookeeper, to use for all qps allocation co-ordination."); public static final ConfigProperty HBASE_INDEX_UPDATE_PARTITION_PATH = ConfigProperty .key("hoodie.hbase.index.update.partition.path") diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java index ba9cb18df..1c5f2b629 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java @@ -58,13 +58,12 @@ public class HoodieIndexConfig extends HoodieConfig { .defaultValue("60000") .withDocumentation("Only applies if index type is BLOOM. " + "This is the number of entries to be stored in the bloom filter. " - + "We assume the maxParquetFileSize is 128MB and averageRecordSize is 1024B and " + + "The rationale for the default: Assume the maxParquetFileSize is 128MB and averageRecordSize is 1kb and " + "hence we approx a total of 130K records in a file. The default (60000) is roughly half of this approximation. " - + "HUDI-56 tracks computing this dynamically. Warning: Setting this very low, " - + "will generate a lot of false positives and index lookup will have to scan a lot more files " - + "than it has to and Setting this to a very high number will increase the size every data file linearly " - + "(roughly 4KB for every 50000 entries). " - + "This config is also used with DYNNAMIC bloom filter which determines the initial size for the bloom."); + + "Warning: Setting this very low, will generate a lot of false positives and index lookup " + + "will have to scan a lot more files than it has to and setting this to a very high number will " + + "increase the size every base file linearly (roughly 4KB for every 50000 entries). " + + "This config is also used with DYNAMIC bloom filter which determines the initial size for the bloom."); public static final ConfigProperty BLOOM_FILTER_FPP = ConfigProperty .key("hoodie.index.bloom.fpp") @@ -73,16 +72,15 @@ public class HoodieIndexConfig extends HoodieConfig { + "Error rate allowed given the number of entries. This is used to calculate how many bits should be " + "assigned for the bloom filter and the number of hash functions. This is usually set very low (default: 0.000000001), " + "we like to tradeoff disk space for lower false positives. " - + "If the number of entries added to bloom filter exceeds the congfigured value (hoodie.index.bloom.num_entries), " + + "If the number of entries added to bloom filter exceeds the configured value (hoodie.index.bloom.num_entries), " + "then this fpp may not be honored."); public static final ConfigProperty BLOOM_INDEX_PARALLELISM_PROP = ConfigProperty .key("hoodie.bloom.index.parallelism") .defaultValue("0") .withDocumentation("Only applies if index type is BLOOM. " - + "This is the amount of parallelism for index lookup, which involves a Spark Shuffle. " - + "By default, this is auto computed based on input workload characteristics. " - + "Disable explicit bloom index parallelism setting by default - hoodie auto computes"); + + "This is the amount of parallelism for index lookup, which involves a shuffle. " + + "By default, this is auto computed based on input workload characteristics."); public static final ConfigProperty BLOOM_INDEX_PRUNE_BY_RANGES_PROP = ConfigProperty .key("hoodie.bloom.index.prune.by.ranges") @@ -90,7 +88,8 @@ public class HoodieIndexConfig extends HoodieConfig { .withDocumentation("Only applies if index type is BLOOM. " + "When true, range information from files to leveraged speed up index lookups. Particularly helpful, " + "if the key has a monotonously increasing prefix, such as timestamp. " - + "If the record key is completely random, it is better to turn this off."); + + "If the record key is completely random, it is better to turn this off, since range pruning will only " + + " add extra overhead to the index lookup."); public static final ConfigProperty BLOOM_INDEX_USE_CACHING_PROP = ConfigProperty .key("hoodie.bloom.index.use.caching") @@ -131,7 +130,7 @@ public class HoodieIndexConfig extends HoodieConfig { .key("hoodie.simple.index.use.caching") .defaultValue("true") .withDocumentation("Only applies if index type is SIMPLE. " - + "When true, the input RDD will cached to speed up index lookup by reducing IO " + + "When true, the incoming writes will cached to speed up index lookup by reducing IO " + "for computing parallelism or affected partitions"); public static final ConfigProperty SIMPLE_INDEX_PARALLELISM_PROP = ConfigProperty @@ -187,7 +186,7 @@ public class HoodieIndexConfig extends HoodieConfig { public static final ConfigProperty SIMPLE_INDEX_UPDATE_PARTITION_PATH = ConfigProperty .key("hoodie.simple.index.update.partition.path") .defaultValue("false") - .withDocumentation(""); + .withDocumentation("Similar to " + BLOOM_INDEX_UPDATE_PARTITION_PATH + ", but for simple index."); private EngineType engineType; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java index 28e2f7c95..f6e1e6ec9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java @@ -18,6 +18,7 @@ package org.apache.hudi.config; +import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; /** @@ -30,13 +31,18 @@ public class HoodieInternalConfig extends HoodieConfig { public static final String BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED = "hoodie.bulkinsert.are.partitioner.records.sorted"; public static final Boolean DEFAULT_BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED = false; + public static final ConfigProperty BULKINSERT_INPUT_DATA_SCHEMA_DDL = ConfigProperty + .key("hoodie.bulkinsert.schema.ddl") + .noDefaultValue() + .withDocumentation("Schema set for row writer/bulk insert."); + /** * Returns if partition records are sorted or not. + * * @param propertyValue value for property BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED. * @return the property value. */ public static Boolean getBulkInsertIsPartitionRecordsSorted(String propertyValue) { return propertyValue != null ? Boolean.parseBoolean(propertyValue) : DEFAULT_BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED; } - } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java index 06094190d..846437ee5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java @@ -17,8 +17,8 @@ package org.apache.hudi.config; -import org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy; import org.apache.hudi.client.transaction.ConflictResolutionStrategy; +import org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy; import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; @@ -61,94 +61,94 @@ public class HoodieLockConfig extends HoodieConfig { .key(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY) .defaultValue(DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS) .sinceVersion("0.8.0") - .withDocumentation("Parameter used in the exponential backoff retry policy. Stands for the Initial amount " - + "of time to wait between retries by lock provider client"); + .withDocumentation("Initial amount of time to wait between retries to acquire locks, " + + " subsequent retries will exponentially backoff."); public static final ConfigProperty LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP = ConfigProperty .key(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY) .defaultValue(String.valueOf(5000L)) .sinceVersion("0.8.0") - .withDocumentation("Parameter used in the exponential backoff retry policy. Stands for the maximum amount " - + "of time to wait between retries by lock provider client"); + .withDocumentation("Maximum amount of time to wait between retries by lock provider client. This bounds" + + " the maximum delay from the exponential backoff. Currently used by ZK based lock provider only."); public static final ConfigProperty LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP = ConfigProperty .key(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY) .defaultValue(String.valueOf(10000L)) .sinceVersion("0.8.0") - .withDocumentation("Amount of time to wait between retries from the hudi client"); + .withDocumentation("Amount of time to wait between retries on the lock provider by the lock manager"); public static final ConfigProperty LOCK_ACQUIRE_NUM_RETRIES_PROP = ConfigProperty .key(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY) .defaultValue(DEFAULT_LOCK_ACQUIRE_NUM_RETRIES) .sinceVersion("0.8.0") - .withDocumentation("Maximum number of times to retry by lock provider client"); + .withDocumentation("Maximum number of times to retry lock acquire, at each lock provider"); public static final ConfigProperty LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP = ConfigProperty .key(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY) .defaultValue(String.valueOf(0)) .sinceVersion("0.8.0") - .withDocumentation("Maximum number of times to retry to acquire lock additionally from the hudi client"); + .withDocumentation("Maximum number of times to retry to acquire lock additionally from the lock manager."); public static final ConfigProperty LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP = ConfigProperty .key(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY) .defaultValue(60 * 1000) .sinceVersion("0.8.0") - .withDocumentation(""); + .withDocumentation("Timeout in ms, to wait on an individual lock acquire() call, at the lock provider."); public static final ConfigProperty FILESYSTEM_LOCK_PATH_PROP = ConfigProperty .key(FILESYSTEM_LOCK_PATH_PROP_KEY) .noDefaultValue() .sinceVersion("0.8.0") - .withDocumentation(""); + .withDocumentation("For DFS based lock providers, path to store the locks under."); public static final ConfigProperty HIVE_DATABASE_NAME_PROP = ConfigProperty .key(HIVE_DATABASE_NAME_PROP_KEY) .noDefaultValue() .sinceVersion("0.8.0") - .withDocumentation("The Hive database to acquire lock against"); + .withDocumentation("For Hive based lock provider, the Hive database to acquire lock against"); public static final ConfigProperty HIVE_TABLE_NAME_PROP = ConfigProperty .key(HIVE_TABLE_NAME_PROP_KEY) .noDefaultValue() .sinceVersion("0.8.0") - .withDocumentation("The Hive table under the hive database to acquire lock against"); + .withDocumentation("For Hive based lock provider, the Hive table to acquire lock against"); public static final ConfigProperty HIVE_METASTORE_URI_PROP = ConfigProperty .key(HIVE_METASTORE_URI_PROP_KEY) .noDefaultValue() .sinceVersion("0.8.0") - .withDocumentation(""); + .withDocumentation("For Hive based lock provider, the Hive metastore URI to acquire locks against."); public static final ConfigProperty ZK_BASE_PATH_PROP = ConfigProperty .key(ZK_BASE_PATH_PROP_KEY) .noDefaultValue() .sinceVersion("0.8.0") - .withDocumentation("The base path on Zookeeper under which to create a ZNode to acquire the lock. " - + "This should be common for all jobs writing to the same table"); + .withDocumentation("The base path on Zookeeper under which to create lock related ZNodes. " + + "This should be same for all concurrent writers to the same table"); public static final ConfigProperty ZK_SESSION_TIMEOUT_MS_PROP = ConfigProperty .key(ZK_SESSION_TIMEOUT_MS_PROP_KEY) .defaultValue(DEFAULT_ZK_SESSION_TIMEOUT_MS) .sinceVersion("0.8.0") - .withDocumentation("How long to wait after losing a connection to ZooKeeper before the session is expired"); + .withDocumentation("Timeout in ms, to wait after losing connection to ZooKeeper, before the session is expired"); public static final ConfigProperty ZK_CONNECTION_TIMEOUT_MS_PROP = ConfigProperty .key(ZK_CONNECTION_TIMEOUT_MS_PROP_KEY) .defaultValue(DEFAULT_ZK_CONNECTION_TIMEOUT_MS) .sinceVersion("0.8.0") - .withDocumentation("How long to wait when connecting to ZooKeeper before considering the connection a failure"); + .withDocumentation("Timeout in ms, to wait for establishing connection with Zookeeper."); public static final ConfigProperty ZK_CONNECT_URL_PROP = ConfigProperty .key(ZK_CONNECT_URL_PROP_KEY) .noDefaultValue() .sinceVersion("0.8.0") - .withDocumentation("Set the list of comma separated servers to connect to"); + .withDocumentation("Zookeeper URL to connect to."); public static final ConfigProperty ZK_PORT_PROP = ConfigProperty .key(ZK_PORT_PROP_KEY) .noDefaultValue() .sinceVersion("0.8.0") - .withDocumentation("The connection port to be used for Zookeeper"); + .withDocumentation("Zookeeper port to connect to."); public static final ConfigProperty ZK_LOCK_KEY_PROP = ConfigProperty .key(ZK_LOCK_KEY_PROP_KEY) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java index fb4cb04cd..2303f235b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java @@ -59,22 +59,22 @@ public class HoodieMemoryConfig extends HoodieConfig { public static final ConfigProperty MAX_MEMORY_FOR_MERGE_PROP = ConfigProperty .key("hoodie.memory.merge.max.size") .defaultValue(DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES) - .withDocumentation("Property to set the max memory for merge"); + .withDocumentation("Maximum amount of memory used for merge operations, before spilling to local storage."); public static final ConfigProperty MAX_MEMORY_FOR_COMPACTION_PROP = ConfigProperty .key("hoodie.memory.compaction.max.size") .noDefaultValue() - .withDocumentation("Property to set the max memory for compaction"); + .withDocumentation("Maximum amount of memory used for compaction operations, before spilling to local storage."); public static final ConfigProperty MAX_DFS_STREAM_BUFFER_SIZE_PROP = ConfigProperty .key("hoodie.memory.dfs.buffer.max.size") .defaultValue(16 * 1024 * 1024) - .withDocumentation("Property to set the max memory for dfs inputstream buffer size"); + .withDocumentation("Property to control the max memory for dfs input stream buffer size"); public static final ConfigProperty SPILLABLE_MAP_BASE_PATH_PROP = ConfigProperty .key("hoodie.memory.spillable.map.path") .defaultValue("/tmp/") - .withDocumentation("Default file path prefix for spillable file"); + .withDocumentation("Default file path prefix for spillable map"); public static final ConfigProperty WRITESTATUS_FAILURE_FRACTION_PROP = ConfigProperty .key("hoodie.memory.writestatus.failure.fraction") diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMetricsDatadogConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMetricsDatadogConfig.java index 2be028dfc..c86232b80 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMetricsDatadogConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMetricsDatadogConfig.java @@ -41,7 +41,7 @@ public class HoodieMetricsDatadogConfig extends HoodieConfig { .key(DATADOG_PREFIX + ".report.period.seconds") .defaultValue(30) .sinceVersion("0.6.0") - .withDocumentation("Datadog report period in seconds. Default to 30."); + .withDocumentation("Datadog reporting period in seconds. Default to 30."); public static final ConfigProperty DATADOG_API_SITE = ConfigProperty .key(DATADOG_PREFIX + ".api.site") diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMetricsPrometheusConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMetricsPrometheusConfig.java index bc33b39ae..504073720 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMetricsPrometheusConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMetricsPrometheusConfig.java @@ -34,19 +34,19 @@ public class HoodieMetricsPrometheusConfig extends HoodieConfig { .key(PUSHGATEWAY_PREFIX + ".host") .defaultValue("localhost") .sinceVersion("0.6.0") - .withDocumentation(""); + .withDocumentation("Hostname of the prometheus push gateway"); public static final ConfigProperty PUSHGATEWAY_PORT = ConfigProperty .key(PUSHGATEWAY_PREFIX + ".port") .defaultValue(9091) .sinceVersion("0.6.0") - .withDocumentation(""); + .withDocumentation("Port for the push gateway."); public static final ConfigProperty PUSHGATEWAY_REPORT_PERIOD_SECONDS = ConfigProperty .key(PUSHGATEWAY_PREFIX + ".report.period.seconds") .defaultValue(30) .sinceVersion("0.6.0") - .withDocumentation(""); + .withDocumentation("Reporting interval in seconds."); public static final ConfigProperty PUSHGATEWAY_DELETE_ON_SHUTDOWN = ConfigProperty .key(PUSHGATEWAY_PREFIX + ".delete.on.shutdown") @@ -58,7 +58,7 @@ public class HoodieMetricsPrometheusConfig extends HoodieConfig { .key(PUSHGATEWAY_PREFIX + ".job.name") .defaultValue("") .sinceVersion("0.6.0") - .withDocumentation(""); + .withDocumentation("Name of the push gateway job."); public static final ConfigProperty PUSHGATEWAY_RANDOM_JOB_NAME_SUFFIX = ConfigProperty .key(PUSHGATEWAY_PREFIX + ".random.job.name.suffix") @@ -73,7 +73,7 @@ public class HoodieMetricsPrometheusConfig extends HoodieConfig { .key(PROMETHEUS_PREFIX + ".port") .defaultValue(9090) .sinceVersion("0.6.0") - .withDocumentation(""); + .withDocumentation("Port for prometheus server."); private HoodieMetricsPrometheusConfig() { super(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java index 086c3387d..b91f1b68b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java @@ -37,12 +37,14 @@ public class HoodiePayloadConfig extends HoodieConfig { public static final ConfigProperty PAYLOAD_ORDERING_FIELD_PROP = ConfigProperty .key(PAYLOAD_ORDERING_FIELD_PROP_KEY) .defaultValue("ts") - .withDocumentation("Property to hold the payload ordering field name"); + .withDocumentation("Table column/field name to order records that have the same key, before " + + "merging and writing to storage."); public static final ConfigProperty PAYLOAD_EVENT_TIME_FIELD_PROP = ConfigProperty .key(PAYLOAD_EVENT_TIME_FIELD_PROP_KEY) .defaultValue("ts") - .withDocumentation("Property for payload event time field"); + .withDocumentation("Table column/field name to derive timestamp associated with the records. This can" + + "be useful for e.g, determining the freshness of the table."); private HoodiePayloadConfig() { super(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java index 9aff8c929..ec1e15599 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java @@ -43,19 +43,19 @@ public class HoodieStorageConfig extends HoodieConfig { public static final ConfigProperty PARQUET_BLOCK_SIZE_BYTES = ConfigProperty .key("hoodie.parquet.block.size") .defaultValue(String.valueOf(120 * 1024 * 1024)) - .withDocumentation("Parquet RowGroup size. Its better this is same as the file size, so that a single column " - + "within a file is stored continuously on disk"); + .withDocumentation("Parquet RowGroup size. It's recommended to make this large enough that scan costs can be" + + " amortized by packing enough column values into a single row group."); public static final ConfigProperty PARQUET_PAGE_SIZE_BYTES = ConfigProperty .key("hoodie.parquet.page.size") .defaultValue(String.valueOf(1 * 1024 * 1024)) .withDocumentation("Parquet page size. Page is the unit of read within a parquet file. " - + "Within a block, pages are compressed seperately."); + + "Within a block, pages are compressed separately."); public static final ConfigProperty ORC_FILE_MAX_BYTES = ConfigProperty .key("hoodie.orc.max.file.size") .defaultValue(String.valueOf(120 * 1024 * 1024)) - .withDocumentation(""); + .withDocumentation("Target file size for ORC base files."); public static final ConfigProperty ORC_STRIPE_SIZE = ConfigProperty .key("hoodie.orc.stripe.size") @@ -65,17 +65,18 @@ public class HoodieStorageConfig extends HoodieConfig { public static final ConfigProperty ORC_BLOCK_SIZE = ConfigProperty .key("hoodie.orc.block.size") .defaultValue(ORC_FILE_MAX_BYTES.defaultValue()) - .withDocumentation("File system block size"); + .withDocumentation("ORC block size, recommended to be aligned with the target file size."); public static final ConfigProperty HFILE_FILE_MAX_BYTES = ConfigProperty .key("hoodie.hfile.max.file.size") .defaultValue(String.valueOf(120 * 1024 * 1024)) - .withDocumentation(""); + .withDocumentation("Target file size for HFile base files."); public static final ConfigProperty HFILE_BLOCK_SIZE_BYTES = ConfigProperty .key("hoodie.hfile.block.size") - .defaultValue(String.valueOf(1 * 1024 * 1024)) - .withDocumentation(""); + .defaultValue(String.valueOf(1024 * 1024)) + .withDocumentation("Lower values increase the size of metadata tracked within HFile, but can offer potentially " + + "faster lookup times."); // used to size log files public static final ConfigProperty LOGFILE_SIZE_MAX_BYTES = ConfigProperty @@ -107,12 +108,12 @@ public class HoodieStorageConfig extends HoodieConfig { public static final ConfigProperty HFILE_COMPRESSION_ALGORITHM = ConfigProperty .key("hoodie.hfile.compression.algorithm") .defaultValue("GZ") - .withDocumentation(""); + .withDocumentation("Compression codec to use for hfile base files."); public static final ConfigProperty ORC_COMPRESSION_CODEC = ConfigProperty .key("hoodie.orc.compression.codec") .defaultValue("ZLIB") - .withDocumentation(""); + .withDocumentation("Compression codec to use for ORC base files."); // Default compression ratio for log file to parquet, general 3x public static final ConfigProperty LOGFILE_TO_PARQUET_COMPRESSION_RATIO = ConfigProperty diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java index 3e126914d..7e7d34eb4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java @@ -36,7 +36,7 @@ public class HoodieWriteCommitCallbackConfig extends HoodieConfig { .key(CALLBACK_PREFIX + "on") .defaultValue(false) .sinceVersion("0.6.0") - .withDocumentation("Turn callback on/off. off by default."); + .withDocumentation("Turn commit callback on/off. off by default."); public static final ConfigProperty CALLBACK_CLASS_PROP = ConfigProperty .key(CALLBACK_PREFIX + "class") diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index e2e295d3e..357e3f574 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -18,7 +18,6 @@ package org.apache.hudi.config; -import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.bootstrap.BootstrapMode; import org.apache.hudi.client.transaction.ConflictResolutionStrategy; @@ -27,8 +26,8 @@ import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.fs.ConsistencyGuardConfig; -import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieCleaningPolicy; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; @@ -43,10 +42,13 @@ import org.apache.hudi.metrics.MetricsReporterType; import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite; import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; + +import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.orc.CompressionKind; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import javax.annotation.concurrent.Immutable; + import java.io.File; import java.io.FileReader; import java.io.IOException; @@ -71,7 +73,7 @@ public class HoodieWriteConfig extends HoodieConfig { public static final ConfigProperty TABLE_NAME = ConfigProperty .key("hoodie.table.name") .noDefaultValue() - .withDocumentation("Table name that will be used for registering with Hive. Needs to be same across runs."); + .withDocumentation("Table name that will be used for registering with metastores like HMS. Needs to be same across runs."); public static final ConfigProperty PRECOMBINE_FIELD_PROP = ConfigProperty .key("hoodie.datasource.write.precombine.field") @@ -88,12 +90,14 @@ public class HoodieWriteConfig extends HoodieConfig { public static final ConfigProperty KEYGENERATOR_CLASS_PROP = ConfigProperty .key("hoodie.datasource.write.keygenerator.class") .noDefaultValue() - .withDocumentation("Key generator class, that implements will extract the key out of incoming Row object"); + .withDocumentation("Key generator class, that implements `org.apache.hudi.keygen.KeyGenerator` " + + "extract a key out of incoming records."); public static final ConfigProperty KEYGENERATOR_TYPE_PROP = ConfigProperty .key("hoodie.datasource.write.keygenerator.type") .defaultValue(KeyGeneratorType.SIMPLE.name()) - .withDocumentation("Type of build-in key generator, currently support SIMPLE, COMPLEX, TIMESTAMP, CUSTOM, NON_PARTITION, GLOBAL_DELETE"); + .withDocumentation("Easily configure one the built-in key generators, instead of specifying the key generator class." + + "Currently supports SIMPLE, COMPLEX, TIMESTAMP, CUSTOM, NON_PARTITION, GLOBAL_DELETE"); public static final ConfigProperty ROLLBACK_USING_MARKERS = ConfigProperty .key("hoodie.rollback.using.markers") @@ -104,206 +108,220 @@ public class HoodieWriteConfig extends HoodieConfig { public static final ConfigProperty TIMELINE_LAYOUT_VERSION = ConfigProperty .key("hoodie.timeline.layout.version") .noDefaultValue() - .withDocumentation(""); + .sinceVersion("0.5.1") + .withDocumentation("Controls the layout of the timeline. Version 0 relied on renames, Version 1 (default) models " + + "the timeline as an immutable log relying only on atomic writes for object storage."); public static final ConfigProperty BASE_PATH_PROP = ConfigProperty .key("hoodie.base.path") .noDefaultValue() - .withDocumentation("Base DFS path under which all the data partitions are created. " + .withDocumentation("Base path on lake storage, under which all the table data is stored. " + "Always prefix it explicitly with the storage scheme (e.g hdfs://, s3:// etc). " + "Hudi stores all the main meta-data about commits, savepoints, cleaning audit logs " - + "etc in .hoodie directory under the base directory."); + + "etc in .hoodie directory under this base path directory."); public static final ConfigProperty AVRO_SCHEMA = ConfigProperty .key("hoodie.avro.schema") .noDefaultValue() - .withDocumentation("This is the current reader avro schema for the table. This is a string of the entire schema. " - + "HoodieWriteClient uses this schema to pass on to implementations of HoodieRecordPayload to convert " - + "from the source format to avro record. This is also used when re-writing records during an update."); + .withDocumentation("Schema string representing the current write schema of the table. Hudi passes this to " + + "implementations of HoodieRecordPayload to convert incoming records to avro. This is also used as the write schema " + + "evolving records during an update."); public static final ConfigProperty AVRO_SCHEMA_VALIDATE = ConfigProperty .key("hoodie.avro.schema.validate") .defaultValue("false") - .withDocumentation(""); + .withDocumentation("Validate the schema used for the write against the latest schema, for backwards compatibility."); public static final ConfigProperty INSERT_PARALLELISM = ConfigProperty .key("hoodie.insert.shuffle.parallelism") .defaultValue("1500") - .withDocumentation("Once data has been initially imported, this parallelism controls initial parallelism for reading input records. " - + "Ensure this value is high enough say: 1 partition for 1 GB of input data"); + .withDocumentation("Parallelism for inserting records into the table. Inserts can shuffle data before writing to tune file sizes and optimize the storage layout."); public static final ConfigProperty BULKINSERT_PARALLELISM = ConfigProperty .key("hoodie.bulkinsert.shuffle.parallelism") .defaultValue("1500") - .withDocumentation("Bulk insert is meant to be used for large initial imports and this parallelism determines " - + "the initial number of files in your table. Tune this to achieve a desired optimal size during initial import."); + .withDocumentation("For large initial imports using bulk_insert operation, controls the parallelism to use for sort modes or custom partitioning done" + + "before writing records to the table."); public static final ConfigProperty BULKINSERT_USER_DEFINED_PARTITIONER_CLASS = ConfigProperty .key("hoodie.bulkinsert.user.defined.partitioner.class") .noDefaultValue() - .withDocumentation("If specified, this class will be used to re-partition input records before they are inserted."); - - public static final ConfigProperty BULKINSERT_INPUT_DATA_SCHEMA_DDL = ConfigProperty - .key("hoodie.bulkinsert.schema.ddl") - .noDefaultValue() - .withDocumentation(""); + .withDocumentation("If specified, this class will be used to re-partition records before they are bulk inserted. This can be used to sort, pack, cluster data" + + " optimally for common query patterns."); public static final ConfigProperty UPSERT_PARALLELISM = ConfigProperty .key("hoodie.upsert.shuffle.parallelism") .defaultValue("1500") - .withDocumentation("Once data has been initially imported, this parallelism controls initial parallelism for reading input records. " - + "Ensure this value is high enough say: 1 partition for 1 GB of input data"); + .withDocumentation("Parallelism to use for upsert operation on the table. Upserts can shuffle data to perform index lookups, file sizing, bin packing records optimally" + + "into file groups."); public static final ConfigProperty DELETE_PARALLELISM = ConfigProperty .key("hoodie.delete.shuffle.parallelism") .defaultValue("1500") - .withDocumentation("This parallelism is Used for “delete” operation while deduping or repartioning."); + .withDocumentation("Parallelism used for “delete” operation. Delete operations also performs shuffles, similar to upsert operation."); public static final ConfigProperty ROLLBACK_PARALLELISM = ConfigProperty .key("hoodie.rollback.parallelism") .defaultValue("100") - .withDocumentation("Determines the parallelism for rollback of commits."); + .withDocumentation("Parallelism for rollback of commits. Rollbacks perform delete of files or logging delete blocks to file groups on storage in parallel."); public static final ConfigProperty WRITE_BUFFER_LIMIT_BYTES = ConfigProperty .key("hoodie.write.buffer.limit.bytes") .defaultValue(String.valueOf(4 * 1024 * 1024)) - .withDocumentation(""); + .withDocumentation("Size of in-memory buffer used for parallelizing network reads and lake storage writes."); public static final ConfigProperty COMBINE_BEFORE_INSERT_PROP = ConfigProperty .key("hoodie.combine.before.insert") .defaultValue("false") - .withDocumentation("Flag which first combines the input RDD and merges multiple partial records into a single record " - + "before inserting or updating in DFS"); + .withDocumentation("When inserted records share same key, controls whether they should be first combined (i.e de-duplicated) before" + + " writing to storage."); public static final ConfigProperty COMBINE_BEFORE_UPSERT_PROP = ConfigProperty .key("hoodie.combine.before.upsert") .defaultValue("true") - .withDocumentation("Flag which first combines the input RDD and merges multiple partial records into a single record " - + "before inserting or updating in DFS"); + .withDocumentation("When upserted records share same key, controls whether they should be first combined (i.e de-duplicated) before" + + " writing to storage. This should be turned off only if you are absolutely certain that there are no duplicates incoming, " + + " otherwise it can lead to duplicate keys and violate the uniqueness guarantees."); public static final ConfigProperty COMBINE_BEFORE_DELETE_PROP = ConfigProperty .key("hoodie.combine.before.delete") .defaultValue("true") - .withDocumentation("Flag which first combines the input RDD and merges multiple partial records into a single record " - + "before deleting in DFS"); + .withDocumentation("During delete operations, controls whether we should combine deletes (and potentially also upserts) before " + + " writing to storage."); public static final ConfigProperty WRITE_STATUS_STORAGE_LEVEL = ConfigProperty .key("hoodie.write.status.storage.level") .defaultValue("MEMORY_AND_DISK_SER") - .withDocumentation("HoodieWriteClient.insert and HoodieWriteClient.upsert returns a persisted RDD[WriteStatus], " - + "this is because the Client can choose to inspect the WriteStatus and choose and commit or not based on the failures. " - + "This is a configuration for the storage level for this RDD"); + .withDocumentation("Write status objects hold metadata about a write (stats, errors), that is not yet committed to storage. " + + "This controls the how that information is cached for inspection by clients. We rarely expect this to be changed."); public static final ConfigProperty HOODIE_AUTO_COMMIT_PROP = ConfigProperty .key("hoodie.auto.commit") .defaultValue("true") - .withDocumentation("Should HoodieWriteClient autoCommit after insert and upsert. " - + "The client can choose to turn off auto-commit and commit on a “defined success condition”"); + .withDocumentation("Controls whether a write operation should auto commit. This can be turned off to perform inspection" + + " of the uncommitted write before deciding to commit."); public static final ConfigProperty HOODIE_WRITE_STATUS_CLASS_PROP = ConfigProperty .key("hoodie.writestatus.class") .defaultValue(WriteStatus.class.getName()) - .withDocumentation(""); + .withDocumentation("Subclass of " + WriteStatus.class.getName() + " to be used to collect information about a write. Can be " + + "overridden to collection additional metrics/statistics about the data if needed."); public static final ConfigProperty FINALIZE_WRITE_PARALLELISM = ConfigProperty .key("hoodie.finalize.write.parallelism") .defaultValue("1500") - .withDocumentation(""); + .withDocumentation("Parallelism for the write finalization internal operation, which involves removing any partially written " + + "files from lake storage, before committing the write. Reduce this value, if the high number of tasks incur delays for smaller tables " + + "or low latency writes."); public static final ConfigProperty MARKERS_DELETE_PARALLELISM = ConfigProperty .key("hoodie.markers.delete.parallelism") .defaultValue("100") - .withDocumentation("Determines the parallelism for deleting marker files."); + .withDocumentation("Determines the parallelism for deleting marker files, which are used to track all files (valid or invalid/partial) written during " + + "a write operation. Increase this value if delays are observed, with large batch writes."); public static final ConfigProperty BULKINSERT_SORT_MODE = ConfigProperty .key("hoodie.bulkinsert.sort.mode") .defaultValue(BulkInsertSortMode.GLOBAL_SORT.toString()) - .withDocumentation("Sorting modes to use for sorting records for bulk insert. This is leveraged when user " - + "defined partitioner is not configured. Default is GLOBAL_SORT. Available values are - GLOBAL_SORT: " - + "this ensures best file sizes, with lowest memory overhead at cost of sorting. PARTITION_SORT: " - + "Strikes a balance by only sorting within a partition, still keeping the memory overhead of writing " - + "lowest and best effort file sizing. NONE: No sorting. Fastest and matches spark.write.parquet() " - + "in terms of number of files, overheads"); + .withDocumentation("Sorting modes to use for sorting records for bulk insert. This is user when user " + + BULKINSERT_USER_DEFINED_PARTITIONER_CLASS.key() + "is not configured. Available values are - " + + "GLOBAL_SORT: this ensures best file sizes, with lowest memory overhead at cost of sorting. " + + "PARTITION_SORT: Strikes a balance by only sorting within a partition, still keeping the memory overhead of writing " + + "lowest and best effort file sizing. " + + "NONE: No sorting. Fastest and matches `spark.write.parquet()` in terms of number of files, overheads"); public static final ConfigProperty EMBEDDED_TIMELINE_SERVER_ENABLED = ConfigProperty .key("hoodie.embed.timeline.server") .defaultValue("true") - .withDocumentation(""); + .withDocumentation("When true, spins up an instance of the timeline server (meta server that serves cached file listings, statistics)," + + "running on each writer's driver process, accepting requests during the write from executors."); public static final ConfigProperty EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED = ConfigProperty .key("hoodie.embed.timeline.server.reuse.enabled") .defaultValue("false") - .withDocumentation(""); + .withDocumentation("Controls whether the timeline server instance should be cached and reused across the JVM (across task lifecycles)" + + "to avoid startup costs. This should rarely be changed."); public static final ConfigProperty EMBEDDED_TIMELINE_SERVER_PORT = ConfigProperty .key("hoodie.embed.timeline.server.port") .defaultValue("0") - .withDocumentation(""); + .withDocumentation("Port at which the timeline server listens for requests. When running embedded in each writer, it picks " + + "a free port and communicates to all the executors. This should rarely be changed."); public static final ConfigProperty EMBEDDED_TIMELINE_SERVER_THREADS = ConfigProperty .key("hoodie.embed.timeline.server.threads") .defaultValue("-1") - .withDocumentation(""); + .withDocumentation("Number of threads to serve requests in the timeline server. By default, auto configured based on the number of underlying cores."); public static final ConfigProperty EMBEDDED_TIMELINE_SERVER_COMPRESS_OUTPUT = ConfigProperty .key("hoodie.embed.timeline.server.gzip") .defaultValue("true") - .withDocumentation(""); + .withDocumentation("Controls whether gzip compression is used, for large responses from the timeline server, to improve latency."); public static final ConfigProperty EMBEDDED_TIMELINE_SERVER_USE_ASYNC = ConfigProperty .key("hoodie.embed.timeline.server.async") .defaultValue("false") - .withDocumentation(""); + .withDocumentation("Controls whether or not, the requests to the timeline server are processed in asynchronous fashion, " + + "potentially improving throughput."); public static final ConfigProperty FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP = ConfigProperty .key("hoodie.fail.on.timeline.archiving") .defaultValue("true") - .withDocumentation(""); + .withDocumentation("Timeline archiving removes older instants from the timeline, after each write operation, to minimize metadata overhead. " + + "Controls whether or not, the write should be failed as well, if such archiving fails."); public static final ConfigProperty INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP = ConfigProperty .key("hoodie.consistency.check.initial_interval_ms") .defaultValue(2000L) - .withDocumentation("Time between successive attempts to ensure written data's metadata is consistent on storage"); + .withDocumentation("Initial time between successive attempts to ensure written data's metadata is consistent on storage. Grows with exponential" + + " backoff after the initial value."); public static final ConfigProperty MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP = ConfigProperty .key("hoodie.consistency.check.max_interval_ms") .defaultValue(300000L) - .withDocumentation("Max interval time for consistency check"); + .withDocumentation("Max time to wait between successive attempts at performing consistency checks"); public static final ConfigProperty MAX_CONSISTENCY_CHECKS_PROP = ConfigProperty .key("hoodie.consistency.check.max_checks") .defaultValue(7) - .withDocumentation("Maximum number of checks, for consistency of written data. Will wait upto 256 Secs"); + .withDocumentation("Maximum number of checks, for consistency of written data."); public static final ConfigProperty MERGE_DATA_VALIDATION_CHECK_ENABLED = ConfigProperty .key("hoodie.merge.data.validation.enabled") .defaultValue("false") - .withDocumentation("Data validation check performed during merges before actual commits"); + .withDocumentation("When enabled, data validation checks are performed during merges to ensure expected " + + "number of records after merge operation."); public static final ConfigProperty MERGE_ALLOW_DUPLICATE_ON_INSERTS = ConfigProperty .key("hoodie.merge.allow.duplicate.on.inserts") .defaultValue("false") - .withDocumentation("Allow duplicates with inserts while merging with existing records"); + .withDocumentation("When enabled, we allow duplicate keys even if inserts are routed to merge with an existing file (for ensuring file sizing)." + + " This is only relevant for insert operation, since upsert, delete operations will ensure unique key constraints are maintained."); public static final ConfigProperty SPILLABLE_DISK_MAP_TYPE = ConfigProperty .key("hoodie.spillable.diskmap.type") .defaultValue(ExternalSpillableMap.DiskMapType.BITCASK) - .withDocumentation("Enable usage of either BITCASK or ROCKS_DB as disk map for External Spillable Map"); + .withDocumentation("When handling input data that cannot be held in memory, to merge with a file on storage, a spillable diskmap is employed. " + + "By default, we use a persistent hashmap based loosely on bitcask, that offers O(1) inserts, lookups. " + + "Change this to `ROCKS_DB` to prefer using rocksDB, for handling the spill."); public static final ConfigProperty CLIENT_HEARTBEAT_INTERVAL_IN_MS_PROP = ConfigProperty .key("hoodie.client.heartbeat.interval_in_ms") .defaultValue(60 * 1000) - .withDocumentation(""); + .withDocumentation("Writers perform heartbeats to indicate liveness. Controls how often (in ms), such heartbeats are registered to lake storage."); public static final ConfigProperty CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES_PROP = ConfigProperty .key("hoodie.client.heartbeat.tolerable.misses") .defaultValue(2) - .withDocumentation(""); + .withDocumentation("Number of heartbeat misses, before a writer is deemed not alive and all pending writes are aborted."); public static final ConfigProperty WRITE_CONCURRENCY_MODE_PROP = ConfigProperty .key("hoodie.write.concurrency.mode") .defaultValue(WriteConcurrencyMode.SINGLE_WRITER.name()) - .withDocumentation("Enable different concurrency support"); + .withDocumentation("Enable different concurrency modes. Options are " + + "SINGLE_WRITER: Only one active writer to the table. Maximizes throughput" + + "OPTIMISTIC_CONCURRENCY_CONTROL: Multiple writers can operate on the table and exactly one of them succeed " + + "if a conflict (writes affect the same file group) is detected."); public static final ConfigProperty WRITE_META_KEY_PREFIXES_PROP = ConfigProperty .key("hoodie.write.meta.key.prefixes") @@ -312,16 +330,14 @@ public class HoodieWriteConfig extends HoodieConfig { + "during overlapping commits via multi writing"); /** - * The specified write schema. In most case, we do not need set this parameter, - * but for the case the write schema is not equal to the specified table schema, we can - * specify the write schema by this parameter. - * - * Currently the MergeIntoHoodieTableCommand use this to specify the write schema. + * Currently the use this to specify the write schema. */ public static final ConfigProperty WRITE_SCHEMA_PROP = ConfigProperty .key("hoodie.write.schema") .noDefaultValue() - .withDocumentation(""); + .withDocumentation("The specified write schema. In most case, we do not need set this parameter," + + " but for the case the write schema is not equal to the specified table schema, we can" + + " specify the write schema by this parameter. Used by MergeIntoHoodieTableCommand"); /** * HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow @@ -342,7 +358,8 @@ public class HoodieWriteConfig extends HoodieConfig { .key(AVRO_SCHEMA.key() + ".external.transformation") .defaultValue("false") .withAlternatives(AVRO_SCHEMA.key() + ".externalTransformation") - .withDocumentation(""); + .withDocumentation("When enabled, records in older schema are rewritten into newer schema during upsert,delete and background" + + " compaction,clustering operations."); private ConsistencyGuardConfig consistencyGuardConfig; @@ -352,7 +369,6 @@ public class HoodieWriteConfig extends HoodieConfig { private FileSystemViewStorageConfig viewStorageConfig; private HoodiePayloadConfig hoodiePayloadConfig; private HoodieMetadataConfig metadataConfig; - private EngineType engineType; /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java index ad6756235..0ddede128 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java @@ -19,13 +19,14 @@ package org.apache.hudi.keygen.constant; import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; -public class KeyGeneratorOptions { +public class KeyGeneratorOptions extends HoodieConfig { public static final ConfigProperty URL_ENCODE_PARTITIONING_OPT_KEY = ConfigProperty .key("hoodie.datasource.write.partitionpath.urlencode") .defaultValue("false") - .withDocumentation(""); + .withDocumentation("Should we url encode the partition path value, before creating the folder structure."); public static final ConfigProperty HIVE_STYLE_PARTITIONING_OPT_KEY = ConfigProperty .key("hoodie.datasource.write.hive_style_partitioning") diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java index 47718990d..393d26e8f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java @@ -60,7 +60,7 @@ public class HoodieAvroKeyGeneratorFactory { props.getString(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key(), null); if (StringUtils.isNullOrEmpty(keyGeneratorType)) { - LOG.info("The value of {} is empty, use SIMPLE", HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key()); + LOG.info("The value of {} is empty, using SIMPLE", HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key()); keyGeneratorType = KeyGeneratorType.SIMPLE.name(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index c5f7ff15f..205fbd7e3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -37,14 +37,14 @@ public final class HoodieMetadataConfig extends HoodieConfig { .key(METADATA_PREFIX + ".enable") .defaultValue(false) .sinceVersion("0.7.0") - .withDocumentation("Enable the internal Metadata Table which stores table level file listings"); + .withDocumentation("Enable the internal metadata table which serves table metadata like level file listings"); // Validate contents of Metadata Table on each access against the actual filesystem public static final ConfigProperty METADATA_VALIDATE_PROP = ConfigProperty .key(METADATA_PREFIX + ".validate") .defaultValue(false) .sinceVersion("0.7.0") - .withDocumentation("Validate contents of Metadata Table on each access against the actual listings from DFS"); + .withDocumentation("Validate contents of metadata table on each access; e.g against the actual listings from lake storage"); public static final boolean DEFAULT_METADATA_ENABLE_FOR_READERS = false; @@ -53,14 +53,14 @@ public final class HoodieMetadataConfig extends HoodieConfig { .key(METADATA_PREFIX + ".metrics.enable") .defaultValue(false) .sinceVersion("0.7.0") - .withDocumentation(""); + .withDocumentation("Enable publishing of metrics around metadata table."); // Parallelism for inserts public static final ConfigProperty METADATA_INSERT_PARALLELISM_PROP = ConfigProperty .key(METADATA_PREFIX + ".insert.parallelism") .defaultValue(1) .sinceVersion("0.7.0") - .withDocumentation("Parallelism to use when writing to the metadata table"); + .withDocumentation("Parallelism to use when inserting to the metadata table"); // Async clean public static final ConfigProperty METADATA_ASYNC_CLEAN_PROP = ConfigProperty @@ -81,32 +81,32 @@ public final class HoodieMetadataConfig extends HoodieConfig { .key(METADATA_PREFIX + ".keep.min.commits") .defaultValue(20) .sinceVersion("0.7.0") - .withDocumentation("Controls the archival of the metadata table’s timeline"); + .withDocumentation("Controls the archival of the metadata table’s timeline."); public static final ConfigProperty MAX_COMMITS_TO_KEEP_PROP = ConfigProperty .key(METADATA_PREFIX + ".keep.max.commits") .defaultValue(30) .sinceVersion("0.7.0") - .withDocumentation("Controls the archival of the metadata table’s timeline"); + .withDocumentation("Controls the archival of the metadata table’s timeline."); // Cleaner commits retained public static final ConfigProperty CLEANER_COMMITS_RETAINED_PROP = ConfigProperty .key(METADATA_PREFIX + ".cleaner.commits.retained") .defaultValue(3) .sinceVersion("0.7.0") - .withDocumentation(""); + .withDocumentation("Controls retention/history for metadata table."); // Regex to filter out matching directories during bootstrap public static final ConfigProperty DIRECTORY_FILTER_REGEX = ConfigProperty .key(METADATA_PREFIX + ".dir.filter.regex") .defaultValue("") .sinceVersion("0.7.0") - .withDocumentation(""); + .withDocumentation("Directories matching this regex, will be filtered out when initializing metadata table from lake storage for the first time."); public static final ConfigProperty HOODIE_ASSUME_DATE_PARTITIONING_PROP = ConfigProperty .key("hoodie.assume.date.partitioning") .defaultValue("false") - .sinceVersion("0.7.0") + .sinceVersion("0.3.0") .withDocumentation("Should HoodieWriteClient assume the data is partitioned by dates, i.e three levels from base path. " + "This is a stop-gap to support tables created by versions < 0.3.1. Will be removed eventually"); @@ -114,7 +114,7 @@ public final class HoodieMetadataConfig extends HoodieConfig { .key("hoodie.file.listing.parallelism") .defaultValue(1500) .sinceVersion("0.7.0") - .withDocumentation(""); + .withDocumentation("Parallelism to use, when listing the table on lake storage."); private HoodieMetadataConfig() { super(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/ConsistencyGuardConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/ConsistencyGuardConfig.java index d7acf61b8..aa91af45e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/ConsistencyGuardConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/ConsistencyGuardConfig.java @@ -31,12 +31,11 @@ import java.util.Properties; */ public class ConsistencyGuardConfig extends HoodieConfig { - // time between successive attempts to ensure written data's metadata is consistent on storage - @Deprecated public static final ConfigProperty CONSISTENCY_CHECK_ENABLED_PROP = ConfigProperty .key("hoodie.consistency.check.enabled") .defaultValue("false") .sinceVersion("0.5.0") + .deprecatedAfter("0.7.0") .withDocumentation("Enabled to handle S3 eventual consistency issue. This property is no longer required " + "since S3 is now strongly consistent. Will be removed in the future releases."); @@ -44,35 +43,37 @@ public class ConsistencyGuardConfig extends HoodieConfig { .key("hoodie.consistency.check.initial_interval_ms") .defaultValue(400L) .sinceVersion("0.5.0") - .withDocumentation(""); + .deprecatedAfter("0.7.0") + .withDocumentation("Amount of time (in ms) to wait, before checking for consistency after an operation on storage."); - // max interval time public static final ConfigProperty MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP = ConfigProperty .key("hoodie.consistency.check.max_interval_ms") .defaultValue(20000L) .sinceVersion("0.5.0") - .withDocumentation(""); + .deprecatedAfter("0.7.0") + .withDocumentation("Maximum amount of time (in ms), to wait for consistency checking."); // maximum number of checks, for consistency of written data. Will wait upto 140 Secs public static final ConfigProperty MAX_CONSISTENCY_CHECKS_PROP = ConfigProperty .key("hoodie.consistency.check.max_checks") .defaultValue(6) .sinceVersion("0.5.0") - .withDocumentation(""); + .deprecatedAfter("0.7.0") + .withDocumentation("Maximum number of consistency checks to perform, with exponential backoff."); // sleep time for OptimisticConsistencyGuard public static final ConfigProperty OPTIMISTIC_CONSISTENCY_GUARD_SLEEP_TIME_MS_PROP = ConfigProperty .key("hoodie.optimistic.consistency.guard.sleep_time_ms") .defaultValue(500L) .sinceVersion("0.6.0") - .withDocumentation(""); + .withDocumentation("Amount of time (in ms), to wait after which we assume storage is consistent."); // config to enable OptimisticConsistencyGuard in finalizeWrite instead of FailSafeConsistencyGuard public static final ConfigProperty ENABLE_OPTIMISTIC_CONSISTENCY_GUARD_PROP = ConfigProperty .key("_hoodie.optimistic.consistency.guard.enable") .defaultValue(true) .sinceVersion("0.6.0") - .withDocumentation(""); + .withDocumentation("Enable consistency guard, which optimistically assumes consistency is achieved after a certain time period."); private ConsistencyGuardConfig() { super(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/NoOpConsistencyGuard.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/NoOpConsistencyGuard.java index 058b3a104..ef4d7a403 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/NoOpConsistencyGuard.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/NoOpConsistencyGuard.java @@ -23,7 +23,8 @@ import org.apache.hadoop.fs.Path; import java.util.List; /** - * Default Consistency guard that does nothing. Used for HDFS deployments + * Default Consistency guard that does nothing. Used for lake storage which provided read-after-write + * guarantees. */ public class NoOpConsistencyGuard implements ConsistencyGuard { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 3a48c923c..fd5335a38 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -72,66 +72,69 @@ public class HoodieTableConfig extends HoodieConfig implements Serializable { public static final ConfigProperty HOODIE_TABLE_VERSION_PROP = ConfigProperty .key("hoodie.table.version") .defaultValue(HoodieTableVersion.ZERO) - .withDocumentation(""); + .withDocumentation("Version of table, used for running upgrade/downgrade steps between releases with potentially " + + "breaking/backwards compatible changes."); public static final ConfigProperty HOODIE_TABLE_PRECOMBINE_FIELD_PROP = ConfigProperty .key("hoodie.table.precombine.field") .noDefaultValue() - .withDocumentation("Field used in preCombining before actual write. When two records have the same key value, " - + "we will pick the one with the largest value for the precombine field, determined by Object.compareTo(..)"); + .withDocumentation("Field used in preCombining before actual write. By default, when two records have the same key value, " + + "the largest value for the precombine field determined by Object.compareTo(..), is picked."); public static final ConfigProperty HOODIE_TABLE_PARTITION_COLUMNS_PROP = ConfigProperty .key("hoodie.table.partition.columns") .noDefaultValue() - .withDocumentation("Partition path field. Value to be used at the partitionPath component of HoodieKey. " - + "Actual value ontained by invoking .toString()"); + .withDocumentation("Columns used to partition the table. Concatenated values of these fields are used as " + + "the partition path, by invoking toString()"); public static final ConfigProperty HOODIE_TABLE_RECORDKEY_FIELDS = ConfigProperty .key("hoodie.table.recordkey.fields") .noDefaultValue() - .withDocumentation(""); + .withDocumentation("Columns used to uniquely identify the table. Concatenated values of these fields are used as " + + " the record key component of HoodieKey."); public static final ConfigProperty HOODIE_TABLE_CREATE_SCHEMA = ConfigProperty .key("hoodie.table.create.schema") .noDefaultValue() - .withDocumentation(""); + .withDocumentation("Schema used when creating the table, for the first time."); public static final ConfigProperty HOODIE_BASE_FILE_FORMAT_PROP = ConfigProperty .key("hoodie.table.base.file.format") .defaultValue(HoodieFileFormat.PARQUET) .withAlternatives("hoodie.table.ro.file.format") - .withDocumentation(""); + .withDocumentation("Base file format to store all the base file data."); public static final ConfigProperty HOODIE_LOG_FILE_FORMAT_PROP = ConfigProperty .key("hoodie.table.log.file.format") .defaultValue(HoodieFileFormat.HOODIE_LOG) .withAlternatives("hoodie.table.rt.file.format") - .withDocumentation(""); + .withDocumentation("Log format used for the delta logs."); public static final ConfigProperty HOODIE_TIMELINE_LAYOUT_VERSION_PROP = ConfigProperty .key("hoodie.timeline.layout.version") .noDefaultValue() - .withDocumentation(""); + .withDocumentation("Version of timeline used, by the table."); public static final ConfigProperty HOODIE_PAYLOAD_CLASS_PROP = ConfigProperty .key("hoodie.compaction.payload.class") .defaultValue(OverwriteWithLatestAvroPayload.class.getName()) - .withDocumentation(""); + .withDocumentation("Payload class to use for performing compactions, i.e merge delta logs with current base file and then " + + " produce a new base file."); public static final ConfigProperty HOODIE_ARCHIVELOG_FOLDER_PROP = ConfigProperty .key("hoodie.archivelog.folder") .defaultValue("archived") - .withDocumentation(""); + .withDocumentation("path under the meta folder, to store archived timeline instants at."); public static final ConfigProperty HOODIE_BOOTSTRAP_INDEX_ENABLE_PROP = ConfigProperty .key("hoodie.bootstrap.index.enable") .noDefaultValue() - .withDocumentation(""); + .withDocumentation("Whether or not, this is a bootstrapped table, with bootstrap base data and an mapping index defined."); public static final ConfigProperty HOODIE_BOOTSTRAP_INDEX_CLASS_PROP = ConfigProperty .key("hoodie.bootstrap.index.class") .defaultValue(HFileBootstrapIndex.class.getName()) - .withDocumentation(""); + .withDocumentation("Implementation to use, for mapping base files to bootstrap base file, that contain actual data."); public static final ConfigProperty HOODIE_BOOTSTRAP_BASE_PATH_PROP = ConfigProperty .key("hoodie.bootstrap.base.path") diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/TimelineLayoutVersion.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/TimelineLayoutVersion.java index 994c86778..0ed83ab59 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/TimelineLayoutVersion.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/TimelineLayoutVersion.java @@ -34,7 +34,7 @@ public class TimelineLayoutVersion implements Serializable, Comparable FILESYSTEM_VIEW_STORAGE_TYPE = ConfigProperty .key("hoodie.filesystem.view.type") .defaultValue(FileSystemViewStorageType.MEMORY) - .withDocumentation(""); + .withDocumentation("File system view provides APIs for viewing the files on the underlying lake storage, " + + " as file groups and file slices. This config controls how such a view is held. Options include " + + Arrays.stream(FileSystemViewStorageType.values()).map(Enum::name).collect(Collectors.joining(",")) + + " which provide different trade offs for memory usage and API request performance."); public static final ConfigProperty FILESYSTEM_VIEW_INCREMENTAL_SYNC_MODE = ConfigProperty .key("hoodie.filesystem.view.incr.timeline.sync.enable") .defaultValue("false") - .withDocumentation(""); + .withDocumentation("Controls whether or not, the file system view is incrementally updated as " + + "new actions are performed on the timeline."); public static final ConfigProperty FILESYSTEM_SECONDARY_VIEW_STORAGE_TYPE = ConfigProperty .key("hoodie.filesystem.view.secondary.type") .defaultValue(FileSystemViewStorageType.MEMORY) - .withDocumentation(""); + .withDocumentation("Specifies the secondary form of storage for file system view, if the primary (e.g timeline server) " + + " is unavailable."); public static final ConfigProperty FILESYSTEM_VIEW_REMOTE_HOST = ConfigProperty .key("hoodie.filesystem.view.remote.host") .defaultValue("localhost") - .withDocumentation(""); + .withDocumentation("We expect this to be rarely hand configured."); public static final ConfigProperty FILESYSTEM_VIEW_REMOTE_PORT = ConfigProperty .key("hoodie.filesystem.view.remote.port") .defaultValue(26754) - .withDocumentation(""); + .withDocumentation("Port to serve file system view queries, when remote. We expect this to be rarely hand configured."); public static final ConfigProperty FILESYSTEM_VIEW_SPILLABLE_DIR = ConfigProperty .key("hoodie.filesystem.view.spillable.dir") .defaultValue("/tmp/view_map/") - .withDocumentation(""); + .withDocumentation("Path on local storage to use, when file system view is held in a spillable map."); public static final ConfigProperty FILESYSTEM_VIEW_SPILLABLE_MEM = ConfigProperty .key("hoodie.filesystem.view.spillable.mem") .defaultValue(100 * 1024 * 1024L) // 100 MB - .withDocumentation(""); + .withDocumentation("Amount of memory to be used for holding file system view, before spilling to disk."); public static final ConfigProperty FILESYSTEM_VIEW_PENDING_COMPACTION_MEM_FRACTION = ConfigProperty .key("hoodie.filesystem.view.spillable.compaction.mem.fraction") .defaultValue(0.8) - .withDocumentation(""); + .withDocumentation("Fraction of the file system view memory, to be used for holding compaction related metadata."); public static final ConfigProperty FILESYSTEM_VIEW_BOOTSTRAP_BASE_FILE_FRACTION = ConfigProperty .key("hoodie.filesystem.view.spillable.bootstrap.base.file.mem.fraction") .defaultValue(0.05) - .withDocumentation(""); + .withDocumentation("Fraction of the file system view memory, to be used for holding mapping to bootstrap base files."); public static final ConfigProperty FILESYSTEM_VIEW_REPLACED_MEM_FRACTION = ConfigProperty .key("hoodie.filesystem.view.spillable.replaced.mem.fraction") .defaultValue(0.01) - .withDocumentation(""); + .withDocumentation("Fraction of the file system view memory, to be used for holding replace commit related metadata."); public static final ConfigProperty FILESYSTEM_VIEW_PENDING_CLUSTERING_MEM_FRACTION = ConfigProperty .key("hoodie.filesystem.view.spillable.clustering.mem.fraction") .defaultValue(0.01) - .withDocumentation(""); + .withDocumentation("Fraction of the file system view memory, to be used for holding clustering related metadata."); public static final ConfigProperty ROCKSDB_BASE_PATH_PROP = ConfigProperty .key("hoodie.filesystem.view.rocksdb.base.path") .defaultValue("/tmp/hoodie_timeline_rocksdb") - .withDocumentation(""); + .withDocumentation("Path on local storage to use, when storing file system view in embedded kv store/rocksdb."); - public static final ConfigProperty FILESTYSTEM_REMOTE_TIMELINE_CLIENT_TIMEOUT_SECS = ConfigProperty + public static final ConfigProperty FILESYSTEM_REMOTE_TIMELINE_CLIENT_TIMEOUT_SECS = ConfigProperty .key("hoodie.filesystem.view.remote.timeout.secs") .defaultValue(5 * 60) // 5 min - .withDocumentation(""); + .withDocumentation("Timeout in seconds, to wait for API requests against a remote file system view. e.g timeline server."); - /** - * Configs to control whether backup needs to be configured if clients were not able to reach - * timeline service. - */ public static final ConfigProperty REMOTE_BACKUP_VIEW_HANDLER_ENABLE = ConfigProperty .key("hoodie.filesystem.remote.backup.view.enable") .defaultValue("true") // Need to be disabled only for tests. - .withDocumentation(""); + .withDocumentation("Config to control whether backup needs to be configured if clients were not able to reach" + + " timeline service."); public static FileSystemViewStorageConfig.Builder newBuilder() { return new Builder(); @@ -132,7 +136,7 @@ public class FileSystemViewStorageConfig extends HoodieConfig { } public Integer getRemoteTimelineClientTimeoutSecs() { - return getInt(FILESTYSTEM_REMOTE_TIMELINE_CLIENT_TIMEOUT_SECS); + return getInt(FILESYSTEM_REMOTE_TIMELINE_CLIENT_TIMEOUT_SECS); } public long getMaxMemoryForFileGroupMap() { @@ -232,7 +236,7 @@ public class FileSystemViewStorageConfig extends HoodieConfig { } public Builder withRemoteTimelineClientTimeoutSecs(Long timelineClientTimeoutSecs) { - fileSystemViewStorageConfig.setValue(FILESTYSTEM_REMOTE_TIMELINE_CLIENT_TIMEOUT_SECS, timelineClientTimeoutSecs.toString()); + fileSystemViewStorageConfig.setValue(FILESYSTEM_REMOTE_TIMELINE_CLIENT_TIMEOUT_SECS, timelineClientTimeoutSecs.toString()); return this; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 7254f3c56..96e038241 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -18,6 +18,7 @@ package org.apache.hudi.configuration; +import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.config.HoodieWriteConfig; @@ -43,7 +44,7 @@ import java.util.Set; * *

It has the options for Hoodie table read and write. It also defines some utilities. */ -public class FlinkOptions { +public class FlinkOptions extends HoodieConfig { private FlinkOptions() { } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 95b883abf..710db943b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -25,7 +25,6 @@ import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.{HiveSyncTool, SlashEncodedDayPartitionValueExtractor} import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.keygen.{CustomKeyGenerator, SimpleKeyGenerator} - import org.apache.log4j.LogManager import org.apache.spark.sql.execution.datasources.{DataSourceUtils => SparkDataSourceUtils} @@ -41,15 +40,6 @@ object DataSourceReadOptions { private val log = LogManager.getLogger(DataSourceReadOptions.getClass) - /** - * Whether data needs to be read, in - * - * 1) Snapshot mode (obtain latest view, based on row & columnar data) - * 2) incremental mode (new data since an instantTime) - * 3) Read Optimized mode (obtain latest view, based on columnar data) - * - * Default: snapshot - */ val QUERY_TYPE_SNAPSHOT_OPT_VAL = "snapshot" val QUERY_TYPE_READ_OPTIMIZED_OPT_VAL = "read_optimized" val QUERY_TYPE_INCREMENTAL_OPT_VAL = "incremental" @@ -58,30 +48,30 @@ object DataSourceReadOptions { .defaultValue(QUERY_TYPE_SNAPSHOT_OPT_VAL) .withAlternatives("hoodie.datasource.view.type") .withDocumentation("Whether data needs to be read, in incremental mode (new data since an instantTime) " + - "(or) Read Optimized mode (obtain latest view, based on columnar data) (or) Snapshot mode " + - "(obtain latest view, based on row & columnar data)") + "(or) Read Optimized mode (obtain latest view, based on base files) (or) Snapshot mode " + + "(obtain latest view, by merging base and (if any) log files)") - /** - * For Snapshot query on merge on read table. Use this key to define the payload class. - */ val REALTIME_SKIP_MERGE_OPT_VAL = "skip_merge" val REALTIME_PAYLOAD_COMBINE_OPT_VAL = "payload_combine" val REALTIME_MERGE_OPT_KEY: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.merge.type") .defaultValue(REALTIME_PAYLOAD_COMBINE_OPT_VAL) - .withDocumentation("") + .withDocumentation("For Snapshot query on merge on read table, control whether we invoke the record " + + s"payload implementation to merge (${REALTIME_PAYLOAD_COMBINE_OPT_VAL}) or skip merging altogether" + + s"${REALTIME_SKIP_MERGE_OPT_VAL}") val READ_PATHS_OPT_KEY: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.read.paths") .noDefaultValue() - .withDocumentation("") + .withDocumentation("Comma separated list of file paths to read within a Hudi table.") val READ_PRE_COMBINE_FIELD = HoodieWriteConfig.PRECOMBINE_FIELD_PROP val ENABLE_HOODIE_FILE_INDEX: ConfigProperty[Boolean] = ConfigProperty .key("hoodie.file.index.enable") .defaultValue(true) - .withDocumentation("") + .withDocumentation("Enables use of the spark file index implementation for Hudi, " + + "that speeds up listing of large tables.") @Deprecated val VIEW_TYPE_OPT_KEY = "hoodie.datasource.view.type" @@ -94,14 +84,6 @@ object DataSourceReadOptions { @Deprecated val DEFAULT_VIEW_TYPE_OPT_VAL = VIEW_TYPE_READ_OPTIMIZED_OPT_VAL - /** - * Instant time to start incrementally pulling data from. The instanttime here need not - * necessarily correspond to an instant on the timeline. New data written with an - * `instant_time > BEGIN_INSTANTTIME` are fetched out. For e.g: '20170901080000' will get - * all new data written after Sep 1, 2017 08:00AM. - * - * Default: None (Mandatory in incremental mode) - */ val BEGIN_INSTANTTIME_OPT_KEY: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.read.begin.instanttime") .noDefaultValue() @@ -109,48 +91,29 @@ object DataSourceReadOptions { "correspond to an instant on the timeline. New data written with an instant_time > BEGIN_INSTANTTIME are fetched out. " + "For e.g: ‘20170901080000’ will get all new data written after Sep 1, 2017 08:00AM.") - /** - * Instant time to limit incrementally fetched data to. New data written with an - * `instant_time <= END_INSTANTTIME` are fetched out. - * - * Default: latest instant (i.e fetches all new data since begin instant time) - * - */ val END_INSTANTTIME_OPT_KEY: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.read.end.instanttime") .noDefaultValue() .withDocumentation("Instant time to limit incrementally fetched data to. " + "New data written with an instant_time <= END_INSTANTTIME are fetched out.") - /** - * If use the end instant schema when incrementally fetched data to. - * - * Default: false (use latest instant schema) - * - */ val INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_KEY: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.read.schema.use.end.instanttime") .defaultValue("false") .withDocumentation("Uses end instant schema when incrementally fetched data to. Default: users latest instant schema.") - /** - * For use-cases like DeltaStreamer which reads from Hoodie Incremental table and applies opaque map functions, - * filters appearing late in the sequence of transformations cannot be automatically pushed down. - * This option allows setting filters directly on Hoodie Source - */ val PUSH_DOWN_INCR_FILTERS_OPT_KEY: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.read.incr.filters") .defaultValue("") - .withDocumentation("") + .withDocumentation("For use-cases like DeltaStreamer which reads from Hoodie Incremental table and applies " + + "opaque map functions, filters appearing late in the sequence of transformations cannot be automatically " + + "pushed down. This option allows setting filters directly on Hoodie Source.") - /** - * For the use-cases like users only want to incremental pull from certain partitions instead of the full table. - * This option allows using glob pattern to directly filter on path. - */ val INCR_PATH_GLOB_OPT_KEY: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.read.incr.path.glob") .defaultValue("") - .withDocumentation("") + .withDocumentation("For the use-cases like users only want to incremental pull from certain partitions " + + "instead of the full table. This option allows using glob pattern to directly filter on path.") } /** @@ -160,11 +123,6 @@ object DataSourceWriteOptions { private val log = LogManager.getLogger(DataSourceWriteOptions.getClass) - /** - * The write operation, that this write should do - * - * Default: upsert() - */ val BULK_INSERT_OPERATION_OPT_VAL = WriteOperationType.BULK_INSERT.value val INSERT_OPERATION_OPT_VAL = WriteOperationType.INSERT.value val UPSERT_OPERATION_OPT_VAL = WriteOperationType.UPSERT.value @@ -179,12 +137,6 @@ object DataSourceWriteOptions { "Use bulkinsert to load new data into a table, and there on use upsert/insert. " + "bulk insert uses a disk based write path to scale to load large inputs without need to cache it.") - /** - * The table type for the underlying data, for this write. - * Note that this can't change across writes. - * - * Default: COPY_ON_WRITE - */ val COW_TABLE_TYPE_OPT_VAL = HoodieTableType.COPY_ON_WRITE.name val MOR_TABLE_TYPE_OPT_VAL = HoodieTableType.MERGE_ON_READ.name val TABLE_TYPE_OPT_KEY: ConfigProperty[String] = ConfigProperty @@ -239,15 +191,10 @@ object DataSourceWriteOptions { translatedOptParams } - /** - * Hive table name, to register the table into. - * - * Default: None (mandatory) - */ val TABLE_NAME_OPT_KEY: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.write.table.name") .noDefaultValue() - .withDocumentation("Hive table name, to register the table into.") + .withDocumentation("Table name for the datasource write. Also used to register the table into meta stores.") /** * Field used in preCombining before actual write. When two records have the same @@ -292,65 +239,50 @@ object DataSourceWriteOptions { val DEFAULT_KEYGENERATOR_CLASS_OPT_VAL = classOf[SimpleKeyGenerator].getName /** - * When set to true, will perform write operations directly using the spark native `Row` representation. + * * By default, false (will be enabled as default in a future release) */ val ENABLE_ROW_WRITER_OPT_KEY: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.write.row.writer.enable") .defaultValue("false") - .withDocumentation("") + .withDocumentation("When set to true, will perform write operations directly using the spark native " + + "`Row` representation, avoiding any additional conversion costs.") - /** - * Option keys beginning with this prefix, are automatically added to the commit/deltacommit metadata. - * This is useful to store checkpointing information, in a consistent way with the hoodie timeline - */ val COMMIT_METADATA_KEYPREFIX_OPT_KEY: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.write.commitmeta.key.prefix") .defaultValue("_") .withDocumentation("Option keys beginning with this prefix, are automatically added to the commit/deltacommit metadata. " + "This is useful to store checkpointing information, in a consistent way with the hudi timeline") - /** - * Flag to indicate whether to drop duplicates upon insert. - * By default insert will accept duplicates, to gain extra performance. - */ val INSERT_DROP_DUPS_OPT_KEY: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.write.insert.drop.duplicates") .defaultValue("false") .withDocumentation("If set to true, filters out all duplicate records from incoming dataframe, during insert operations.") - /** - * Flag to indicate how many times streaming job should retry for a failed microbatch - * By default 3 - */ val STREAMING_RETRY_CNT_OPT_KEY: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.write.streaming.retry.count") .defaultValue("3") - .withDocumentation("") + .withDocumentation("Config to indicate how many times streaming job should retry for a failed micro batch.") - /** - * Flag to indicate how long (by millisecond) before a retry should issued for failed microbatch - * By default 2000 and it will be doubled by every retry - */ val STREAMING_RETRY_INTERVAL_MS_OPT_KEY: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.write.streaming.retry.interval.ms") .defaultValue("2000") - .withDocumentation("") + .withDocumentation(" Config to indicate how long (by millisecond) before a retry should issued for failed microbatch") /** - * Flag to indicate whether to ignore any non exception error (e.g. writestatus error) - * within a streaming microbatch + * * By default true (in favor of streaming progressing over data integrity) */ val STREAMING_IGNORE_FAILED_BATCH_OPT_KEY: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.write.streaming.ignore.failed.batch") .defaultValue("true") - .withDocumentation("") + .withDocumentation("Config to indicate whether to ignore any non exception error (e.g. writestatus error)" + + " within a streaming microbatch") val META_SYNC_CLIENT_TOOL_CLASS: ConfigProperty[String] = ConfigProperty .key("hoodie.meta.sync.client.tool.class") .defaultValue(classOf[HiveSyncTool].getName) - .withDocumentation("") + .withDocumentation("Sync tool class name used to sync to metastore. Defaults to Hive.") // HIVE SYNC SPECIFIC CONFIGS // NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes @@ -378,7 +310,7 @@ object DataSourceWriteOptions { val HIVE_BASE_FILE_FORMAT_OPT_KEY: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.hive_sync.base_file_format") .defaultValue("PARQUET") - .withDocumentation("") + .withDocumentation("Base file format for the sync.") val HIVE_USER_OPT_KEY: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.hive_sync.username") @@ -441,6 +373,16 @@ object DataSourceWriteOptions { .withDocumentation("‘INT64’ with original type TIMESTAMP_MICROS is converted to hive ‘timestamp’ type. " + "Disabled by default for backward compatibility.") + val HIVE_TABLE_PROPERTIES: ConfigProperty[String] = ConfigProperty + .key("hoodie.datasource.hive_sync.table_properties") + .noDefaultValue() + .withDocumentation("Additional properties to store with table.") + + val HIVE_TABLE_SERDE_PROPERTIES: ConfigProperty[String] = ConfigProperty + .key("hoodie.datasource.hive_sync.serde_properties") + .noDefaultValue() + .withDocumentation("") + val HIVE_SYNC_AS_DATA_SOURCE_TABLE: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.hive_sync.sync_as_datasource") .defaultValue("true") @@ -461,7 +403,7 @@ object DataSourceWriteOptions { val ASYNC_COMPACT_ENABLE_OPT_KEY: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.compaction.async.enable") .defaultValue("true") - .withDocumentation("") + .withDocumentation("Controls whether async compaction should be turned on for MOR table writing.") val INLINE_CLUSTERING_ENABLE_OPT_KEY: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.clustering.inline.enable") diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index e62a56957..858d0284e 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -17,20 +17,17 @@ package org.apache.hudi -import java.util -import java.util.Properties import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.avro.HoodieAvroUtils -import org.apache.hudi.client.HoodieWriteResult -import org.apache.hudi.client.SparkRDDWriteClient +import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType} -import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.table.timeline.HoodieActiveTimeline +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils} import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, BOOTSTRAP_INDEX_CLASS_PROP} import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig} @@ -43,14 +40,15 @@ import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.sync.common.AbstractSyncTool import org.apache.hudi.table.BulkInsertPartitioner import org.apache.log4j.LogManager -import org.apache.spark.SPARK_VERSION -import org.apache.spark.SparkContext +import org.apache.spark.{SPARK_VERSION, SparkContext} import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, Dataset,Row, SQLContext, SaveMode, SparkSession} import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} +import java.util +import java.util.Properties import scala.collection.JavaConversions._ import scala.collection.mutable.ListBuffer @@ -365,7 +363,7 @@ object HoodieSparkSqlWriter { } else if (SPARK_VERSION.startsWith("3.")) { hoodieDF.write.format("org.apache.hudi.spark3.internal") .option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime) - .option(HoodieWriteConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key, hoodieDF.schema.toDDL) + .option(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key, hoodieDF.schema.toDDL) .options(params) .mode(SaveMode.Append) .save() diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java index 6860b7e76..e368e8834 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java +++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java @@ -40,7 +40,7 @@ public class DefaultSource extends BaseDefaultSource implements TableProvider { @Override public StructType inferSchema(CaseInsensitiveStringMap options) { - return StructType.fromDDL(options.get(HoodieWriteConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key())); + return StructType.fromDDL(options.get(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key())); } @Override diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/kafka/HoodieWriteCommitKafkaCallbackConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/kafka/HoodieWriteCommitKafkaCallbackConfig.java index 6336c5363..a198ddb1f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/kafka/HoodieWriteCommitKafkaCallbackConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/kafka/HoodieWriteCommitKafkaCallbackConfig.java @@ -31,31 +31,33 @@ public class HoodieWriteCommitKafkaCallbackConfig extends HoodieConfig { .key(CALLBACK_PREFIX + "kafka.bootstrap.servers") .noDefaultValue() .sinceVersion("0.7.0") - .withDocumentation("Bootstrap servers of kafka callback cluster"); + .withDocumentation("Bootstrap servers of kafka cluster, to be used for publishing commit metadata."); public static final ConfigProperty CALLBACK_KAFKA_TOPIC = ConfigProperty .key(CALLBACK_PREFIX + "kafka.topic") .noDefaultValue() .sinceVersion("0.7.0") - .withDocumentation("Kafka topic to be sent along with callback messages"); + .withDocumentation("Kafka topic name to publish timeline activity into."); public static final ConfigProperty CALLBACK_KAFKA_PARTITION = ConfigProperty .key(CALLBACK_PREFIX + "kafka.partition") .noDefaultValue() .sinceVersion("0.7.0") - .withDocumentation("partition of CALLBACK_KAFKA_TOPIC, 0 by default"); + .withDocumentation("It may be desirable to serialize all changes into a single Kafka partition " + + " for providing strict ordering. By default, Kafka messages are keyed by table name, which " + + " guarantees ordering at the table level, but not globally (or when new partitions are added)"); public static final ConfigProperty CALLBACK_KAFKA_ACKS = ConfigProperty .key(CALLBACK_PREFIX + "kafka.acks") .defaultValue("all") .sinceVersion("0.7.0") - .withDocumentation("kafka acks level, all by default"); + .withDocumentation("kafka acks level, all by default to ensure strong durability."); public static final ConfigProperty CALLBACK_KAFKA_RETRIES = ConfigProperty .key(CALLBACK_PREFIX + "kafka.retries") .defaultValue(3) .sinceVersion("0.7.0") - .withDocumentation("Times to retry. 3 by default"); + .withDocumentation("Times to retry the produce. 3 by default"); /** * Set default value for {@link HoodieWriteCommitKafkaCallbackConfig} if needed.