1
0

Allow users to set hoodie configs figs for Compactor, Cleaner and HDFSParquetImporter utility scripts

This commit is contained in:
Balaji Varadarajan
2019-05-23 16:35:30 -07:00
committed by n3nash
parent 145034c5fa
commit 2fe526d548
4 changed files with 53 additions and 11 deletions

View File

@@ -30,10 +30,12 @@ import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.HoodieTableConfig; import com.uber.hoodie.common.table.HoodieTableConfig;
import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
@@ -57,14 +59,22 @@ import scala.Tuple2;
* Loads data from Parquet Sources * Loads data from Parquet Sources
*/ */
public class HDFSParquetImporter implements Serializable { public class HDFSParquetImporter implements Serializable {
private static volatile Logger log = LogManager.getLogger(HDFSParquetImporter.class);
public static final SimpleDateFormat PARTITION_FORMATTER = new SimpleDateFormat("yyyy/MM/dd"); public static final SimpleDateFormat PARTITION_FORMATTER = new SimpleDateFormat("yyyy/MM/dd");
private static volatile Logger logger = LogManager.getLogger(HDFSParquetImporter.class); private static volatile Logger logger = LogManager.getLogger(HDFSParquetImporter.class);
private final Config cfg; private final Config cfg;
private transient FileSystem fs; private transient FileSystem fs;
/**
* Bag of properties with source, hoodie client, key generator etc.
*/
private TypedProperties props;
public HDFSParquetImporter(Config cfg) throws IOException { public HDFSParquetImporter(Config cfg) throws IOException {
this.cfg = cfg; this.cfg = cfg;
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) :
UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
log.info("Creating Cleaner with configs : " + props.toString());
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
@@ -116,7 +126,7 @@ public class HDFSParquetImporter implements Serializable {
.initializePathAsHoodieDataset(jsc.hadoopConfiguration(), cfg.targetPath, properties); .initializePathAsHoodieDataset(jsc.hadoopConfiguration(), cfg.targetPath, properties);
HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.targetPath, schemaStr, HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.targetPath, schemaStr,
cfg.parallelism, Optional.empty()); cfg.parallelism, Optional.empty(), props);
JavaRDD<HoodieRecord<HoodieRecordPayload>> hoodieRecords = buildHoodieRecordsForImport(jsc, schemaStr); JavaRDD<HoodieRecord<HoodieRecordPayload>> hoodieRecords = buildHoodieRecordsForImport(jsc, schemaStr);
// Get instant time. // Get instant time.
@@ -247,6 +257,12 @@ public class HDFSParquetImporter implements Serializable {
public String sparkMemory = null; public String sparkMemory = null;
@Parameter(names = {"--retry", "-rt"}, description = "number of retries", required = false) @Parameter(names = {"--retry", "-rt"}, description = "number of retries", required = false)
public int retry = 0; public int retry = 0;
@Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
+ "hoodie client for importing")
public String propsFilePath = null;
@Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+ "(using the CLI parameter \"--propsFilePath\") can also be passed command line using this parameter")
public List<String> configs = new ArrayList<>();
@Parameter(names = {"--help", "-h"}, help = true) @Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false; public Boolean help = false;
} }

View File

@@ -22,7 +22,6 @@ import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.TypedProperties; import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
@@ -35,7 +34,7 @@ import org.apache.spark.api.java.JavaSparkContext;
public class HoodieCleaner { public class HoodieCleaner {
private static volatile Logger log = LogManager.getLogger(HoodieDeltaStreamer.class); private static volatile Logger log = LogManager.getLogger(HoodieCleaner.class);
/** /**
* Config for Cleaner * Config for Cleaner
@@ -55,14 +54,14 @@ public class HoodieCleaner {
/** /**
* Bag of properties with source, hoodie client, key generator etc. * Bag of properties with source, hoodie client, key generator etc.
*/ */
TypedProperties props; private TypedProperties props;
public HoodieCleaner(Config cfg, JavaSparkContext jssc) throws IOException { public HoodieCleaner(Config cfg, JavaSparkContext jssc) throws IOException {
this.cfg = cfg; this.cfg = cfg;
this.jssc = jssc; this.jssc = jssc;
this.fs = FSUtils.getFs(cfg.basePath, jssc.hadoopConfiguration()); this.fs = FSUtils.getFs(cfg.basePath, jssc.hadoopConfiguration());
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) :
this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig(); UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
log.info("Creating Cleaner with configs : " + props.toString()); log.info("Creating Cleaner with configs : " + props.toString());
} }
@@ -86,8 +85,7 @@ public class HoodieCleaner {
@Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for " @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
+ "hoodie client for cleaning") + "hoodie client for cleaning")
public String propsFilePath = public String propsFilePath = null;
"file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties";
@Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file " @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+ "(using the CLI parameter \"--propsFilePath\") can also be passed command line using this parameter") + "(using the CLI parameter \"--propsFilePath\") can also be passed command line using this parameter")

