From f93e64fee413ed1b774156e688794ee7937cc01a Mon Sep 17 00:00:00 2001 From: hongdd Date: Mon, 9 Mar 2020 18:31:04 +0800 Subject: [PATCH] [HUDI-681]Remove embeddedTimelineService from HoodieReadClient (#1388) * [HUDI-681]Remove embeddedTimelineService from HoodieReadClient --- .../apache/hudi/client/HoodieReadClient.java | 21 ++----------------- .../java/org/apache/hudi/DataSourceUtils.java | 10 ++++----- .../apache/hudi/HoodieSparkSqlWriter.scala | 5 +---- .../utilities/deltastreamer/DeltaSync.java | 2 +- 4 files changed, 8 insertions(+), 30 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java index 33d661ba5..d1e92b5c6 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java @@ -19,7 +19,6 @@ package org.apache.hudi.client; import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; @@ -72,18 +71,10 @@ public class HoodieReadClient implements Serializ /** * @param basePath path to Hoodie table */ - public HoodieReadClient(JavaSparkContext jsc, String basePath, Option timelineService) { + public HoodieReadClient(JavaSparkContext jsc, String basePath) { this(jsc, HoodieWriteConfig.newBuilder().withPath(basePath) // by default we use HoodieBloomIndex - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(), - timelineService); - } - - /** - * @param basePath path to Hoodie table - */ - public HoodieReadClient(JavaSparkContext jsc, String basePath) { - this(jsc, basePath, Option.empty()); + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build()); } /** @@ -100,14 +91,6 @@ public class HoodieReadClient implements Serializ * @param clientConfig instance of HoodieWriteConfig */ public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) { - this(jsc, clientConfig, Option.empty()); - } - - /** - * @param clientConfig instance of HoodieWriteConfig - */ - public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, - Option timelineService) { this.jsc = jsc; final String basePath = clientConfig.getBasePath(); // Create a Hoodie table which encapsulated the commits and files visible diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java index 6a4ad03b1..99a795d88 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -23,11 +23,9 @@ import org.apache.avro.Schema; import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.TypedProperties; import org.apache.hudi.config.HoodieCompactionConfig; @@ -222,9 +220,9 @@ public class DataSourceUtils { @SuppressWarnings("unchecked") public static JavaRDD dropDuplicates(JavaSparkContext jssc, JavaRDD incomingHoodieRecords, - HoodieWriteConfig writeConfig, Option timelineService) { + HoodieWriteConfig writeConfig) { try { - HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig, timelineService); + HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig); return client.tagLocation(incomingHoodieRecords) .filter(r -> !((HoodieRecord) r).isCurrentLocationKnown()); } catch (TableNotFoundException e) { @@ -236,10 +234,10 @@ public class DataSourceUtils { @SuppressWarnings("unchecked") public static JavaRDD dropDuplicates(JavaSparkContext jssc, JavaRDD incomingHoodieRecords, - Map parameters, Option timelineService) { + Map parameters) { HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(parameters.get("path")).withProps(parameters).build(); - return dropDuplicates(jssc, incomingHoodieRecords, writeConfig, timelineService); + return dropDuplicates(jssc, incomingHoodieRecords, writeConfig); } public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, String basePath) { diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 80a01d337..326595f5e 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -131,10 +131,7 @@ private[hudi] object HoodieSparkSqlWriter { val hoodieRecords = if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) { - DataSourceUtils.dropDuplicates( - jsc, - hoodieAllIncomingRecords, - mapAsJavaMap(parameters), client.getTimelineServer) + DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters)) } else { hoodieAllIncomingRecords } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 97d3d4282..4b69d2234 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -357,7 +357,7 @@ public class DeltaSync implements Serializable { if (cfg.filterDupes) { // turn upserts to insert cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation; - records = DataSourceUtils.dropDuplicates(jssc, records, writeClient.getConfig(), writeClient.getTimelineServer()); + records = DataSourceUtils.dropDuplicates(jssc, records, writeClient.getConfig()); } boolean isEmpty = records.isEmpty();