1
0

[HUDI-3130] Fixing Hive getSchema for RT tables addressing different partitions having different schemas (#4468)

* Fixing Hive getSchema for RT tables

* Addressing feedback

* temp diff

* fixing tests after spark datasource read support for metadata table is merged to master

* Adding multi-partition schema evolution tests to HoodieRealTimeRecordReader

Co-authored-by: Aditya Tiwari <aditya.tiwari@flipkart.com>
Co-authored-by: sivabalan <n.siva.b@gmail.com>
This commit is contained in:
Aditya Tiwari
2022-03-06 07:51:35 +05:30
committed by GitHub
parent 6a46130037
commit 051ad0b033
9 changed files with 174 additions and 56 deletions

View File

@@ -264,7 +264,7 @@ public class TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
.map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
.collect(Collectors.toList());
List<GenericRecord> recordsRead =
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), inputPaths, basePath(), new JobConf(hadoopConf()), true, false);
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), inputPaths, basePath(), new JobConf(hadoopConf()), true, populateMetaFields);
// Wrote 20 records and deleted 20 records, so remaining 20-20 = 0
assertEquals(0, recordsRead.size(), "Must contain 0 records");
}

View File

@@ -143,7 +143,7 @@ public class TableSchemaResolver {
* @throws Exception
*/
public Schema getTableAvroSchema() throws Exception {
return getTableAvroSchema(metaClient.getTableConfig().populateMetaFields());
return getTableAvroSchema(true);
}
/**

View File

@@ -194,6 +194,10 @@ public final class SchemaTestUtil {
return new Schema.Parser().parse(SchemaTestUtil.class.getResourceAsStream("/simple-test-evolved.avsc"));
}
public static Schema getEvolvedCompatibleSchema() throws IOException {
return new Schema.Parser().parse(SchemaTestUtil.class.getResourceAsStream("/simple-test-evolved-compatible.avsc"));
}
public static List<IndexedRecord> generateEvolvedTestRecords(int from, int limit)
throws IOException, URISyntaxException {
return toRecords(getSimpleSchema(), getEvolvedSchema(), from, limit);

View File

@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": "int"},
{"name": "favorite_color", "type": "string"},
{"name": "field1", "type": ["null", "string"], "default": null},
{"name": "field2", "type": ["null", "string"], "default": null}
]
}

View File

@@ -21,21 +21,18 @@ package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodiePayloadProps;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.LogReaderUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.InputSplitUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -57,6 +54,7 @@ public abstract class AbstractRealtimeRecordReader {
private Schema readerSchema;
private Schema writerSchema;
private Schema hiveSchema;
private HoodieTableMetaClient metaClient;
public AbstractRealtimeRecordReader(RealtimeSplit split, JobConf job) {
this.split = split;
@@ -65,15 +63,15 @@ public abstract class AbstractRealtimeRecordReader {
LOG.info("columnIds ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
LOG.info("partitioningColumns ==> " + job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""));
try {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jobConf).setBasePath(split.getBasePath()).build();
metaClient = HoodieTableMetaClient.builder().setConf(jobConf).setBasePath(split.getBasePath()).build();
if (metaClient.getTableConfig().getPreCombineField() != null) {
this.payloadProps.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, metaClient.getTableConfig().getPreCombineField());
}
this.usesCustomPayload = usesCustomPayload(metaClient);
LOG.info("usesCustomPayload ==> " + this.usesCustomPayload);
init();
} catch (IOException e) {
throw new HoodieIOException("Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e);
} catch (Exception e) {
throw new HoodieException("Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e);
}
}
@@ -83,19 +81,14 @@ public abstract class AbstractRealtimeRecordReader {
}
/**
* Goes through the log files in reverse order and finds the schema from the last available data block. If not, falls
* Gets schema from HoodieTableMetaClient. If not, falls
* back to the schema from the latest parquet file. Finally, sets the partition column and projection fields into the
* job conf.
*/
private void init() throws IOException {
Schema schemaFromLogFile = LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaLogFiles(), jobConf);
if (schemaFromLogFile == null) {
writerSchema = InputSplitUtils.getBaseFileSchema((FileSplit)split, jobConf);
LOG.info("Writer Schema From Parquet => " + writerSchema.getFields());
} else {
writerSchema = schemaFromLogFile;
LOG.info("Writer Schema From Log => " + writerSchema.toString(true));
}
private void init() throws Exception {
LOG.info("Getting writer schema from table avro schema ");
writerSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
// Add partitioning fields to writer schema for resulting row to contain null values for these fields
String partitionFields = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
List<String> partitioningFields =

View File

@@ -21,12 +21,18 @@ package org.apache.hudi.hadoop.functional;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.io.IOContextMap;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
import org.apache.hudi.hadoop.hive.HoodieCombineRealtimeFileSplit;
import org.apache.hudi.hadoop.hive.HoodieCombineRealtimeHiveSplit;
@@ -58,6 +64,7 @@ import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.stream.Collectors;
@@ -105,7 +112,9 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness {
// Create 3 partitions, each partition holds one parquet file and 1000 records
List<File> partitionDirs = InputFormatTestUtil
.prepareMultiPartitionedParquetTable(tempDir, schema, 3, numRecords, commitTime, HoodieTableType.MERGE_ON_READ);
InputFormatTestUtil.commit(tempDir, commitTime);
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT,
schema.toString(), HoodieTimeline.COMMIT_ACTION);
FileCreateUtils.createCommit(tempDir.toString(), commitTime, Option.of(commitMetadata));
TableDesc tblDesc = Utilities.defaultTd;
// Set the input format
@@ -185,7 +194,9 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness {
final int numRecords = 1000;
// Create 3 parquet files with 1000 records each
File partitionDir = InputFormatTestUtil.prepareParquetTable(tempDir, schema, 3, numRecords, commitTime);
InputFormatTestUtil.commit(tempDir, commitTime);
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT,
schema.toString(), HoodieTimeline.COMMIT_ACTION);
FileCreateUtils.createCommit(tempDir.toString(), commitTime, Option.of(commitMetadata));
TableDesc tblDesc = Utilities.defaultTd;
// Set the input format
@@ -255,7 +266,9 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness {
final int numRecords = 1000;
// Create 3 parquet files with 1000 records each
File partitionDir = InputFormatTestUtil.prepareParquetTable(tempDir, schema, 3, numRecords, commitTime);
InputFormatTestUtil.commit(tempDir, commitTime);
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT,
schema.toString(), HoodieTimeline.COMMIT_ACTION);
FileCreateUtils.createCommit(tempDir.toString(), commitTime, Option.of(commitMetadata));
String newCommitTime = "101";
// to trigger the bug of HUDI-1772, only update fileid2
@@ -323,7 +336,9 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness {
final int numRecords = 1000;
// Create 3 parquet files with 1000 records each
File partitionDir = InputFormatTestUtil.prepareParquetTable(tempDir, schema, 3, numRecords, commitTime);
InputFormatTestUtil.commit(tempDir, commitTime);
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT,
schema.toString(), HoodieTimeline.COMMIT_ACTION);
FileCreateUtils.createCommit(tempDir.toString(), commitTime, Option.of(commitMetadata));
// insert 1000 update records to log file 0
String newCommitTime = "101";

View File

@@ -51,7 +51,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
@@ -60,6 +60,7 @@ import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
@@ -85,7 +86,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -136,16 +136,6 @@ public class TestHoodieRealtimeRecordReader {
jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveOrderedColumnNames);
}
protected Properties getPropertiesForKeyGen() {
Properties properties = new Properties();
properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false");
properties.put("hoodie.datasource.write.recordkey.field", "_row_key");
properties.put("hoodie.datasource.write.partitionpath.field", "partition_path");
properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key");
properties.put(HoodieTableConfig.PARTITION_FIELDS.key(), "partition_path");
return properties;
}
@ParameterizedTest
@MethodSource("testArguments")
public void testReader(ExternalSpillableMap.DiskMapType diskMapType,
@@ -183,7 +173,10 @@ public class TestHoodieRealtimeRecordReader {
HoodieTableType.MERGE_ON_READ)
: InputFormatTestUtil.prepareNonPartitionedParquetTable(basePath, schema, 1, 100, baseInstant,
HoodieTableType.MERGE_ON_READ);
FileCreateUtils.createDeltaCommit(basePath.toString(), baseInstant);
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT,
schema.toString(), HoodieTimeline.DELTA_COMMIT_ACTION);
FileCreateUtils.createDeltaCommit(basePath.toString(), baseInstant, commitMetadata);
// Add the paths
FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
@@ -218,7 +211,7 @@ public class TestHoodieRealtimeRecordReader {
long size = writer.getCurrentSize();
writer.close();
assertTrue(size > 0, "block - size should be > 0");
FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime);
FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime, commitMetadata);
// create a split with baseFile (parquet file written earlier) and new log file(s)
fileSlice.addLogFile(writer.getLogFile());
@@ -291,7 +284,9 @@ public class TestHoodieRealtimeRecordReader {
final int secondBatchLastRecordKey = 2 * numRecords - 1;
File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, numRecords, instantTime,
HoodieTableType.MERGE_ON_READ);
FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime);
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT,
schema.toString(), HoodieTimeline.DELTA_COMMIT_ACTION);
FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime, commitMetadata);
// Add the paths
FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
@@ -303,7 +298,7 @@ public class TestHoodieRealtimeRecordReader {
long size = writer.getCurrentSize();
writer.close();
assertTrue(size > 0, "block - size should be > 0");
FileCreateUtils.createDeltaCommit(basePath.toString(), newCommitTime);
FileCreateUtils.createDeltaCommit(basePath.toString(), newCommitTime, commitMetadata);
// create a split with baseFile (parquet file written earlier) and new log file(s)
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
@@ -371,7 +366,10 @@ public class TestHoodieRealtimeRecordReader {
int numberOfLogRecords = numberOfRecords / 2;
File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, numberOfRecords,
instantTime, HoodieTableType.MERGE_ON_READ);
InputFormatTestUtil.commit(basePath, instantTime);
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT,
schema.toString(), HoodieTimeline.COMMIT_ACTION);
FileCreateUtils.createCommit(basePath.toString(), instantTime, Option.of(commitMetadata));
// Add the paths
FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
@@ -382,7 +380,9 @@ public class TestHoodieRealtimeRecordReader {
long size = writer.getCurrentSize();
writer.close();
assertTrue(size > 0, "block - size should be > 0");
InputFormatTestUtil.deltaCommit(basePath, newCommitTime);
commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT,
schema.toString(), HoodieTimeline.DELTA_COMMIT_ACTION);
FileCreateUtils.createDeltaCommit(basePath.toString(), newCommitTime, commitMetadata);
// create a split with baseFile (parquet file written earlier) and new log file(s)
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
@@ -507,7 +507,9 @@ public class TestHoodieRealtimeRecordReader {
File partitionDir =
InputFormatTestUtil.prepareSimpleParquetTable(basePath, schema, 1, numberOfRecords,
instantTime, HoodieTableType.MERGE_ON_READ);
InputFormatTestUtil.commit(basePath, instantTime);
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT,
schema.toString(), HoodieTimeline.COMMIT_ACTION);
FileCreateUtils.createCommit(basePath.toString(), instantTime, Option.of(commitMetadata));
// Add the paths
FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
List<Field> firstSchemaFields = schema.getFields();
@@ -529,7 +531,10 @@ public class TestHoodieRealtimeRecordReader {
newCommitTime, "101", 1);
logFiles.add(writer.getLogFile());
writer.close();
InputFormatTestUtil.deltaCommit(basePath, newCommitTime);
commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT,
schema.toString(), HoodieTimeline.DELTA_COMMIT_ACTION);
FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime, commitMetadata);
// create a split with baseFile (parquet file written earlier) and new log file(s)
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
@@ -572,6 +577,63 @@ public class TestHoodieRealtimeRecordReader {
reader.close();
}
@Test
public void testSchemaEvolution() throws Exception {
ExternalSpillableMap.DiskMapType diskMapType = ExternalSpillableMap.DiskMapType.BITCASK;
boolean isCompressionEnabled = true;
// initial commit
List<HoodieLogFile> logFiles = new ArrayList<>();
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ);
String instantTime = "100";
int numberOfRecords = 100;
int numberOfLogRecords = numberOfRecords / 2;
File partitionDir =
InputFormatTestUtil.prepareSimpleParquetTable(basePath, schema, 1, numberOfRecords,
instantTime, HoodieTableType.MERGE_ON_READ);
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT,
schema.toString(), HoodieTimeline.COMMIT_ACTION);
FileCreateUtils.createCommit(basePath.toString(), instantTime, Option.of(commitMetadata));
// Add the paths
FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
List<Field> firstSchemaFields = schema.getFields();
// 2nd commit w/ evolved schema
Schema evolvedSchema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedCompatibleSchema());
List<Field> secondSchemaFields = evolvedSchema.getFields();
String newCommitTime = "101";
File partitionDir1 =
InputFormatTestUtil.prepareSimpleParquetTable(basePath, evolvedSchema, 1, numberOfRecords,
instantTime, HoodieTableType.MERGE_ON_READ,"2017","05","01");
HoodieCommitMetadata commitMetadata1 = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT,
evolvedSchema.toString(), HoodieTimeline.COMMIT_ACTION);
FileCreateUtils.createCommit(basePath.toString(), newCommitTime, Option.of(commitMetadata1));
// Add the paths
FileInputFormat.setInputPaths(baseJobConf, partitionDir1.getPath());
// create a split with baseFile from 1st commit.
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
new FileSplit(new Path(partitionDir + "/fileid0_1_" + instantTime + ".parquet"), 0, 1, baseJobConf),
basePath.toUri().toString(), logFiles, newCommitTime, false, Option.empty());
// create a RecordReader to be used by HoodieRealtimeRecordReader
RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader(
new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), baseJobConf, null);
JobConf jobConf = new JobConf(baseJobConf);
// Try to read all the fields passed by the new schema
setHiveColumnNameProps(secondSchemaFields, jobConf, true);
// This time read only the fields which are part of parquet
HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader);
// use reader to read base Parquet File and log file
NullWritable key = recordReader.createKey();
ArrayWritable value = recordReader.createValue();
while (recordReader.next(key, value)) {
// keep reading
}
reader.close();
}
private static Stream<Arguments> testArguments() {
// Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled, Arg3: partitioned
return Stream.of(
@@ -595,8 +657,7 @@ public class TestHoodieRealtimeRecordReader {
final int numRecords = 1000;
File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, numRecords, instantTime,
HoodieTableType.MERGE_ON_READ);
//FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime);
createDeltaCommitFile(basePath, instantTime,"2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0");
createDeltaCommitFile(basePath, instantTime,"2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0", schema.toString());
// Add the paths
FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
@@ -607,7 +668,7 @@ public class TestHoodieRealtimeRecordReader {
InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime, newCommitTime,
numRecords, numRecords, 0);
writer.close();
createDeltaCommitFile(basePath, newCommitTime,"2016/05/01", "2016/05/01/.fileid0_100.log.1_1-0-1", "fileid0");
createDeltaCommitFile(basePath, newCommitTime,"2016/05/01", "2016/05/01/.fileid0_100.log.1_1-0-1", "fileid0", schema.toString());
InputFormatTestUtil.setupIncremental(baseJobConf, "101", 1);
@@ -644,8 +705,7 @@ public class TestHoodieRealtimeRecordReader {
String baseInstant = "100";
File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, 100, baseInstant,
HoodieTableType.MERGE_ON_READ);
//FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime);
createDeltaCommitFile(basePath, baseInstant,"2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0");
createDeltaCommitFile(basePath, baseInstant,"2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0", schema.toString());
// Add the paths
FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
@@ -727,13 +787,17 @@ public class TestHoodieRealtimeRecordReader {
String commitNumber,
String partitionPath,
String filePath,
String fileId) throws IOException {
String fileId,
String schemaStr) throws IOException {
List<HoodieWriteStat> writeStats = new ArrayList<>();
HoodieWriteStat writeStat = createHoodieWriteStat(basePath, commitNumber, partitionPath, filePath, fileId);
writeStats.add(writeStat);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
writeStats.forEach(stat -> commitMetadata.addWriteStat(partitionPath, stat));
if (schemaStr != null) {
commitMetadata.getExtraMetadata().put(HoodieCommitMetadata.SCHEMA_KEY, schemaStr);
}
File file = basePath.resolve(".hoodie").resolve(commitNumber + ".deltacommit").toFile();
file.createNewFile();
FileOutputStream fileOutputStream = new FileOutputStream(file);
@@ -765,7 +829,9 @@ public class TestHoodieRealtimeRecordReader {
long size = writer.getCurrentSize();
writer.close();
assertTrue(size > 0, "block - size should be > 0");
FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime);
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT,
schema.toString(), HoodieTimeline.COMMIT_ACTION);
FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime, commitMetadata);
// create a split with new log file(s)
fileSlice.addLogFile(new HoodieLogFile(writer.getLogFile().getPath(), size));
RealtimeFileStatus realtimeFileStatus = new RealtimeFileStatus(
@@ -807,7 +873,7 @@ public class TestHoodieRealtimeRecordReader {
String baseInstant = "100";
File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, 100, baseInstant,
HoodieTableType.MERGE_ON_READ);
createDeltaCommitFile(basePath, baseInstant,"2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0");
createDeltaCommitFile(basePath, baseInstant,"2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0", schema.toString());
// Add the paths
FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());

View File

@@ -223,9 +223,14 @@ public class InputFormatTestUtil {
public static File prepareSimpleParquetTable(java.nio.file.Path basePath, Schema schema, int numberOfFiles,
int numberOfRecords, String commitNumber, HoodieTableType tableType) throws Exception {
return prepareSimpleParquetTable(basePath, schema, numberOfFiles, numberOfRecords, commitNumber, tableType, "2016","05","01");
}
public static File prepareSimpleParquetTable(java.nio.file.Path basePath, Schema schema, int numberOfFiles,
int numberOfRecords, String commitNumber, HoodieTableType tableType, String year, String month, String date) throws Exception {
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), tableType, HoodieFileFormat.PARQUET);
java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", "01"));
java.nio.file.Path partitionPath = basePath.resolve(Paths.get(year, month, date));
setupPartition(basePath, partitionPath);
createSimpleData(schema, partitionPath, numberOfFiles, numberOfRecords, commitNumber);

View File

@@ -19,12 +19,13 @@
package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.HoodieDataSourceHelper._
import org.apache.hudi.HoodieMergeOnReadRDD.resolveAvroSchemaNullability
import org.apache.hudi.MergeOnReadSnapshotRelation.getFilePath
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.engine.HoodieLocalEngineContext
import org.apache.hudi.common.fs.FSUtils
@@ -309,10 +310,15 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
}
}
private def mergeRowWithLog(curRow: InternalRow, curKey: String) = {
private def mergeRowWithLog(curRow: InternalRow, curKey: String) : org.apache.hudi.common.util.Option[IndexedRecord] = {
val historyAvroRecord = serializer.serialize(curRow).asInstanceOf[GenericRecord]
logRecords.get(curKey).getData
val mergedRec = logRecords.get(curKey).getData
.combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema, payloadProps)
if (mergedRec.isPresent && mergedRec.get().getSchema != tableAvroSchema) {
org.apache.hudi.common.util.Option.of(HoodieAvroUtils.rewriteRecord(mergedRec.get().asInstanceOf[GenericRecord], tableAvroSchema).asInstanceOf[IndexedRecord])
} else {
mergedRec
}
}
}
}