1
0

[HUDI-583] Code Cleanup, remove redundant code, and other changes (#1237)

This commit is contained in:
Suneel Marthi
2020-02-02 11:03:44 +01:00
committed by GitHub
parent f27c7a16c6
commit 5b7bb142dc
69 changed files with 447 additions and 582 deletions

View File

@@ -56,6 +56,7 @@ import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
@@ -81,7 +82,7 @@ public class HDFSParquetImporter implements Serializable {
this.cfg = cfg;
}
public static void main(String[] args) throws Exception {
public static void main(String[] args) {
final Config cfg = new Config();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
@@ -160,8 +161,7 @@ public class HDFSParquetImporter implements Serializable {
AvroReadSupport.setAvroReadSchema(jsc.hadoopConfiguration(), (new Schema.Parser().parse(schemaStr)));
ParquetInputFormat.setReadSupportClass(job, (AvroReadSupport.class));
return jsc
.newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class,
return jsc.newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class,
job.getConfiguration())
// To reduce large number of tasks.
.coalesce(16 * cfg.parallelism).map(entry -> {
@@ -198,7 +198,7 @@ public class HDFSParquetImporter implements Serializable {
* @param <T> Type
*/
protected <T extends HoodieRecordPayload> JavaRDD<WriteStatus> load(HoodieWriteClient client, String instantTime,
JavaRDD<HoodieRecord<T>> hoodieRecords) throws Exception {
JavaRDD<HoodieRecord<T>> hoodieRecords) {
switch (cfg.command.toLowerCase()) {
case "upsert": {
return client.upsert(hoodieRecords, instantTime);
@@ -227,7 +227,7 @@ public class HDFSParquetImporter implements Serializable {
public static class FormatValidator implements IValueValidator<String> {
List<String> validFormats = Arrays.asList("parquet");
List<String> validFormats = Collections.singletonList("parquet");
@Override
public void validate(String name, String value) throws ParameterException {
@@ -241,7 +241,7 @@ public class HDFSParquetImporter implements Serializable {
public static class Config implements Serializable {
@Parameter(names = {"--command", "-c"}, description = "Write command Valid values are insert(default)/upsert/bulkinsert",
required = false, validateValueWith = CommandValidator.class)
validateValueWith = CommandValidator.class)
public String command = "INSERT";
@Parameter(names = {"--src-path", "-sp"}, description = "Base path for the input table", required = true)
public String srcPath = null;

View File

@@ -62,7 +62,6 @@ import java.util.stream.Collectors;
public class HiveIncrementalPuller {
private static final Logger LOG = LogManager.getLogger(HiveIncrementalPuller.class);
private static String driverName = "org.apache.hive.jdbc.HiveDriver";
public static class Config implements Serializable {
@@ -97,6 +96,7 @@ public class HiveIncrementalPuller {
}
static {
String driverName = "org.apache.hive.jdbc.HiveDriver";
try {
Class.forName(driverName);
} catch (ClassNotFoundException e) {
@@ -219,8 +219,7 @@ public class HiveIncrementalPuller {
// Set the from commit time
executeStatement("set hoodie." + config.sourceTable + ".consume.start.timestamp=" + config.fromCommitTime, stmt);
// Set number of commits to pull
executeStatement("set hoodie." + config.sourceTable + ".consume.max.commits=" + String.valueOf(config.maxCommits),
stmt);
executeStatement("set hoodie." + config.sourceTable + ".consume.max.commits=" + config.maxCommits, stmt);
}
private boolean deleteHDFSPath(FileSystem fs, String path) throws IOException {
@@ -233,14 +232,14 @@ public class HiveIncrementalPuller {
stmt.execute(sql);
}
private String inferCommitTime(FileSystem fs) throws SQLException, IOException {
private String inferCommitTime(FileSystem fs) throws IOException {
LOG.info("FromCommitTime not specified. Trying to infer it from Hoodie table " + config.targetDb + "."
+ config.targetTable);
String targetDataLocation = getTableLocation(config.targetDb, config.targetTable);
return scanForCommitTime(fs, targetDataLocation);
}
private String getTableLocation(String db, String table) throws SQLException {
private String getTableLocation(String db, String table) {
ResultSet resultSet = null;
Statement stmt = null;
try {
@@ -309,7 +308,7 @@ public class HiveIncrementalPuller {
return FileSystem.mkdirs(fs, targetBaseDirPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
}
private String getLastCommitTimePulled(FileSystem fs, String sourceTableLocation) throws IOException {
private String getLastCommitTimePulled(FileSystem fs, String sourceTableLocation) {
HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs.getConf(), sourceTableLocation);
List<String> commitsToSync = metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
.findInstantsAfter(config.fromCommitTime, config.maxCommits).getInstants().map(HoodieInstant::getTimestamp)

View File

@@ -31,7 +31,6 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@@ -45,11 +44,6 @@ public class HoodieCleaner {
*/
private final Config cfg;
/**
* Filesystem used.
*/
private transient FileSystem fs;
/**
* Spark context.
*/
@@ -60,22 +54,25 @@ public class HoodieCleaner {
*/
private TypedProperties props;
public HoodieCleaner(Config cfg, JavaSparkContext jssc) throws IOException {
public HoodieCleaner(Config cfg, JavaSparkContext jssc) {
this.cfg = cfg;
this.jssc = jssc;
this.fs = FSUtils.getFs(cfg.basePath, jssc.hadoopConfiguration());
/*
* Filesystem used.
*/
FileSystem fs = FSUtils.getFs(cfg.basePath, jssc.hadoopConfiguration());
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs)
: UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
LOG.info("Creating Cleaner with configs : " + props.toString());
}
public void run() throws Exception {
public void run() {
HoodieWriteConfig hoodieCfg = getHoodieClientConfig();
HoodieWriteClient client = new HoodieWriteClient<>(jssc, hoodieCfg, false);
client.clean();
}
private HoodieWriteConfig getHoodieClientConfig() throws Exception {
private HoodieWriteConfig getHoodieClientConfig() {
return HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(cfg.basePath).withAutoCommit(false)
.withProps(props).build();
}
@@ -101,7 +98,7 @@ public class HoodieCleaner {
public Boolean help = false;
}
public static void main(String[] args) throws Exception {
public static void main(String[] args) {
final Config cfg = new Config();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {

View File

@@ -60,8 +60,7 @@ public class HoodieCompactionAdminTool {
*/
public void run(JavaSparkContext jsc) throws Exception {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.basePath);
final CompactionAdminClient admin = new CompactionAdminClient(jsc, cfg.basePath);
try {
try (CompactionAdminClient admin = new CompactionAdminClient(jsc, cfg.basePath)) {
final FileSystem fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration());
if (cfg.outputPath != null && fs.exists(new Path(cfg.outputPath))) {
throw new IllegalStateException("Output File Path already exists");
@@ -101,8 +100,6 @@ public class HoodieCompactionAdminTool {
default:
throw new IllegalStateException("Not yet implemented !!");
}
} finally {
admin.close();
}
}

View File

@@ -83,7 +83,7 @@ public class HoodieCompactor {
public List<String> configs = new ArrayList<>();
}
public static void main(String[] args) throws Exception {
public static void main(String[] args) {
final Config cfg = new Config();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {

View File

@@ -42,30 +42,28 @@ public class HoodieWithTimelineServer implements Serializable {
private final Config cfg;
private transient Javalin app = null;
public HoodieWithTimelineServer(Config cfg) {
this.cfg = cfg;
}
public static class Config implements Serializable {
@Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false)
@Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
public String sparkMaster = null;
@Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true)
public String sparkMemory = null;
@Parameter(names = {"--num-partitions", "-n"}, description = "Num Partitions", required = false)
@Parameter(names = {"--num-partitions", "-n"}, description = "Num Partitions")
public Integer numPartitions = 100;
@Parameter(names = {"--server-port", "-p"}, description = " Server Port", required = false)
@Parameter(names = {"--server-port", "-p"}, description = " Server Port")
public Integer serverPort = 26754;
@Parameter(names = {"--delay-secs", "-d"}, description = "Delay(sec) before client connects", required = false)
@Parameter(names = {"--delay-secs", "-d"}, description = "Delay(sec) before client connects")
public Integer delaySecs = 30;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
}
public void startService() {
app = Javalin.create().start(cfg.serverPort);
Javalin app = Javalin.create().start(cfg.serverPort);
app.get("/", ctx -> ctx.result("Hello World"));
}
@@ -107,7 +105,7 @@ public class HoodieWithTimelineServer implements Serializable {
System.out.println("Response Code from(" + url + ") : " + response.getStatusLine().getStatusCode());
try (BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity().getContent()))) {
StringBuffer result = new StringBuffer();
StringBuilder result = new StringBuilder();
String line;
while ((line = rd.readLine()) != null) {
result.append(line);

View File

@@ -167,9 +167,8 @@ public class UtilHelpers {
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
additionalConfigs.entrySet().forEach(e -> sparkConf.set(e.getKey(), e.getValue()));
SparkConf newSparkConf = HoodieWriteClient.registerClasses(sparkConf);
return newSparkConf;
additionalConfigs.forEach(sparkConf::set);
return HoodieWriteClient.registerClasses(sparkConf);
}
public static JavaSparkContext buildSparkContext(String appName, String defaultMaster, Map<String, String> configs) {
@@ -200,7 +199,7 @@ public class UtilHelpers {
* @param parallelism Parallelism
*/
public static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr,
int parallelism, Option<String> compactionStrategyClass, TypedProperties properties) throws Exception {
int parallelism, Option<String> compactionStrategyClass, TypedProperties properties) {
HoodieCompactionConfig compactionConfig = compactionStrategyClass
.map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false)
.withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build())

View File

@@ -50,7 +50,7 @@ public class Compactor implements Serializable {
public void compact(HoodieInstant instant) throws IOException {
LOG.info("Compactor executing compaction " + instant);
JavaRDD<WriteStatus> res = compactionClient.compact(instant.getTimestamp());
long numWriteErrors = res.collect().stream().filter(r -> r.hasErrors()).count();
long numWriteErrors = res.collect().stream().filter(WriteStatus::hasErrors).count();
if (numWriteErrors != 0) {
// We treat even a single error in compaction as fatal
LOG.error("Compaction for instant (" + instant + ") failed with write errors. Errors :" + numWriteErrors);

View File

@@ -376,8 +376,8 @@ public class DeltaSync implements Serializable {
throw new HoodieDeltaStreamerException("Unknown operation :" + cfg.operation);
}
long totalErrorRecords = writeStatusRDD.mapToDouble(ws -> ws.getTotalErrorRecords()).sum().longValue();
long totalRecords = writeStatusRDD.mapToDouble(ws -> ws.getTotalRecords()).sum().longValue();
long totalErrorRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue();
long totalRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalRecords).sum().longValue();
boolean hasErrors = totalErrorRecords > 0;
long hiveSyncTimeMs = 0;
if (!hasErrors || cfg.commitOnErrors) {
@@ -414,10 +414,10 @@ public class DeltaSync implements Serializable {
} else {
LOG.error("Delta Sync found errors when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);
LOG.error("Printing out the top 100 errors");
writeStatusRDD.filter(ws -> ws.hasErrors()).take(100).forEach(ws -> {
writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws -> {
LOG.error("Global error :", ws.getGlobalError());
if (ws.getErrors().size() > 0) {
ws.getErrors().entrySet().forEach(r -> LOG.trace("Error for key:" + r.getKey() + " is " + r.getValue()));
ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " is " + value));
}
});
// Rolling back instant
@@ -456,7 +456,7 @@ public class DeltaSync implements Serializable {
/**
* Sync to Hive.
*/
private void syncHive() throws ClassNotFoundException {
private void syncHive() {
if (cfg.enableHiveSync) {
HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath);
LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :"

View File

@@ -64,7 +64,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
@@ -440,7 +439,7 @@ public class HoodieDeltaStreamer implements Serializable {
HoodieTableMetaClient meta =
new HoodieTableMetaClient(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, true);
List<HoodieInstant> pending = CompactionUtils.getPendingCompactionInstantTimes(meta);
pending.stream().forEach(hoodieInstant -> asyncCompactService.enqueuePendingCompaction(hoodieInstant));
pending.forEach(hoodieInstant -> asyncCompactService.enqueuePendingCompaction(hoodieInstant));
asyncCompactService.start((error) -> {
// Shutdown DeltaSync
shutdown(false);
@@ -554,29 +553,27 @@ public class HoodieDeltaStreamer implements Serializable {
@Override
protected Pair<CompletableFuture, ExecutorService> startService() {
ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentCompaction);
List<CompletableFuture<Boolean>> compactionFutures =
IntStream.range(0, maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
try {
// Set Compactor Pool Name for allowing users to prioritize compaction
LOG.info("Setting Spark Pool name for compaction to " + SchedulerConfGenerator.COMPACT_POOL_NAME);
jssc.setLocalProperty("spark.scheduler.pool", SchedulerConfGenerator.COMPACT_POOL_NAME);
return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
try {
// Set Compactor Pool Name for allowing users to prioritize compaction
LOG.info("Setting Spark Pool name for compaction to " + SchedulerConfGenerator.COMPACT_POOL_NAME);
jssc.setLocalProperty("spark.scheduler.pool", SchedulerConfGenerator.COMPACT_POOL_NAME);
while (!isShutdownRequested()) {
final HoodieInstant instant = fetchNextCompactionInstant();
if (null != instant) {
compactor.compact(instant);
}
}
LOG.info("Compactor shutting down properly!!");
} catch (InterruptedException ie) {
LOG.warn("Compactor executor thread got interrupted exception. Stopping", ie);
} catch (IOException e) {
LOG.error("Compactor executor failed", e);
throw new HoodieIOException(e.getMessage(), e);
while (!isShutdownRequested()) {
final HoodieInstant instant = fetchNextCompactionInstant();
if (null != instant) {
compactor.compact(instant);
}
return true;
}, executor)).collect(Collectors.toList());
return Pair.of(CompletableFuture.allOf(compactionFutures.stream().toArray(CompletableFuture[]::new)), executor);
}
LOG.info("Compactor shutting down properly!!");
} catch (InterruptedException ie) {
LOG.warn("Compactor executor thread got interrupted exception. Stopping", ie);
} catch (IOException e) {
LOG.error("Compactor executor failed", e);
throw new HoodieIOException(e.getMessage(), e);
}
return true;
}, executor)).toArray(CompletableFuture[]::new)), executor);
}
}

View File

@@ -25,8 +25,8 @@ import com.codahale.metrics.Timer;
public class HoodieDeltaStreamerMetrics {
private HoodieWriteConfig config = null;
private String tableName = null;
private HoodieWriteConfig config;
private String tableName;
public String overallTimerName = null;
public String hiveSyncTimerName = null;

View File

@@ -46,7 +46,7 @@ public class SchedulerConfGenerator {
public static final String SPARK_SCHEDULER_MODE_KEY = "spark.scheduler.mode";
public static final String SPARK_SCHEDULER_ALLOCATION_FILE_KEY = "spark.scheduler.allocation.file";
private static String SPARK_SCHEDULING_PATTERN =
private static final String SPARK_SCHEDULING_PATTERN =
"<?xml version=\"1.0\"?>\n<allocations>\n <pool name=\"%s\">\n"
+ " <schedulingMode>%s</schedulingMode>\n <weight>%s</weight>\n <minShare>%s</minShare>\n"
+ " </pool>\n <pool name=\"%s\">\n <schedulingMode>%s</schedulingMode>\n"

View File

@@ -36,7 +36,7 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
import java.util.Collections;
public class HoodieIncrSource extends RowSource {
@@ -87,9 +87,9 @@ public class HoodieIncrSource extends RowSource {
@Override
public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.HOODIE_SRC_BASE_PATH));
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.HOODIE_SRC_BASE_PATH));
/**
/*
* DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.HOODIE_SRC_BASE_PATH,
* Config.HOODIE_SRC_PARTITION_FIELDS)); List<String> partitionFields =
* props.getStringList(Config.HOODIE_SRC_PARTITION_FIELDS, ",", new ArrayList<>()); PartitionValueExtractor
@@ -121,7 +121,7 @@ public class HoodieIncrSource extends RowSource {
Dataset<Row> source = reader.load(srcPath);
/**
/*
* log.info("Partition Fields are : (" + partitionFields + "). Initial Source Schema :" + source.schema());
*
* StructType newSchema = new StructType(source.schema().fields()); for (String field : partitionFields) { newSchema

View File

@@ -25,7 +25,6 @@ import com.twitter.bijection.avro.GenericAvroCodecs;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import java.io.IOException;
import java.io.Serializable;
/**
@@ -80,7 +79,7 @@ public class AvroConvertor implements Serializable {
}
}
public GenericRecord fromJson(String json) throws IOException {
public GenericRecord fromJson(String json) {
initSchema();
initJsonConvertor();
return jsonConverter.convert(json, schema);

View File

@@ -33,13 +33,12 @@ import java.io.IOException;
import static org.junit.Assert.assertEquals;
public class TestTimestampBasedKeyGenerator {
private Schema schema;
private GenericRecord baseRecord;
private TypedProperties properties = new TypedProperties();
@Before
public void initialize() throws IOException {
schema = SchemaTestUtil.getTimestampEvolvedSchema();
Schema schema = SchemaTestUtil.getTimestampEvolvedSchema();
baseRecord = SchemaTestUtil
.generateAvroRecordFromJson(schema, 1, "001", "f1");