[HUDI-2509] OverwriteNonDefaultsWithLatestAvroPayload doesn`t work when upsert data with some null value column (#3761)
Co-authored-by: 502395931@qq.com <lzyadam315>
This commit is contained in:
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.common.model;
|
package org.apache.hudi.common.model;
|
||||||
|
|
||||||
|
import org.apache.avro.JsonProperties;
|
||||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
|
|
||||||
@@ -99,6 +100,9 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
|
|||||||
* Return true if value equals defaultValue otherwise false.
|
* Return true if value equals defaultValue otherwise false.
|
||||||
*/
|
*/
|
||||||
public Boolean overwriteField(Object value, Object defaultValue) {
|
public Boolean overwriteField(Object value, Object defaultValue) {
|
||||||
|
if (JsonProperties.NULL_VALUE.equals(defaultValue)) {
|
||||||
|
return value == null;
|
||||||
|
}
|
||||||
return Objects.equals(value, defaultValue);
|
return Objects.equals(value, defaultValue);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.common.model;
|
package org.apache.hudi.common.model;
|
||||||
|
|
||||||
|
import org.apache.avro.JsonProperties;
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.avro.generic.GenericData;
|
import org.apache.avro.generic.GenericData;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
@@ -126,4 +127,34 @@ public class TestOverwriteNonDefaultsWithLatestAvroPayload {
|
|||||||
assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema).get(), record2);
|
assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema).get(), record2);
|
||||||
assertFalse(payload2.combineAndGetUpdateValue(record1, schema).isPresent());
|
assertFalse(payload2.combineAndGetUpdateValue(record1, schema).isPresent());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNullColumn() throws IOException {
|
||||||
|
Schema avroSchema = Schema.createRecord(Arrays.asList(
|
||||||
|
new Schema.Field("id", Schema.createUnion(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE),
|
||||||
|
new Schema.Field("name", Schema.createUnion(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE),
|
||||||
|
new Schema.Field("age", Schema.createUnion(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE),
|
||||||
|
new Schema.Field("job", Schema.createUnion(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE)
|
||||||
|
));
|
||||||
|
GenericRecord record1 = new GenericData.Record(avroSchema);
|
||||||
|
record1.put("id", "1");
|
||||||
|
record1.put("name", "aa");
|
||||||
|
record1.put("age", "1");
|
||||||
|
record1.put("job", "1");
|
||||||
|
|
||||||
|
GenericRecord record2 = new GenericData.Record(avroSchema);
|
||||||
|
record2.put("id", "1");
|
||||||
|
record2.put("name", "bb");
|
||||||
|
record2.put("age", "2");
|
||||||
|
record2.put("job", null);
|
||||||
|
|
||||||
|
GenericRecord record3 = new GenericData.Record(avroSchema);
|
||||||
|
record3.put("id", "1");
|
||||||
|
record3.put("name", "bb");
|
||||||
|
record3.put("age", "2");
|
||||||
|
record3.put("job", "1");
|
||||||
|
|
||||||
|
OverwriteNonDefaultsWithLatestAvroPayload payload2 = new OverwriteNonDefaultsWithLatestAvroPayload(record2, 1);
|
||||||
|
assertEquals(payload2.combineAndGetUpdateValue(record1, avroSchema).get(), record3);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user