1
0

[HUDI-4298] When reading the mor table with QUERY_TYPE_SNAPSHOT,Unabl… (#5937)

* [HUDI-4298] Add test case for reading mor table

Signed-off-by: LinMingQiang <1356469429@qq.com>
This commit is contained in:
HunterXHunter
2022-07-12 14:49:44 +08:00
committed by GitHub
parent a270eeeef9
commit 994c561488
7 changed files with 179 additions and 65 deletions

View File

@@ -66,6 +66,10 @@ public class HoodieTestUtils {
return init(getDefaultHadoopConf(), basePath, tableType);
}
public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, Properties properties) throws IOException {
return init(getDefaultHadoopConf(), basePath, tableType, properties);
}
public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, boolean bootstrapIndexEnable) throws IOException {
Properties props = new Properties();
props.setProperty(HoodieTableConfig.BOOTSTRAP_BASE_PATH.key(), bootstrapBasePath);

View File

@@ -263,7 +263,7 @@ public class ITTestDataStreamWrite extends TestLogger {
client.getJobExecutionResult().get();
}
TestData.checkWrittenFullData(tempFile, expected);
TestData.checkWrittenDataCOW(tempFile, expected);
}
private void testWriteToHoodieWithCluster(
@@ -327,7 +327,7 @@ public class ITTestDataStreamWrite extends TestLogger {
// wait for the streaming job to finish
client.getJobExecutionResult().get();
TestData.checkWrittenFullData(tempFile, expected);
TestData.checkWrittenDataCOW(tempFile, expected);
}
public void execute(StreamExecutionEnvironment execEnv, boolean isMor, String jobName) throws Exception {
@@ -449,7 +449,7 @@ public class ITTestDataStreamWrite extends TestLogger {
builder.sink(dataStream, false);
execute(execEnv, true, "Api_Sink_Test");
TestData.checkWrittenFullData(tempFile, EXPECTED);
TestData.checkWrittenDataCOW(tempFile, EXPECTED);
}
}

View File

@@ -250,7 +250,7 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
.checkpoint(2)
.assertNextEvent()
.checkpointComplete(2)
.checkWrittenFullData(EXPECTED5)
.checkWrittenDataCOW(EXPECTED5)
.end();
}
@@ -282,7 +282,7 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
.checkpoint(2)
.handleEvents(2)
.checkpointComplete(2)
.checkWrittenFullData(EXPECTED5)
.checkWrittenDataCOW(EXPECTED5)
.end();
}
@@ -305,7 +305,7 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
.checkpoint(2)
.handleEvents(1)
.checkpointComplete(2)
.checkWrittenFullData(EXPECTED5)
.checkWrittenDataCOW(EXPECTED5)
.end();
}

View File

