1
0

[HUDI-2008] Avoid the raw type usage in some classes under hudi-utilities module (#3076)

This commit is contained in:
Wei
2021-06-16 22:37:29 +08:00
committed by GitHub
parent 8b0a502c4f
commit d519c74626
4 changed files with 13 additions and 10 deletions

View File

@@ -144,7 +144,7 @@ public class HDFSParquetImporter implements Serializable {
// Get schema. // Get schema.
String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile); String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
SparkRDDWriteClient client = SparkRDDWriteClient<HoodieRecordPayload> client =
UtilHelpers.createHoodieClient(jsc, cfg.targetPath, schemaStr, cfg.parallelism, Option.empty(), props); UtilHelpers.createHoodieClient(jsc, cfg.targetPath, schemaStr, cfg.parallelism, Option.empty(), props);
JavaRDD<HoodieRecord<HoodieRecordPayload>> hoodieRecords = buildHoodieRecordsForImport(jsc, schemaStr); JavaRDD<HoodieRecord<HoodieRecordPayload>> hoodieRecords = buildHoodieRecordsForImport(jsc, schemaStr);
@@ -206,7 +206,7 @@ public class HDFSParquetImporter implements Serializable {
* @param hoodieRecords Hoodie Records * @param hoodieRecords Hoodie Records
* @param <T> Type * @param <T> Type
*/ */
protected <T extends HoodieRecordPayload> JavaRDD<WriteStatus> load(SparkRDDWriteClient client, String instantTime, protected <T extends HoodieRecordPayload> JavaRDD<WriteStatus> load(SparkRDDWriteClient<T> client, String instantTime,
JavaRDD<HoodieRecord<T>> hoodieRecords) { JavaRDD<HoodieRecord<T>> hoodieRecords) {
switch (cfg.command.toLowerCase()) { switch (cfg.command.toLowerCase()) {
case "upsert": { case "upsert": {

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
@@ -148,10 +149,10 @@ public class HoodieClusteringJob {
private int doCluster(JavaSparkContext jsc) throws Exception { private int doCluster(JavaSparkContext jsc) throws Exception {
String schemaStr = getSchemaFromLatestInstant(); String schemaStr = getSchemaFromLatestInstant();
SparkRDDWriteClient client = SparkRDDWriteClient<HoodieRecordPayload> client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props); UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props);
JavaRDD<WriteStatus> writeResponse = JavaRDD<WriteStatus> writeResponse =
(JavaRDD<WriteStatus>) client.cluster(cfg.clusteringInstantTime, true).getWriteStatuses(); client.cluster(cfg.clusteringInstantTime, true).getWriteStatuses();
return UtilHelpers.handleErrors(jsc, cfg.clusteringInstantTime, writeResponse); return UtilHelpers.handleErrors(jsc, cfg.clusteringInstantTime, writeResponse);
} }
@@ -162,7 +163,7 @@ public class HoodieClusteringJob {
private Option<String> doSchedule(JavaSparkContext jsc) throws Exception { private Option<String> doSchedule(JavaSparkContext jsc) throws Exception {
String schemaStr = getSchemaFromLatestInstant(); String schemaStr = getSchemaFromLatestInstant();
SparkRDDWriteClient client = SparkRDDWriteClient<HoodieRecordPayload> client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props); UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props);
if (cfg.clusteringInstantTime != null) { if (cfg.clusteringInstantTime != null) {
client.scheduleClusteringAtInstant(cfg.clusteringInstantTime, Option.empty()); client.scheduleClusteringAtInstant(cfg.clusteringInstantTime, Option.empty());

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import com.beust.jcommander.JCommander; import com.beust.jcommander.JCommander;
@@ -131,9 +132,9 @@ public class HoodieCompactor {
private int doCompact(JavaSparkContext jsc) throws Exception { private int doCompact(JavaSparkContext jsc) throws Exception {
// Get schema. // Get schema.
String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile); String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
SparkRDDWriteClient client = SparkRDDWriteClient<HoodieRecordPayload> client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props); UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props);
JavaRDD<WriteStatus> writeResponse = (JavaRDD<WriteStatus>) client.compact(cfg.compactionInstantTime); JavaRDD<WriteStatus> writeResponse = client.compact(cfg.compactionInstantTime);
return UtilHelpers.handleErrors(jsc, cfg.compactionInstantTime, writeResponse); return UtilHelpers.handleErrors(jsc, cfg.compactionInstantTime, writeResponse);
} }

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.DFSPropertiesConfiguration; import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.StringUtils;
@@ -274,8 +275,8 @@ public class UtilHelpers {
* @param schemaStr Schema * @param schemaStr Schema
* @param parallelism Parallelism * @param parallelism Parallelism
*/ */
public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr, public static SparkRDDWriteClient<HoodieRecordPayload> createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr,
int parallelism, Option<String> compactionStrategyClass, TypedProperties properties) { int parallelism, Option<String> compactionStrategyClass, TypedProperties properties) {
HoodieCompactionConfig compactionConfig = compactionStrategyClass HoodieCompactionConfig compactionConfig = compactionStrategyClass
.map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false) .map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false)
.withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build()) .withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build())
@@ -288,7 +289,7 @@ public class UtilHelpers {
.withSchema(schemaStr).combineInput(true, true).withCompactionConfig(compactionConfig) .withSchema(schemaStr).combineInput(true, true).withCompactionConfig(compactionConfig)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withProps(properties).build(); .withProps(properties).build();
return new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), config); return new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), config);
} }
public static int handleErrors(JavaSparkContext jsc, String instantTime, JavaRDD<WriteStatus> writeResponse) { public static int handleErrors(JavaSparkContext jsc, String instantTime, JavaRDD<WriteStatus> writeResponse) {