[HUDI-888] fix NullPointerException in HoodieCompactor (#1622)
This commit is contained in:
@@ -270,7 +270,7 @@ public class SparkMain {
|
|||||||
cfg.propsFilePath = propsFilePath;
|
cfg.propsFilePath = propsFilePath;
|
||||||
cfg.configs = configs;
|
cfg.configs = configs;
|
||||||
jsc.getConf().set("spark.executor.memory", sparkMemory);
|
jsc.getConf().set("spark.executor.memory", sparkMemory);
|
||||||
return new HoodieCompactor(cfg).compact(jsc, retry);
|
return new HoodieCompactor(jsc, cfg).compact(retry);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static int deduplicatePartitionPath(JavaSparkContext jsc, String duplicatedPartitionPath,
|
private static int deduplicatePartitionPath(JavaSparkContext jsc, String duplicatedPartitionPath,
|
||||||
|
|||||||
@@ -43,11 +43,22 @@ public class HoodieCompactor {
|
|||||||
private final Config cfg;
|
private final Config cfg;
|
||||||
private transient FileSystem fs;
|
private transient FileSystem fs;
|
||||||
private TypedProperties props;
|
private TypedProperties props;
|
||||||
|
private final JavaSparkContext jsc;
|
||||||
|
|
||||||
public HoodieCompactor(Config cfg) {
|
public HoodieCompactor(JavaSparkContext jsc, Config cfg) {
|
||||||
this.cfg = cfg;
|
this.cfg = cfg;
|
||||||
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs)
|
this.jsc = jsc;
|
||||||
: UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
|
this.props = cfg.propsFilePath == null
|
||||||
|
? UtilHelpers.buildProperties(cfg.configs)
|
||||||
|
: readConfigFromFileSystem(jsc, cfg);
|
||||||
|
}
|
||||||
|
|
||||||
|
private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
|
||||||
|
final FileSystem fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration());
|
||||||
|
|
||||||
|
return UtilHelpers
|
||||||
|
.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs)
|
||||||
|
.getConfig();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Config implements Serializable {
|
public static class Config implements Serializable {
|
||||||
@@ -90,12 +101,12 @@ public class HoodieCompactor {
|
|||||||
cmd.usage();
|
cmd.usage();
|
||||||
System.exit(1);
|
System.exit(1);
|
||||||
}
|
}
|
||||||
HoodieCompactor compactor = new HoodieCompactor(cfg);
|
final JavaSparkContext jsc = UtilHelpers.buildSparkContext("compactor-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory);
|
||||||
compactor.compact(UtilHelpers.buildSparkContext("compactor-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory),
|
HoodieCompactor compactor = new HoodieCompactor(jsc, cfg);
|
||||||
cfg.retry);
|
compactor.compact(cfg.retry);
|
||||||
}
|
}
|
||||||
|
|
||||||
public int compact(JavaSparkContext jsc, int retry) {
|
public int compact(int retry) {
|
||||||
this.fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration());
|
this.fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration());
|
||||||
int ret = -1;
|
int ret = -1;
|
||||||
try {
|
try {
|
||||||
|
|||||||
Reference in New Issue
Block a user