diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroWriteSupport.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroWriteSupport.java new file mode 100644 index 000000000..434807db8 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroWriteSupport.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.bloom.BloomFilterFactory; +import org.apache.hudi.common.bloom.BloomFilterTypeCode; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +public class TestHoodieAvroWriteSupport { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void testAddKey() throws IOException { + List rowKeys = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + rowKeys.add(UUID.randomUUID().toString()); + } + String filePath = folder.getRoot() + "/test.parquet"; + Schema schema = HoodieAvroUtils.getRecordKeySchema(); + BloomFilter filter = BloomFilterFactory.createBloomFilter( + 1000, 0.0001, 10000, + BloomFilterTypeCode.SIMPLE.name()); + HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport( + new AvroSchemaConverter().convert(schema), schema, filter); + ParquetWriter writer = new ParquetWriter(new Path(filePath), writeSupport, CompressionCodecName.GZIP, + 120 * 1024 * 1024, ParquetWriter.DEFAULT_PAGE_SIZE); + for (String rowKey : rowKeys) { + GenericRecord rec = new GenericData.Record(schema); + rec.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, rowKey); + writer.write(rec); + writeSupport.add(rowKey); + } + writer.close(); + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java index ad9a7f163..58c4580de 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java @@ -140,7 +140,7 @@ public class TestParquetUtils extends HoodieCommonTestHarness { GenericRecord rec = new GenericData.Record(schema); rec.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, rowKey); writer.write(rec); - filter.add(rowKey); + writeSupport.add(rowKey); } writer.close(); } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index 104577e15..33856ae0a 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -75,6 +75,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import static org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.REALTIME_SKIP_MERGE_PROP; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -251,6 +252,9 @@ public class TestHoodieRealtimeRecordReader { key = recordReader.createKey(); value = recordReader.createValue(); } + recordReader.getPos(); + assertEquals(1.0, recordReader.getProgress(), 0.05); + recordReader.close(); } catch (Exception ioe) { throw new HoodieException(ioe.getMessage(), ioe); } @@ -295,10 +299,10 @@ public class TestHoodieRealtimeRecordReader { List fields = schema.getFields(); setHiveColumnNameProps(fields, jobConf, true); // Enable merge skipping. - jobConf.set("hoodie.realtime.merge.skip", "true"); + jobConf.set(REALTIME_SKIP_MERGE_PROP, "true"); // validate unmerged record reader - RealtimeUnmergedRecordReader recordReader = new RealtimeUnmergedRecordReader(split, jobConf, reader); + HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader); // use reader to read base Parquet File and log file // here all records should be present. Also ensure log records are in order. @@ -333,6 +337,8 @@ public class TestHoodieRealtimeRecordReader { assertEquals(numRecords, numRecordsAtCommit1); assertEquals(numRecords, numRecordsAtCommit2); assertEquals(2 * numRecords, seenKeys.size()); + assertEquals(1.0, recordReader.getProgress(), 0.05); + recordReader.close(); } @Test