1
0

[HUDI-995] Use Transformations, Assertions and SchemaTestUtil (#1884)

- Consolidate transform functions for tests in Transformations.java
- Consolidate assertion functions for tests in Assertions.java
- Make use of SchemaTestUtil for loading schema from resource
This commit is contained in:
Raymond Xu
2020-08-01 05:57:18 -07:00
committed by GitHub
parent e79fbc07fe
commit 10e4268792
23 changed files with 302 additions and 277 deletions

View File

@@ -28,8 +28,6 @@ import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.hive.NonPartitionedExtractor;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.testutils.DataSourceTestUtils;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
@@ -45,6 +43,10 @@ import org.apache.spark.sql.SparkSession;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings;
import static org.apache.hudi.common.testutils.Transformations.randomSelectAsHoodieKeys;
/**
* Sample program that writes & reads hoodie tables via the Spark datasource.
@@ -123,7 +125,7 @@ public class HoodieJavaApp {
*/
// Generate some input..
List<HoodieRecord> recordsSoFar = new ArrayList<>(dataGen.generateInserts("001"/* ignore */, 100));
List<String> records1 = DataSourceTestUtils.convertToStringList(recordsSoFar);
List<String> records1 = recordsToStrings(recordsSoFar);
Dataset<Row> inputDF1 = spark.read().json(jssc.parallelize(records1, 2));
// Save as hoodie dataset (copy on write)
@@ -163,7 +165,7 @@ public class HoodieJavaApp {
*/
List<HoodieRecord> recordsToBeUpdated = dataGen.generateUpdates("002"/* ignore */, 100);
recordsSoFar.addAll(recordsToBeUpdated);
List<String> records2 = DataSourceTestUtils.convertToStringList(recordsToBeUpdated);
List<String> records2 = recordsToStrings(recordsToBeUpdated);
Dataset<Row> inputDF2 = spark.read().json(jssc.parallelize(records2, 2));
writer = inputDF2.write().format("org.apache.hudi").option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
@@ -185,9 +187,9 @@ public class HoodieJavaApp {
/**
* Commit that Deletes some records
*/
List<String> deletes = DataSourceTestUtils.convertKeysToStringList(
HoodieClientTestUtils
.getKeysToDelete(HoodieClientTestUtils.getHoodieKeys(recordsSoFar), 20));
List<String> deletes = randomSelectAsHoodieKeys(recordsSoFar, 20).stream()
.map(hr -> "{\"_row_key\":\"" + hr.getRecordKey() + "\",\"partition\":\"" + hr.getPartitionPath() + "\"}")
.collect(Collectors.toList());
Dataset<Row> inputDF3 = spark.read().json(jssc.parallelize(deletes, 2));
writer = inputDF3.write().format("org.apache.hudi").option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")

View File

@@ -16,21 +16,21 @@
* limitations under the License.
*/
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.hive.NonPartitionedExtractor;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.testutils.DataSourceTestUtils;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
@@ -43,6 +43,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings;
public class HoodieJavaGenerateApp {
@Parameter(names = {"--table-path", "-p"}, description = "Path for Hoodie sample table")
private String tablePath = "file:///tmp/hoodie/sample-table";
@@ -152,7 +154,7 @@ public class HoodieJavaGenerateApp {
// Generate some input..
String instantTime = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> recordsSoFar = new ArrayList<>(dataGen.generateInserts(instantTime/* ignore */, 100));
List<String> records1 = DataSourceTestUtils.convertToStringList(recordsSoFar);
List<String> records1 = recordsToStrings(recordsSoFar);
Dataset<Row> inputDF1 = spark.read().json(jssc.parallelize(records1, 2));
// Save as hoodie dataset (copy on write)

View File

@@ -23,7 +23,6 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.testutils.DataSourceTestUtils;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
@@ -45,6 +44,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings;
/**
* Sample program that writes & reads hoodie tables via the Spark datasource streaming.
*/
@@ -128,10 +129,10 @@ public class HoodieJavaStreamingApp {
// Generator of some records to be loaded in.
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
List<String> records1 = DataSourceTestUtils.convertToStringList(dataGen.generateInserts("001", 100));
List<String> records1 = recordsToStrings(dataGen.generateInserts("001", 100));
Dataset<Row> inputDF1 = spark.read().json(jssc.parallelize(records1, 2));
List<String> records2 = DataSourceTestUtils.convertToStringList(dataGen.generateUpdates("002", 100));
List<String> records2 = recordsToStrings(dataGen.generateUpdates("002", 100));
Dataset<Row> inputDF2 = spark.read().json(jssc.parallelize(records2, 2));

View File

@@ -20,10 +20,11 @@ package org.apache.hudi;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.testutils.DataSourceTestUtils;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
@@ -127,23 +128,36 @@ public class TestDataSourceUtils {
@Test
public void testDoWriteOperationWithUserDefinedBulkInsertPartitioner() throws HoodieException {
setAndVerifyHoodieWriteClientWith(DataSourceTestUtils.NoOpBulkInsertPartitioner.class.getName());
setAndVerifyHoodieWriteClientWith(NoOpBulkInsertPartitioner.class.getName());
DataSourceUtils.doWriteOperation(hoodieWriteClient, hoodieRecords, "test-time",
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL());
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL());
verify(hoodieWriteClient, times(1)).bulkInsert(any(hoodieRecords.getClass()), anyString(),
optionCaptor.capture());
assertThat(optionCaptor.getValue().get(), is(instanceOf(DataSourceTestUtils.NoOpBulkInsertPartitioner.class)));
optionCaptor.capture());
assertThat(optionCaptor.getValue().get(), is(instanceOf(NoOpBulkInsertPartitioner.class)));
}
private void setAndVerifyHoodieWriteClientWith(final String partitionerClassName) {
config = HoodieWriteConfig.newBuilder().withPath(config.getBasePath())
.withUserDefinedBulkInsertPartitionerClass(partitionerClassName)
.build();
.withUserDefinedBulkInsertPartitionerClass(partitionerClassName)
.build();
when(hoodieWriteClient.getConfig()).thenReturn(config);
assertThat(config.getUserDefinedBulkInsertPartitionerClass(), is(equalTo(partitionerClassName)));
}
public static class NoOpBulkInsertPartitioner<T extends HoodieRecordPayload>
implements BulkInsertPartitioner<T> {
@Override
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) {
return records;
}
@Override
public boolean arePartitionRecordsSorted() {
return false;
}
}
}

View File

@@ -1,76 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.testutils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.spark.api.java.JavaRDD;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
/**
* Test utils for data source tests.
*/
public class DataSourceTestUtils {
public static Option<String> convertToString(HoodieRecord record) {
try {
String str = ((RawTripTestPayload) record.getData()).getJsonData();
str = "{" + str.substring(str.indexOf("\"timestamp\":"));
// Remove the last } bracket
str = str.substring(0, str.length() - 1);
return Option.of(str + ", \"partition\": \"" + record.getPartitionPath() + "\"}");
} catch (IOException e) {
return Option.empty();
}
}
public static List<String> convertToStringList(List<HoodieRecord> records) {
return records.stream().map(DataSourceTestUtils::convertToString).filter(Option::isPresent).map(Option::get)
.collect(Collectors.toList());
}
public static List<String> convertKeysToStringList(List<HoodieKey> keys) {
return keys.stream()
.map(hr -> "{\"_row_key\":\"" + hr.getRecordKey() + "\",\"partition\":\"" + hr.getPartitionPath() + "\"}")
.collect(Collectors.toList());
}
public static class NoOpBulkInsertPartitioner<T extends HoodieRecordPayload>
implements BulkInsertPartitioner<T> {
@Override
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) {
return records;
}
@Override
public boolean arePartitionRecordsSorted() {
return false;
}
}
}