View File

@@ -23,9 +23,13 @@ import com.beust.jcommander.Parameter;
import com.uber.hoodie.HoodieWriteClient; import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.WriteStatus; import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.TypedProperties;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional; import java.util.Optional;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
@@ -37,9 +41,12 @@ public class HoodieCompactor {
private static volatile Logger logger = LogManager.getLogger(HoodieCompactor.class); private static volatile Logger logger = LogManager.getLogger(HoodieCompactor.class);
private final Config cfg; private final Config cfg;
private transient FileSystem fs; private transient FileSystem fs;
private TypedProperties props;
public HoodieCompactor(Config cfg) { public HoodieCompactor(Config cfg) {
this.cfg = cfg; this.cfg = cfg;
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) :
UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
} }
public static class Config implements Serializable { public static class Config implements Serializable {
@@ -70,6 +77,14 @@ public class HoodieCompactor {
public String strategyClassName = null; public String strategyClassName = null;
@Parameter(names = {"--help", "-h"}, help = true) @Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false; public Boolean help = false;
@Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
+ "hoodie client for compacting")
public String propsFilePath = null;
@Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+ "(using the CLI parameter \"--propsFilePath\") can also be passed command line using this parameter")
public List<String> configs = new ArrayList<>();
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
@@ -108,7 +123,7 @@ public class HoodieCompactor {
//Get schema. //Get schema.
String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile); String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism,
Optional.empty()); Optional.empty(), props);
JavaRDD<WriteStatus> writeResponse = client.compact(cfg.compactionInstantTime); JavaRDD<WriteStatus> writeResponse = client.compact(cfg.compactionInstantTime);
return UtilHelpers.handleErrors(jsc, cfg.compactionInstantTime, writeResponse); return UtilHelpers.handleErrors(jsc, cfg.compactionInstantTime, writeResponse);
} }
@@ -116,7 +131,7 @@ public class HoodieCompactor {
private int doSchedule(JavaSparkContext jsc) throws Exception { private int doSchedule(JavaSparkContext jsc) throws Exception {
//Get schema. //Get schema.
HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism, HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism,
Optional.of(cfg.strategyClassName)); Optional.of(cfg.strategyClassName), props);
client.scheduleCompactionAtInstant(cfg.compactionInstantTime, Optional.empty()); client.scheduleCompactionAtInstant(cfg.compactionInstantTime, Optional.empty());
return 0; return 0;
} }

View File

@@ -18,6 +18,7 @@
package com.uber.hoodie.utilities; package com.uber.hoodie.utilities;
import com.google.common.base.Preconditions;
import com.uber.hoodie.HoodieWriteClient; import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.WriteStatus; import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.util.DFSPropertiesConfiguration; import com.uber.hoodie.common.util.DFSPropertiesConfiguration;
@@ -100,6 +101,16 @@ public class UtilHelpers {
} }
} }
public static TypedProperties buildProperties(List<String> props) {
TypedProperties properties = new TypedProperties();
props.stream().forEach(x -> {
String[] kv = x.split("=");
Preconditions.checkArgument(kv.length == 2);
properties.setProperty(kv[0], kv[1]);
});
return properties;
}
/** /**
* Parse Schema from file * Parse Schema from file
* *
@@ -163,7 +174,8 @@ public class UtilHelpers {
* @param parallelism Parallelism * @param parallelism Parallelism
*/ */
public static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, public static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath,
String schemaStr, int parallelism, Optional<String> compactionStrategyClass) throws Exception { String schemaStr, int parallelism, Optional<String> compactionStrategyClass, TypedProperties properties)
throws Exception {
HoodieCompactionConfig compactionConfig = HoodieCompactionConfig compactionConfig =
compactionStrategyClass.map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false) compactionStrategyClass.map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false)
.withCompactionStrategy(ReflectionUtils.loadClass(strategy)) .withCompactionStrategy(ReflectionUtils.loadClass(strategy))
@@ -173,6 +185,7 @@ public class UtilHelpers {
.combineInput(true, true) .combineInput(true, true)
.withCompactionConfig(compactionConfig) .withCompactionConfig(compactionConfig)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withProps(properties)
.build(); .build();
return new HoodieWriteClient(jsc, config); return new HoodieWriteClient(jsc, config);
} }