1
0

[HUDI-681]Remove embeddedTimelineService from HoodieReadClient (#1388)

* [HUDI-681]Remove embeddedTimelineService from HoodieReadClient
This commit is contained in:
hongdd
2020-03-09 18:31:04 +08:00
committed by GitHub
parent 2137ecfa22
commit f93e64fee4
4 changed files with 8 additions and 30 deletions

View File

@@ -19,7 +19,6 @@
package org.apache.hudi.client;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -72,18 +71,10 @@ public class HoodieReadClient<T extends HoodieRecordPayload> implements Serializ
/**
* @param basePath path to Hoodie table
*/
public HoodieReadClient(JavaSparkContext jsc, String basePath, Option<EmbeddedTimelineService> timelineService) {
public HoodieReadClient(JavaSparkContext jsc, String basePath) {
this(jsc, HoodieWriteConfig.newBuilder().withPath(basePath)
// by default we use HoodieBloomIndex
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(),
timelineService);
}
/**
* @param basePath path to Hoodie table
*/
public HoodieReadClient(JavaSparkContext jsc, String basePath) {
this(jsc, basePath, Option.empty());
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build());
}
/**
@@ -100,14 +91,6 @@ public class HoodieReadClient<T extends HoodieRecordPayload> implements Serializ
* @param clientConfig instance of HoodieWriteConfig
*/
public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) {
this(jsc, clientConfig, Option.empty());
}
/**
* @param clientConfig instance of HoodieWriteConfig
*/
public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig,
Option<EmbeddedTimelineService> timelineService) {
this.jsc = jsc;
final String basePath = clientConfig.getBasePath();
// Create a Hoodie table which encapsulated the commits and files visible

View File

@@ -23,11 +23,9 @@ import org.apache.avro.Schema;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.config.HoodieCompactionConfig;
@@ -222,9 +220,9 @@ public class DataSourceUtils {
@SuppressWarnings("unchecked")
public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRDD<HoodieRecord> incomingHoodieRecords,
HoodieWriteConfig writeConfig, Option<EmbeddedTimelineService> timelineService) {
HoodieWriteConfig writeConfig) {
try {
HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig, timelineService);
HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig);
return client.tagLocation(incomingHoodieRecords)
.filter(r -> !((HoodieRecord<HoodieRecordPayload>) r).isCurrentLocationKnown());
} catch (TableNotFoundException e) {
@@ -236,10 +234,10 @@ public class DataSourceUtils {
@SuppressWarnings("unchecked")
public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRDD<HoodieRecord> incomingHoodieRecords,
Map<String, String> parameters, Option<EmbeddedTimelineService> timelineService) {
Map<String, String> parameters) {
HoodieWriteConfig writeConfig =
HoodieWriteConfig.newBuilder().withPath(parameters.get("path")).withProps(parameters).build();
return dropDuplicates(jssc, incomingHoodieRecords, writeConfig, timelineService);
return dropDuplicates(jssc, incomingHoodieRecords, writeConfig);
}
public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, String basePath) {

View File

@@ -131,10 +131,7 @@ private[hudi] object HoodieSparkSqlWriter {
val hoodieRecords =
if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
DataSourceUtils.dropDuplicates(
jsc,
hoodieAllIncomingRecords,
mapAsJavaMap(parameters), client.getTimelineServer)
DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters))
} else {
hoodieAllIncomingRecords
}

View File

@@ -357,7 +357,7 @@ public class DeltaSync implements Serializable {
if (cfg.filterDupes) {
// turn upserts to insert
cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation;
records = DataSourceUtils.dropDuplicates(jssc, records, writeClient.getConfig(), writeClient.getTimelineServer());
records = DataSourceUtils.dropDuplicates(jssc, records, writeClient.getConfig());
}
boolean isEmpty = records.isEmpty();