From 2fe526d5485963493e44d9c6cb2a80cb84244c1f Mon Sep 17 00:00:00 2001 From: Balaji Varadarajan Date: Thu, 23 May 2019 16:35:30 -0700 Subject: [PATCH] Allow users to set hoodie configs figs for Compactor, Cleaner and HDFSParquetImporter utility scripts --- .../hoodie/utilities/HDFSParquetImporter.java | 18 +++++++++++++++++- .../uber/hoodie/utilities/HoodieCleaner.java | 12 +++++------- .../hoodie/utilities/HoodieCompactor.java | 19 +++++++++++++++++-- .../uber/hoodie/utilities/UtilHelpers.java | 15 ++++++++++++++- 4 files changed, 53 insertions(+), 11 deletions(-) diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HDFSParquetImporter.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HDFSParquetImporter.java index 9e6f45512..7cda533fe 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HDFSParquetImporter.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HDFSParquetImporter.java @@ -30,10 +30,12 @@ import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.table.HoodieTableConfig; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.TypedProperties; import com.uber.hoodie.exception.HoodieIOException; import java.io.IOException; import java.io.Serializable; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; @@ -57,14 +59,22 @@ import scala.Tuple2; * Loads data from Parquet Sources */ public class HDFSParquetImporter implements Serializable { + private static volatile Logger log = LogManager.getLogger(HDFSParquetImporter.class); public static final SimpleDateFormat PARTITION_FORMATTER = new SimpleDateFormat("yyyy/MM/dd"); private static volatile Logger logger = LogManager.getLogger(HDFSParquetImporter.class); private final Config cfg; private transient FileSystem fs; + /** + * Bag of properties with source, hoodie client, key generator etc. + */ + private TypedProperties props; public HDFSParquetImporter(Config cfg) throws IOException { this.cfg = cfg; + this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) : + UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig(); + log.info("Creating Cleaner with configs : " + props.toString()); } public static void main(String[] args) throws Exception { @@ -116,7 +126,7 @@ public class HDFSParquetImporter implements Serializable { .initializePathAsHoodieDataset(jsc.hadoopConfiguration(), cfg.targetPath, properties); HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.targetPath, schemaStr, - cfg.parallelism, Optional.empty()); + cfg.parallelism, Optional.empty(), props); JavaRDD> hoodieRecords = buildHoodieRecordsForImport(jsc, schemaStr); // Get instant time. @@ -247,6 +257,12 @@ public class HDFSParquetImporter implements Serializable { public String sparkMemory = null; @Parameter(names = {"--retry", "-rt"}, description = "number of retries", required = false) public int retry = 0; + @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for " + + "hoodie client for importing") + public String propsFilePath = null; + @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file " + + "(using the CLI parameter \"--propsFilePath\") can also be passed command line using this parameter") + public List configs = new ArrayList<>(); @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; } diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCleaner.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCleaner.java index 7ebca042c..3c25b6bcd 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCleaner.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCleaner.java @@ -22,7 +22,6 @@ import com.uber.hoodie.HoodieWriteClient; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.TypedProperties; import com.uber.hoodie.config.HoodieWriteConfig; -import com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -35,7 +34,7 @@ import org.apache.spark.api.java.JavaSparkContext; public class HoodieCleaner { - private static volatile Logger log = LogManager.getLogger(HoodieDeltaStreamer.class); + private static volatile Logger log = LogManager.getLogger(HoodieCleaner.class); /** * Config for Cleaner @@ -55,14 +54,14 @@ public class HoodieCleaner { /** * Bag of properties with source, hoodie client, key generator etc. */ - TypedProperties props; + private TypedProperties props; public HoodieCleaner(Config cfg, JavaSparkContext jssc) throws IOException { this.cfg = cfg; this.jssc = jssc; this.fs = FSUtils.getFs(cfg.basePath, jssc.hadoopConfiguration()); - - this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig(); + this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) : + UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig(); log.info("Creating Cleaner with configs : " + props.toString()); } @@ -86,8 +85,7 @@ public class HoodieCleaner { @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for " + "hoodie client for cleaning") - public String propsFilePath = - "file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties"; + public String propsFilePath = null; @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file " + "(using the CLI parameter \"--propsFilePath\") can also be passed command line using this parameter") diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactor.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactor.java index 8aa596793..f7e8de485 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactor.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactor.java @@ -23,9 +23,13 @@ import com.beust.jcommander.Parameter; import com.uber.hoodie.HoodieWriteClient; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.TypedProperties; import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -37,9 +41,12 @@ public class HoodieCompactor { private static volatile Logger logger = LogManager.getLogger(HoodieCompactor.class); private final Config cfg; private transient FileSystem fs; + private TypedProperties props; public HoodieCompactor(Config cfg) { this.cfg = cfg; + this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) : + UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig(); } public static class Config implements Serializable { @@ -70,6 +77,14 @@ public class HoodieCompactor { public String strategyClassName = null; @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; + + @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for " + + "hoodie client for compacting") + public String propsFilePath = null; + + @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file " + + "(using the CLI parameter \"--propsFilePath\") can also be passed command line using this parameter") + public List configs = new ArrayList<>(); } public static void main(String[] args) throws Exception { @@ -108,7 +123,7 @@ public class HoodieCompactor { //Get schema. String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile); HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, - Optional.empty()); + Optional.empty(), props); JavaRDD writeResponse = client.compact(cfg.compactionInstantTime); return UtilHelpers.handleErrors(jsc, cfg.compactionInstantTime, writeResponse); } @@ -116,7 +131,7 @@ public class HoodieCompactor { private int doSchedule(JavaSparkContext jsc) throws Exception { //Get schema. HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism, - Optional.of(cfg.strategyClassName)); + Optional.of(cfg.strategyClassName), props); client.scheduleCompactionAtInstant(cfg.compactionInstantTime, Optional.empty()); return 0; } diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java index b24e3a277..0449c403f 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java @@ -18,6 +18,7 @@ package com.uber.hoodie.utilities; +import com.google.common.base.Preconditions; import com.uber.hoodie.HoodieWriteClient; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.util.DFSPropertiesConfiguration; @@ -100,6 +101,16 @@ public class UtilHelpers { } } + public static TypedProperties buildProperties(List props) { + TypedProperties properties = new TypedProperties(); + props.stream().forEach(x -> { + String[] kv = x.split("="); + Preconditions.checkArgument(kv.length == 2); + properties.setProperty(kv[0], kv[1]); + }); + return properties; + } + /** * Parse Schema from file * @@ -163,7 +174,8 @@ public class UtilHelpers { * @param parallelism Parallelism */ public static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, - String schemaStr, int parallelism, Optional compactionStrategyClass) throws Exception { + String schemaStr, int parallelism, Optional compactionStrategyClass, TypedProperties properties) + throws Exception { HoodieCompactionConfig compactionConfig = compactionStrategyClass.map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false) .withCompactionStrategy(ReflectionUtils.loadClass(strategy)) @@ -173,6 +185,7 @@ public class UtilHelpers { .combineInput(true, true) .withCompactionConfig(compactionConfig) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) + .withProps(properties) .build(); return new HoodieWriteClient(jsc, config); }