1
0

Initial impl of HoodieRealtimeInputFormat

- Works end-end for flat schemas
 - Schema evolution & hardening remains
 - HoodieClientExample can now write mor tables as well
This commit is contained in:
Vinoth Chandar
2017-04-24 08:19:02 -07:00
committed by prazanna
parent 9f526396a0
commit 7b1446548f
12 changed files with 745 additions and 27 deletions

View File

@@ -21,6 +21,7 @@ import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.table.HoodieTableConfig;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieIndexConfig;
@@ -36,6 +37,7 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.List;
import java.util.Properties;
/**
* Driver program that uses the Hoodie client with synthetic workload, and performs basic
@@ -47,14 +49,13 @@ public class HoodieClientExample {
private String tablePath = "file:///tmp/hoodie/sample-table";
@Parameter(names={"--table-name", "-n"}, description = "table name for Hoodie sample table")
private String tableName = "sample-table";
private String tableName = "hoodie_rt";
@Parameter(names={"--table-type", "-t"}, description = "table type")
@Parameter(names={"--table-type", "-t"}, description = "One of COPY_ON_WRITE or MERGE_ON_READ")
private String tableType = HoodieTableType.COPY_ON_WRITE.name();
private static Logger logger = LogManager.getLogger(HoodieClientExample.class);
public static void main(String[] args) throws Exception {
HoodieClientExample cli = new HoodieClientExample();
new JCommander(cli, args);
@@ -81,8 +82,7 @@ public class HoodieClientExample {
}
// Create the write client to write some records in
HoodieWriteConfig cfg =
HoodieWriteConfig.newBuilder().withPath(tablePath)
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable(tableName).withIndexConfig(
HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())

View File

@@ -131,7 +131,11 @@ public class HoodiePartitionMetadata {
}
// methods related to partition meta data
public static boolean hasPartitionMetadata(FileSystem fs, Path partitionPath) throws IOException {
return fs.exists(new Path(partitionPath, HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
public static boolean hasPartitionMetadata(FileSystem fs, Path partitionPath) {
try {
return fs.exists(new Path(partitionPath, HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
} catch (IOException ioe) {
throw new HoodieException("Error checking Hoodie partition metadata for " + partitionPath, ioe);
}
}
}

View File

@@ -98,7 +98,7 @@ public interface TableFileSystemView {
/**
* Group data files with corresponding delta files
* @param fs
*
* @param partitionPath
* @return
* @throws IOException

View File

@@ -207,7 +207,7 @@ public class HoodieTableFileSystemView implements TableFileSystemView, Serializa
l -> l.stream().sorted(HoodieLogFile.getLogVersionComparator())
.collect(toList())));
// Filter the delta files by the commit time of the latest base fine and collect as a list
// Filter the delta files by the commit time of the latest base file and collect as a list
Optional<HoodieInstant> lastTimestamp = metaClient.getActiveTimeline().lastInstant();
return lastTimestamp.map(hoodieInstant -> getLatestVersionInPartition(partitionPath,
hoodieInstant.getTimestamp()).map(

View File

@@ -135,6 +135,12 @@ public class FSUtils {
return datePartitions;
}
public static String getRelativePartitionPath(Path basePath, Path partitionPath) {
String partitionFullPath = partitionPath.toString();
int partitionStartIndex = partitionFullPath.lastIndexOf(basePath.getName());
return partitionFullPath.substring(partitionStartIndex + basePath.getName().length() + 1);
}
/**
* Obtain all the partition paths, that are present in this table, denoted by presence of {@link
* com.uber.hoodie.common.model.HoodiePartitionMetadata#HOODIE_PARTITION_METAFILE}
@@ -147,9 +153,7 @@ public class FSUtils {
while (allFiles.hasNext()) {
Path filePath = allFiles.next().getPath();
if (filePath.getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
String partitionFullPath = filePath.getParent().toString();
int partitionStartIndex = partitionFullPath.lastIndexOf(basePath.getName()) ;
partitions.add(partitionFullPath.substring(partitionStartIndex + basePath.getName().length() + 1));
partitions.add(getRelativePartitionPath(basePath, filePath.getParent()));
}
}
return partitions;

View File

@@ -31,6 +31,7 @@ import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import java.io.*;
import java.util.ArrayList;
@@ -80,17 +81,48 @@ public class ParquetUtils {
return rowKeys;
}
/**
*
* Read the metadata from a parquet file
*
* @param parquetFilePath
* @return
*/
public static ParquetMetadata readMetadata(Path parquetFilePath) {
return readMetadata(new Configuration(), parquetFilePath);
}
public static ParquetMetadata readMetadata(Configuration conf, Path parquetFilePath) {
ParquetMetadata footer;
try {
// TODO(vc): Should we use the parallel reading version here?
footer = ParquetFileReader.readFooter(conf, parquetFilePath);
} catch (IOException e) {
throw new HoodieIOException("Failed to read footer for parquet " + parquetFilePath,
e);
}
return footer;
}
/**
* Get the schema of the given parquet file.
*
* @param parquetFilePath
* @return
*/
public static MessageType readSchema(Path parquetFilePath) {
return readMetadata(parquetFilePath).getFileMetaData().getSchema();
}
/**
* Read out the bloom filter from the parquet file meta data.
*/
public static BloomFilter readBloomFilterFromParquetMetadata(Path parquetFilePath) {
ParquetMetadata footer;
try {
footer = ParquetFileReader.readFooter(new Configuration(), parquetFilePath);
} catch (IOException e) {
throw new HoodieIndexException("Failed to read footer for parquet " + parquetFilePath,
e);
}
ParquetMetadata footer = readMetadata(parquetFilePath);
Map<String, String> metadata = footer.getFileMetaData().getKeyValueMetaData();
if (metadata.containsKey(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY)) {
return new BloomFilter(metadata.get(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY));

View File

@@ -73,8 +73,10 @@ import static parquet.filter2.predicate.FilterApi.gt;
@UseFileSplitsFromInputFormat
public class HoodieInputFormat extends MapredParquetInputFormat
implements Configurable {
public static final Log LOG = LogFactory.getLog(HoodieInputFormat.class);
private Configuration conf;
protected Configuration conf;
@Override
public FileStatus[] listStatus(JobConf job) throws IOException {
@@ -82,7 +84,7 @@ public class HoodieInputFormat extends MapredParquetInputFormat
FileStatus[] fileStatuses = super.listStatus(job);
Map<HoodieTableMetaClient, List<FileStatus>> groupedFileStatus = groupFileStatus(fileStatuses);
LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
List<FileStatus> returns = new ArrayList<FileStatus>();
List<FileStatus> returns = new ArrayList<>();
for(Map.Entry<HoodieTableMetaClient, List<FileStatus>> entry:groupedFileStatus.entrySet()) {
HoodieTableMetaClient metadata = entry.getKey();
if(metadata == null) {
@@ -97,7 +99,8 @@ public class HoodieInputFormat extends MapredParquetInputFormat
}
String tableName = metadata.getTableConfig().getTableName();
String mode = HoodieHiveUtil.readMode(Job.getInstance(job), tableName);
HoodieTimeline timeline = metadata.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
// FIXME(VC): This is incorrect and needs to change to include commits, delta commits, compactions, as all of them produce a base parquet file today
HoodieTimeline timeline = metadata.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
TableFileSystemView fsView = new HoodieTableFileSystemView(metadata, timeline);
if (HoodieHiveUtil.INCREMENTAL_SCAN_MODE.equals(mode)) {
@@ -160,12 +163,16 @@ public class HoodieInputFormat extends MapredParquetInputFormat
Map<HoodieTableMetaClient, List<FileStatus>> grouped = new HashMap<>();
HoodieTableMetaClient metadata = null;
String nonHoodieBasePath = null;
for(FileStatus status:fileStatuses) {
for(FileStatus status: fileStatuses) {
if (!status.getPath().getName().endsWith(".parquet")) {
//FIXME(vc): skip non parquet files for now. This wont be needed once log file name start with "."
continue;
}
if ((metadata == null && nonHoodieBasePath == null) || (metadata == null && !status.getPath().toString()
.contains(nonHoodieBasePath)) || (metadata != null && !status.getPath().toString()
.contains(metadata.getBasePath()))) {
try {
metadata = getTableMetaClient(status.getPath().getParent());
metadata = getTableMetaClient(status.getPath().getFileSystem(conf), status.getPath().getParent());
nonHoodieBasePath = null;
} catch (InvalidDatasetException e) {
LOG.info("Handling a non-hoodie path " + status.getPath());
@@ -278,8 +285,7 @@ public class HoodieInputFormat extends MapredParquetInputFormat
* @return
* @throws IOException
*/
private HoodieTableMetaClient getTableMetaClient(Path dataPath) throws IOException {
FileSystem fs = dataPath.getFileSystem(conf);
protected static HoodieTableMetaClient getTableMetaClient(FileSystem fs, Path dataPath) {
int levels = HoodieHiveUtil.DEFAULT_LEVELS_TO_BASEPATH;
if (HoodiePartitionMetadata.hasPartitionMetadata(fs, dataPath)) {
HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, dataPath);
@@ -289,6 +295,5 @@ public class HoodieInputFormat extends MapredParquetInputFormat
Path baseDir = HoodieHiveUtil.getNthParent(dataPath, levels);
LOG.info("Reading hoodie metadata from path " + baseDir.toString());
return new HoodieTableMetaClient(fs, baseDir.toString());
}
}

View File

@@ -0,0 +1,31 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.hadoop.realtime;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
/**
* Simply extends ParquetHiveSerDe
*/
public class HoodieParquetSerde extends ParquetHiveSerDe {
public HoodieParquetSerde() {
super();
}
}

View File

@@ -0,0 +1,92 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.hadoop.realtime;
import org.apache.hadoop.mapred.FileSplit;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
/**
* Filesplit that wraps the base split and a list of log files to merge deltas from.
*/
public class HoodieRealtimeFileSplit extends FileSplit {
private List<String> deltaFilePaths;
private String maxCommitTime;
public HoodieRealtimeFileSplit() {
super();
}
public HoodieRealtimeFileSplit(FileSplit baseSplit, List<String> deltaLogFiles, String maxCommitTime) throws IOException {
super(baseSplit.getPath(), baseSplit.getStart(), baseSplit.getLength(), baseSplit.getLocations());
this.deltaFilePaths = deltaLogFiles;
this.maxCommitTime = maxCommitTime;
}
public List<String> getDeltaFilePaths() {
return deltaFilePaths;
}
public String getMaxCommitTime() {
return maxCommitTime;
}
private static void writeString(String str, DataOutput out) throws IOException {
byte[] pathBytes = str.getBytes(StandardCharsets.UTF_8);
out.writeInt(pathBytes.length);
out.write(pathBytes);
}
private static String readString(DataInput in) throws IOException {
byte[] pathBytes = new byte[in.readInt()];
in.readFully(pathBytes);
return new String(pathBytes, StandardCharsets.UTF_8);
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
writeString(maxCommitTime, out);
out.writeInt(deltaFilePaths.size());
for (String logFilePath: deltaFilePaths) {
writeString(logFilePath, out);
}
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
maxCommitTime = readString(in);
int totalLogFiles = in.readInt();
deltaFilePaths = new ArrayList<>(totalLogFiles);
for (int i=0; i < totalLogFiles; i++) {
deltaFilePaths.add(readString(in));
}
}
}

View File

@@ -0,0 +1,188 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.hadoop.realtime;
import com.google.common.base.Preconditions;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.log.HoodieLogFile;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.hadoop.HoodieInputFormat;
import com.uber.hoodie.hadoop.UseFileSplitsFromInputFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Input Format, that provides a real-time view of data in a Hoodie dataset
*/
@UseFileSplitsFromInputFormat
public class HoodieRealtimeInputFormat extends HoodieInputFormat implements Configurable {
public static final Log LOG = LogFactory.getLog(HoodieRealtimeInputFormat.class);
// These positions have to be deterministic across all tables
public static final int HOODIE_COMMIT_TIME_COL_POS = 0;
public static final int HOODIE_RECORD_KEY_COL_POS = 2;
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is);
// obtain all unique parent folders for splits
Map<Path, List<FileSplit>> partitionsToParquetSplits = fileSplits.collect(Collectors.groupingBy(split -> split.getPath().getParent()));
// TODO(vc): Should we handle also non-hoodie splits here?
Map<String, HoodieTableMetaClient> metaClientMap = new HashMap<>();
Map<Path, HoodieTableMetaClient> partitionsToMetaClient = partitionsToParquetSplits.keySet().stream()
.collect(Collectors.toMap(Function.identity(), p -> {
// find if we have a metaclient already for this partition.
Optional<String> matchingBasePath = metaClientMap.keySet().stream()
.filter(basePath -> p.toString().startsWith(basePath)).findFirst();
if (matchingBasePath.isPresent()) {
return metaClientMap.get(matchingBasePath.get());
}
try {
HoodieTableMetaClient metaClient = getTableMetaClient(p.getFileSystem(conf), p);
metaClientMap.put(metaClient.getBasePath(), metaClient);
return metaClient;
} catch (IOException e) {
throw new HoodieIOException("Error creating hoodie meta client against : " + p, e);
}
}));
// for all unique split parents, obtain all delta files based on delta commit timeline, grouped on file id
List<HoodieRealtimeFileSplit> rtSplits = new ArrayList<>();
partitionsToParquetSplits.keySet().stream().forEach(partitionPath -> {
// for each partition path obtain the data & log file groupings, then map back to inputsplits
HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath);
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath);
try {
Map<HoodieDataFile, List<HoodieLogFile>> dataLogFileGrouping = fsView.groupLatestDataFileWithLogFiles(relPartitionPath);
// subgroup splits again by file id & match with log files.
Map<String, List<FileSplit>> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream()
.collect(Collectors.groupingBy(split -> FSUtils.getFileId(split.getPath().getName())));
dataLogFileGrouping.forEach((dataFile, logFiles) -> {
List<FileSplit> dataFileSplits = groupedInputSplits.get(dataFile.getFileId());
dataFileSplits.forEach(split -> {
try {
List<String> logFilePaths = logFiles.stream().map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
String maxCommitTime = metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant().get().getTimestamp();
rtSplits.add(new HoodieRealtimeFileSplit(split, logFilePaths, maxCommitTime));
} catch (IOException e) {
throw new HoodieIOException("Error creating hoodie real time split ", e);
}
});
});
} catch (IOException e) {
throw new HoodieIOException("Error obtaining data file/log file grouping: " + partitionPath, e);
}
});
return rtSplits.toArray(new InputSplit[rtSplits.size()]);
}
@Override
public FileStatus[] listStatus(JobConf job) throws IOException {
// Call the HoodieInputFormat::listStatus to obtain all latest parquet files, based on commit timeline.
return super.listStatus(job);
}
private static Configuration addExtraReadColsIfNeeded(Configuration configuration) {
String readColNames = configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
String readColIds = configuration.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
if (!readColNames.contains(HoodieRecord.RECORD_KEY_METADATA_FIELD)) {
configuration.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
readColNames + "," + HoodieRecord.RECORD_KEY_METADATA_FIELD);
configuration.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,
readColIds + "," + HOODIE_RECORD_KEY_COL_POS);
LOG.info(String.format("Adding extra _hoodie_record_key column, to enable log merging cols (%s) ids (%s) ",
configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
configuration.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)));
}
if (!readColNames.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD)) {
configuration.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
readColNames + "," + HoodieRecord.COMMIT_TIME_METADATA_FIELD);
configuration.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,
readColIds + "," + HOODIE_COMMIT_TIME_COL_POS);
LOG.info(String.format("Adding extra _hoodie_commit_time column, to enable log merging cols (%s) ids (%s) ",
configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
configuration.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)));
}
return configuration;
}
@Override
public RecordReader<Void, ArrayWritable> getRecordReader(final InputSplit split,
final JobConf job,
final Reporter reporter) throws IOException {
LOG.info("Creating record reader with readCols :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
// sanity check
Preconditions.checkArgument(split instanceof HoodieRealtimeFileSplit,
"HoodieRealtimeRecordReader can only work on HoodieRealtimeFileSplit");
return new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) split, job, super.getRecordReader(split, job, reporter));
}
@Override
public void setConf(Configuration conf) {
this.conf = addExtraReadColsIfNeeded(conf);
}
@Override
public Configuration getConf() {
return addExtraReadColsIfNeeded(conf);
}
}

