diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index d89925711..083d780e4 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -54,55 +54,55 @@ import java.util.stream.Collectors; public class HoodieWriteConfig extends DefaultHoodieConfig { public static final String TABLE_NAME = "hoodie.table.name"; - private static final String TIMELINE_LAYOUT_VERSION = "hoodie.timeline.layout.version"; - private static final String BASE_PATH_PROP = "hoodie.base.path"; - private static final String AVRO_SCHEMA = "hoodie.avro.schema"; - private static final String AVRO_SCHEMA_VALIDATE = "hoodie.avro.schema.validate"; - private static final String DEFAULT_AVRO_SCHEMA_VALIDATE = "false"; - private static final String DEFAULT_PARALLELISM = "1500"; - private static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism"; - private static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"; - private static final String BULKINSERT_USER_DEFINED_PARTITIONER_CLASS = "hoodie.bulkinsert.user.defined.partitioner.class"; - private static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"; - private static final String DELETE_PARALLELISM = "hoodie.delete.shuffle.parallelism"; - private static final String DEFAULT_ROLLBACK_PARALLELISM = "100"; - private static final String ROLLBACK_PARALLELISM = "hoodie.rollback.parallelism"; - private static final String WRITE_BUFFER_LIMIT_BYTES = "hoodie.write.buffer.limit.bytes"; - private static final String DEFAULT_WRITE_BUFFER_LIMIT_BYTES = String.valueOf(4 * 1024 * 1024); - private static final String COMBINE_BEFORE_INSERT_PROP = "hoodie.combine.before.insert"; - private static final String DEFAULT_COMBINE_BEFORE_INSERT = "false"; - private static final String COMBINE_BEFORE_UPSERT_PROP = "hoodie.combine.before.upsert"; - private static final String DEFAULT_COMBINE_BEFORE_UPSERT = "true"; - private static final String COMBINE_BEFORE_DELETE_PROP = "hoodie.combine.before.delete"; - private static final String DEFAULT_COMBINE_BEFORE_DELETE = "true"; + public static final String TIMELINE_LAYOUT_VERSION = "hoodie.timeline.layout.version"; + public static final String BASE_PATH_PROP = "hoodie.base.path"; + public static final String AVRO_SCHEMA = "hoodie.avro.schema"; + public static final String AVRO_SCHEMA_VALIDATE = "hoodie.avro.schema.validate"; + public static final String DEFAULT_AVRO_SCHEMA_VALIDATE = "false"; + public static final String DEFAULT_PARALLELISM = "1500"; + public static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism"; + public static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"; + public static final String BULKINSERT_USER_DEFINED_PARTITIONER_CLASS = "hoodie.bulkinsert.user.defined.partitioner.class"; + public static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"; + public static final String DELETE_PARALLELISM = "hoodie.delete.shuffle.parallelism"; + public static final String DEFAULT_ROLLBACK_PARALLELISM = "100"; + public static final String ROLLBACK_PARALLELISM = "hoodie.rollback.parallelism"; + public static final String WRITE_BUFFER_LIMIT_BYTES = "hoodie.write.buffer.limit.bytes"; + public static final String DEFAULT_WRITE_BUFFER_LIMIT_BYTES = String.valueOf(4 * 1024 * 1024); + public static final String COMBINE_BEFORE_INSERT_PROP = "hoodie.combine.before.insert"; + public static final String DEFAULT_COMBINE_BEFORE_INSERT = "false"; + public static final String COMBINE_BEFORE_UPSERT_PROP = "hoodie.combine.before.upsert"; + public static final String DEFAULT_COMBINE_BEFORE_UPSERT = "true"; + public static final String COMBINE_BEFORE_DELETE_PROP = "hoodie.combine.before.delete"; + public static final String DEFAULT_COMBINE_BEFORE_DELETE = "true"; public static final String WRITE_STATUS_STORAGE_LEVEL = "hoodie.write.status.storage.level"; - private static final String DEFAULT_WRITE_STATUS_STORAGE_LEVEL = "MEMORY_AND_DISK_SER"; - private static final String HOODIE_AUTO_COMMIT_PROP = "hoodie.auto.commit"; - private static final String DEFAULT_HOODIE_AUTO_COMMIT = "true"; - private static final String HOODIE_ASSUME_DATE_PARTITIONING_PROP = "hoodie.assume.date.partitioning"; - private static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false"; - private static final String HOODIE_WRITE_STATUS_CLASS_PROP = "hoodie.writestatus.class"; - private static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName(); - private static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism"; - private static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM; + public static final String DEFAULT_WRITE_STATUS_STORAGE_LEVEL = "MEMORY_AND_DISK_SER"; + public static final String HOODIE_AUTO_COMMIT_PROP = "hoodie.auto.commit"; + public static final String DEFAULT_HOODIE_AUTO_COMMIT = "true"; + public static final String HOODIE_ASSUME_DATE_PARTITIONING_PROP = "hoodie.assume.date.partitioning"; + public static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false"; + public static final String HOODIE_WRITE_STATUS_CLASS_PROP = "hoodie.writestatus.class"; + public static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName(); + public static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism"; + public static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM; - private static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server"; - private static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true"; + public static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server"; + public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true"; - private static final String FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP = "hoodie.fail.on.timeline.archiving"; - private static final String DEFAULT_FAIL_ON_TIMELINE_ARCHIVING_ENABLED = "true"; + public static final String FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP = "hoodie.fail.on.timeline.archiving"; + public static final String DEFAULT_FAIL_ON_TIMELINE_ARCHIVING_ENABLED = "true"; // time between successive attempts to ensure written data's metadata is consistent on storage - private static final String INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP = + public static final String INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP = "hoodie.consistency.check.initial_interval_ms"; - private static long DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = 2000L; + public static long DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = 2000L; // max interval time - private static final String MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP = "hoodie.consistency.check.max_interval_ms"; - private static long DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS = 300000L; + public static final String MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP = "hoodie.consistency.check.max_interval_ms"; + public static long DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS = 300000L; // maximum number of checks, for consistency of written data. Will wait upto 256 Secs - private static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks"; - private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7; + public static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks"; + public static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7; /** * HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow @@ -114,9 +114,9 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { * Given the importance of supporting such cases for the user's migration to 0.5.x, we are proposing a safety flag * (disabled by default) which will allow this old behavior. */ - private static final String ALLOW_MULTI_WRITE_ON_SAME_INSTANT = + public static final String ALLOW_MULTI_WRITE_ON_SAME_INSTANT = "_.hoodie.allow.multi.write.on.same.instant"; - private static final String DEFAULT_ALLOW_MULTI_WRITE_ON_SAME_INSTANT = "false"; + public static final String DEFAULT_ALLOW_MULTI_WRITE_ON_SAME_INSTANT = "false"; private ConsistencyGuardConfig consistencyGuardConfig; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 1a3e43c49..068e592f8 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -33,12 +33,10 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; -import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HiveSyncTool; -import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException; @@ -74,6 +72,10 @@ import java.util.stream.Collectors; import scala.collection.JavaConversions; +import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_PROP; +import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP; +import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT_PROP; +import static org.apache.hudi.config.HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP; import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE; import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME; @@ -500,14 +502,15 @@ public class DeltaSync implements Serializable { * @param schemaProvider Schema Provider */ private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) { + final boolean combineBeforeUpsert = true; + final boolean autoCommit = false; HoodieWriteConfig.Builder builder = - HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, true) + HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, combineBeforeUpsert) .withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName) // Inline compaction is disabled for continuous mode. otherwise enabled for MOR .withInlineCompaction(cfg.isInlineCompactionEnabled()).build()) .forTable(cfg.targetTableName) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) - .withAutoCommit(false).withProps(props); + .withAutoCommit(autoCommit).withProps(props); if (null != schemaProvider && null != schemaProvider.getTargetSchema()) { builder = builder.withSchema(schemaProvider.getTargetSchema().toString()); @@ -515,10 +518,14 @@ public class DeltaSync implements Serializable { HoodieWriteConfig config = builder.build(); // Validate what deltastreamer assumes of write-config to be really safe - ValidationUtils.checkArgument(config.isInlineCompaction() == cfg.isInlineCompactionEnabled()); - ValidationUtils.checkArgument(!config.shouldAutoCommit()); - ValidationUtils.checkArgument(config.shouldCombineBeforeInsert() == cfg.filterDupes); - ValidationUtils.checkArgument(config.shouldCombineBeforeUpsert()); + ValidationUtils.checkArgument(config.isInlineCompaction() == cfg.isInlineCompactionEnabled(), + String.format("%s should be set to %s", INLINE_COMPACT_PROP, cfg.isInlineCompactionEnabled())); + ValidationUtils.checkArgument(!config.shouldAutoCommit(), + String.format("%s should be set to %s", HOODIE_AUTO_COMMIT_PROP, autoCommit)); + ValidationUtils.checkArgument(config.shouldCombineBeforeInsert() == cfg.filterDupes, + String.format("%s should be set to %s", COMBINE_BEFORE_INSERT_PROP, cfg.filterDupes)); + ValidationUtils.checkArgument(config.shouldCombineBeforeUpsert(), + String.format("%s should be set to %s", COMBINE_BEFORE_UPSERT_PROP, combineBeforeUpsert)); return config; }