@@ -18,12 +18,15 @@
package org.apache.hudi.sink;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.utils.TestData;
import org.apache.flink.configuration.Configuration;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.HashMap;
import java.util.Map;
@@ -84,6 +87,60 @@ public class TestWriteMergeOnRead extends TestWriteCopyOnWrite {
validateIndexLoaded();
}
@Test
public void testEventTimeAvroPayloadMergeRead() throws Exception {
conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
conf.set(FlinkOptions.PATH, tempFile.getAbsolutePath());
conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
conf.set(FlinkOptions.OPERATION, "upsert");
conf.set(FlinkOptions.CHANGELOG_ENABLED, false);
conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 2);
conf.set(FlinkOptions.PRE_COMBINE, true);
conf.set(FlinkOptions.PRECOMBINE_FIELD, "ts");
conf.set(FlinkOptions.PAYLOAD_CLASS_NAME, EventTimeAvroPayload.class.getName());
HashMap<String, String> mergedExpected = new HashMap<>(EXPECTED1);
mergedExpected.put("par1", "[id1,par1,id1,Danny,22,4,par1, id2,par1,id2,Stephen,33,2,par1]");
TestHarness.instance().preparePipeline(tempFile, conf)
.consume(TestData.DATA_SET_INSERT)
.emptyEventBuffer()
.checkpoint(1)
.assertNextEvent()
.checkpointComplete(1)
.checkWrittenData(EXPECTED1, 4)
.consume(TestData.DATA_SET_DISORDER_INSERT)
.emptyEventBuffer()
.checkpoint(2)
.assertNextEvent()
.checkpointComplete(2)
.checkWrittenData(mergedExpected, 4)
.consume(TestData.DATA_SET_SINGLE_INSERT)
.emptyEventBuffer()
.checkpoint(3)
.assertNextEvent()
.checkpointComplete(3)
.checkWrittenData(mergedExpected, 4)
.end();
}
@ParameterizedTest
@ValueSource(ints = {1, 2})
public void testOnlyBaseFileOrOnlyLogFileRead(int compactionDeltaCommits) throws Exception {
conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
conf.set(FlinkOptions.PATH, tempFile.getAbsolutePath());
conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
conf.set(FlinkOptions.OPERATION, "upsert");
conf.set(FlinkOptions.CHANGELOG_ENABLED, false);
conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits);
TestHarness.instance().preparePipeline(tempFile, conf)
.consume(TestData.DATA_SET_INSERT)
.emptyEventBuffer()
.checkpoint(1)
.assertNextEvent()
.checkpointComplete(1)
.checkWrittenData(EXPECTED1, 4)
.end();
}
@Override
public void testInsertClustering() {
// insert clustering is only valid for cow table.

View File

@@ -161,7 +161,7 @@ public class ITTestHoodieFlinkCompactor {
env.execute("flink_hudi_compaction");
writeClient.close();
TestData.checkWrittenFullData(tempFile, EXPECTED1);
TestData.checkWrittenDataCOW(tempFile, EXPECTED1);
}
@ParameterizedTest
@@ -202,7 +202,7 @@ public class ITTestHoodieFlinkCompactor {
asyncCompactionService.shutDown();
TestData.checkWrittenFullData(tempFile, EXPECTED2);
TestData.checkWrittenDataCOW(tempFile, EXPECTED2);
}
@ParameterizedTest
@@ -281,7 +281,7 @@ public class ITTestHoodieFlinkCompactor {
env.execute("flink_hudi_compaction");
writeClient.close();
TestData.checkWrittenFullData(tempFile, EXPECTED3);
TestData.checkWrittenDataCOW(tempFile, EXPECTED3);
}
private String scheduleCompactionPlan(HoodieTableMetaClient metaClient, HoodieFlinkWriteClient<?> writeClient) {

View File

@@ -22,20 +22,14 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;
import org.apache.avro.Schema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.table.data.RowData;
@@ -338,9 +332,7 @@ public class TestWriteBase {
public TestHarness checkWrittenData(
Map<String, String> expected,
int partitions) throws Exception {
if (OptionsResolver.isCowTable(conf)
|| conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)
|| OptionsResolver.isAppendMode(conf)) {
if (OptionsResolver.isCowTable(conf)) {
TestData.checkWrittenData(this.baseFile, expected, partitions);
} else {
checkWrittenDataMor(baseFile, expected, partitions);
@@ -349,15 +341,12 @@ public class TestWriteBase {
}
private void checkWrittenDataMor(File baseFile, Map<String, String> expected, int partitions) throws Exception {
HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(basePath, HadoopConfigurations.getHadoopConf(conf));
Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema();
String latestInstant = lastCompleteInstant();
FileSystem fs = FSUtils.getFs(basePath, new org.apache.hadoop.conf.Configuration());
TestData.checkWrittenDataMOR(fs, latestInstant, baseFile, expected, partitions, schema);
TestData.checkWrittenDataMOR(fs, baseFile, expected, partitions);
}
public TestHarness checkWrittenFullData(Map<String, List<String>> expected) throws IOException {
TestData.checkWrittenFullData(this.baseFile, expected);
public TestHarness checkWrittenDataCOW(Map<String, List<String>> expected) throws IOException {
TestData.checkWrittenDataCOW(this.baseFile, expected);
return this;
}

View File

@@ -18,13 +18,22 @@
package org.apache.hudi.utils;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
@@ -49,7 +58,6 @@ import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.Strings;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
@@ -60,15 +68,19 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.function.Predicate;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.function.Predicate;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static junit.framework.TestCase.assertEquals;
import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_PROPERTIES_FILE;
import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -278,6 +290,17 @@ public class TestData {
insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
public static List<RowData> DATA_SET_DISORDER_INSERT = Arrays.asList(
insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(3), StringData.fromString("par1")),
insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 22,
TimestampData.fromEpochMillis(4), StringData.fromString("par1")),
insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(1), StringData.fromString("par1"))
);
public static List<RowData> DATA_SET_DISORDER_UPDATE_DELETE = Arrays.asList(
// DISORDER UPDATE
updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 21,
@@ -598,14 +621,14 @@ public class TestData {
* @param basePath The file base to check, should be a directory
* @param expected The expected results mapping, the key should be the partition path
*/
public static void checkWrittenFullData(
public static void checkWrittenDataCOW(
File basePath,
Map<String, List<String>> expected) throws IOException {
// 1. init flink table
HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath.getAbsolutePath());
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath.getAbsolutePath()).build();
HoodieFlinkTable table = HoodieFlinkTable.create(config, HoodieFlinkEngineContext.DEFAULT, metaClient);
HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, HoodieFlinkEngineContext.DEFAULT, metaClient);
// 2. check each partition data
expected.forEach((partition, partitionDataSet) -> {
@@ -638,49 +661,90 @@ public class TestData {
*
* <p>Note: Replace it with the Flink reader when it is supported.
*
* @param fs The file system
* @param latestInstant The latest committed instant of current table
* @param baseFile The file base to check, should be a directory
* @param expected The expected results mapping, the key should be the partition path
* @param partitions The expected partition number
* @param schema The read schema
* @param fs The file system
* @param baseFile The file base to check, should be a directory
* @param expected The expected results mapping, the key should be the partition path
* @param partitions The expected partition number
*/
public static void checkWrittenDataMOR(
FileSystem fs,
String latestInstant,
File baseFile,
Map<String, String> expected,
int partitions,
Schema schema) {
int partitions) throws Exception {
assert baseFile.isDirectory() : "Base path should be a directory";
FileFilter partitionFilter = file -> !file.getName().startsWith(".");
File[] partitionDirs = baseFile.listFiles(partitionFilter);
String basePath = baseFile.getAbsolutePath();
File hoodiePropertiesFile = new File(baseFile + "/" + METAFOLDER_NAME + "/" + HOODIE_PROPERTIES_FILE);
assert hoodiePropertiesFile.exists();
// 1. init flink table
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.fromFile(hoodiePropertiesFile)
.withPath(basePath).build();
HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ, config.getProps());
HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, HoodieFlinkEngineContext.DEFAULT, metaClient);
Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema();
String latestInstant = metaClient.getActiveTimeline().filterCompletedInstants()
.lastInstant().map(HoodieInstant::getTimestamp).orElse(null);
assertNotNull(latestInstant, "No completed commit under table path" + basePath);
File[] partitionDirs = baseFile.listFiles(file -> !file.getName().startsWith(".") && file.isDirectory());
assertNotNull(partitionDirs);
assertThat(partitionDirs.length, is(partitions));
assertThat("The partitions number should be: " + partitions, partitionDirs.length, is(partitions));
// 2. check each partition data
final int[] requiredPos = IntStream.range(0, schema.getFields().size()).toArray();
for (File partitionDir : partitionDirs) {
File[] dataFiles = partitionDir.listFiles(file ->
file.getName().contains(".log.") && !file.getName().startsWith(".."));
assertNotNull(dataFiles);
List<String> logPaths = Arrays.stream(dataFiles)
.sorted((f1, f2) -> HoodieLogFile.getLogFileComparator()
.compare(new HoodieLogFile(f1.getPath()), new HoodieLogFile(f2.getPath())))
.map(File::getAbsolutePath).collect(Collectors.toList());
HoodieMergedLogRecordScanner scanner = getScanner(fs, baseFile.getPath(), logPaths, schema, latestInstant);
List<String> readBuffer = scanner.getRecords().values().stream()
.map(hoodieRecord -> {
try {
// in case it is a delete
GenericRecord record = (GenericRecord) hoodieRecord.getData()
.getInsertValue(schema, new Properties())
.orElse(null);
return record == null ? (String) null : filterOutVariables(record);
} catch (IOException e) {
throw new RuntimeException(e);
List<String> readBuffer = new ArrayList<>();
List<FileSlice> fileSlices = table.getSliceView().getLatestMergedFileSlicesBeforeOrOn(partitionDir.getName(), latestInstant).collect(Collectors.toList());
for (FileSlice fileSlice : fileSlices) {
HoodieMergedLogRecordScanner scanner = null;
List<String> logPaths = fileSlice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
.map(logFile -> logFile.getPath().toString())
.collect(Collectors.toList());
if (logPaths.size() > 0) {
scanner = getScanner(fs, basePath, logPaths, schema, latestInstant);
}
String baseFilePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
Set<String> keyToSkip = new HashSet<>();
if (baseFilePath != null) {
// read the base file first
GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema);
ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(new Path(baseFilePath)).build();
GenericRecord currentRecord = reader.read();
while (currentRecord != null) {
String curKey = currentRecord.get(HOODIE_RECORD_KEY_COL_POS).toString();
if (scanner != null && scanner.getRecords().containsKey(curKey)) {
keyToSkip.add(curKey);
// merge row with log.
final HoodieAvroRecord<?> record = (HoodieAvroRecord<?>) scanner.getRecords().get(curKey);
Option<IndexedRecord> combineResult = record.getData().combineAndGetUpdateValue(currentRecord, schema, config.getProps());
if (combineResult.isPresent()) {
GenericRecord avroRecord = buildAvroRecordBySchema(combineResult.get(), schema, requiredPos, recordBuilder);
readBuffer.add(filterOutVariables(avroRecord));
}
} else {
readBuffer.add(filterOutVariables(currentRecord));
}
})
.filter(Objects::nonNull)
.sorted(Comparator.naturalOrder())
.collect(Collectors.toList());
currentRecord = reader.read();
}
}
// read the remaining log data.
if (scanner != null) {
for (String curKey : scanner.getRecords().keySet()) {
if (!keyToSkip.contains(curKey)) {
Option<GenericRecord> record = (Option<GenericRecord>) scanner.getRecords()
.get(curKey).getData()
.getInsertValue(schema, config.getProps());
if (record.isPresent()) {
readBuffer.add(filterOutVariables(record.get()));
}
}
}
}
}
// Ensure that to write and read sequences are consistent.
readBuffer.sort(String::compareTo);
assertThat(readBuffer.toString(), is(expected.get(partitionDir.getName())));
}
}
@@ -722,7 +786,7 @@ public class TestData {
fields.add(genericRecord.get("age").toString());
fields.add(genericRecord.get("ts").toString());
fields.add(genericRecord.get("partition").toString());
return Strings.join(fields, ",");
return String.join(",",fields);
}
public static BinaryRowData insertRow(Object... fields) {