From a5e496fe23c3cd89ceff0e4c49d27df325ba5bd8 Mon Sep 17 00:00:00 2001 From: Shawy Geng Date: Wed, 11 Aug 2021 12:17:39 +0800 Subject: [PATCH] [HUDI-2292] MOR should not predicate pushdown when reading with payload_combine type (#3443) --- .../testutils/HoodieTestDataGenerator.java | 17 ++++++++++++++++- .../hudi/MergeOnReadSnapshotRelation.scala | 2 +- .../hudi/functional/TestMORDataSource.scala | 14 +++++++++++++- 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java index e4ea18678..68d1f2dd3 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java @@ -208,8 +208,13 @@ public class HoodieTestDataGenerator { */ public static RawTripTestPayload generateRandomValue( HoodieKey key, String instantTime, boolean isFlattened) throws IOException { + return generateRandomValue(key, instantTime, isFlattened, 0); + } + + public static RawTripTestPayload generateRandomValue( + HoodieKey key, String instantTime, boolean isFlattened, int ts) throws IOException { GenericRecord rec = generateGenericRecord( - key.getRecordKey(), key.getPartitionPath(), "rider-" + instantTime, "driver-" + instantTime, 0, + key.getRecordKey(), key.getPartitionPath(), "rider-" + instantTime, "driver-" + instantTime, ts, false, isFlattened); return new RawTripTestPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA); } @@ -591,6 +596,16 @@ public class HoodieTestDataGenerator { return updates; } + public List generateUpdatesWithTS(String instantTime, List baseRecords, int ts) throws IOException { + List updates = new ArrayList<>(); + for (HoodieRecord baseRecord : baseRecords) { + HoodieRecord record = new HoodieRecord(baseRecord.getKey(), + generateRandomValue(baseRecord.getKey(), instantTime, false, ts)); + updates.add(record); + } + return updates; + } + public List generateUpdatesWithDiffPartition(String instantTime, List baseRecords) throws IOException { List updates = new ArrayList<>(); diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index cf8296c9f..1d140304c 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -122,7 +122,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, dataSchema = tableStructSchema, partitionSchema = StructType(Nil), requiredSchema = tableStructSchema, - filters = filters, + filters = Seq.empty, options = optParams, hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf() ) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 8855fb01c..82f6cf919 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -412,13 +412,15 @@ class TestMORDataSource extends HoodieClientTestBase { // First Operation: // Producing parquet files to three default partitions. // SNAPSHOT view on MOR table with parquet files only. - val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList + val hoodieRecords1 = dataGen.generateInserts("001", 100) + val records1 = recordsToStrings(hoodieRecords1).toList val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) inputDF1.write.format("org.apache.hudi") .options(commonOpts) .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .option(DataSourceWriteOptions.PAYLOAD_CLASS.key, classOf[DefaultHoodieRecordPayload].getName) .mode(SaveMode.Overwrite) .save(basePath) val hudiSnapshotDF1 = spark.read.format("org.apache.hudi") @@ -484,6 +486,16 @@ class TestMORDataSource extends HoodieClientTestBase { verifyShow(hudiIncDF1) verifyShow(hudiIncDF2) verifyShow(hudiIncDF1Skipmerge) + + val record3 = recordsToStrings(dataGen.generateUpdatesWithTS("003", hoodieRecords1, -1)) + spark.read.json(spark.sparkContext.parallelize(record3, 2)) + .write.format("org.apache.hudi").options(commonOpts) + .mode(SaveMode.Append).save(basePath) + val hudiSnapshotDF3 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .load(basePath + "/*/*/*/*") + assertEquals(100, hudiSnapshotDF3.count()) + assertEquals(0, hudiSnapshotDF3.filter("rider = 'rider-003'").count()) } @Test