1
0

Reformatting code per Google Code Style all over

This commit is contained in:
Vinoth Chandar
2017-11-12 22:54:56 -08:00
committed by vinoth chandar
parent 5a62480a92
commit e45679f5e2
254 changed files with 21580 additions and 21108 deletions

View File

@@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
*/
public class HoodieParquetSerde extends ParquetHiveSerDe {
public HoodieParquetSerde() {
super();
}
public HoodieParquetSerde() {
super();
}
}

View File

@@ -18,81 +18,83 @@
package com.uber.hoodie.hadoop.realtime;
import org.apache.hadoop.mapred.FileSplit;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.mapred.FileSplit;
/**
* Filesplit that wraps the base split and a list of log files to merge deltas from.
*/
public class HoodieRealtimeFileSplit extends FileSplit {
private List<String> deltaFilePaths;
private List<String> deltaFilePaths;
private String maxCommitTime;
private String maxCommitTime;
private String basePath;
private String basePath;
public HoodieRealtimeFileSplit() {
super();
public HoodieRealtimeFileSplit() {
super();
}
public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List<String> deltaLogFiles,
String maxCommitTime) throws IOException {
super(baseSplit.getPath(), baseSplit.getStart(), baseSplit.getLength(),
baseSplit.getLocations());
this.deltaFilePaths = deltaLogFiles;
this.maxCommitTime = maxCommitTime;
this.basePath = basePath;
}
public List<String> getDeltaFilePaths() {
return deltaFilePaths;
}
public String getMaxCommitTime() {
return maxCommitTime;
}
public String getBasePath() {
return basePath;
}
private static void writeString(String str, DataOutput out) throws IOException {
byte[] pathBytes = str.getBytes(StandardCharsets.UTF_8);
out.writeInt(pathBytes.length);
out.write(pathBytes);
}
private static String readString(DataInput in) throws IOException {
byte[] pathBytes = new byte[in.readInt()];
in.readFully(pathBytes);
return new String(pathBytes, StandardCharsets.UTF_8);
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
writeString(maxCommitTime, out);
out.writeInt(deltaFilePaths.size());
for (String logFilePath : deltaFilePaths) {
writeString(logFilePath, out);
}
}
public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List<String> deltaLogFiles, String maxCommitTime) throws IOException {
super(baseSplit.getPath(), baseSplit.getStart(), baseSplit.getLength(), baseSplit.getLocations());
this.deltaFilePaths = deltaLogFiles;
this.maxCommitTime = maxCommitTime;
this.basePath = basePath;
}
public List<String> getDeltaFilePaths() {
return deltaFilePaths;
}
public String getMaxCommitTime() {
return maxCommitTime;
}
public String getBasePath() {
return basePath;
}
private static void writeString(String str, DataOutput out) throws IOException {
byte[] pathBytes = str.getBytes(StandardCharsets.UTF_8);
out.writeInt(pathBytes.length);
out.write(pathBytes);
}
private static String readString(DataInput in) throws IOException {
byte[] pathBytes = new byte[in.readInt()];
in.readFully(pathBytes);
return new String(pathBytes, StandardCharsets.UTF_8);
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
writeString(maxCommitTime, out);
out.writeInt(deltaFilePaths.size());
for (String logFilePath: deltaFilePaths) {
writeString(logFilePath, out);
}
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
maxCommitTime = readString(in);
int totalLogFiles = in.readInt();
deltaFilePaths = new ArrayList<>(totalLogFiles);
for (int i=0; i < totalLogFiles; i++) {
deltaFilePaths.add(readString(in));
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
maxCommitTime = readString(in);
int totalLogFiles = in.readInt();
deltaFilePaths = new ArrayList<>(totalLogFiles);
for (int i = 0; i < totalLogFiles; i++) {
deltaFilePaths.add(readString(in));
}
}
}

View File

