1
0

[HUDI-2167] HoodieCompactionConfig get HoodieCleaningPolicy NullPointerException

close apache/hudi#3402
This commit is contained in:
leiqiang
2021-07-13 11:26:12 +08:00
committed by yuzhao.cyz
parent 5ee35a0a92
commit b7a0d76fc9

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat;
@@ -42,8 +43,8 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.schema.FilebasedSchemaProvider; import org.apache.hudi.schema.FilebasedSchemaProvider;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.sink.transform.ChainedTransformer; import org.apache.hudi.sink.transform.ChainedTransformer;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.streamer.FlinkStreamerConfig; import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
@@ -168,6 +169,7 @@ public class StreamerUtil {
// actually Flink cleaning is always with parallelism 1 now // actually Flink cleaning is always with parallelism 1 now
.withCleanerParallelism(20) .withCleanerParallelism(20)
.archiveCommitsWith(conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), conf.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS)) .archiveCommitsWith(conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), conf.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS))
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
.build()) .build())
.withMemoryConfig( .withMemoryConfig(
HoodieMemoryConfig.newBuilder() HoodieMemoryConfig.newBuilder()
@@ -372,7 +374,7 @@ public class StreamerUtil {
return fileStatus.getLen() > 0; return fileStatus.getLen() > 0;
} }
public static boolean allowDuplicateInserts(Configuration conf) { public static boolean allowDuplicateInserts(Configuration conf) {
WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)); WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
return operationType == WriteOperationType.INSERT && !conf.getBoolean(FlinkOptions.INSERT_DEDUP); return operationType == WriteOperationType.INSERT && !conf.getBoolean(FlinkOptions.INSERT_DEDUP);