diff --git a/hudi-cli/pom.xml b/hudi-cli/pom.xml
index fed2bf972..dbb44639f 100644
--- a/hudi-cli/pom.xml
+++ b/hudi-cli/pom.xml
@@ -194,6 +194,13 @@
test
test-jar
+
+ org.apache.hudi
+ hudi-utilities_${scala.binary.version}
+ ${project.version}
+ test
+ test-jar
+
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java
index 0f1db5036..a31f31012 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java
@@ -18,7 +18,6 @@
package org.apache.hudi.cli.commands;
-import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.commands.SparkMain.SparkCommand;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
@@ -57,6 +56,7 @@ public class HDFSParquetImportCommand implements CommandMarker {
@CliOption(key = "schemaFilePath", mandatory = true,
help = "Path for Avro schema file") final String schemaFilePath,
@CliOption(key = "format", mandatory = true, help = "Format for the input data") final String format,
+ @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master,
@CliOption(key = "sparkMemory", mandatory = true, help = "Spark executor memory") final String sparkMemory,
@CliOption(key = "retry", mandatory = true, help = "Number of retries") final String retry,
@CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for importing",
@@ -66,8 +66,6 @@ public class HDFSParquetImportCommand implements CommandMarker {
(new FormatValidator()).validate("format", format);
- boolean initialized = HoodieCLI.initConf();
- HoodieCLI.initFS(initialized);
String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
@@ -78,8 +76,8 @@ public class HDFSParquetImportCommand implements CommandMarker {
cmd = SparkCommand.UPSERT.toString();
}
- sparkLauncher.addAppArgs(cmd, srcPath, targetPath, tableName, tableType, rowKeyField, partitionPathField,
- parallelism, schemaFilePath, sparkMemory, retry, propsFilePath);
+ sparkLauncher.addAppArgs(cmd, master, sparkMemory, srcPath, targetPath, tableName, tableType, rowKeyField,
+ partitionPathField, parallelism, schemaFilePath, retry, propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 5d8972d0b..be9d7ddf8 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -82,17 +82,17 @@ public class SparkMain {
break;
case IMPORT:
case UPSERT:
- assert (args.length >= 12);
+ assert (args.length >= 13);
String propsFilePath = null;
- if (!StringUtils.isNullOrEmpty(args[11])) {
- propsFilePath = args[11];
+ if (!StringUtils.isNullOrEmpty(args[12])) {
+ propsFilePath = args[12];
}
List configs = new ArrayList<>();
- if (args.length > 12) {
- configs.addAll(Arrays.asList(args).subList(12, args.length));
+ if (args.length > 13) {
+ configs.addAll(Arrays.asList(args).subList(13, args.length));
}
- returnCode = dataLoad(jsc, command, args[1], args[2], args[3], args[4], args[5], args[6],
- Integer.parseInt(args[7]), args[8], args[9], Integer.parseInt(args[10]), propsFilePath, configs);
+ returnCode = dataLoad(jsc, command, args[3], args[4], args[5], args[6], args[7], args[8],
+ Integer.parseInt(args[9]), args[10], Integer.parseInt(args[11]), propsFilePath, configs);
break;
case COMPACT_RUN:
assert (args.length >= 9);
@@ -163,7 +163,7 @@ public class SparkMain {
private static boolean sparkMasterContained(SparkCommand command) {
List masterContained = Arrays.asList(SparkCommand.COMPACT_VALIDATE, SparkCommand.COMPACT_REPAIR,
SparkCommand.COMPACT_UNSCHEDULE_PLAN, SparkCommand.COMPACT_UNSCHEDULE_FILE, SparkCommand.CLEAN,
- SparkCommand.DEDUPLICATE);
+ SparkCommand.IMPORT, SparkCommand.UPSERT, SparkCommand.DEDUPLICATE);
return masterContained.contains(command);
}
@@ -177,7 +177,7 @@ public class SparkMain {
}
private static int dataLoad(JavaSparkContext jsc, String command, String srcPath, String targetPath, String tableName,
- String tableType, String rowKey, String partitionKey, int parallelism, String schemaFile, String sparkMemory,
+ String tableType, String rowKey, String partitionKey, int parallelism, String schemaFile,
int retry, String propsFilePath, List configs) {
Config cfg = new Config();
cfg.command = command;
@@ -191,7 +191,6 @@ public class SparkMain {
cfg.schemaFile = schemaFile;
cfg.propsFilePath = propsFilePath;
cfg.configs = configs;
- jsc.getConf().set("spark.executor.memory", sparkMemory);
return new HDFSParquetImporter(cfg).dataImport(jsc, retry);
}
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestHDFSParquetImportCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestHDFSParquetImportCommand.java
new file mode 100644
index 000000000..347396b23
--- /dev/null
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestHDFSParquetImportCommand.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.common.HoodieClientTestUtils;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.utilities.HDFSParquetImporter;
+import org.apache.hudi.utilities.functional.TestHDFSParquetImporter;
+import org.apache.hudi.utilities.functional.TestHDFSParquetImporter.HoodieTripModel;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.text.ParseException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertAll;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test class for {@link org.apache.hudi.cli.commands.HDFSParquetImportCommand}.
+ */
+public class ITTestHDFSParquetImportCommand extends AbstractShellIntegrationTest {
+
+ private Path sourcePath;
+ private Path targetPath;
+ private String tableName;
+ private String schemaFile;
+ private String tablePath;
+
+ private List insertData;
+ private TestHDFSParquetImporter importer;
+
+ @BeforeEach
+ public void init() throws IOException, ParseException {
+ tableName = "test_table";
+ tablePath = basePath + File.separator + tableName;
+ sourcePath = new Path(basePath, "source");
+ targetPath = new Path(tablePath);
+ schemaFile = new Path(basePath, "file.schema").toString();
+
+ // create schema file
+ try (FSDataOutputStream schemaFileOS = fs.create(new Path(schemaFile))) {
+ schemaFileOS.write(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA.getBytes());
+ }
+
+ importer = new TestHDFSParquetImporter();
+ insertData = importer.createInsertRecords(sourcePath);
+ }
+
+ /**
+ * Test case for 'hdfsparquetimport' with insert.
+ */
+ @Test
+ public void testConvertWithInsert() throws IOException {
+ String command = String.format("hdfsparquetimport --srcPath %s --targetPath %s --tableName %s "
+ + "--tableType %s --rowKeyField %s" + " --partitionPathField %s --parallelism %s "
+ + "--schemaFilePath %s --format %s --sparkMemory %s --retry %s --sparkMaster %s",
+ sourcePath.toString(), targetPath.toString(), tableName, HoodieTableType.COPY_ON_WRITE.name(),
+ "_row_key", "timestamp", "1", schemaFile, "parquet", "2G", "1", "local");
+ CommandResult cr = getShell().executeCommand(command);
+
+ assertAll("Command run success",
+ () -> assertTrue(cr.isSuccess()),
+ () -> assertEquals("Table imported to hoodie format", cr.getResult().toString()));
+
+ // Check hudi table exist
+ String metaPath = targetPath + File.separator + HoodieTableMetaClient.METAFOLDER_NAME;
+ assertTrue(Files.exists(Paths.get(metaPath)), "Hoodie table not exist.");
+
+ // Load meta data
+ new TableCommand().connect(targetPath.toString(), TimelineLayoutVersion.VERSION_1, false, 2000, 300000, 7);
+ metaClient = HoodieCLI.getTableMetaClient();
+
+ assertEquals(1, metaClient.getActiveTimeline().getCommitsTimeline().countInstants(), "Should only 1 commit.");
+
+ verifyResultData(insertData);
+ }
+
+ /**
+ * Test case for 'hdfsparquetimport' with upsert.
+ */
+ @Test
+ public void testConvertWithUpsert() throws IOException, ParseException {
+ Path upsertFolder = new Path(basePath, "testUpsertSrc");
+ List upsertData = importer.createUpsertRecords(upsertFolder);
+
+ // first insert records
+ HDFSParquetImporter.Config cfg = importer.getHDFSParquetImporterConfig(sourcePath.toString(), tablePath,
+ tableName, HoodieTableType.COPY_ON_WRITE.name(), "_row_key", "timestamp", 1, schemaFile);
+ HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+ dataImporter.dataImport(jsc, 0);
+
+ // Load meta data
+ new TableCommand().connect(targetPath.toString(), TimelineLayoutVersion.VERSION_1, false, 2000, 300000, 7);
+ metaClient = HoodieCLI.getTableMetaClient();
+
+ // check if insert instant exist
+ assertEquals(1, metaClient.getActiveTimeline().getCommitsTimeline().countInstants(), "Should only 1 commit.");
+
+ String command = String.format("hdfsparquetimport --srcPath %s --targetPath %s --tableName %s "
+ + "--tableType %s --rowKeyField %s" + " --partitionPathField %s --parallelism %s "
+ + "--schemaFilePath %s --format %s --sparkMemory %s --retry %s --sparkMaster %s --upsert %s",
+ upsertFolder.toString(), targetPath.toString(), tableName, HoodieTableType.COPY_ON_WRITE.name(),
+ "_row_key", "timestamp", "1", schemaFile, "parquet", "2G", "1", "local", "true");
+ CommandResult cr = getShell().executeCommand(command);
+
+ assertAll("Command run success",
+ () -> assertTrue(cr.isSuccess()),
+ () -> assertEquals("Table imported to hoodie format", cr.getResult().toString()));
+
+ // reload meta client
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ assertEquals(2, metaClient.getActiveTimeline().getCommitsTimeline().countInstants(), "Should have 2 commit.");
+
+ // construct result, remove top 10 and add upsert data.
+ List expectData = insertData.subList(11, 96);
+ expectData.addAll(upsertData);
+
+ verifyResultData(expectData);
+ }
+
+ /**
+ * Method to verify result is equals to expect.
+ */
+ private void verifyResultData(List expectData) {
+ Dataset ds = HoodieClientTestUtils.read(jsc, tablePath, sqlContext, fs, tablePath + "/*/*/*/*");
+
+ List readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+ List result = readData.stream().map(row ->
+ new HoodieTripModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+ row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+ .collect(Collectors.toList());
+
+ List expected = expectData.stream().map(g ->
+ new HoodieTripModel(Double.parseDouble(g.get("timestamp").toString()),
+ g.get("_row_key").toString(),
+ g.get("rider").toString(),
+ g.get("driver").toString(),
+ Double.parseDouble(g.get("begin_lat").toString()),
+ Double.parseDouble(g.get("begin_lon").toString()),
+ Double.parseDouble(g.get("end_lat").toString()),
+ Double.parseDouble(g.get("end_lon").toString())))
+ .collect(Collectors.toList());
+
+ assertAll("Result list equals",
+ () -> assertEquals(expected.size(), result.size()),
+ () -> assertTrue(result.containsAll(expected) && expected.containsAll(result)));
+ }
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java
index f738cdfde..a8474f670 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java
@@ -264,7 +264,7 @@ public class TestHDFSParquetImporter implements Serializable {
}
}
- private List createInsertRecords(Path srcFolder) throws ParseException, IOException {
+ public List createInsertRecords(Path srcFolder) throws ParseException, IOException {
Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
long startTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000;
List records = new ArrayList();
@@ -281,7 +281,7 @@ public class TestHDFSParquetImporter implements Serializable {
return records;
}
- private List createUpsertRecords(Path srcFolder) throws ParseException, IOException {
+ public List createUpsertRecords(Path srcFolder) throws ParseException, IOException {
Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
long startTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000;
List records = new ArrayList();
@@ -361,7 +361,7 @@ public class TestHDFSParquetImporter implements Serializable {
}
}
- private HDFSParquetImporter.Config getHDFSParquetImporterConfig(String srcPath, String targetPath, String tableName,
+ public HDFSParquetImporter.Config getHDFSParquetImporterConfig(String srcPath, String targetPath, String tableName,
String tableType, String rowKey, String partitionKey, int parallelism, String schemaFile) {
HDFSParquetImporter.Config cfg = new HDFSParquetImporter.Config();
cfg.srcPath = srcPath;
@@ -395,7 +395,7 @@ public class TestHDFSParquetImporter implements Serializable {
double endLat;
double endLon;
- private HoodieTripModel(double timestamp, String rowKey, String rider, String driver, double beginLat,
+ public HoodieTripModel(double timestamp, String rowKey, String rider, String driver, double beginLat,
double beginLon, double endLat, double endLon) {
this.timestamp = timestamp;
this.rowKey = rowKey;