@@ -19,9 +19,7 @@
package com.uber.hoodie.hadoop.realtime;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
@@ -32,7 +30,16 @@ import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.hadoop.HoodieInputFormat;
import com.uber.hoodie.hadoop.UseFileSplitsFromInputFormat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
@@ -47,168 +54,168 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Input Format, that provides a real-time view of data in a Hoodie dataset
*/
@UseFileSplitsFromInputFormat
public class HoodieRealtimeInputFormat extends HoodieInputFormat implements Configurable {
public static final Log LOG = LogFactory.getLog(HoodieRealtimeInputFormat.class);
public static final Log LOG = LogFactory.getLog(HoodieRealtimeInputFormat.class);
// These positions have to be deterministic across all tables
public static final int HOODIE_COMMIT_TIME_COL_POS = 0;
public static final int HOODIE_RECORD_KEY_COL_POS = 2;
public static final int HOODIE_PARTITION_PATH_COL_POS = 3;
// These positions have to be deterministic across all tables
public static final int HOODIE_COMMIT_TIME_COL_POS = 0;
public static final int HOODIE_RECORD_KEY_COL_POS = 2;
public static final int HOODIE_PARTITION_PATH_COL_POS = 3;
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is);
Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, numSplits))
.map(is -> (FileSplit) is);
// obtain all unique parent folders for splits
Map<Path, List<FileSplit>> partitionsToParquetSplits = fileSplits.collect(Collectors.groupingBy(split -> split.getPath().getParent()));
// TODO(vc): Should we handle also non-hoodie splits here?
Map<String, HoodieTableMetaClient> metaClientMap = new HashMap<>();
Map<Path, HoodieTableMetaClient> partitionsToMetaClient = partitionsToParquetSplits.keySet().stream()
.collect(Collectors.toMap(Function.identity(), p -> {
// find if we have a metaclient already for this partition.
Optional<String> matchingBasePath = metaClientMap.keySet().stream()
.filter(basePath -> p.toString().startsWith(basePath)).findFirst();
if (matchingBasePath.isPresent()) {
return metaClientMap.get(matchingBasePath.get());
}
// obtain all unique parent folders for splits
Map<Path, List<FileSplit>> partitionsToParquetSplits = fileSplits
.collect(Collectors.groupingBy(split -> split.getPath().getParent()));
// TODO(vc): Should we handle also non-hoodie splits here?
Map<String, HoodieTableMetaClient> metaClientMap = new HashMap<>();
Map<Path, HoodieTableMetaClient> partitionsToMetaClient = partitionsToParquetSplits.keySet()
.stream()
.collect(Collectors.toMap(Function.identity(), p -> {
// find if we have a metaclient already for this partition.
Optional<String> matchingBasePath = metaClientMap.keySet().stream()
.filter(basePath -> p.toString().startsWith(basePath)).findFirst();
if (matchingBasePath.isPresent()) {
return metaClientMap.get(matchingBasePath.get());
}
try {
HoodieTableMetaClient metaClient = getTableMetaClient(p.getFileSystem(conf), p);
metaClientMap.put(metaClient.getBasePath(), metaClient);
return metaClient;
} catch (IOException e) {
throw new HoodieIOException("Error creating hoodie meta client against : " + p, e);
}
}));
try {
HoodieTableMetaClient metaClient = getTableMetaClient(p.getFileSystem(conf), p);
metaClientMap.put(metaClient.getBasePath(), metaClient);
return metaClient;
} catch (IOException e) {
throw new HoodieIOException("Error creating hoodie meta client against : " + p, e);
}
}));
// for all unique split parents, obtain all delta files based on delta commit timeline, grouped on file id
List<HoodieRealtimeFileSplit> rtSplits = new ArrayList<>();
partitionsToParquetSplits.keySet().stream().forEach(partitionPath -> {
// for each partition path obtain the data & log file groupings, then map back to inputsplits
HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath);
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath);
// for all unique split parents, obtain all delta files based on delta commit timeline, grouped on file id
List<HoodieRealtimeFileSplit> rtSplits = new ArrayList<>();
partitionsToParquetSplits.keySet().stream().forEach(partitionPath -> {
// for each partition path obtain the data & log file groupings, then map back to inputsplits
HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath);
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
metaClient.getActiveTimeline());
String relPartitionPath = FSUtils
.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath);
try {
Stream<FileSlice> latestFileSlices = fsView.getLatestFileSlices(relPartitionPath);
// subgroup splits again by file id & match with log files.
Map<String, List<FileSplit>> groupedInputSplits = partitionsToParquetSplits
.get(partitionPath).stream()
.collect(Collectors.groupingBy(split -> FSUtils.getFileId(split.getPath().getName())));
latestFileSlices.forEach(fileSlice -> {
List<FileSplit> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
dataFileSplits.forEach(split -> {
try {
Stream<FileSlice> latestFileSlices = fsView.getLatestFileSlices(relPartitionPath);
// subgroup splits again by file id & match with log files.
Map<String, List<FileSplit>> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream()
.collect(Collectors.groupingBy(split -> FSUtils.getFileId(split.getPath().getName())));
latestFileSlices.forEach(fileSlice -> {
List<FileSplit> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
dataFileSplits.forEach(split -> {
try {
List<String> logFilePaths = fileSlice.getLogFiles()
.map(logFile -> logFile.getPath().toString())
.collect(Collectors.toList());
// Get the maxCommit from the last delta or compaction or commit - when bootstrapped from COW table
String maxCommitTime = metaClient.getActiveTimeline()
.getTimelineOfActions(
Sets.newHashSet(HoodieTimeline.COMMIT_ACTION,
HoodieTimeline.COMPACTION_ACTION,
HoodieTimeline.DELTA_COMMIT_ACTION))
.filterCompletedInstants().lastInstant().get().getTimestamp();
rtSplits.add(
new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFilePaths, maxCommitTime));
} catch (IOException e) {
throw new HoodieIOException("Error creating hoodie real time split ", e);
}
});
});
} catch (Exception e) {
throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e);
List<String> logFilePaths = fileSlice.getLogFiles()
.map(logFile -> logFile.getPath().toString())
.collect(Collectors.toList());
// Get the maxCommit from the last delta or compaction or commit - when bootstrapped from COW table
String maxCommitTime = metaClient.getActiveTimeline()
.getTimelineOfActions(
Sets.newHashSet(HoodieTimeline.COMMIT_ACTION,
HoodieTimeline.COMPACTION_ACTION,
HoodieTimeline.DELTA_COMMIT_ACTION))
.filterCompletedInstants().lastInstant().get().getTimestamp();
rtSplits.add(
new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFilePaths,
maxCommitTime));
} catch (IOException e) {
throw new HoodieIOException("Error creating hoodie real time split ", e);
}
});
});
LOG.info("Returning a total splits of " + rtSplits.size());
return rtSplits.toArray(new InputSplit[rtSplits.size()]);
} catch (Exception e) {
throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath,
e);
}
});
LOG.info("Returning a total splits of " + rtSplits.size());
return rtSplits.toArray(new InputSplit[rtSplits.size()]);
}
@Override
public FileStatus[] listStatus(JobConf job) throws IOException {
// Call the HoodieInputFormat::listStatus to obtain all latest parquet files, based on commit timeline.
return super.listStatus(job);
}
/**
* Add a field to the existing fields projected
*/
private static Configuration addProjectionField(Configuration conf, String fieldName,
int fieldIndex) {
String readColNames = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "");
String readColIds = conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "");
String readColNamesPrefix = readColNames + ",";
if (readColNames == null || readColNames.isEmpty()) {
readColNamesPrefix = "";
}
String readColIdsPrefix = readColIds + ",";
if (readColIds == null || readColIds.isEmpty()) {
readColIdsPrefix = "";
}
@Override
public FileStatus[] listStatus(JobConf job) throws IOException {
// Call the HoodieInputFormat::listStatus to obtain all latest parquet files, based on commit timeline.
return super.listStatus(job);
if (!readColNames.contains(fieldName)) {
// If not already in the list - then add it
conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
readColNamesPrefix + fieldName);
conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, readColIdsPrefix + fieldIndex);
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Adding extra column " + fieldName
+ ", to enable log merging cols (%s) ids (%s) ",
conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)));
}
}
return conf;
}
/**
* Add a field to the existing fields projected
*/
private static Configuration addProjectionField(Configuration conf, String fieldName,
int fieldIndex) {
String readColNames = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "");
String readColIds = conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "");
private static Configuration addRequiredProjectionFields(Configuration configuration) {
// Need this to do merge records in HoodieRealtimeRecordReader
configuration = addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD,
HOODIE_RECORD_KEY_COL_POS);
configuration = addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD,
HOODIE_COMMIT_TIME_COL_POS);
configuration = addProjectionField(configuration,
HoodieRecord.PARTITION_PATH_METADATA_FIELD, HOODIE_PARTITION_PATH_COL_POS);
return configuration;
}
String readColNamesPrefix = readColNames + ",";
if (readColNames == null || readColNames.isEmpty()) {
readColNamesPrefix = "";
}
String readColIdsPrefix = readColIds + ",";
if (readColIds == null || readColIds.isEmpty()) {
readColIdsPrefix = "";
}
@Override
public RecordReader<Void, ArrayWritable> getRecordReader(final InputSplit split,
final JobConf job,
final Reporter reporter) throws IOException {
LOG.info("Creating record reader with readCols :" + job
.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
// sanity check
Preconditions.checkArgument(split instanceof HoodieRealtimeFileSplit,
"HoodieRealtimeRecordReader can only work on HoodieRealtimeFileSplit and not with "
+ split);
return new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) split, job,
super.getRecordReader(split, job, reporter));
}
if (!readColNames.contains(fieldName)) {
// If not already in the list - then add it
conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
readColNamesPrefix + fieldName);
conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, readColIdsPrefix + fieldIndex);
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Adding extra column " + fieldName
+ ", to enable log merging cols (%s) ids (%s) ",
conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)));
}
}
return conf;
}
@Override
public void setConf(Configuration conf) {
this.conf = addRequiredProjectionFields(conf);
}
private static Configuration addRequiredProjectionFields(Configuration configuration) {
// Need this to do merge records in HoodieRealtimeRecordReader
configuration = addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD,
HOODIE_RECORD_KEY_COL_POS);
configuration = addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD,
HOODIE_COMMIT_TIME_COL_POS);
configuration = addProjectionField(configuration,
HoodieRecord.PARTITION_PATH_METADATA_FIELD, HOODIE_PARTITION_PATH_COL_POS);
return configuration;
}
@Override
public RecordReader<Void, ArrayWritable> getRecordReader(final InputSplit split,
final JobConf job,
final Reporter reporter) throws IOException {
LOG.info("Creating record reader with readCols :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
// sanity check
Preconditions.checkArgument(split instanceof HoodieRealtimeFileSplit,
"HoodieRealtimeRecordReader can only work on HoodieRealtimeFileSplit and not with " + split );
return new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) split, job, super.getRecordReader(split, job, reporter));
}
@Override
public void setConf(Configuration conf) {
this.conf = addRequiredProjectionFields(conf);
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public Configuration getConf() {
return conf;
}
}

