[HUDI-789]Adjust logic of upsert in HDFSParquetImporter (#1511)
This commit is contained in:
@@ -100,6 +100,10 @@ public class HDFSParquetImporter implements Serializable {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean isUpsert() {
|
||||||
|
return "upsert".equals(cfg.command.toLowerCase());
|
||||||
|
}
|
||||||
|
|
||||||
public int dataImport(JavaSparkContext jsc, int retry) {
|
public int dataImport(JavaSparkContext jsc, int retry) {
|
||||||
this.fs = FSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration());
|
this.fs = FSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration());
|
||||||
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs)
|
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs)
|
||||||
@@ -108,7 +112,7 @@ public class HDFSParquetImporter implements Serializable {
|
|||||||
int ret = -1;
|
int ret = -1;
|
||||||
try {
|
try {
|
||||||
// Verify that targetPath is not present.
|
// Verify that targetPath is not present.
|
||||||
if (fs.exists(new Path(cfg.targetPath))) {
|
if (fs.exists(new Path(cfg.targetPath)) && !isUpsert()) {
|
||||||
throw new HoodieIOException(String.format("Make sure %s is not present.", cfg.targetPath));
|
throw new HoodieIOException(String.format("Make sure %s is not present.", cfg.targetPath));
|
||||||
}
|
}
|
||||||
do {
|
do {
|
||||||
@@ -122,20 +126,22 @@ public class HDFSParquetImporter implements Serializable {
|
|||||||
|
|
||||||
protected int dataImport(JavaSparkContext jsc) throws IOException {
|
protected int dataImport(JavaSparkContext jsc) throws IOException {
|
||||||
try {
|
try {
|
||||||
if (fs.exists(new Path(cfg.targetPath))) {
|
if (fs.exists(new Path(cfg.targetPath)) && !isUpsert()) {
|
||||||
// cleanup target directory.
|
// cleanup target directory.
|
||||||
fs.delete(new Path(cfg.targetPath), true);
|
fs.delete(new Path(cfg.targetPath), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!fs.exists(new Path(cfg.targetPath))) {
|
||||||
|
// Initialize target hoodie table.
|
||||||
|
Properties properties = new Properties();
|
||||||
|
properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, cfg.tableName);
|
||||||
|
properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, cfg.tableType);
|
||||||
|
HoodieTableMetaClient.initTableAndGetMetaClient(jsc.hadoopConfiguration(), cfg.targetPath, properties);
|
||||||
|
}
|
||||||
|
|
||||||
// Get schema.
|
// Get schema.
|
||||||
String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
|
String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
|
||||||
|
|
||||||
// Initialize target hoodie table.
|
|
||||||
Properties properties = new Properties();
|
|
||||||
properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, cfg.tableName);
|
|
||||||
properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, cfg.tableType);
|
|
||||||
HoodieTableMetaClient.initTableAndGetMetaClient(jsc.hadoopConfiguration(), cfg.targetPath, properties);
|
|
||||||
|
|
||||||
HoodieWriteClient client =
|
HoodieWriteClient client =
|
||||||
UtilHelpers.createHoodieClient(jsc, cfg.targetPath, schemaStr, cfg.parallelism, Option.empty(), props);
|
UtilHelpers.createHoodieClient(jsc, cfg.targetPath, schemaStr, cfg.parallelism, Option.empty(), props);
|
||||||
|
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities;
|
|||||||
|
|
||||||
import org.apache.hudi.client.HoodieReadClient;
|
import org.apache.hudi.client.HoodieReadClient;
|
||||||
import org.apache.hudi.client.HoodieWriteClient;
|
import org.apache.hudi.client.HoodieWriteClient;
|
||||||
|
import org.apache.hudi.common.HoodieClientTestUtils;
|
||||||
import org.apache.hudi.common.HoodieTestDataGenerator;
|
import org.apache.hudi.common.HoodieTestDataGenerator;
|
||||||
import org.apache.hudi.common.minicluster.HdfsTestService;
|
import org.apache.hudi.common.minicluster.HdfsTestService;
|
||||||
import org.apache.hudi.common.model.HoodieTestUtils;
|
import org.apache.hudi.common.model.HoodieTestUtils;
|
||||||
@@ -37,8 +38,13 @@ import org.apache.parquet.avro.AvroParquetWriter;
|
|||||||
import org.apache.parquet.hadoop.ParquetWriter;
|
import org.apache.parquet.hadoop.ParquetWriter;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Row;
|
||||||
import org.apache.spark.sql.SQLContext;
|
import org.apache.spark.sql.SQLContext;
|
||||||
|
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
@@ -50,8 +56,10 @@ import java.util.HashMap;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
@@ -75,34 +83,43 @@ public class TestHDFSParquetImporter implements Serializable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void cleanupClass() throws Exception {
|
public static void cleanupClass() {
|
||||||
if (hdfsTestService != null) {
|
if (hdfsTestService != null) {
|
||||||
hdfsTestService.stop();
|
hdfsTestService.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String basePath;
|
||||||
|
private transient Path hoodieFolder;
|
||||||
|
private transient Path srcFolder;
|
||||||
|
private transient List<GenericRecord> insertData;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void init() throws IOException, ParseException {
|
||||||
|
basePath = (new Path(dfsBasePath, Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
|
||||||
|
|
||||||
|
// Hoodie root folder.
|
||||||
|
hoodieFolder = new Path(basePath, "testTarget");
|
||||||
|
|
||||||
|
// Create generic records.
|
||||||
|
srcFolder = new Path(basePath, "testSrc");
|
||||||
|
insertData = createInsertRecords(srcFolder);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void clean() throws IOException {
|
||||||
|
dfs.delete(new Path(basePath), true);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test successful data import with retries.
|
* Test successful data import with retries.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testImportWithRetries() throws Exception {
|
public void testImportWithRetries() throws Exception {
|
||||||
JavaSparkContext jsc = null;
|
try (JavaSparkContext jsc = getJavaSparkContext()) {
|
||||||
try {
|
|
||||||
jsc = getJavaSparkContext();
|
|
||||||
|
|
||||||
// Test root folder.
|
|
||||||
String basePath = (new Path(dfsBasePath, Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
|
|
||||||
|
|
||||||
// Hoodie root folder
|
|
||||||
Path hoodieFolder = new Path(basePath, "testTarget");
|
|
||||||
|
|
||||||
// Create schema file.
|
// Create schema file.
|
||||||
String schemaFile = new Path(basePath, "file.schema").toString();
|
String schemaFile = new Path(basePath, "file.schema").toString();
|
||||||
|
|
||||||
// Create generic records.
|
|
||||||
Path srcFolder = new Path(basePath, "testSrc");
|
|
||||||
createRecords(srcFolder);
|
|
||||||
|
|
||||||
HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
|
HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
|
||||||
"testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
|
"testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
|
||||||
AtomicInteger retry = new AtomicInteger(3);
|
AtomicInteger retry = new AtomicInteger(3);
|
||||||
@@ -150,14 +167,104 @@ public class TestHDFSParquetImporter implements Serializable {
|
|||||||
for (Entry<String, Long> e : recordCounts.entrySet()) {
|
for (Entry<String, Long> e : recordCounts.entrySet()) {
|
||||||
assertEquals("missing records", 24, e.getValue().longValue());
|
assertEquals("missing records", 24, e.getValue().longValue());
|
||||||
}
|
}
|
||||||
} finally {
|
|
||||||
if (jsc != null) {
|
|
||||||
jsc.stop();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createRecords(Path srcFolder) throws ParseException, IOException {
|
private void insert(JavaSparkContext jsc) throws IOException {
|
||||||
|
// Create schema file.
|
||||||
|
String schemaFile = new Path(basePath, "file.schema").toString();
|
||||||
|
createSchemaFile(schemaFile);
|
||||||
|
|
||||||
|
HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
|
||||||
|
"testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
|
||||||
|
HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
|
||||||
|
|
||||||
|
dataImporter.dataImport(jsc, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test successful insert and verify data consistency.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testImportWithInsert() throws IOException, ParseException {
|
||||||
|
try (JavaSparkContext jsc = getJavaSparkContext()) {
|
||||||
|
insert(jsc);
|
||||||
|
SQLContext sqlContext = new SQLContext(jsc);
|
||||||
|
Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
|
||||||
|
|
||||||
|
List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
|
||||||
|
List<HoodieTripModel> result = readData.stream().map(row ->
|
||||||
|
new HoodieTripModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
|
||||||
|
row.getDouble(5), row.getDouble(6), row.getDouble(7)))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
List<HoodieTripModel> expected = insertData.stream().map(g ->
|
||||||
|
new HoodieTripModel(Double.parseDouble(g.get("timestamp").toString()),
|
||||||
|
g.get("_row_key").toString(),
|
||||||
|
g.get("rider").toString(),
|
||||||
|
g.get("driver").toString(),
|
||||||
|
Double.parseDouble(g.get("begin_lat").toString()),
|
||||||
|
Double.parseDouble(g.get("begin_lon").toString()),
|
||||||
|
Double.parseDouble(g.get("end_lat").toString()),
|
||||||
|
Double.parseDouble(g.get("end_lon").toString())))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test upsert data and verify data consistency.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testImportWithUpsert() throws IOException, ParseException {
|
||||||
|
try (JavaSparkContext jsc = getJavaSparkContext()) {
|
||||||
|
insert(jsc);
|
||||||
|
|
||||||
|
// Create schema file.
|
||||||
|
String schemaFile = new Path(basePath, "file.schema").toString();
|
||||||
|
|
||||||
|
Path upsertFolder = new Path(basePath, "testUpsertSrc");
|
||||||
|
List<GenericRecord> upsertData = createUpsertRecords(upsertFolder);
|
||||||
|
|
||||||
|
HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(upsertFolder.toString(), hoodieFolder.toString(),
|
||||||
|
"testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
|
||||||
|
cfg.command = "upsert";
|
||||||
|
HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
|
||||||
|
|
||||||
|
dataImporter.dataImport(jsc, 0);
|
||||||
|
|
||||||
|
// construct result, remove top 10 and add upsert data.
|
||||||
|
List<GenericRecord> expectData = insertData.subList(11, 96);
|
||||||
|
expectData.addAll(upsertData);
|
||||||
|
|
||||||
|
// read latest data
|
||||||
|
SQLContext sqlContext = new SQLContext(jsc);
|
||||||
|
Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
|
||||||
|
|
||||||
|
List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
|
||||||
|
List<HoodieTripModel> result = readData.stream().map(row ->
|
||||||
|
new HoodieTripModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
|
||||||
|
row.getDouble(5), row.getDouble(6), row.getDouble(7)))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
// get expected result.
|
||||||
|
List<HoodieTripModel> expected = expectData.stream().map(g ->
|
||||||
|
new HoodieTripModel(Double.parseDouble(g.get("timestamp").toString()),
|
||||||
|
g.get("_row_key").toString(),
|
||||||
|
g.get("rider").toString(),
|
||||||
|
g.get("driver").toString(),
|
||||||
|
Double.parseDouble(g.get("begin_lat").toString()),
|
||||||
|
Double.parseDouble(g.get("begin_lon").toString()),
|
||||||
|
Double.parseDouble(g.get("end_lat").toString()),
|
||||||
|
Double.parseDouble(g.get("end_lon").toString())))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<GenericRecord> createInsertRecords(Path srcFolder) throws ParseException, IOException {
|
||||||
Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
|
Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
|
||||||
long startTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000;
|
long startTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000;
|
||||||
List<GenericRecord> records = new ArrayList<GenericRecord>();
|
List<GenericRecord> records = new ArrayList<GenericRecord>();
|
||||||
@@ -165,12 +272,36 @@ public class TestHDFSParquetImporter implements Serializable {
|
|||||||
records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "rider-" + recordNum,
|
records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "rider-" + recordNum,
|
||||||
"driver-" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
|
"driver-" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
|
||||||
}
|
}
|
||||||
ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(srcFile)
|
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(srcFile)
|
||||||
.withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build();
|
.withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build()) {
|
||||||
for (GenericRecord record : records) {
|
for (GenericRecord record : records) {
|
||||||
writer.write(record);
|
writer.write(record);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
writer.close();
|
return records;
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<GenericRecord> createUpsertRecords(Path srcFolder) throws ParseException, IOException {
|
||||||
|
Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
|
||||||
|
long startTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000;
|
||||||
|
List<GenericRecord> records = new ArrayList<GenericRecord>();
|
||||||
|
// 10 for update
|
||||||
|
for (long recordNum = 0; recordNum < 11; recordNum++) {
|
||||||
|
records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "rider-upsert-" + recordNum,
|
||||||
|
"driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
|
||||||
|
}
|
||||||
|
// 4 for insert
|
||||||
|
for (long recordNum = 96; recordNum < 100; recordNum++) {
|
||||||
|
records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "rider-upsert-" + recordNum,
|
||||||
|
"driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
|
||||||
|
}
|
||||||
|
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(srcFile)
|
||||||
|
.withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build()) {
|
||||||
|
for (GenericRecord record : records) {
|
||||||
|
writer.write(record);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return records;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createSchemaFile(String schemaFile) throws IOException {
|
private void createSchemaFile(String schemaFile) throws IOException {
|
||||||
@@ -184,12 +315,7 @@ public class TestHDFSParquetImporter implements Serializable {
|
|||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testSchemaFile() throws Exception {
|
public void testSchemaFile() throws Exception {
|
||||||
JavaSparkContext jsc = null;
|
try (JavaSparkContext jsc = getJavaSparkContext()) {
|
||||||
try {
|
|
||||||
jsc = getJavaSparkContext();
|
|
||||||
|
|
||||||
// Test root folder.
|
|
||||||
String basePath = (new Path(dfsBasePath, Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
|
|
||||||
// Hoodie root folder
|
// Hoodie root folder
|
||||||
Path hoodieFolder = new Path(basePath, "testTarget");
|
Path hoodieFolder = new Path(basePath, "testTarget");
|
||||||
Path srcFolder = new Path(basePath.toString(), "srcTest");
|
Path srcFolder = new Path(basePath.toString(), "srcTest");
|
||||||
@@ -204,10 +330,6 @@ public class TestHDFSParquetImporter implements Serializable {
|
|||||||
// Should fail - return : -1.
|
// Should fail - return : -1.
|
||||||
assertEquals(-1, dataImporter.dataImport(jsc, 0));
|
assertEquals(-1, dataImporter.dataImport(jsc, 0));
|
||||||
|
|
||||||
} finally {
|
|
||||||
if (jsc != null) {
|
|
||||||
jsc.stop();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -216,19 +338,7 @@ public class TestHDFSParquetImporter implements Serializable {
|
|||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testRowAndPartitionKey() throws Exception {
|
public void testRowAndPartitionKey() throws Exception {
|
||||||
JavaSparkContext jsc = null;
|
try (JavaSparkContext jsc = getJavaSparkContext()) {
|
||||||
try {
|
|
||||||
jsc = getJavaSparkContext();
|
|
||||||
|
|
||||||
// Test root folder.
|
|
||||||
String basePath = (new Path(dfsBasePath, Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
|
|
||||||
// Hoodie root folder
|
|
||||||
Path hoodieFolder = new Path(basePath, "testTarget");
|
|
||||||
|
|
||||||
// Create generic records.
|
|
||||||
Path srcFolder = new Path(basePath, "testSrc");
|
|
||||||
createRecords(srcFolder);
|
|
||||||
|
|
||||||
// Create schema file.
|
// Create schema file.
|
||||||
Path schemaFile = new Path(basePath.toString(), "missingFile.schema");
|
Path schemaFile = new Path(basePath.toString(), "missingFile.schema");
|
||||||
createSchemaFile(schemaFile.toString());
|
createSchemaFile(schemaFile.toString());
|
||||||
@@ -248,10 +358,6 @@ public class TestHDFSParquetImporter implements Serializable {
|
|||||||
dataImporter = new HDFSParquetImporter(cfg);
|
dataImporter = new HDFSParquetImporter(cfg);
|
||||||
assertEquals(-1, dataImporter.dataImport(jsc, 0));
|
assertEquals(-1, dataImporter.dataImport(jsc, 0));
|
||||||
|
|
||||||
} finally {
|
|
||||||
if (jsc != null) {
|
|
||||||
jsc.stop();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -275,4 +381,49 @@ public class TestHDFSParquetImporter implements Serializable {
|
|||||||
sparkConf = HoodieWriteClient.registerClasses(sparkConf);
|
sparkConf = HoodieWriteClient.registerClasses(sparkConf);
|
||||||
return new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf));
|
return new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class used for compare result and expected.
|
||||||
|
*/
|
||||||
|
public static class HoodieTripModel {
|
||||||
|
double timestamp;
|
||||||
|
String rowKey;
|
||||||
|
String rider;
|
||||||
|
String driver;
|
||||||
|
double beginLat;
|
||||||
|
double beginLon;
|
||||||
|
double endLat;
|
||||||
|
double endLon;
|
||||||
|
|
||||||
|
private HoodieTripModel(double timestamp, String rowKey, String rider, String driver, double beginLat,
|
||||||
|
double beginLon, double endLat, double endLon) {
|
||||||
|
this.timestamp = timestamp;
|
||||||
|
this.rowKey = rowKey;
|
||||||
|
this.rider = rider;
|
||||||
|
this.driver = driver;
|
||||||
|
this.beginLat = beginLat;
|
||||||
|
this.beginLon = beginLon;
|
||||||
|
this.endLat = endLat;
|
||||||
|
this.endLon = endLon;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
HoodieTripModel other = (HoodieTripModel) o;
|
||||||
|
return timestamp == other.timestamp && rowKey.equals(other.rowKey) && rider.equals(other.rider)
|
||||||
|
&& driver.equals(other.driver) && beginLat == other.beginLat && beginLon == other.beginLon
|
||||||
|
&& endLat == other.endLat && endLon == other.endLon;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(timestamp, rowKey, rider, driver, beginLat, beginLon, endLat, endLon);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user