1
0

[HUDI-701] Add unit test for HDFSParquetImportCommand (#1574)

This commit is contained in:
hongdd
2020-05-14 19:15:49 +08:00
committed by GitHub
parent 83796b3189
commit 3a2fe13fcb
5 changed files with 209 additions and 19 deletions

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.cli.commands;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.commands.SparkMain.SparkCommand;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
@@ -57,6 +56,7 @@ public class HDFSParquetImportCommand implements CommandMarker {
@CliOption(key = "schemaFilePath", mandatory = true,
help = "Path for Avro schema file") final String schemaFilePath,
@CliOption(key = "format", mandatory = true, help = "Format for the input data") final String format,
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master,
@CliOption(key = "sparkMemory", mandatory = true, help = "Spark executor memory") final String sparkMemory,
@CliOption(key = "retry", mandatory = true, help = "Number of retries") final String retry,
@CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for importing",
@@ -66,8 +66,6 @@ public class HDFSParquetImportCommand implements CommandMarker {
(new FormatValidator()).validate("format", format);
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);
String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
@@ -78,8 +76,8 @@ public class HDFSParquetImportCommand implements CommandMarker {
cmd = SparkCommand.UPSERT.toString();
}
sparkLauncher.addAppArgs(cmd, srcPath, targetPath, tableName, tableType, rowKeyField, partitionPathField,
parallelism, schemaFilePath, sparkMemory, retry, propsFilePath);
sparkLauncher.addAppArgs(cmd, master, sparkMemory, srcPath, targetPath, tableName, tableType, rowKeyField,
partitionPathField, parallelism, schemaFilePath, retry, propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);

View File

@@ -82,17 +82,17 @@ public class SparkMain {
break;
case IMPORT:
case UPSERT:
assert (args.length >= 12);
assert (args.length >= 13);
String propsFilePath = null;
if (!StringUtils.isNullOrEmpty(args[11])) {
propsFilePath = args[11];
if (!StringUtils.isNullOrEmpty(args[12])) {
propsFilePath = args[12];
}
List<String> configs = new ArrayList<>();
if (args.length > 12) {
configs.addAll(Arrays.asList(args).subList(12, args.length));
if (args.length > 13) {
configs.addAll(Arrays.asList(args).subList(13, args.length));
}
returnCode = dataLoad(jsc, command, args[1], args[2], args[3], args[4], args[5], args[6],
Integer.parseInt(args[7]), args[8], args[9], Integer.parseInt(args[10]), propsFilePath, configs);
returnCode = dataLoad(jsc, command, args[3], args[4], args[5], args[6], args[7], args[8],
Integer.parseInt(args[9]), args[10], Integer.parseInt(args[11]), propsFilePath, configs);
break;
case COMPACT_RUN:
assert (args.length >= 9);
@@ -163,7 +163,7 @@ public class SparkMain {
private static boolean sparkMasterContained(SparkCommand command) {
List<SparkCommand> masterContained = Arrays.asList(SparkCommand.COMPACT_VALIDATE, SparkCommand.COMPACT_REPAIR,
SparkCommand.COMPACT_UNSCHEDULE_PLAN, SparkCommand.COMPACT_UNSCHEDULE_FILE, SparkCommand.CLEAN,
SparkCommand.DEDUPLICATE);
SparkCommand.IMPORT, SparkCommand.UPSERT, SparkCommand.DEDUPLICATE);
return masterContained.contains(command);
}
@@ -177,7 +177,7 @@ public class SparkMain {
}
private static int dataLoad(JavaSparkContext jsc, String command, String srcPath, String targetPath, String tableName,
String tableType, String rowKey, String partitionKey, int parallelism, String schemaFile, String sparkMemory,
String tableType, String rowKey, String partitionKey, int parallelism, String schemaFile,
int retry, String propsFilePath, List<String> configs) {
Config cfg = new Config();
cfg.command = command;
@@ -191,7 +191,6 @@ public class SparkMain {
cfg.schemaFile = schemaFile;
cfg.propsFilePath = propsFilePath;
cfg.configs = configs;
jsc.getConf().set("spark.executor.memory", sparkMemory);
return new HDFSParquetImporter(cfg).dataImport(jsc, retry);
}

View File

@@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.cli.integ;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.cli.AbstractShellIntegrationTest;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.commands.TableCommand;
import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.utilities.HDFSParquetImporter;
import org.apache.hudi.utilities.functional.TestHDFSParquetImporter;
import org.apache.hudi.utilities.functional.TestHDFSParquetImporter.HoodieTripModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.BeforeEach;
import org.springframework.shell.core.CommandResult;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.text.ParseException;
import java.util.List;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test class for {@link org.apache.hudi.cli.commands.HDFSParquetImportCommand}.
*/
public class ITTestHDFSParquetImportCommand extends AbstractShellIntegrationTest {
private Path sourcePath;
private Path targetPath;
private String tableName;
private String schemaFile;
private String tablePath;
private List<GenericRecord> insertData;
private TestHDFSParquetImporter importer;
@BeforeEach
public void init() throws IOException, ParseException {
tableName = "test_table";
tablePath = basePath + File.separator + tableName;
sourcePath = new Path(basePath, "source");
targetPath = new Path(tablePath);
schemaFile = new Path(basePath, "file.schema").toString();
// create schema file
try (FSDataOutputStream schemaFileOS = fs.create(new Path(schemaFile))) {
schemaFileOS.write(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA.getBytes());
}
importer = new TestHDFSParquetImporter();
insertData = importer.createInsertRecords(sourcePath);
}
/**
* Test case for 'hdfsparquetimport' with insert.
*/
@Test
public void testConvertWithInsert() throws IOException {
String command = String.format("hdfsparquetimport --srcPath %s --targetPath %s --tableName %s "
+ "--tableType %s --rowKeyField %s" + " --partitionPathField %s --parallelism %s "
+ "--schemaFilePath %s --format %s --sparkMemory %s --retry %s --sparkMaster %s",
sourcePath.toString(), targetPath.toString(), tableName, HoodieTableType.COPY_ON_WRITE.name(),
"_row_key", "timestamp", "1", schemaFile, "parquet", "2G", "1", "local");
CommandResult cr = getShell().executeCommand(command);
assertAll("Command run success",
() -> assertTrue(cr.isSuccess()),
() -> assertEquals("Table imported to hoodie format", cr.getResult().toString()));
// Check hudi table exist
String metaPath = targetPath + File.separator + HoodieTableMetaClient.METAFOLDER_NAME;
assertTrue(Files.exists(Paths.get(metaPath)), "Hoodie table not exist.");
// Load meta data
new TableCommand().connect(targetPath.toString(), TimelineLayoutVersion.VERSION_1, false, 2000, 300000, 7);
metaClient = HoodieCLI.getTableMetaClient();
assertEquals(1, metaClient.getActiveTimeline().getCommitsTimeline().countInstants(), "Should only 1 commit.");
verifyResultData(insertData);
}
/**
* Test case for 'hdfsparquetimport' with upsert.
*/
@Test
public void testConvertWithUpsert() throws IOException, ParseException {
Path upsertFolder = new Path(basePath, "testUpsertSrc");
List<GenericRecord> upsertData = importer.createUpsertRecords(upsertFolder);
// first insert records
HDFSParquetImporter.Config cfg = importer.getHDFSParquetImporterConfig(sourcePath.toString(), tablePath,
tableName, HoodieTableType.COPY_ON_WRITE.name(), "_row_key", "timestamp", 1, schemaFile);
HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
dataImporter.dataImport(jsc, 0);
// Load meta data
new TableCommand().connect(targetPath.toString(), TimelineLayoutVersion.VERSION_1, false, 2000, 300000, 7);
metaClient = HoodieCLI.getTableMetaClient();
// check if insert instant exist
assertEquals(1, metaClient.getActiveTimeline().getCommitsTimeline().countInstants(), "Should only 1 commit.");
String command = String.format("hdfsparquetimport --srcPath %s --targetPath %s --tableName %s "
+ "--tableType %s --rowKeyField %s" + " --partitionPathField %s --parallelism %s "
+ "--schemaFilePath %s --format %s --sparkMemory %s --retry %s --sparkMaster %s --upsert %s",
upsertFolder.toString(), targetPath.toString(), tableName, HoodieTableType.COPY_ON_WRITE.name(),
"_row_key", "timestamp", "1", schemaFile, "parquet", "2G", "1", "local", "true");
CommandResult cr = getShell().executeCommand(command);
assertAll("Command run success",
() -> assertTrue(cr.isSuccess()),
() -> assertEquals("Table imported to hoodie format", cr.getResult().toString()));
// reload meta client
metaClient = HoodieTableMetaClient.reload(metaClient);
assertEquals(2, metaClient.getActiveTimeline().getCommitsTimeline().countInstants(), "Should have 2 commit.");
// construct result, remove top 10 and add upsert data.
List<GenericRecord> expectData = insertData.subList(11, 96);
expectData.addAll(upsertData);
verifyResultData(expectData);
}
/**
* Method to verify result is equals to expect.
*/
private void verifyResultData(List<GenericRecord> expectData) {
Dataset<Row> ds = HoodieClientTestUtils.read(jsc, tablePath, sqlContext, fs, tablePath + "/*/*/*/*");
List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
List<HoodieTripModel> result = readData.stream().map(row ->
new HoodieTripModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
row.getDouble(5), row.getDouble(6), row.getDouble(7)))
.collect(Collectors.toList());
List<HoodieTripModel> expected = expectData.stream().map(g ->
new HoodieTripModel(Double.parseDouble(g.get("timestamp").toString()),
g.get("_row_key").toString(),
g.get("rider").toString(),
g.get("driver").toString(),
Double.parseDouble(g.get("begin_lat").toString()),
Double.parseDouble(g.get("begin_lon").toString()),
Double.parseDouble(g.get("end_lat").toString()),
Double.parseDouble(g.get("end_lon").toString())))
.collect(Collectors.toList());
assertAll("Result list equals",
() -> assertEquals(expected.size(), result.size()),
() -> assertTrue(result.containsAll(expected) && expected.containsAll(result)));
}
}