1
0

[HUDI-2798] Fix flink query operation fields (#4041)

This commit is contained in:
Danny Chan
2021-11-19 23:39:37 +08:00
committed by GitHub
parent 7a00f867ae
commit bf008762df
5 changed files with 77 additions and 39 deletions

View File

@@ -44,6 +44,7 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@@ -156,28 +157,52 @@ public class IncrementalInputSplits implements Serializable {
}
String tableName = conf.getString(FlinkOptions.TABLE_NAME);
List<HoodieCommitMetadata> activeMetadataList = instants.stream()
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
List<HoodieCommitMetadata> archivedMetadataList = getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName);
if (archivedMetadataList.size() > 0) {
LOG.warn("\n"
+ "--------------------------------------------------------------------------------\n"
+ "---------- caution: the reader has fall behind too much from the writer,\n"
+ "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n"
+ "--------------------------------------------------------------------------------");
}
List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
// IMPORTANT: the merged metadata list must be in ascending order by instant time
? mergeList(archivedMetadataList, activeMetadataList)
: activeMetadataList;
Set<String> writePartitions = HoodieInputFormatUtils.getWritePartitionPaths(metadataList);
// apply partition push down
if (this.requiredPartitions != null) {
writePartitions = writePartitions.stream()
.filter(this.requiredPartitions::contains).collect(Collectors.toSet());
Set<String> writePartitions;
final FileStatus[] fileStatuses;
if (instantRange == null) {
// reading from the earliest, scans the partitions and files directly.
FileIndex fileIndex = FileIndex.instance(new org.apache.hadoop.fs.Path(path.toUri()), conf);
if (this.requiredPartitions != null) {
// apply partition push down
fileIndex.setPartitionPaths(this.requiredPartitions);
}
writePartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());
if (writePartitions.size() == 0) {
LOG.warn("No partitions found for reading in user provided path.");
return Result.EMPTY;
}
fileStatuses = fileIndex.getFilesInPartitions();
} else {
List<HoodieCommitMetadata> activeMetadataList = instants.stream()
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
List<HoodieCommitMetadata> archivedMetadataList = getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName);
if (archivedMetadataList.size() > 0) {
LOG.warn("\n"
+ "--------------------------------------------------------------------------------\n"
+ "---------- caution: the reader has fall behind too much from the writer,\n"
+ "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n"
+ "--------------------------------------------------------------------------------");
}
List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
// IMPORTANT: the merged metadata list must be in ascending order by instant time
? mergeList(archivedMetadataList, activeMetadataList)
: activeMetadataList;
writePartitions = HoodieInputFormatUtils.getWritePartitionPaths(metadataList);
// apply partition push down
if (this.requiredPartitions != null) {
writePartitions = writePartitions.stream()
.filter(this.requiredPartitions::contains).collect(Collectors.toSet());
}
if (writePartitions.size() == 0) {
LOG.warn("No partitions found for reading in user provided path.");
return Result.EMPTY;
}
fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList, metaClient.getTableType());
}
FileStatus[] fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList, metaClient.getTableType());
if (fileStatuses.length == 0) {
LOG.warn("No files found for reading in user provided path.");
return Result.EMPTY;

View File

@@ -683,13 +683,18 @@ public class MergeOnReadInputFormat
// deleted
continue;
} else {
final RowKind rowKind = FormatUtils.getRowKindSafely(mergedAvroRecord.get(), this.operationPos);
if (!emitDelete && rowKind == RowKind.DELETE) {
// deleted
continue;
}
GenericRecord avroRecord = buildAvroRecordBySchema(
mergedAvroRecord.get(),
requiredSchema,
requiredPos,
recordBuilder);
this.currentRecord = (RowData) avroToRowDataConverter.convert(avroRecord);
FormatUtils.setRowKind(this.currentRecord, mergedAvroRecord.get(), this.operationPos);
this.currentRecord.setRowKind(rowKind);
return false;
}
}
@@ -746,9 +751,6 @@ public class MergeOnReadInputFormat
RowData curRow,
String curKey) throws IOException {
final HoodieRecord<?> record = scanner.getRecords().get(curKey);
if (!emitDelete && HoodieOperation.isDelete(record.getOperation())) {
return Option.empty();
}
GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow);
return record.getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema);
}

View File

@@ -39,6 +39,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.io.IOException;
@@ -221,8 +222,9 @@ public class TestInputFormat {
assertThat(actual2, is(expected2));
}
@Test
void testReadBaseAndLogFilesWithDisorderUpdateDelete() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testReadBaseAndLogFilesWithDisorderUpdateDelete(boolean compact) throws Exception {
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
beforeEach(HoodieTableType.MERGE_ON_READ, options);
@@ -233,7 +235,7 @@ public class TestInputFormat {
TestData.writeData(TestData.DATA_SET_SINGLE_INSERT, conf);
// write another commit using logs and read again.
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, compact);
TestData.writeData(TestData.DATA_SET_DISORDER_UPDATE_DELETE, conf);
InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
@@ -242,9 +244,11 @@ public class TestInputFormat {
// when isEmitDelete is false.
List<RowData> result1 = readData(inputFormat);
final String rowKind = compact ? "I" : "U";
final String expected = "[+" + rowKind + "[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]";
final String actual1 = TestData.rowDataToString(result1);
final String expected1 = "[+U[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]";
assertThat(actual1, is(expected1));
assertThat(actual1, is(expected));
// refresh the input format and set isEmitDelete to true.
this.tableSource.reset();
@@ -254,8 +258,7 @@ public class TestInputFormat {
List<RowData> result2 = readData(inputFormat);
final String actual2 = TestData.rowDataToString(result2);
final String expected2 = "[+U[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]";
assertThat(actual2, is(expected2));
assertThat(actual2, is(expected));
}
@Test