1
0

[HUDI-2292] MOR should not predicate pushdown when reading with payload_combine type (#3443)

This commit is contained in:
Shawy Geng
2021-08-11 12:17:39 +08:00
committed by GitHub
parent 8255a86cb4
commit a5e496fe23
3 changed files with 30 additions and 3 deletions

View File

@@ -208,8 +208,13 @@ public class HoodieTestDataGenerator {
*/ */
public static RawTripTestPayload generateRandomValue( public static RawTripTestPayload generateRandomValue(
HoodieKey key, String instantTime, boolean isFlattened) throws IOException { HoodieKey key, String instantTime, boolean isFlattened) throws IOException {
return generateRandomValue(key, instantTime, isFlattened, 0);
}
public static RawTripTestPayload generateRandomValue(
HoodieKey key, String instantTime, boolean isFlattened, int ts) throws IOException {
GenericRecord rec = generateGenericRecord( GenericRecord rec = generateGenericRecord(
key.getRecordKey(), key.getPartitionPath(), "rider-" + instantTime, "driver-" + instantTime, 0, key.getRecordKey(), key.getPartitionPath(), "rider-" + instantTime, "driver-" + instantTime, ts,
false, isFlattened); false, isFlattened);
return new RawTripTestPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA); return new RawTripTestPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
} }
@@ -591,6 +596,16 @@ public class HoodieTestDataGenerator {
return updates; return updates;
} }
public List<HoodieRecord> generateUpdatesWithTS(String instantTime, List<HoodieRecord> baseRecords, int ts) throws IOException {
List<HoodieRecord> updates = new ArrayList<>();
for (HoodieRecord baseRecord : baseRecords) {
HoodieRecord record = new HoodieRecord(baseRecord.getKey(),
generateRandomValue(baseRecord.getKey(), instantTime, false, ts));
updates.add(record);
}
return updates;
}
public List<HoodieRecord> generateUpdatesWithDiffPartition(String instantTime, List<HoodieRecord> baseRecords) public List<HoodieRecord> generateUpdatesWithDiffPartition(String instantTime, List<HoodieRecord> baseRecords)
throws IOException { throws IOException {
List<HoodieRecord> updates = new ArrayList<>(); List<HoodieRecord> updates = new ArrayList<>();

View File

@@ -122,7 +122,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
dataSchema = tableStructSchema, dataSchema = tableStructSchema,
partitionSchema = StructType(Nil), partitionSchema = StructType(Nil),
requiredSchema = tableStructSchema, requiredSchema = tableStructSchema,
filters = filters, filters = Seq.empty,
options = optParams, options = optParams,
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf() hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
) )

View File

@@ -412,13 +412,15 @@ class TestMORDataSource extends HoodieClientTestBase {
// First Operation: // First Operation:
// Producing parquet files to three default partitions. // Producing parquet files to three default partitions.
// SNAPSHOT view on MOR table with parquet files only. // SNAPSHOT view on MOR table with parquet files only.
val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList val hoodieRecords1 = dataGen.generateInserts("001", 100)
val records1 = recordsToStrings(hoodieRecords1).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi") inputDF1.write.format("org.apache.hudi")
.options(commonOpts) .options(commonOpts)
.option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.PAYLOAD_CLASS.key, classOf[DefaultHoodieRecordPayload].getName)
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.save(basePath) .save(basePath)
val hudiSnapshotDF1 = spark.read.format("org.apache.hudi") val hudiSnapshotDF1 = spark.read.format("org.apache.hudi")
@@ -484,6 +486,16 @@ class TestMORDataSource extends HoodieClientTestBase {
verifyShow(hudiIncDF1) verifyShow(hudiIncDF1)
verifyShow(hudiIncDF2) verifyShow(hudiIncDF2)
verifyShow(hudiIncDF1Skipmerge) verifyShow(hudiIncDF1Skipmerge)
val record3 = recordsToStrings(dataGen.generateUpdatesWithTS("003", hoodieRecords1, -1))
spark.read.json(spark.sparkContext.parallelize(record3, 2))
.write.format("org.apache.hudi").options(commonOpts)
.mode(SaveMode.Append).save(basePath)
val hudiSnapshotDF3 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load(basePath + "/*/*/*/*")
assertEquals(100, hudiSnapshotDF3.count())
assertEquals(0, hudiSnapshotDF3.filter("rider = 'rider-003'").count())
} }
@Test @Test