1
0

[HUDI-1023] Add validation error messages in delta sync (#1710)

- Remove explicitly specifying BLOOM_INDEX since thats the default anyway
This commit is contained in:
Raymond Xu
2020-06-19 12:12:35 -07:00
committed by GitHub
parent ab724af5c4
commit 8a9fdd603e
2 changed files with 58 additions and 51 deletions

View File

@@ -33,12 +33,10 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
@@ -74,6 +72,10 @@ import java.util.stream.Collectors;
import scala.collection.JavaConversions;
import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_PROP;
import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP;
import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT_PROP;
import static org.apache.hudi.config.HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP;
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
@@ -500,14 +502,15 @@ public class DeltaSync implements Serializable {
* @param schemaProvider Schema Provider
*/
private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) {
final boolean combineBeforeUpsert = true;
final boolean autoCommit = false;
HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, true)
HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, combineBeforeUpsert)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName)
// Inline compaction is disabled for continuous mode. otherwise enabled for MOR
.withInlineCompaction(cfg.isInlineCompactionEnabled()).build())
.forTable(cfg.targetTableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withAutoCommit(false).withProps(props);
.withAutoCommit(autoCommit).withProps(props);
if (null != schemaProvider && null != schemaProvider.getTargetSchema()) {
builder = builder.withSchema(schemaProvider.getTargetSchema().toString());
@@ -515,10 +518,14 @@ public class DeltaSync implements Serializable {
HoodieWriteConfig config = builder.build();
// Validate what deltastreamer assumes of write-config to be really safe
ValidationUtils.checkArgument(config.isInlineCompaction() == cfg.isInlineCompactionEnabled());
ValidationUtils.checkArgument(!config.shouldAutoCommit());
ValidationUtils.checkArgument(config.shouldCombineBeforeInsert() == cfg.filterDupes);
ValidationUtils.checkArgument(config.shouldCombineBeforeUpsert());
ValidationUtils.checkArgument(config.isInlineCompaction() == cfg.isInlineCompactionEnabled(),
String.format("%s should be set to %s", INLINE_COMPACT_PROP, cfg.isInlineCompactionEnabled()));
ValidationUtils.checkArgument(!config.shouldAutoCommit(),
String.format("%s should be set to %s", HOODIE_AUTO_COMMIT_PROP, autoCommit));
ValidationUtils.checkArgument(config.shouldCombineBeforeInsert() == cfg.filterDupes,
String.format("%s should be set to %s", COMBINE_BEFORE_INSERT_PROP, cfg.filterDupes));
ValidationUtils.checkArgument(config.shouldCombineBeforeUpsert(),
String.format("%s should be set to %s", COMBINE_BEFORE_UPSERT_PROP, combineBeforeUpsert));
return config;
}