diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java index c9955bdb1..6d8769b7f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java @@ -144,7 +144,7 @@ public class HDFSParquetImporter implements Serializable { // Get schema. String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile); - SparkRDDWriteClient client = + SparkRDDWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.targetPath, schemaStr, cfg.parallelism, Option.empty(), props); JavaRDD> hoodieRecords = buildHoodieRecordsForImport(jsc, schemaStr); @@ -206,7 +206,7 @@ public class HDFSParquetImporter implements Serializable { * @param hoodieRecords Hoodie Records * @param Type */ - protected JavaRDD load(SparkRDDWriteClient client, String instantTime, + protected JavaRDD load(SparkRDDWriteClient client, String instantTime, JavaRDD> hoodieRecords) { switch (cfg.command.toLowerCase()) { case "upsert": { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java index 1e10eef24..65cb4048a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java @@ -27,6 +27,7 @@ import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.util.Option; @@ -148,10 +149,10 @@ public class HoodieClusteringJob { private int doCluster(JavaSparkContext jsc) throws Exception { String schemaStr = getSchemaFromLatestInstant(); - SparkRDDWriteClient client = + SparkRDDWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props); JavaRDD writeResponse = - (JavaRDD) client.cluster(cfg.clusteringInstantTime, true).getWriteStatuses(); + client.cluster(cfg.clusteringInstantTime, true).getWriteStatuses(); return UtilHelpers.handleErrors(jsc, cfg.clusteringInstantTime, writeResponse); } @@ -162,7 +163,7 @@ public class HoodieClusteringJob { private Option doSchedule(JavaSparkContext jsc) throws Exception { String schemaStr = getSchemaFromLatestInstant(); - SparkRDDWriteClient client = + SparkRDDWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props); if (cfg.clusteringInstantTime != null) { client.scheduleClusteringAtInstant(cfg.clusteringInstantTime, Option.empty()); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java index c1493e6a4..1996fb8b5 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java @@ -22,6 +22,7 @@ import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; import com.beust.jcommander.JCommander; @@ -131,9 +132,9 @@ public class HoodieCompactor { private int doCompact(JavaSparkContext jsc) throws Exception { // Get schema. String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile); - SparkRDDWriteClient client = + SparkRDDWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props); - JavaRDD writeResponse = (JavaRDD) client.compact(cfg.compactionInstantTime); + JavaRDD writeResponse = client.compact(cfg.compactionInstantTime); return UtilHelpers.handleErrors(jsc, cfg.compactionInstantTime, writeResponse); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index e7a8b3c37..79a495e55 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -24,6 +24,7 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.DFSPropertiesConfiguration; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; @@ -274,8 +275,8 @@ public class UtilHelpers { * @param schemaStr Schema * @param parallelism Parallelism */ - public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr, - int parallelism, Option compactionStrategyClass, TypedProperties properties) { + public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr, + int parallelism, Option compactionStrategyClass, TypedProperties properties) { HoodieCompactionConfig compactionConfig = compactionStrategyClass .map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false) .withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build()) @@ -288,7 +289,7 @@ public class UtilHelpers { .withSchema(schemaStr).combineInput(true, true).withCompactionConfig(compactionConfig) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .withProps(properties).build(); - return new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), config); + return new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), config); } public static int handleErrors(JavaSparkContext jsc, String instantTime, JavaRDD writeResponse) {