diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml index c8e226f47..74f3a6f2d 100644 --- a/.github/workflows/bot.yml +++ b/.github/workflows/bot.yml @@ -21,37 +21,45 @@ jobs: - scalaProfile: "scala-2.11" sparkProfile: "spark2.4" sparkVersion: "2.4.4" + flinkProfile: "flink1.13" # Spark 2.4.4, scala 2.12 - scalaProfile: "scala-2.12" sparkProfile: "spark2.4" sparkVersion: "2.4.4" + flinkProfile: "flink1.14" # Spark 3.1.x - scalaProfile: "scala-2.12" sparkProfile: "spark3.1" sparkVersion: "3.1.0" + flinkProfile: "flink1.13" - scalaProfile: "scala-2.12" sparkProfile: "spark3.1" sparkVersion: "3.1.1" + flinkProfile: "flink1.13" - scalaProfile: "scala-2.12" sparkProfile: "spark3.1" sparkVersion: "3.1.2" + flinkProfile: "flink1.14" - scalaProfile: "scala-2.12" sparkProfile: "spark3.1" sparkVersion: "3.1.3" + flinkProfile: "flink1.14" # Spark 3.2.x - scalaProfile: "scala-2.12" sparkProfile: "spark3.2" sparkVersion: "3.2.0" + flinkProfile: "flink1.13" - scalaProfile: "scala-2.12" sparkProfile: "spark3.2" sparkVersion: "3.2.1" + flinkProfile: "flink1.14" steps: - uses: actions/checkout@v2 @@ -66,13 +74,15 @@ jobs: SCALA_PROFILE: ${{ matrix.scalaProfile }} SPARK_PROFILE: ${{ matrix.sparkProfile }} SPARK_VERSION: ${{ matrix.sparkVersion }} + FLINK_PROFILE: ${{ matrix.flinkProfile }} run: - mvn -T 2.5C clean install -P "$SCALA_PROFILE,$SPARK_PROFILE" -Dspark.version="$SPARK_VERSION" -DskipTests=true -Dmaven.javadoc.skip=true -B -V + mvn -T 2.5C clean install -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -D"$FLINK_PROFILE" -Dspark.version="$SPARK_VERSION" -DskipTests=true -Dmaven.javadoc.skip=true -B -V - name: Quickstart Test env: SCALA_PROFILE: ${{ matrix.scalaProfile }} SPARK_PROFILE: ${{ matrix.sparkProfile }} SPARK_VERSION: ${{ matrix.sparkVersion }} + FLINK_PROFILE: ${{ matrix.flinkProfile }} if: ${{ !startsWith(env.SPARK_VERSION, '3.2.') }} # skip test spark 3.2 before hadoop upgrade to 3.x run: - mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -Dspark.version="$SPARK_VERSION" -DfailIfNoTests=false -pl hudi-examples/hudi-examples-flink,hudi-examples/hudi-examples-java,hudi-examples/hudi-examples-spark + mvn test -P "unit-tests" -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -D"$FLINK_PROFILE" -Dspark.version="$SPARK_VERSION" -DfailIfNoTests=false -pl hudi-examples/hudi-examples-flink,hudi-examples/hudi-examples-java,hudi-examples/hudi-examples-spark diff --git a/hudi-examples/hudi-examples-common/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java b/hudi-examples/hudi-examples-common/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java index 2c94c8bc4..4ce11acfa 100644 --- a/hudi-examples/hudi-examples-common/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java +++ b/hudi-examples/hudi-examples-common/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java @@ -62,7 +62,7 @@ public class HoodieExampleDataGenerator> { + "{\"name\":\"fare\",\"type\": \"double\"}]}"; public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA); - private static final Random rand = new Random(46474747); + private static final Random RAND = new Random(46474747); private final Map existingKeys; private final String[] partitionPaths; @@ -97,11 +97,11 @@ public class HoodieExampleDataGenerator> { rec.put("ts", timestamp); rec.put("rider", riderName); rec.put("driver", driverName); - rec.put("begin_lat", rand.nextDouble()); - rec.put("begin_lon", rand.nextDouble()); - rec.put("end_lat", rand.nextDouble()); - rec.put("end_lon", rand.nextDouble()); - rec.put("fare", rand.nextDouble() * 100); + rec.put("begin_lat", RAND.nextDouble()); + rec.put("begin_lon", RAND.nextDouble()); + rec.put("end_lat", RAND.nextDouble()); + rec.put("end_lon", RAND.nextDouble()); + rec.put("fare", RAND.nextDouble() * 100); return rec; } @@ -119,7 +119,7 @@ public class HoodieExampleDataGenerator> { int currSize = getNumExistingKeys(); return IntStream.range(0, n).boxed().map(i -> { - String partitionPath = partitionPaths[rand.nextInt(partitionPaths.length)]; + String partitionPath = partitionPaths[RAND.nextInt(partitionPaths.length)]; HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath); KeyPartition kp = new KeyPartition(); kp.key = key; @@ -141,7 +141,7 @@ public class HoodieExampleDataGenerator> { public List> generateUpdates(String commitTime, Integer n) { List> updates = new ArrayList<>(); for (int i = 0; i < n; i++) { - KeyPartition kp = existingKeys.get(rand.nextInt(numExistingKeys - 1)); + KeyPartition kp = existingKeys.get(RAND.nextInt(numExistingKeys - 1)); HoodieRecord record = generateUpdateRecord(kp.key, commitTime); updates.add(record); } diff --git a/hudi-examples/hudi-examples-flink/pom.xml b/hudi-examples/hudi-examples-flink/pom.xml index 7aa6c05d5..d48841b20 100644 --- a/hudi-examples/hudi-examples-flink/pom.xml +++ b/hudi-examples/hudi-examples-flink/pom.xml @@ -171,13 +171,13 @@ org.apache.flink - flink-table-runtime_${scala.binary.version} + ${flink.table.runtime.artifactId} ${flink.version} provided org.apache.flink - flink-table-planner_${scala.binary.version} + ${flink.table.planner.artifactId} ${flink.version} provided @@ -320,7 +320,7 @@ org.apache.flink - flink-runtime + ${flink.runtime.artifactId} ${flink.version} test test-jar @@ -334,7 +334,7 @@ org.apache.flink - flink-table-runtime_${scala.binary.version} + ${flink.table.runtime.artifactId} ${flink.version} test test-jar diff --git a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieFlinkQuickstart.java b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieFlinkQuickstart.java index 3ac5f43c4..4a2768119 100644 --- a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieFlinkQuickstart.java +++ b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieFlinkQuickstart.java @@ -18,10 +18,6 @@ package org.apache.hudi.examples.quickstart; -import static org.apache.hudi.examples.quickstart.TestQuickstartData.assertRowsEquals; -import java.io.File; -import java.util.List; -import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.types.Row; import org.apache.hudi.common.model.HoodieTableType; @@ -30,6 +26,11 @@ import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import java.io.File; +import java.util.List; + +import static org.apache.hudi.examples.quickstart.TestQuickstartData.assertRowsEquals; + /** * IT cases for Hoodie table source and sink. */ diff --git a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java index b0d7e7981..67691a3ec 100644 --- a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java +++ b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java @@ -18,23 +18,6 @@ package org.apache.hudi.examples.quickstart; -import static junit.framework.TestCase.assertEquals; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import java.io.File; -import java.io.FileFilter; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Properties; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.flink.table.data.RowData; @@ -60,6 +43,24 @@ import org.apache.parquet.Strings; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.hadoop.ParquetReader; +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static junit.framework.TestCase.assertEquals; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotNull; + /** * Data set for testing, also some utilities to check the results. */ diff --git a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java index 5b14a5f44..9f8e29d68 100644 --- a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java +++ b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java @@ -18,11 +18,6 @@ package org.apache.hudi.examples.quickstart; -import static org.apache.hudi.config.HoodieWriteConfig.TBL_NAME; -import static org.apache.spark.sql.SaveMode.Append; -import static org.apache.spark.sql.SaveMode.Overwrite; -import java.util.List; -import org.apache.commons.lang.ArrayUtils; import org.apache.hudi.QuickstartUtils; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.WriteOperationType; @@ -37,6 +32,12 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import java.util.List; + +import static org.apache.hudi.config.HoodieWriteConfig.TBL_NAME; +import static org.apache.spark.sql.SaveMode.Append; +import static org.apache.spark.sql.SaveMode.Overwrite; + public final class HoodieSparkQuickstart { private HoodieSparkQuickstart() { @@ -76,14 +77,14 @@ public final class HoodieSparkQuickstart { String commitTime = Long.toString(System.currentTimeMillis()); List inserts = dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20)); Dataset df = spark.read().json(jsc.parallelize(inserts, 1)); - df.write().format("org.apache.hudi"). - options(QuickstartUtils.getQuickstartWriteConfigs()). - option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts"). - option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid"). - option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath"). - option(TBL_NAME.key(), tableName). - mode(Overwrite). - save(tablePath); + df.write().format("org.apache.hudi") + .options(QuickstartUtils.getQuickstartWriteConfigs()) + .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts") + .option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid") + .option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath") + .option(TBL_NAME.key(), tableName) + .mode(Overwrite) + .save(tablePath); } /** @@ -91,10 +92,10 @@ public final class HoodieSparkQuickstart { */ public static void queryData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName, HoodieExampleDataGenerator dataGen) { - Dataset roViewDF = spark. - read(). - format("org.apache.hudi"). - load(tablePath + "/*/*/*/*"); + Dataset roViewDF = spark + .read() + .format("org.apache.hudi") + .load(tablePath + "/*/*/*/*"); roViewDF.createOrReplaceTempView("hudi_ro_table"); @@ -125,14 +126,14 @@ public final class HoodieSparkQuickstart { String commitTime = Long.toString(System.currentTimeMillis()); List updates = dataGen.convertToStringList(dataGen.generateUpdates(commitTime, 10)); Dataset df = spark.read().json(jsc.parallelize(updates, 1)); - df.write().format("org.apache.hudi"). - options(QuickstartUtils.getQuickstartWriteConfigs()). - option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts"). - option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid"). - option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath"). - option(TBL_NAME.key(), tableName). - mode(Append). - save(tablePath); + df.write().format("org.apache.hudi") + .options(QuickstartUtils.getQuickstartWriteConfigs()) + .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts") + .option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid") + .option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath") + .option(TBL_NAME.key(), tableName) + .mode(Append) + .save(tablePath); } /** @@ -144,15 +145,15 @@ public final class HoodieSparkQuickstart { roViewDF.createOrReplaceTempView("hudi_ro_table"); Dataset df = spark.sql("select uuid, partitionpath, ts from hudi_ro_table limit 2"); - df.write().format("org.apache.hudi"). - options(QuickstartUtils.getQuickstartWriteConfigs()). - option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts"). - option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "uuid"). - option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath"). - option(TBL_NAME.key(), tableName). - option("hoodie.datasource.write.operation", WriteOperationType.DELETE.value()). - mode(Append). - save(tablePath); + df.write().format("org.apache.hudi") + .options(QuickstartUtils.getQuickstartWriteConfigs()) + .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts") + .option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "uuid") + .option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath") + .option(TBL_NAME.key(), tableName) + .option("hoodie.datasource.write.operation", WriteOperationType.DELETE.value()) + .mode(Append) + .save(tablePath); } /** @@ -160,17 +161,17 @@ public final class HoodieSparkQuickstart { */ public static void deleteByPartition(SparkSession spark, String tablePath, String tableName) { Dataset df = spark.emptyDataFrame(); - df.write().format("org.apache.hudi"). - options(QuickstartUtils.getQuickstartWriteConfigs()). - option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts"). - option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid"). - option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath"). - option(TBL_NAME.key(), tableName). - option("hoodie.datasource.write.operation", WriteOperationType.DELETE.value()). - option("hoodie.datasource.write.partitions.to.delete", - ArrayUtils.toString(HoodieExampleDataGenerator.DEFAULT_PARTITION_PATHS, ",")). - mode(Append). - save(tablePath); + df.write().format("org.apache.hudi") + .options(QuickstartUtils.getQuickstartWriteConfigs()) + .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts") + .option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid") + .option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath") + .option(TBL_NAME.key(), tableName) + .option("hoodie.datasource.write.operation", WriteOperationType.DELETE.value()) + .option("hoodie.datasource.write.partitions.to.delete", + String.join(", ", HoodieExampleDataGenerator.DEFAULT_PARTITION_PATHS)) + .mode(Append) + .save(tablePath); } /** @@ -188,12 +189,12 @@ public final class HoodieSparkQuickstart { String beginTime = commits.get(commits.size() - 2); // commit time we are interested in // incrementally query data - Dataset incViewDF = spark. - read(). - format("org.apache.hudi"). - option("hoodie.datasource.query.type", "incremental"). - option("hoodie.datasource.read.begin.instanttime", beginTime). - load(tablePath); + Dataset incViewDF = spark + .read() + .format("org.apache.hudi") + .option("hoodie.datasource.query.type", "incremental") + .option("hoodie.datasource.read.begin.instanttime", beginTime) + .load(tablePath); incViewDF.createOrReplaceTempView("hudi_incr_table"); spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0") @@ -215,11 +216,11 @@ public final class HoodieSparkQuickstart { String endTime = commits.get(commits.size() - 2); // commit time we are interested in //incrementally query data - Dataset incViewDF = spark.read().format("org.apache.hudi"). - option("hoodie.datasource.query.type", "incremental"). - option("hoodie.datasource.read.begin.instanttime", beginTime). - option("hoodie.datasource.read.end.instanttime", endTime). - load(tablePath); + Dataset incViewDF = spark.read().format("org.apache.hudi") + .option("hoodie.datasource.query.type", "incremental") + .option("hoodie.datasource.read.begin.instanttime", beginTime) + .option("hoodie.datasource.read.end.instanttime", endTime) + .load(tablePath); incViewDF.createOrReplaceTempView("hudi_incr_table"); spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0") diff --git a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java index 426f4317d..212dcc440 100644 --- a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java +++ b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java @@ -18,8 +18,6 @@ package org.apache.hudi.examples.quickstart; -import java.io.File; -import java.nio.file.Paths; import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; @@ -35,6 +33,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import java.io.File; +import java.nio.file.Paths; + public class TestHoodieSparkQuickstart implements SparkProvider { protected static transient HoodieSparkEngineContext context; @@ -49,7 +50,7 @@ public class TestHoodieSparkQuickstart implements SparkProvider { @TempDir protected java.nio.file.Path tempDir; - private static final HoodieExampleDataGenerator dataGen = new HoodieExampleDataGenerator<>(); + private static final HoodieExampleDataGenerator DATA_GEN = new HoodieExampleDataGenerator<>(); @Override public SparkSession spark() { @@ -99,10 +100,10 @@ public class TestHoodieSparkQuickstart implements SparkProvider { String tablePath = tablePath(tableName); try { - HoodieSparkQuickstart.insertData(spark, jsc, tablePath, tableName, dataGen); - HoodieSparkQuickstart.updateData(spark, jsc, tablePath, tableName, dataGen); + HoodieSparkQuickstart.insertData(spark, jsc, tablePath, tableName, DATA_GEN); + HoodieSparkQuickstart.updateData(spark, jsc, tablePath, tableName, DATA_GEN); - HoodieSparkQuickstart.queryData(spark, jsc, tablePath, tableName, dataGen); + HoodieSparkQuickstart.queryData(spark, jsc, tablePath, tableName, DATA_GEN); HoodieSparkQuickstart.incrementalQuery(spark, tablePath, tableName); HoodieSparkQuickstart.pointInTimeQuery(spark, tablePath, tableName);