View File

@@ -18,14 +18,21 @@
package com.uber.hoodie.hadoop.realtime;
import com.uber.hoodie.common.model.HoodieAvroPayload;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.log.HoodieCompactedLogRecordScanner;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericFixed;
@@ -51,291 +58,274 @@ import parquet.avro.AvroSchemaConverter;
import parquet.hadoop.ParquetFileReader;
import parquet.schema.MessageType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
/**
* Record Reader implementation to merge fresh avro data with base parquet data, to support real time
* queries.
* Record Reader implementation to merge fresh avro data with base parquet data, to support real
* time queries.
*/
public class HoodieRealtimeRecordReader implements RecordReader<Void, ArrayWritable> {
private final RecordReader<Void, ArrayWritable> parquetReader;
private final HoodieRealtimeFileSplit split;
private final JobConf jobConf;
private final RecordReader<Void, ArrayWritable> parquetReader;
private final HoodieRealtimeFileSplit split;
private final JobConf jobConf;
public static final Log LOG = LogFactory.getLog(HoodieRealtimeRecordReader.class);
public static final Log LOG = LogFactory.getLog(HoodieRealtimeRecordReader.class);
private final HashMap<String, ArrayWritable> deltaRecordMap;
private final MessageType baseFileSchema;
private final HashMap<String, ArrayWritable> deltaRecordMap;
private final MessageType baseFileSchema;
public HoodieRealtimeRecordReader(HoodieRealtimeFileSplit split,
JobConf job,
RecordReader<Void, ArrayWritable> realReader) {
this.split = split;
this.jobConf = job;
this.parquetReader = realReader;
this.deltaRecordMap = new HashMap<>();
public HoodieRealtimeRecordReader(HoodieRealtimeFileSplit split,
JobConf job,
RecordReader<Void, ArrayWritable> realReader) {
this.split = split;
this.jobConf = job;
this.parquetReader = realReader;
this.deltaRecordMap = new HashMap<>();
LOG.info("cfg ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
try {
baseFileSchema = readSchema(jobConf, split.getPath());
readAndCompactLog();
} catch (IOException e) {
throw new HoodieIOException(
"Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e);
}
LOG.info("cfg ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
try {
baseFileSchema = readSchema(jobConf, split.getPath());
readAndCompactLog();
} catch (IOException e) {
throw new HoodieIOException(
"Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e);
}
}
/**
* Reads the schema from the parquet file. This is different from ParquetUtils as it uses the
* twitter parquet to support hive 1.1.0
*/
private static MessageType readSchema(Configuration conf, Path parquetFilePath) {
try {
return ParquetFileReader.readFooter(conf, parquetFilePath).getFileMetaData()
.getSchema();
} catch (IOException e) {
throw new HoodieIOException("Failed to read footer for parquet " + parquetFilePath,
e);
}
}
/**
* Goes through the log files and populates a map with latest version of each key logged, since
* the base split was written.
*/
private void readAndCompactLog() throws IOException {
Schema writerSchema = new AvroSchemaConverter().convert(baseFileSchema);
List<String> projectionFields = orderFields(
jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR),
jobConf.get("partition_columns", ""));
// TODO(vc): In the future, the reader schema should be updated based on log files & be able to null out fields not present before
Schema readerSchema = generateProjectionSchema(writerSchema, projectionFields);
LOG.info(
String.format("About to read compacted logs %s for base split %s, projecting cols %s",
split.getDeltaFilePaths(), split.getPath(), projectionFields));
HoodieCompactedLogRecordScanner compactedLogRecordScanner =
new HoodieCompactedLogRecordScanner(FSUtils.getFs(), split.getBasePath(),
split.getDeltaFilePaths(),
readerSchema, split.getMaxCommitTime());
// NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit
// but can return records for completed commits > the commit we are trying to read (if using readCommit() API)
for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : compactedLogRecordScanner) {
GenericRecord rec = (GenericRecord) hoodieRecord.getData().getInsertValue(readerSchema)
.get();
String key = hoodieRecord.getRecordKey();
// we assume, a later safe record in the log, is newer than what we have in the map & replace it.
ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(rec, writerSchema);
deltaRecordMap.put(key, aWritable);
if (LOG.isDebugEnabled()) {
LOG.debug("Log record : " + arrayWritableToString(aWritable));
}
}
}
private static String arrayWritableToString(ArrayWritable writable) {
if (writable == null) {
return "null";
}
/**
* Reads the schema from the parquet file. This is different from ParquetUtils as it uses the
* twitter parquet to support hive 1.1.0
*/
private static MessageType readSchema(Configuration conf, Path parquetFilePath) {
try {
return ParquetFileReader.readFooter(conf, parquetFilePath).getFileMetaData()
.getSchema();
} catch (IOException e) {
throw new HoodieIOException("Failed to read footer for parquet " + parquetFilePath,
e);
}
StringBuilder builder = new StringBuilder();
Writable[] values = writable.get();
builder.append(String.format("Size: %s,", values.length));
for (Writable w : values) {
builder.append(w + " ");
}
return builder.toString();
}
/**
* Given a comma separated list of field names and positions at which they appear on Hive, return
* a ordered list of field names, that can be passed onto storage.
*/
public static List<String> orderFields(String fieldNameCsv, String fieldOrderCsv,
String partitioningFieldsCsv) {
String[] fieldOrders = fieldOrderCsv.split(",");
Set<String> partitioningFields = Arrays.stream(partitioningFieldsCsv.split(","))
.collect(Collectors.toSet());
List<String> fieldNames = Arrays.stream(fieldNameCsv.split(","))
.filter(fn -> !partitioningFields.contains(fn)).collect(
Collectors.toList());
// Hive does not provide ids for partitioning fields, so check for lengths excluding that.
if (fieldNames.size() != fieldOrders.length) {
throw new HoodieException(String.format(
"Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d",
fieldNames.size(), fieldOrders.length));
}
TreeMap<Integer, String> orderedFieldMap = new TreeMap<>();
for (int ox = 0; ox < fieldOrders.length; ox++) {
orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNames.get(ox));
}
return new ArrayList<>(orderedFieldMap.values());
}
/**
* Generate a reader schema off the provided writeSchema, to just project out the provided
* columns
*/
public static Schema generateProjectionSchema(Schema writeSchema, List<String> fieldNames) {
List<Schema.Field> projectedFields = new ArrayList<>();
for (String fn : fieldNames) {
Schema.Field field = writeSchema.getField(fn);
if (field == null) {
throw new HoodieException("Field " + fn + " not found log schema. Query cannot proceed!");
}
projectedFields
.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue()));
}
return Schema.createRecord(projectedFields);
}
/**
* Goes through the log files and populates a map with latest version of each key logged, since the base split was written.
*/
private void readAndCompactLog() throws IOException {
Schema writerSchema = new AvroSchemaConverter().convert(baseFileSchema);
List<String> projectionFields = orderFields(
jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR),
jobConf.get("partition_columns", ""));
// TODO(vc): In the future, the reader schema should be updated based on log files & be able to null out fields not present before
Schema readerSchema = generateProjectionSchema(writerSchema, projectionFields);
/**
* Convert the projected read from delta record into an array writable
*/
public static Writable avroToArrayWritable(Object value, Schema schema) {
LOG.info(
String.format("About to read compacted logs %s for base split %s, projecting cols %s",
split.getDeltaFilePaths(), split.getPath(), projectionFields));
HoodieCompactedLogRecordScanner compactedLogRecordScanner =
new HoodieCompactedLogRecordScanner(FSUtils.getFs(), split.getBasePath(), split.getDeltaFilePaths(),
readerSchema, split.getMaxCommitTime());
// NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit
// but can return records for completed commits > the commit we are trying to read (if using readCommit() API)
for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : compactedLogRecordScanner) {
GenericRecord rec = (GenericRecord) hoodieRecord.getData().getInsertValue(readerSchema)
.get();
String key = hoodieRecord.getRecordKey();
// we assume, a later safe record in the log, is newer than what we have in the map & replace it.
ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(rec, writerSchema);
deltaRecordMap.put(key, aWritable);
if (LOG.isDebugEnabled()) {
LOG.debug("Log record : " + arrayWritableToString(aWritable));
}
}
// if value is null, make a NullWritable
if (value == null) {
return NullWritable.get();
}
private static String arrayWritableToString(ArrayWritable writable) {
if (writable == null) {
return "null";
switch (schema.getType()) {
case STRING:
return new Text(value.toString());
case BYTES:
return new BytesWritable((byte[]) value);
case INT:
return new IntWritable((Integer) value);
case LONG:
return new LongWritable((Long) value);
case FLOAT:
return new FloatWritable((Float) value);
case DOUBLE:
return new DoubleWritable((Double) value);
case BOOLEAN:
return new BooleanWritable((Boolean) value);
case NULL:
return NullWritable.get();
case RECORD:
GenericRecord record = (GenericRecord) value;
Writable[] values1 = new Writable[schema.getFields().size()];
int index1 = 0;
for (Schema.Field field : schema.getFields()) {
values1[index1++] = avroToArrayWritable(record.get(field.name()), field.schema());
}
StringBuilder builder = new StringBuilder();
Writable[] values = writable.get();
builder.append(String.format("Size: %s,", values.length));
for (Writable w: values) {
builder.append(w + " ");
return new ArrayWritable(Writable.class, values1);
case ENUM:
return new Text(value.toString());
case ARRAY:
GenericArray arrayValue = (GenericArray) value;
Writable[] values2 = new Writable[arrayValue.size()];
int index2 = 0;
for (Object obj : arrayValue) {
values2[index2++] = avroToArrayWritable(obj, schema.getElementType());
}
return builder.toString();
}
/**
* Given a comma separated list of field names and positions at which they appear on Hive,
* return a ordered list of field names, that can be passed onto storage.
*
* @param fieldNameCsv
* @param fieldOrderCsv
* @return
*/
public static List<String> orderFields(String fieldNameCsv, String fieldOrderCsv,
String partitioningFieldsCsv) {
String[] fieldOrders = fieldOrderCsv.split(",");
Set<String> partitioningFields = Arrays.stream(partitioningFieldsCsv.split(","))
.collect(Collectors.toSet());
List<String> fieldNames = Arrays.stream(fieldNameCsv.split(","))
.filter(fn -> !partitioningFields.contains(fn)).collect(
Collectors.toList());
// Hive does not provide ids for partitioning fields, so check for lengths excluding that.
if (fieldNames.size() != fieldOrders.length) {
throw new HoodieException(String.format(
"Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d",
fieldNames.size(), fieldOrders.length));
return new ArrayWritable(Writable.class, values2);
case MAP:
Map mapValue = (Map) value;
Writable[] values3 = new Writable[mapValue.size()];
int index3 = 0;
for (Object entry : mapValue.entrySet()) {
Map.Entry mapEntry = (Map.Entry) entry;
Writable[] mapValues = new Writable[2];
mapValues[0] = new Text(mapEntry.getKey().toString());
mapValues[1] = avroToArrayWritable(mapEntry.getValue(), schema.getValueType());
values3[index3++] = new ArrayWritable(Writable.class, mapValues);
}
TreeMap<Integer, String> orderedFieldMap = new TreeMap<>();
for (int ox = 0; ox < fieldOrders.length; ox++) {
orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNames.get(ox));
return new ArrayWritable(Writable.class, values3);
case UNION:
List<Schema> types = schema.getTypes();
if (types.size() != 2) {
throw new IllegalArgumentException("Only support union with 2 fields");
}
return new ArrayList<>(orderedFieldMap.values());
}
/**
* Generate a reader schema off the provided writeSchema, to just project out
* the provided columns
*
* @param writeSchema
* @param fieldNames
* @return
*/
public static Schema generateProjectionSchema(Schema writeSchema, List<String> fieldNames) {
List<Schema.Field> projectedFields = new ArrayList<>();
for (String fn: fieldNames) {
Schema.Field field = writeSchema.getField(fn);
if (field == null) {
throw new HoodieException("Field "+ fn + " not found log schema. Query cannot proceed!");
}
projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue()));
}
return Schema.createRecord(projectedFields);
}
/**
* Convert the projected read from delta record into an array writable
*
* @param value
* @param schema
* @return
*/
public static Writable avroToArrayWritable(Object value, Schema schema) {
// if value is null, make a NullWritable
if (value == null) {
return NullWritable.get();
}
switch (schema.getType()) {
case STRING:
return new Text(value.toString());
case BYTES:
return new BytesWritable((byte[]) value);
case INT:
return new IntWritable((Integer) value);
case LONG:
return new LongWritable((Long) value);
case FLOAT:
return new FloatWritable((Float) value);
case DOUBLE:
return new DoubleWritable((Double) value);
case BOOLEAN:
return new BooleanWritable((Boolean) value);
case NULL:
return NullWritable.get();
case RECORD:
GenericRecord record = (GenericRecord) value;
Writable[] values1 = new Writable[schema.getFields().size()];
int index1 = 0;
for (Schema.Field field : schema.getFields()) {
values1[index1++] = avroToArrayWritable(record.get(field.name()), field.schema());
}
return new ArrayWritable(Writable.class, values1);
case ENUM:
return new Text(value.toString());
case ARRAY:
GenericArray arrayValue = (GenericArray) value;
Writable[] values2 = new Writable[arrayValue.size()];
int index2 = 0;
for (Object obj : arrayValue) {
values2[index2++] = avroToArrayWritable(obj, schema.getElementType());
}
return new ArrayWritable(Writable.class, values2);
case MAP:
Map mapValue = (Map) value;
Writable[] values3 = new Writable[mapValue.size()];
int index3 = 0;
for (Object entry : mapValue.entrySet()) {
Map.Entry mapEntry = (Map.Entry) entry;
Writable[] mapValues = new Writable[2];
mapValues[0] = new Text(mapEntry.getKey().toString());
mapValues[1] = avroToArrayWritable(mapEntry.getValue(), schema.getValueType());
values3[index3++] = new ArrayWritable(Writable.class, mapValues);
}
return new ArrayWritable(Writable.class, values3);
case UNION:
List<Schema> types = schema.getTypes();
if (types.size() != 2) {
throw new IllegalArgumentException("Only support union with 2 fields");
}
Schema s1 = types.get(0);
Schema s2 = types.get(1);
if (s1.getType() == Schema.Type.NULL) {
return avroToArrayWritable(value, s2);
} else if (s2.getType() == Schema.Type.NULL) {
return avroToArrayWritable(value, s1);
} else {
throw new IllegalArgumentException("Only support union with null");
}
case FIXED:
return new BytesWritable(((GenericFixed) value).bytes());
}
return null;
}
@Override
public boolean next(Void aVoid, ArrayWritable arrayWritable) throws IOException {
// Call the underlying parquetReader.next - which may replace the passed in ArrayWritable with a new block of values
boolean result = this.parquetReader.next(aVoid, arrayWritable);
if(!result) {
// if the result is false, then there are no more records
return false;
Schema s1 = types.get(0);
Schema s2 = types.get(1);
if (s1.getType() == Schema.Type.NULL) {
return avroToArrayWritable(value, s2);
} else if (s2.getType() == Schema.Type.NULL) {
return avroToArrayWritable(value, s1);
} else {
// TODO(VC): Right now, we assume all records in log, have a matching base record. (which would be true until we have a way to index logs too)
// return from delta records map if we have some match.
String key = arrayWritable.get()[HoodieRealtimeInputFormat.HOODIE_RECORD_KEY_COL_POS].toString();
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("key %s, base values: %s, log values: %s",
key, arrayWritableToString(arrayWritable), arrayWritableToString(deltaRecordMap.get(key))));
}
if (deltaRecordMap.containsKey(key)) {
Writable[] replaceValue = deltaRecordMap.get(key).get();
Writable[] originalValue = arrayWritable.get();
System.arraycopy(replaceValue, 0, originalValue, 0, originalValue.length);
arrayWritable.set(originalValue);
}
return true;
throw new IllegalArgumentException("Only support union with null");
}
case FIXED:
return new BytesWritable(((GenericFixed) value).bytes());
}
return null;
}
@Override
public Void createKey() {
return parquetReader.createKey();
@Override
public boolean next(Void aVoid, ArrayWritable arrayWritable) throws IOException {
// Call the underlying parquetReader.next - which may replace the passed in ArrayWritable with a new block of values
boolean result = this.parquetReader.next(aVoid, arrayWritable);
if (!result) {
// if the result is false, then there are no more records
return false;
} else {
// TODO(VC): Right now, we assume all records in log, have a matching base record. (which would be true until we have a way to index logs too)
// return from delta records map if we have some match.
String key = arrayWritable.get()[HoodieRealtimeInputFormat.HOODIE_RECORD_KEY_COL_POS]
.toString();
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("key %s, base values: %s, log values: %s",
key, arrayWritableToString(arrayWritable),
arrayWritableToString(deltaRecordMap.get(key))));
}
if (deltaRecordMap.containsKey(key)) {
Writable[] replaceValue = deltaRecordMap.get(key).get();
Writable[] originalValue = arrayWritable.get();
System.arraycopy(replaceValue, 0, originalValue, 0, originalValue.length);
arrayWritable.set(originalValue);
}
return true;
}
}
@Override
public ArrayWritable createValue() {
return parquetReader.createValue();
}
@Override
public Void createKey() {
return parquetReader.createKey();
}
@Override
public long getPos() throws IOException {
return parquetReader.getPos();
}
@Override
public ArrayWritable createValue() {
return parquetReader.createValue();
}
@Override
public void close() throws IOException {
parquetReader.close();
}
@Override
public long getPos() throws IOException {
return parquetReader.getPos();
}
@Override
public float getProgress() throws IOException {
return parquetReader.getProgress();
}
@Override
public void close() throws IOException {
parquetReader.close();
}
@Override
public float getProgress() throws IOException {
return parquetReader.getProgress();
}
}