1
0

[HUDI-3795] Fix hudi-examples checkstyle and maven enforcer error (#5221)

Co-authored-by: Raymond Xu <2701446+xushiyan@users.noreply.github.com>
This commit is contained in:
ForwardXu
2022-04-05 16:10:11 +08:00
committed by GitHub
parent 3449e86989
commit 325b3d610a
7 changed files with 111 additions and 97 deletions

View File

@@ -21,37 +21,45 @@ jobs:
- scalaProfile: "scala-2.11"
sparkProfile: "spark2.4"
sparkVersion: "2.4.4"
flinkProfile: "flink1.13"
# Spark 2.4.4, scala 2.12
- scalaProfile: "scala-2.12"
sparkProfile: "spark2.4"
sparkVersion: "2.4.4"
flinkProfile: "flink1.14"
# Spark 3.1.x
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.1"
sparkVersion: "3.1.0"
flinkProfile: "flink1.13"
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.1"
sparkVersion: "3.1.1"
flinkProfile: "flink1.13"
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.1"
sparkVersion: "3.1.2"
flinkProfile: "flink1.14"
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.1"
sparkVersion: "3.1.3"
flinkProfile: "flink1.14"
# Spark 3.2.x
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.2"
sparkVersion: "3.2.0"
flinkProfile: "flink1.13"
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.2"
sparkVersion: "3.2.1"
flinkProfile: "flink1.14"
steps:
- uses: actions/checkout@v2
@@ -66,13 +74,15 @@ jobs:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_VERSION: ${{ matrix.sparkVersion }}
FLINK_PROFILE: ${{ matrix.flinkProfile }}
run:
mvn -T 2.5C clean install -P "$SCALA_PROFILE,$SPARK_PROFILE" -Dspark.version="$SPARK_VERSION" -DskipTests=true -Dmaven.javadoc.skip=true -B -V
mvn -T 2.5C clean install -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -D"$FLINK_PROFILE" -Dspark.version="$SPARK_VERSION" -DskipTests=true -Dmaven.javadoc.skip=true -B -V
- name: Quickstart Test
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_VERSION: ${{ matrix.sparkVersion }}
FLINK_PROFILE: ${{ matrix.flinkProfile }}
if: ${{ !startsWith(env.SPARK_VERSION, '3.2.') }} # skip test spark 3.2 before hadoop upgrade to 3.x
run:
mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -Dspark.version="$SPARK_VERSION" -DfailIfNoTests=false -pl hudi-examples/hudi-examples-flink,hudi-examples/hudi-examples-java,hudi-examples/hudi-examples-spark
mvn test -P "unit-tests" -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -D"$FLINK_PROFILE" -Dspark.version="$SPARK_VERSION" -DfailIfNoTests=false -pl hudi-examples/hudi-examples-flink,hudi-examples/hudi-examples-java,hudi-examples/hudi-examples-spark

View File

@@ -62,7 +62,7 @@ public class HoodieExampleDataGenerator<T extends HoodieRecordPayload<T>> {
+ "{\"name\":\"fare\",\"type\": \"double\"}]}";
public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
private static final Random rand = new Random(46474747);
private static final Random RAND = new Random(46474747);
private final Map<Integer, KeyPartition> existingKeys;
private final String[] partitionPaths;
@@ -97,11 +97,11 @@ public class HoodieExampleDataGenerator<T extends HoodieRecordPayload<T>> {
rec.put("ts", timestamp);
rec.put("rider", riderName);
rec.put("driver", driverName);
rec.put("begin_lat", rand.nextDouble());
rec.put("begin_lon", rand.nextDouble());
rec.put("end_lat", rand.nextDouble());
rec.put("end_lon", rand.nextDouble());
rec.put("fare", rand.nextDouble() * 100);
rec.put("begin_lat", RAND.nextDouble());
rec.put("begin_lon", RAND.nextDouble());
rec.put("end_lat", RAND.nextDouble());
rec.put("end_lon", RAND.nextDouble());
rec.put("fare", RAND.nextDouble() * 100);
return rec;
}
@@ -119,7 +119,7 @@ public class HoodieExampleDataGenerator<T extends HoodieRecordPayload<T>> {
int currSize = getNumExistingKeys();
return IntStream.range(0, n).boxed().map(i -> {
String partitionPath = partitionPaths[rand.nextInt(partitionPaths.length)];
String partitionPath = partitionPaths[RAND.nextInt(partitionPaths.length)];
HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath);
KeyPartition kp = new KeyPartition();
kp.key = key;
@@ -141,7 +141,7 @@ public class HoodieExampleDataGenerator<T extends HoodieRecordPayload<T>> {
public List<HoodieRecord<T>> generateUpdates(String commitTime, Integer n) {
List<HoodieRecord<T>> updates = new ArrayList<>();
for (int i = 0; i < n; i++) {
KeyPartition kp = existingKeys.get(rand.nextInt(numExistingKeys - 1));
KeyPartition kp = existingKeys.get(RAND.nextInt(numExistingKeys - 1));
HoodieRecord<T> record = generateUpdateRecord(kp.key, commitTime);
updates.add(record);
}

View File

@@ -171,13 +171,13 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
<artifactId>${flink.table.runtime.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<artifactId>${flink.table.planner.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
@@ -320,7 +320,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<artifactId>${flink.runtime.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
@@ -334,7 +334,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
<artifactId>${flink.table.runtime.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>

View File

@@ -18,10 +18,6 @@
package org.apache.hudi.examples.quickstart;
import static org.apache.hudi.examples.quickstart.TestQuickstartData.assertRowsEquals;
import java.io.File;
import java.util.List;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.apache.hudi.common.model.HoodieTableType;
@@ -30,6 +26,11 @@ import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.io.File;
import java.util.List;
import static org.apache.hudi.examples.quickstart.TestQuickstartData.assertRowsEquals;
/**
* IT cases for Hoodie table source and sink.
*/

View File

@@ -18,23 +18,6 @@
package org.apache.hudi.examples.quickstart;
import static junit.framework.TestCase.assertEquals;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.table.data.RowData;
@@ -60,6 +43,24 @@ import org.apache.parquet.Strings;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static junit.framework.TestCase.assertEquals;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
/**
* Data set for testing, also some utilities to check the results.
*/

View File

@@ -18,11 +18,6 @@
package org.apache.hudi.examples.quickstart;
import static org.apache.hudi.config.HoodieWriteConfig.TBL_NAME;
import static org.apache.spark.sql.SaveMode.Append;
import static org.apache.spark.sql.SaveMode.Overwrite;
import java.util.List;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hudi.QuickstartUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
@@ -37,6 +32,12 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.util.List;
import static org.apache.hudi.config.HoodieWriteConfig.TBL_NAME;
import static org.apache.spark.sql.SaveMode.Append;
import static org.apache.spark.sql.SaveMode.Overwrite;
public final class HoodieSparkQuickstart {
private HoodieSparkQuickstart() {
@@ -76,14 +77,14 @@ public final class HoodieSparkQuickstart {
String commitTime = Long.toString(System.currentTimeMillis());
List<String> inserts = dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20));
Dataset<Row> df = spark.read().json(jsc.parallelize(inserts, 1));
df.write().format("org.apache.hudi").
options(QuickstartUtils.getQuickstartWriteConfigs()).
option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts").
option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid").
option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath").
option(TBL_NAME.key(), tableName).
mode(Overwrite).
save(tablePath);
df.write().format("org.apache.hudi")
.options(QuickstartUtils.getQuickstartWriteConfigs())
.option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts")
.option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid")
.option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath")
.option(TBL_NAME.key(), tableName)
.mode(Overwrite)
.save(tablePath);
}
/**
@@ -91,10 +92,10 @@ public final class HoodieSparkQuickstart {
*/
public static void queryData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName,
HoodieExampleDataGenerator<HoodieAvroPayload> dataGen) {
Dataset<Row> roViewDF = spark.
read().
format("org.apache.hudi").
load(tablePath + "/*/*/*/*");
Dataset<Row> roViewDF = spark
.read()
.format("org.apache.hudi")
.load(tablePath + "/*/*/*/*");
roViewDF.createOrReplaceTempView("hudi_ro_table");
@@ -125,14 +126,14 @@ public final class HoodieSparkQuickstart {
String commitTime = Long.toString(System.currentTimeMillis());
List<String> updates = dataGen.convertToStringList(dataGen.generateUpdates(commitTime, 10));
Dataset<Row> df = spark.read().json(jsc.parallelize(updates, 1));
df.write().format("org.apache.hudi").
options(QuickstartUtils.getQuickstartWriteConfigs()).
option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts").
option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid").
option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath").
option(TBL_NAME.key(), tableName).
mode(Append).
save(tablePath);
df.write().format("org.apache.hudi")
.options(QuickstartUtils.getQuickstartWriteConfigs())
.option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts")
.option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid")
.option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath")
.option(TBL_NAME.key(), tableName)
.mode(Append)
.save(tablePath);
}
/**
@@ -144,15 +145,15 @@ public final class HoodieSparkQuickstart {
roViewDF.createOrReplaceTempView("hudi_ro_table");
Dataset<Row> df = spark.sql("select uuid, partitionpath, ts from hudi_ro_table limit 2");
df.write().format("org.apache.hudi").
options(QuickstartUtils.getQuickstartWriteConfigs()).
option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts").
option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "uuid").
option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath").
option(TBL_NAME.key(), tableName).
option("hoodie.datasource.write.operation", WriteOperationType.DELETE.value()).
mode(Append).
save(tablePath);
df.write().format("org.apache.hudi")
.options(QuickstartUtils.getQuickstartWriteConfigs())
.option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts")
.option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "uuid")
.option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath")
.option(TBL_NAME.key(), tableName)
.option("hoodie.datasource.write.operation", WriteOperationType.DELETE.value())
.mode(Append)
.save(tablePath);
}
/**
@@ -160,17 +161,17 @@ public final class HoodieSparkQuickstart {
*/
public static void deleteByPartition(SparkSession spark, String tablePath, String tableName) {
Dataset<Row> df = spark.emptyDataFrame();
df.write().format("org.apache.hudi").
options(QuickstartUtils.getQuickstartWriteConfigs()).
option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts").
option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid").
option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath").
option(TBL_NAME.key(), tableName).
option("hoodie.datasource.write.operation", WriteOperationType.DELETE.value()).
option("hoodie.datasource.write.partitions.to.delete",
ArrayUtils.toString(HoodieExampleDataGenerator.DEFAULT_PARTITION_PATHS, ",")).
mode(Append).
save(tablePath);
df.write().format("org.apache.hudi")
.options(QuickstartUtils.getQuickstartWriteConfigs())
.option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts")
.option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid")
.option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath")
.option(TBL_NAME.key(), tableName)
.option("hoodie.datasource.write.operation", WriteOperationType.DELETE.value())
.option("hoodie.datasource.write.partitions.to.delete",
String.join(", ", HoodieExampleDataGenerator.DEFAULT_PARTITION_PATHS))
.mode(Append)
.save(tablePath);
}
/**
@@ -188,12 +189,12 @@ public final class HoodieSparkQuickstart {
String beginTime = commits.get(commits.size() - 2); // commit time we are interested in
// incrementally query data
Dataset<Row> incViewDF = spark.
read().
format("org.apache.hudi").
option("hoodie.datasource.query.type", "incremental").
option("hoodie.datasource.read.begin.instanttime", beginTime).
load(tablePath);
Dataset<Row> incViewDF = spark
.read()
.format("org.apache.hudi")
.option("hoodie.datasource.query.type", "incremental")
.option("hoodie.datasource.read.begin.instanttime", beginTime)
.load(tablePath);
incViewDF.createOrReplaceTempView("hudi_incr_table");
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0")
@@ -215,11 +216,11 @@ public final class HoodieSparkQuickstart {
String endTime = commits.get(commits.size() - 2); // commit time we are interested in
//incrementally query data
Dataset<Row> incViewDF = spark.read().format("org.apache.hudi").
option("hoodie.datasource.query.type", "incremental").
option("hoodie.datasource.read.begin.instanttime", beginTime).
option("hoodie.datasource.read.end.instanttime", endTime).
load(tablePath);
Dataset<Row> incViewDF = spark.read().format("org.apache.hudi")
.option("hoodie.datasource.query.type", "incremental")
.option("hoodie.datasource.read.begin.instanttime", beginTime)
.option("hoodie.datasource.read.end.instanttime", endTime)
.load(tablePath);
incViewDF.createOrReplaceTempView("hudi_incr_table");
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0")

View File

@@ -18,8 +18,6 @@
package org.apache.hudi.examples.quickstart;
import java.io.File;
import java.nio.file.Paths;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -35,6 +33,9 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.nio.file.Paths;
public class TestHoodieSparkQuickstart implements SparkProvider {
protected static transient HoodieSparkEngineContext context;
@@ -49,7 +50,7 @@ public class TestHoodieSparkQuickstart implements SparkProvider {
@TempDir
protected java.nio.file.Path tempDir;
private static final HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new HoodieExampleDataGenerator<>();
private static final HoodieExampleDataGenerator<HoodieAvroPayload> DATA_GEN = new HoodieExampleDataGenerator<>();
@Override
public SparkSession spark() {
@@ -99,10 +100,10 @@ public class TestHoodieSparkQuickstart implements SparkProvider {
String tablePath = tablePath(tableName);
try {
HoodieSparkQuickstart.insertData(spark, jsc, tablePath, tableName, dataGen);
HoodieSparkQuickstart.updateData(spark, jsc, tablePath, tableName, dataGen);
HoodieSparkQuickstart.insertData(spark, jsc, tablePath, tableName, DATA_GEN);
HoodieSparkQuickstart.updateData(spark, jsc, tablePath, tableName, DATA_GEN);
HoodieSparkQuickstart.queryData(spark, jsc, tablePath, tableName, dataGen);
HoodieSparkQuickstart.queryData(spark, jsc, tablePath, tableName, DATA_GEN);
HoodieSparkQuickstart.incrementalQuery(spark, tablePath, tableName);
HoodieSparkQuickstart.pointInTimeQuery(spark, tablePath, tableName);