View File

@@ -0,0 +1,319 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.hadoop.realtime;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieDefaultTimeline;
import com.uber.hoodie.common.util.ParquetUtils;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.AvroFSInput;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.schema.MessageType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
/**
* Record Reader implementation to merge fresh avro data with base parquet data, to support real time
* queries.
*/
public class HoodieRealtimeRecordReader implements RecordReader<Void, ArrayWritable> {
private final RecordReader<Void, ArrayWritable> parquetReader;
private final HoodieRealtimeFileSplit split;
private final JobConf jobConf;
public static final Log LOG = LogFactory.getLog(HoodieRealtimeRecordReader.class);
private final HashMap<String, ArrayWritable> deltaRecordMap;
private final MessageType baseFileSchema;
public HoodieRealtimeRecordReader(HoodieRealtimeFileSplit split,
JobConf job,
RecordReader<Void, ArrayWritable> realReader) {
this.split = split;
this.jobConf = job;
this.parquetReader = realReader;
this.deltaRecordMap = new HashMap<>();
LOG.info("cfg ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
try {
baseFileSchema = ParquetUtils.readSchema(split.getPath());
readAndCompactLog();
} catch (IOException e) {
throw new HoodieIOException("Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e);
}
}
/**
* Goes through the log files and populates a map with latest version of each key logged, since the base split was written.
*/
private void readAndCompactLog() throws IOException {
Schema writerSchema = new AvroSchemaConverter().convert(baseFileSchema);
List<String> projectionFields = orderFields(
jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR),
jobConf.get("partition_columns"));
// TODO(vc): In the future, the reader schema should be updated based on log files & be able to null out fields not present before
Schema readerSchema = generateProjectionSchema(writerSchema, projectionFields);
// FIXME(vc): This is ugly.. Need to fix the usage everywhere
HoodieTimeline defTimeline = new HoodieDefaultTimeline();
LOG.info(String.format("About to read logs %s for base split %s, projecting cols %s",
split.getDeltaFilePaths(), split.getPath(), projectionFields));
for (String logFilePath: split.getDeltaFilePaths()) {
GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(writerSchema, readerSchema);
final AvroFSInput input = new AvroFSInput(FileContext.getFileContext(jobConf), new Path(logFilePath));
DataFileReader<GenericRecord> reader = (DataFileReader<GenericRecord>) DataFileReader.openReader(input, datumReader);
while (reader.hasNext()) {
GenericRecord rec = reader.next();
String key = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
String commitTime = rec.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
if (defTimeline.compareTimestamps(commitTime, split.getMaxCommitTime(), HoodieTimeline.GREATER)) {
// stop reading this log file. we hit a record later than max known commit time.
break;
}
// we assume, a later safe record in the log, is newer than what we have in the map & replace it.
ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(rec, writerSchema);
deltaRecordMap.put(key, aWritable);
if (LOG.isDebugEnabled()) {
LOG.debug("Log record : " + arrayWritableToString(aWritable));
}
}
reader.close();
}
}
private static String arrayWritableToString(ArrayWritable writable) {
if (writable == null) {
return "null";
}
StringBuilder builder = new StringBuilder();
Writable[] values = writable.get();
builder.append(String.format("Size: %s,", values.length));
for (Writable w: values) {
builder.append(w + " ");
}
return builder.toString();
}
/**
* Given a comma separated list of field names and positions at which they appear on Hive,
* return a ordered list of field names, that can be passed onto storage.
*
* @param fieldNameCsv
* @param fieldOrderCsv
* @return
*/
public static List<String> orderFields(String fieldNameCsv, String fieldOrderCsv, String partitioningFieldsCsv) {
String[] fieldOrders = fieldOrderCsv.split(",");
Set<String> partitioningFields = Arrays.stream(partitioningFieldsCsv.split(",")).collect(Collectors.toSet());
List<String> fieldNames = Arrays.stream(fieldNameCsv.split(",")).filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList());
// Hive does not provide ids for partitioning fields, so check for lengths excluding that.
if (fieldNames.size() != fieldOrders.length) {
throw new HoodieException(String.format("Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d",
fieldNames.size(), fieldOrders.length));
}
TreeMap<Integer, String> orderedFieldMap = new TreeMap<>();
for (int ox=0; ox < fieldOrders.length; ox++) {
orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNames.get(ox));
}
return orderedFieldMap.values().stream().collect(Collectors.toList());
}
/**
* Generate a reader schema off the provided writeSchema, to just project out
* the provided columns
*
* @param writeSchema
* @param fieldNames
* @return
*/
public static Schema generateProjectionSchema(Schema writeSchema, List<String> fieldNames) {
List<Schema.Field> projectedFields = new ArrayList<>();
for (String fn: fieldNames) {
Schema.Field field = writeSchema.getField(fn);
if (field == null) {
throw new HoodieException("Field "+ fn + " not found log schema. Query cannot proceed!");
}
projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue()));
}
return Schema.createRecord(projectedFields);
}
/**
* Convert the projected read from delta record into an array writable
*
* @param value
* @param schema
* @return
*/
public static Writable avroToArrayWritable(Object value, Schema schema) {
// if value is null, make a NullWritable
if (value == null) {
return NullWritable.get();
}
switch (schema.getType()) {
case STRING:
return new Text(value.toString());
case BYTES:
return new BytesWritable((byte[]) value);
case INT:
return new IntWritable((Integer) value);
case LONG:
return new LongWritable((Long) value);
case FLOAT:
return new FloatWritable((Float) value);
case DOUBLE:
return new DoubleWritable((Double) value);
case BOOLEAN:
return new BooleanWritable((Boolean) value);
case NULL:
return NullWritable.get();
case RECORD:
GenericRecord record = (GenericRecord) value;
Writable[] values1 = new Writable[schema.getFields().size()];
int index1 = 0;
for (Schema.Field field : schema.getFields()) {
values1[index1++] = avroToArrayWritable(record.get(field.name()), field.schema());
}
return new ArrayWritable(Writable.class, values1);
case ENUM:
return new Text(value.toString());
case ARRAY:
Writable[] values2 = new Writable[schema.getFields().size()];
int index2 = 0;
for (Object obj : (GenericArray) value) {
values2[index2++] = avroToArrayWritable(obj, schema.getElementType());
}
return new ArrayWritable(Writable.class, values2);
case MAP:
// TODO(vc): Need to add support for complex types
case UNION:
List<Schema> types = schema.getTypes();
if (types.size() != 2) {
throw new IllegalArgumentException("Only support union with 2 fields");
}
Schema s1 = types.get(0);
Schema s2 = types.get(1);
if (s1.getType() == Schema.Type.NULL) {
return avroToArrayWritable(value, s2);
} else if (s2.getType() == Schema.Type.NULL) {
return avroToArrayWritable(value, s1);
} else {
throw new IllegalArgumentException("Only support union with null");
}
case FIXED:
return new BytesWritable(((GenericFixed) value).bytes());
}
return null;
}
@Override
public boolean next(Void aVoid, ArrayWritable arrayWritable) throws IOException {
// Call the underlying parquetReader.next - which may replace the passed in ArrayWritable with a new block of values
boolean result = this.parquetReader.next(aVoid, arrayWritable);
if(!result) {
// if the result is false, then there are no more records
return false;
} else {
// TODO(VC): Right now, we assume all records in log, have a matching base record. (which would be true until we have a way to index logs too)
// return from delta records map if we have some match.
String key = arrayWritable.get()[HoodieRealtimeInputFormat.HOODIE_RECORD_KEY_COL_POS].toString();
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("key %s, base values: %s, log values: %s",
key, arrayWritableToString(arrayWritable), arrayWritableToString(deltaRecordMap.get(key))));
}
if (deltaRecordMap.containsKey(key)) {
arrayWritable.set(deltaRecordMap.get(key).get());
}
return true;
}
}
@Override
public Void createKey() {
return parquetReader.createKey();
}
@Override
public ArrayWritable createValue() {
return parquetReader.createValue();
}
@Override
public long getPos() throws IOException {
return parquetReader.getPos();
}
@Override
public void close() throws IOException {
parquetReader.close();
}
@Override
public float getProgress() throws IOException {
return parquetReader.getProgress();
}
}

View File

@@ -0,0 +1,43 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
import org.apache.spark.sql.SparkSession;
/**
* Examples to do Spark SQL on Hoodie dataset.
*/
public class HoodieSparkSQLExample {
public static void main(String[] args) throws Exception {
SparkSession spark = SparkSession.builder()
.appName("Hoodie SparkSQL")
.config("hive.metastore.uris","thrift://localhost:10000")
.config("spark.sql.hive.convertMetastoreParquet", false)
.enableHiveSupport()
.master("local[2]")
.getOrCreate();
spark.sql("describe hoodie_rt").show();
spark.sql("select * from hoodie_rt").show();
spark.sql("select end_lon as e1, driver, rider as r1, datestr, driver, datestr, rider, _hoodie_record_key from hoodie_rt").show();
spark.sql("select fare, begin_lon, begin_lat, timestamp from hoodie_rt where fare > 2.0").show();
spark.sql("select count(*) as cnt, _hoodie_file_name as file from hoodie_rt group by _hoodie_file_name").show();
}
}