[MINOR] Deprecate older configs (#3464)
Rename and deprecate props in HoodieWriteConfig Rename and deprecate older props
This commit is contained in:
@@ -52,6 +52,8 @@ object DataSourceReadOptions {
|
||||
.withDocumentation("Whether data needs to be read, in incremental mode (new data since an instantTime) " +
|
||||
"(or) Read Optimized mode (obtain latest view, based on base files) (or) Snapshot mode " +
|
||||
"(obtain latest view, by merging base and (if any) log files)")
|
||||
@Deprecated
|
||||
val QUERY_TYPE_OPT_KEY = QUERY_TYPE.key()
|
||||
|
||||
val REALTIME_SKIP_MERGE_OPT_VAL = "skip_merge"
|
||||
val REALTIME_PAYLOAD_COMBINE_OPT_VAL = "payload_combine"
|
||||
@@ -61,13 +63,17 @@ object DataSourceReadOptions {
|
||||
.withDocumentation("For Snapshot query on merge on read table, control whether we invoke the record " +
|
||||
s"payload implementation to merge (${REALTIME_PAYLOAD_COMBINE_OPT_VAL}) or skip merging altogether" +
|
||||
s"${REALTIME_SKIP_MERGE_OPT_VAL}")
|
||||
@Deprecated
|
||||
val REALTIME_MERGE_OPT_KEY = REALTIME_MERGE.key()
|
||||
|
||||
val READ_PATHS: ConfigProperty[String] = ConfigProperty
|
||||
.key("hoodie.datasource.read.paths")
|
||||
.noDefaultValue()
|
||||
.withDocumentation("Comma separated list of file paths to read within a Hudi table.")
|
||||
@Deprecated
|
||||
val READ_PATHS_OPT_KEY = READ_PATHS.key()
|
||||
|
||||
val READ_PRE_COMBINE_FIELD = HoodieWriteConfig.PRECOMBINE_FIELD_PROP
|
||||
val READ_PRE_COMBINE_FIELD = HoodieWriteConfig.PRECOMBINE_FIELD
|
||||
|
||||
val ENABLE_HOODIE_FILE_INDEX: ConfigProperty[Boolean] = ConfigProperty
|
||||
.key("hoodie.file.index.enable")
|
||||
@@ -92,17 +98,23 @@ object DataSourceReadOptions {
|
||||
.withDocumentation("Instant time to start incrementally pulling data from. The instanttime here need not necessarily " +
|
||||
"correspond to an instant on the timeline. New data written with an instant_time > BEGIN_INSTANTTIME are fetched out. " +
|
||||
"For e.g: ‘20170901080000’ will get all new data written after Sep 1, 2017 08:00AM.")
|
||||
@Deprecated
|
||||
val BEGIN_INSTANTTIME_OPT_KEY = BEGIN_INSTANTTIME.key()
|
||||
|
||||
val END_INSTANTTIME: ConfigProperty[String] = ConfigProperty
|
||||
.key("hoodie.datasource.read.end.instanttime")
|
||||
.noDefaultValue()
|
||||
.withDocumentation("Instant time to limit incrementally fetched data to. " +
|
||||
"New data written with an instant_time <= END_INSTANTTIME are fetched out.")
|
||||
@Deprecated
|
||||
val END_INSTANTTIME_OPT_KEY = END_INSTANTTIME.key()
|
||||
|
||||
val INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME: ConfigProperty[String] = ConfigProperty
|
||||
.key("hoodie.datasource.read.schema.use.end.instanttime")
|
||||
.defaultValue("false")
|
||||
.withDocumentation("Uses end instant schema when incrementally fetched data to. Default: users latest instant schema.")
|
||||
@Deprecated
|
||||
val INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_KEY = INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME.key()
|
||||
|
||||
val PUSH_DOWN_INCR_FILTERS: ConfigProperty[String] = ConfigProperty
|
||||
.key("hoodie.datasource.read.incr.filters")
|
||||
@@ -110,12 +122,16 @@ object DataSourceReadOptions {
|
||||
.withDocumentation("For use-cases like DeltaStreamer which reads from Hoodie Incremental table and applies "
|
||||
+ "opaque map functions, filters appearing late in the sequence of transformations cannot be automatically "
|
||||
+ "pushed down. This option allows setting filters directly on Hoodie Source.")
|
||||
@Deprecated
|
||||
val PUSH_DOWN_INCR_FILTERS_OPT_KEY = PUSH_DOWN_INCR_FILTERS.key()
|
||||
|
||||
val INCR_PATH_GLOB: ConfigProperty[String] = ConfigProperty
|
||||
.key("hoodie.datasource.read.incr.path.glob")
|
||||
.defaultValue("")
|
||||
.withDocumentation("For the use-cases like users only want to incremental pull from certain partitions "
|
||||
+ "instead of the full table. This option allows using glob pattern to directly filter on path.")
|
||||
@Deprecated
|
||||
val INCR_PATH_GLOB_OPT_KEY = INCR_PATH_GLOB.key()
|
||||
|
||||
val TIME_TRAVEL_AS_OF_INSTANT: ConfigProperty[String] = ConfigProperty
|
||||
.key("as.of.instant")
|
||||
@@ -144,6 +160,8 @@ object DataSourceWriteOptions {
|
||||
.withDocumentation("Whether to do upsert, insert or bulkinsert for the write operation. " +
|
||||
"Use bulkinsert to load new data into a table, and there on use upsert/insert. " +
|
||||
"bulk insert uses a disk based write path to scale to load large inputs without need to cache it.")
|
||||
@Deprecated
|
||||
val OPERATION_OPT_KEY = OPERATION.key()
|
||||
|
||||
val COW_TABLE_TYPE_OPT_VAL = HoodieTableType.COPY_ON_WRITE.name
|
||||
val MOR_TABLE_TYPE_OPT_VAL = HoodieTableType.MERGE_ON_READ.name
|
||||
@@ -152,6 +170,8 @@ object DataSourceWriteOptions {
|
||||
.defaultValue(COW_TABLE_TYPE_OPT_VAL)
|
||||
.withAlternatives("hoodie.datasource.write.storage.type")
|
||||
.withDocumentation("The table type for the underlying data, for this write. This can’t change between writes.")
|
||||
@Deprecated
|
||||
val TABLE_TYPE_OPT_KEY = TABLE_TYPE.key()
|
||||
|
||||
@Deprecated
|
||||
val STORAGE_TYPE_OPT = "hoodie.datasource.write.storage.type"
|
||||
@@ -203,19 +223,25 @@ object DataSourceWriteOptions {
|
||||
.key("hoodie.datasource.write.table.name")
|
||||
.noDefaultValue()
|
||||
.withDocumentation("Table name for the datasource write. Also used to register the table into meta stores.")
|
||||
@Deprecated
|
||||
val TABLE_NAME_OPT_KEY = TABLE_NAME.key()
|
||||
|
||||
/**
|
||||
* Field used in preCombining before actual write. When two records have the same
|
||||
* key value, we will pick the one with the largest value for the precombine field,
|
||||
* determined by Object.compareTo(..)
|
||||
*/
|
||||
val PRECOMBINE_FIELD = HoodieWriteConfig.PRECOMBINE_FIELD_PROP
|
||||
val PRECOMBINE_FIELD = HoodieWriteConfig.PRECOMBINE_FIELD
|
||||
@Deprecated
|
||||
val PRECOMBINE_FIELD_OPT_KEY = HoodieWriteConfig.PRECOMBINE_FIELD.key()
|
||||
|
||||
/**
|
||||
* Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting.
|
||||
* This will render any value set for `PRECOMBINE_FIELD_OPT_VAL` in-effective
|
||||
*/
|
||||
val PAYLOAD_CLASS = HoodieWriteConfig.WRITE_PAYLOAD_CLASS
|
||||
@Deprecated
|
||||
val PAYLOAD_CLASS_OPT_KEY = HoodieWriteConfig.WRITE_PAYLOAD_CLASS.key()
|
||||
|
||||
/**
|
||||
* Record key field. Value to be used as the `recordKey` component of `HoodieKey`. Actual value
|
||||
@@ -224,12 +250,16 @@ object DataSourceWriteOptions {
|
||||
*
|
||||
*/
|
||||
val RECORDKEY_FIELD = KeyGeneratorOptions.RECORDKEY_FIELD
|
||||
@Deprecated
|
||||
val RECORDKEY_FIELD_OPT_KEY = KeyGeneratorOptions.RECORDKEY_FIELD.key()
|
||||
|
||||
/**
|
||||
* Partition path field. Value to be used at the `partitionPath` component of `HoodieKey`. Actual
|
||||
* value obtained by invoking .toString()
|
||||
*/
|
||||
val PARTITIONPATH_FIELD = KeyGeneratorOptions.PARTITIONPATH_FIELD
|
||||
@Deprecated
|
||||
val PARTITIONPATH_FIELD_OPT_KEY = KeyGeneratorOptions.PARTITIONPATH_FIELD.key()
|
||||
|
||||
/**
|
||||
* Flag to indicate whether to use Hive style partitioning.
|
||||
@@ -238,13 +268,19 @@ object DataSourceWriteOptions {
|
||||
*/
|
||||
val HIVE_STYLE_PARTITIONING = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING
|
||||
val URL_ENCODE_PARTITIONING = KeyGeneratorOptions.URL_ENCODE_PARTITIONING
|
||||
@Deprecated
|
||||
val HIVE_STYLE_PARTITIONING_OPT_KEY = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING.key()
|
||||
@Deprecated
|
||||
val URL_ENCODE_PARTITIONING_OPT_KEY = KeyGeneratorOptions.URL_ENCODE_PARTITIONING.key()
|
||||
|
||||
/**
|
||||
* Key generator class, that implements will extract the key out of incoming record
|
||||
*
|
||||
*/
|
||||
val KEYGENERATOR_CLASS = HoodieWriteConfig.KEYGENERATOR_CLASS_PROP
|
||||
val KEYGENERATOR_CLASS = HoodieWriteConfig.KEYGENERATOR_CLASS
|
||||
val DEFAULT_KEYGENERATOR_CLASS_OPT_VAL = classOf[SimpleKeyGenerator].getName
|
||||
@Deprecated
|
||||
val KEYGENERATOR_CLASS_OPT_KEY = HoodieWriteConfig.KEYGENERATOR_CLASS.key()
|
||||
|
||||
/**
|
||||
*
|
||||
@@ -255,6 +291,8 @@ object DataSourceWriteOptions {
|
||||
.defaultValue("false")
|
||||
.withDocumentation("When set to true, will perform write operations directly using the spark native " +
|
||||
"`Row` representation, avoiding any additional conversion costs.")
|
||||
@Deprecated
|
||||
val ENABLE_ROW_WRITER_OPT_KEY = ENABLE_ROW_WRITER.key()
|
||||
|
||||
/**
|
||||
* Enable the bulk insert for sql insert statement.
|
||||
@@ -277,21 +315,29 @@ object DataSourceWriteOptions {
|
||||
.defaultValue("_")
|
||||
.withDocumentation("Option keys beginning with this prefix, are automatically added to the commit/deltacommit metadata. " +
|
||||
"This is useful to store checkpointing information, in a consistent way with the hudi timeline")
|
||||
@Deprecated
|
||||
val COMMIT_METADATA_KEYPREFIX_OPT_KEY = COMMIT_METADATA_KEYPREFIX.key()
|
||||
|
||||
val INSERT_DROP_DUPS: ConfigProperty[String] = ConfigProperty
|
||||
.key("hoodie.datasource.write.insert.drop.duplicates")
|
||||
.defaultValue("false")
|
||||
.withDocumentation("If set to true, filters out all duplicate records from incoming dataframe, during insert operations.")
|
||||
@Deprecated
|
||||
val INSERT_DROP_DUPS_OPT_KEY = INSERT_DROP_DUPS.key()
|
||||
|
||||
val STREAMING_RETRY_CNT: ConfigProperty[String] = ConfigProperty
|
||||
.key("hoodie.datasource.write.streaming.retry.count")
|
||||
.defaultValue("3")
|
||||
.withDocumentation("Config to indicate how many times streaming job should retry for a failed micro batch.")
|
||||
@Deprecated
|
||||
val STREAMING_RETRY_CNT_OPT_KEY = STREAMING_RETRY_CNT.key()
|
||||
|
||||
val STREAMING_RETRY_INTERVAL_MS: ConfigProperty[String] = ConfigProperty
|
||||
.key("hoodie.datasource.write.streaming.retry.interval.ms")
|
||||
.defaultValue("2000")
|
||||
.withDocumentation(" Config to indicate how long (by millisecond) before a retry should issued for failed microbatch")
|
||||
@Deprecated
|
||||
val STREAMING_RETRY_INTERVAL_MS_OPT_KEY = STREAMING_RETRY_INTERVAL_MS.key()
|
||||
|
||||
/**
|
||||
*
|
||||
@@ -302,6 +348,8 @@ object DataSourceWriteOptions {
|
||||
.defaultValue("true")
|
||||
.withDocumentation("Config to indicate whether to ignore any non exception error (e.g. writestatus error)"
|
||||
+ " within a streaming microbatch")
|
||||
@Deprecated
|
||||
val STREAMING_IGNORE_FAILED_BATCH_OPT_KEY = STREAMING_IGNORE_FAILED_BATCH.key()
|
||||
|
||||
val META_SYNC_CLIENT_TOOL_CLASS: ConfigProperty[String] = ConfigProperty
|
||||
.key("hoodie.meta.sync.client.tool.class")
|
||||
@@ -322,61 +370,85 @@ object DataSourceWriteOptions {
|
||||
.key("hoodie.datasource.hive_sync.enable")
|
||||
.defaultValue("false")
|
||||
.withDocumentation("When set to true, register/sync the table to Apache Hive metastore")
|
||||
@Deprecated
|
||||
val HIVE_SYNC_ENABLED_OPT_KEY = HIVE_SYNC_ENABLED.key()
|
||||
|
||||
val META_SYNC_ENABLED: ConfigProperty[String] = ConfigProperty
|
||||
.key("hoodie.datasource.meta.sync.enable")
|
||||
.defaultValue("false")
|
||||
.withDocumentation("")
|
||||
@Deprecated
|
||||
val META_SYNC_ENABLED_OPT_KEY = META_SYNC_ENABLED.key()
|
||||
|
||||
val HIVE_DATABASE: ConfigProperty[String] = ConfigProperty
|
||||
.key("hoodie.datasource.hive_sync.database")
|
||||
.defaultValue("default")
|
||||
.withDocumentation("database to sync to")
|
||||
@Deprecated
|
||||
val HIVE_DATABASE_OPT_KEY = HIVE_DATABASE.key()
|
||||
|
||||
val HIVE_TABLE: ConfigProperty[String] = ConfigProperty
|
||||
.key("hoodie.datasource.hive_sync.table")
|
||||
.defaultValue("unknown")
|
||||
.withDocumentation("table to sync to")
|
||||
@Deprecated
|
||||
val HIVE_TABLE_OPT_KEY = HIVE_TABLE.key()
|
||||
|
||||
val HIVE_BASE_FILE_FORMAT: ConfigProperty[String] = ConfigProperty
|
||||
.key("hoodie.datasource.hive_sync.base_file_format")
|
||||
.defaultValue("PARQUET")
|
||||
.withDocumentation("Base file format for the sync.")
|
||||
@Deprecated
|
||||
val HIVE_BASE_FILE_FORMAT_OPT_KEY = HIVE_BASE_FILE_FORMAT.key()
|
||||
|
||||
val HIVE_USER: ConfigProperty[String] = ConfigProperty
|
||||
.key("hoodie.datasource.hive_sync.username")
|
||||
.defaultValue("hive")
|
||||
.withDocumentation("hive user name to use")
|
||||
@Deprecated
|
||||
val HIVE_USER_OPT_KEY = HIVE_USER.key()
|
||||
|
||||
val HIVE_PASS: ConfigProperty[String] = ConfigProperty
|
||||
.key("hoodie.datasource.hive_sync.password")
|
||||
.defaultValue("hive")
|
||||
.withDocumentation("hive password to use")
|
||||
@Deprecated
|
||||
val HIVE_PASS_OPT_KEY = HIVE_PASS.key()
|
||||
|
||||
val HIVE_URL: ConfigProperty[String] = ConfigProperty
|
||||
.key("hoodie.datasource.hive_sync.jdbcurl")
|
||||
.defaultValue("jdbc:hive2://localhost:10000")
|
||||
.withDocumentation("Hive metastore url")
|
||||
@Deprecated
|
||||
val HIVE_URL_OPT_KEY = HIVE_URL.key()
|
||||
|
||||
val HIVE_PARTITION_FIELDS: ConfigProperty[String] = ConfigProperty
|
||||
.key("hoodie.datasource.hive_sync.partition_fields")
|
||||
.defaultValue("")
|
||||
.withDocumentation("field in the table to use for determining hive partition columns.")
|
||||
@Deprecated
|
||||
val HIVE_PARTITION_FIELDS_OPT_KEY = HIVE_PARTITION_FIELDS.key()
|
||||
|
||||
val HIVE_PARTITION_EXTRACTOR_CLASS: ConfigProperty[String] = ConfigProperty
|
||||
.key("hoodie.datasource.hive_sync.partition_extractor_class")
|
||||
.defaultValue(classOf[SlashEncodedDayPartitionValueExtractor].getCanonicalName)
|
||||
.withDocumentation("")
|
||||
@Deprecated
|
||||
val HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY = HIVE_PARTITION_EXTRACTOR_CLASS.key()
|
||||
|
||||
val HIVE_ASSUME_DATE_PARTITION: ConfigProperty[String] = ConfigProperty
|
||||
.key("hoodie.datasource.hive_sync.assume_date_partitioning")
|
||||
.defaultValue("false")
|
||||
.withDocumentation("Assume partitioning is yyyy/mm/dd")
|
||||
@Deprecated
|
||||
val HIVE_ASSUME_DATE_PARTITION_OPT_KEY = HIVE_ASSUME_DATE_PARTITION.key()
|
||||
|
||||
val HIVE_USE_PRE_APACHE_INPUT_FORMAT: ConfigProperty[String] = ConfigProperty
|
||||
.key("hoodie.datasource.hive_sync.use_pre_apache_input_format")
|
||||
.defaultValue("false")
|
||||
.withDocumentation("")
|
||||
@Deprecated
|
||||
val HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY = HIVE_USE_PRE_APACHE_INPUT_FORMAT.key()
|
||||
|
||||
// We should use HIVE_SYNC_MODE instead of this config from 0.9.0
|
||||
@Deprecated
|
||||
@@ -385,16 +457,22 @@ object DataSourceWriteOptions {
|
||||
.defaultValue("true")
|
||||
.deprecatedAfter("0.9.0")
|
||||
.withDocumentation("Use JDBC when hive synchronization is enabled")
|
||||
@Deprecated
|
||||
val HIVE_USE_JDBC_OPT_KEY = HIVE_USE_JDBC.key()
|
||||
|
||||
val HIVE_AUTO_CREATE_DATABASE: ConfigProperty[String] = ConfigProperty
|
||||
.key("hoodie.datasource.hive_sync.auto_create_database")
|
||||
.defaultValue("true")
|
||||
.withDocumentation("Auto create hive database if does not exists")
|
||||
@Deprecated
|
||||
val HIVE_AUTO_CREATE_DATABASE_OPT_KEY = HIVE_AUTO_CREATE_DATABASE.key()
|
||||
|
||||
val HIVE_IGNORE_EXCEPTIONS: ConfigProperty[String] = ConfigProperty
|
||||
.key("hoodie.datasource.hive_sync.ignore_exceptions")
|
||||
.defaultValue("false")
|
||||
.withDocumentation("")
|
||||
@Deprecated
|
||||
val HIVE_IGNORE_EXCEPTIONS_OPT_KEY = HIVE_IGNORE_EXCEPTIONS.key()
|
||||
|
||||
val HIVE_SKIP_RO_SUFFIX: ConfigProperty[String] = ConfigProperty
|
||||
.key("hoodie.datasource.hive_sync.skip_ro_suffix")
|
||||
|
||||
@@ -33,7 +33,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, T
|
||||
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
|
||||
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
|
||||
import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils}
|
||||
import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, BOOTSTRAP_INDEX_CLASS_PROP}
|
||||
import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH, BOOTSTRAP_INDEX_CLASS}
|
||||
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
|
||||
import org.apache.hudi.exception.HoodieException
|
||||
import org.apache.hudi.execution.bulkinsert.{BulkInsertInternalPartitionerWithRowsFactory, NonSortPartitionerWithRows}
|
||||
@@ -170,7 +170,7 @@ object HoodieSparkSqlWriter {
|
||||
// Create a HoodieWriteClient & issue the delete.
|
||||
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
|
||||
null, path.get, tblName,
|
||||
mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP.key)))
|
||||
mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT.key)))
|
||||
.asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
|
||||
|
||||
if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
|
||||
@@ -196,7 +196,7 @@ object HoodieSparkSqlWriter {
|
||||
// Create a HoodieWriteClient & issue the delete.
|
||||
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
|
||||
null, path.get, tblName,
|
||||
mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP.key)))
|
||||
mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT.key)))
|
||||
.asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
|
||||
// Issue delete partitions
|
||||
client.startCommitWithTime(instantTime, commitActionType)
|
||||
@@ -221,8 +221,8 @@ object HoodieSparkSqlWriter {
|
||||
org.apache.hudi.common.util.Option.of(schema))
|
||||
val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
|
||||
operation.equals(WriteOperationType.UPSERT) ||
|
||||
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key(),
|
||||
HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.defaultValue()).toBoolean
|
||||
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
|
||||
HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean
|
||||
val hoodieAllIncomingRecords = genericRecords.map(gr => {
|
||||
val hoodieRecord = if (shouldCombine) {
|
||||
val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false)
|
||||
@@ -238,7 +238,7 @@ object HoodieSparkSqlWriter {
|
||||
|
||||
// Create a HoodieWriteClient & issue the write.
|
||||
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get,
|
||||
tblName, mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP.key)
|
||||
tblName, mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT.key)
|
||||
)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
|
||||
|
||||
if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
|
||||
@@ -312,10 +312,10 @@ object HoodieSparkSqlWriter {
|
||||
val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters)
|
||||
val tableName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TABLE_NAME, s"'${HoodieWriteConfig.TABLE_NAME.key}' must be set.")
|
||||
val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE)
|
||||
val bootstrapBasePath = hoodieConfig.getStringOrThrow(BOOTSTRAP_BASE_PATH_PROP,
|
||||
s"'${BOOTSTRAP_BASE_PATH_PROP.key}' is required for '${BOOTSTRAP_OPERATION_OPT_VAL}'" +
|
||||
val bootstrapBasePath = hoodieConfig.getStringOrThrow(BOOTSTRAP_BASE_PATH,
|
||||
s"'${BOOTSTRAP_BASE_PATH.key}' is required for '${BOOTSTRAP_OPERATION_OPT_VAL}'" +
|
||||
" operation'")
|
||||
val bootstrapIndexClass = hoodieConfig.getStringOrDefault(BOOTSTRAP_INDEX_CLASS_PROP)
|
||||
val bootstrapIndexClass = hoodieConfig.getStringOrDefault(BOOTSTRAP_INDEX_CLASS)
|
||||
|
||||
var schema: String = null
|
||||
if (df.schema.nonEmpty) {
|
||||
|
||||
@@ -270,7 +270,7 @@ object InsertIntoHoodieTableCommand extends Logging {
|
||||
PARTITIONPATH_FIELD.key -> partitionFields,
|
||||
PAYLOAD_CLASS.key -> payloadClassName,
|
||||
ENABLE_ROW_WRITER.key -> enableBulkInsert.toString,
|
||||
HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key -> isPrimaryKeyTable.toString,
|
||||
HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> isPrimaryKeyTable.toString,
|
||||
META_SYNC_ENABLED.key -> enableHive.toString,
|
||||
HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
|
||||
HIVE_USE_JDBC.key -> "false",
|
||||
|
||||
@@ -230,7 +230,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
|
||||
// insert actions.
|
||||
var writeParams = parameters +
|
||||
(OPERATION.key -> UPSERT_OPERATION_OPT_VAL) +
|
||||
(HoodieWriteConfig.WRITE_SCHEMA_PROP.key -> getTableSchema.toString) +
|
||||
(HoodieWriteConfig.WRITE_SCHEMA.key -> getTableSchema.toString) +
|
||||
(DataSourceWriteOptions.TABLE_TYPE.key -> targetTableType)
|
||||
|
||||
// Map of Condition -> Assignments
|
||||
@@ -277,7 +277,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
|
||||
|
||||
var writeParams = parameters +
|
||||
(OPERATION.key -> INSERT_OPERATION_OPT_VAL) +
|
||||
(HoodieWriteConfig.WRITE_SCHEMA_PROP.key -> getTableSchema.toString)
|
||||
(HoodieWriteConfig.WRITE_SCHEMA.key -> getTableSchema.toString)
|
||||
|
||||
writeParams += (PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS ->
|
||||
serializedInsertConditionAndExpressions(insertActions))
|
||||
|
||||
@@ -48,7 +48,7 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
|
||||
val keyGenProps = new TypedProperties()
|
||||
keyGenProps.putAll(props)
|
||||
keyGenProps.remove(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS)
|
||||
keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP.key, beforeKeyGenClassName)
|
||||
keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS.key, beforeKeyGenClassName)
|
||||
Some(KeyGenUtils.createKeyGeneratorByClassName(keyGenProps))
|
||||
} else {
|
||||
None
|
||||
|
||||
@@ -214,9 +214,9 @@ class ExpressionPayload(record: GenericRecord,
|
||||
*/
|
||||
private def initWriteSchemaIfNeed(properties: Properties): Unit = {
|
||||
if (writeSchema == null) {
|
||||
ValidationUtils.checkArgument(properties.containsKey(HoodieWriteConfig.WRITE_SCHEMA_PROP.key),
|
||||
s"Missing ${HoodieWriteConfig.WRITE_SCHEMA_PROP.key}")
|
||||
writeSchema = new Schema.Parser().parse(properties.getProperty(HoodieWriteConfig.WRITE_SCHEMA_PROP.key))
|
||||
ValidationUtils.checkArgument(properties.containsKey(HoodieWriteConfig.WRITE_SCHEMA.key),
|
||||
s"Missing ${HoodieWriteConfig.WRITE_SCHEMA.key}")
|
||||
writeSchema = new Schema.Parser().parse(properties.getProperty(HoodieWriteConfig.WRITE_SCHEMA.key))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -182,7 +182,7 @@ public class HoodieJavaApp {
|
||||
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS().key(),
|
||||
nonPartitionedTable ? NonpartitionedKeyGenerator.class.getCanonicalName()
|
||||
: SimpleKeyGenerator.class.getCanonicalName()) // Add Key Extractor
|
||||
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP.key(), "1")
|
||||
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1")
|
||||
.option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key(), "false")
|
||||
.option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true")
|
||||
.option(HoodieWriteConfig.TABLE_NAME.key(), tableName).mode(SaveMode.Append);
|
||||
@@ -210,7 +210,7 @@ public class HoodieJavaApp {
|
||||
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS().key(),
|
||||
nonPartitionedTable ? NonpartitionedKeyGenerator.class.getCanonicalName()
|
||||
: SimpleKeyGenerator.class.getCanonicalName()) // Add Key Extractor
|
||||
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP.key(), "1")
|
||||
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1")
|
||||
.option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key(), "false")
|
||||
.option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true")
|
||||
.option(HoodieWriteConfig.TABLE_NAME.key(), tableName).mode(SaveMode.Append);
|
||||
|
||||
@@ -360,7 +360,7 @@ public class HoodieJavaStreamingApp {
|
||||
.option(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key")
|
||||
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition")
|
||||
.option(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp")
|
||||
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP.key(), "1")
|
||||
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1")
|
||||
.option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key(), "true")
|
||||
.option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true")
|
||||
.option(HoodieWriteConfig.TABLE_NAME.key(), tableName).option("checkpointLocation", checkpointLocation)
|
||||
|
||||
@@ -428,7 +428,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
|
||||
.save(srcPath.toAbsolutePath.toString)
|
||||
|
||||
val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
|
||||
HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP.key -> srcPath.toAbsolutePath.toString,
|
||||
HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH.key -> srcPath.toAbsolutePath.toString,
|
||||
HoodieWriteConfig.TABLE_NAME.key -> hoodieFooTableName,
|
||||
DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
|
||||
HoodieBootstrapConfig.BOOTSTRAP_PARALLELISM.key -> "4",
|
||||
@@ -652,7 +652,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
|
||||
df.write.mode(SaveMode.Overwrite).save(baseBootStrapPath)
|
||||
spark.emptyDataFrame.write.format("hudi")
|
||||
.options(options)
|
||||
.option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP.key, baseBootStrapPath)
|
||||
.option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH.key, baseBootStrapPath)
|
||||
.option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS.key, classOf[NonpartitionedKeyGenerator].getCanonicalName)
|
||||
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
|
||||
.option(HoodieBootstrapConfig.BOOTSTRAP_PARALLELISM.key, "4")
|
||||
|
||||
@@ -510,7 +510,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
|
||||
inputDF1.write.format("org.apache.hudi")
|
||||
.options(commonOpts)
|
||||
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
||||
.option(HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP.key, "true")
|
||||
.option(HoodieWriteConfig.HOODIE_AUTO_COMMIT.key, "true")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(basePath)
|
||||
|
||||
|
||||
@@ -297,8 +297,8 @@ class TestDataSourceForBootstrap {
|
||||
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
|
||||
.option(HoodieCompactionConfig.INLINE_COMPACT_PROP.key, "true")
|
||||
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP.key, "1")
|
||||
.option(HoodieCompactionConfig.INLINE_COMPACT.key, "true")
|
||||
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key, "1")
|
||||
.mode(SaveMode.Append)
|
||||
.save(basePath)
|
||||
|
||||
@@ -429,7 +429,7 @@ class TestDataSourceForBootstrap {
|
||||
.options(commonOpts)
|
||||
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
|
||||
.option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP.key, srcPath)
|
||||
.option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH.key, srcPath)
|
||||
.option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS.key, classOf[SimpleKeyGenerator].getName)
|
||||
.option(HoodieBootstrapConfig.BOOTSTRAP_MODE_SELECTOR.key, classOf[FullRecordBootstrapModeSelector].getName)
|
||||
.option(HoodieBootstrapConfig.FULL_BOOTSTRAP_INPUT_PROVIDER.key, classOf[SparkParquetBootstrapDataProvider].getName)
|
||||
@@ -480,7 +480,7 @@ class TestDataSourceForBootstrap {
|
||||
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType)
|
||||
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, partitionColumns.getOrElse(""))
|
||||
.option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP.key, srcPath)
|
||||
.option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH.key, srcPath)
|
||||
.option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS.key, classOf[SimpleKeyGenerator].getName)
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(basePath)
|
||||
|
||||
@@ -753,7 +753,7 @@ class TestMORDataSource extends HoodieClientTestBase {
|
||||
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
|
||||
// Use InMemoryIndex to generate log only mor table.
|
||||
.option(HoodieIndexConfig.INDEX_TYPE_PROP.key, IndexType.INMEMORY.toString)
|
||||
.option(HoodieIndexConfig.INDEX_TYPE.key, IndexType.INMEMORY.toString)
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(basePath)
|
||||
// There should no base file in the file list.
|
||||
|
||||
@@ -193,8 +193,8 @@ class TestStructuredStreaming extends HoodieClientTestBase {
|
||||
|
||||
def getClusteringOpts(isInlineClustering: String, isAsyncClustering: String, isAsyncCompaction: String,
|
||||
clusteringNumCommit: String, fileMaxRecordNum: Int):Map[String, String] = {
|
||||
commonOpts + (HoodieClusteringConfig.INLINE_CLUSTERING_PROP.key -> isInlineClustering,
|
||||
HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP.key -> clusteringNumCommit,
|
||||
commonOpts + (HoodieClusteringConfig.INLINE_CLUSTERING.key -> isInlineClustering,
|
||||
HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT.key -> clusteringNumCommit,
|
||||
DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key -> isAsyncClustering,
|
||||
DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key -> isAsyncCompaction,
|
||||
HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMIT_PROP.key -> clusteringNumCommit,
|
||||
|
||||
Reference in New Issue
Block a user