diff --git a/hoodie-client/src/test/java/HoodieClientExample.java b/hoodie-client/src/test/java/HoodieClientExample.java index 98cf2cba5..1758a9b5e 100644 --- a/hoodie-client/src/test/java/HoodieClientExample.java +++ b/hoodie-client/src/test/java/HoodieClientExample.java @@ -21,6 +21,7 @@ import com.uber.hoodie.HoodieWriteClient; import com.uber.hoodie.common.HoodieTestDataGenerator; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieTableType; +import com.uber.hoodie.common.table.HoodieTableConfig; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieIndexConfig; @@ -36,6 +37,7 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.util.List; +import java.util.Properties; /** * Driver program that uses the Hoodie client with synthetic workload, and performs basic @@ -47,14 +49,13 @@ public class HoodieClientExample { private String tablePath = "file:///tmp/hoodie/sample-table"; @Parameter(names={"--table-name", "-n"}, description = "table name for Hoodie sample table") - private String tableName = "sample-table"; + private String tableName = "hoodie_rt"; - @Parameter(names={"--table-type", "-t"}, description = "table type") + @Parameter(names={"--table-type", "-t"}, description = "One of COPY_ON_WRITE or MERGE_ON_READ") private String tableType = HoodieTableType.COPY_ON_WRITE.name(); private static Logger logger = LogManager.getLogger(HoodieClientExample.class); - public static void main(String[] args) throws Exception { HoodieClientExample cli = new HoodieClientExample(); new JCommander(cli, args); @@ -81,8 +82,7 @@ public class HoodieClientExample { } // Create the write client to write some records in - HoodieWriteConfig cfg = - HoodieWriteConfig.newBuilder().withPath(tablePath) + HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .forTable(tableName).withIndexConfig( HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodiePartitionMetadata.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodiePartitionMetadata.java index 546253f3d..d0ddf16ca 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodiePartitionMetadata.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodiePartitionMetadata.java @@ -131,7 +131,11 @@ public class HoodiePartitionMetadata { } // methods related to partition meta data - public static boolean hasPartitionMetadata(FileSystem fs, Path partitionPath) throws IOException { - return fs.exists(new Path(partitionPath, HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)); + public static boolean hasPartitionMetadata(FileSystem fs, Path partitionPath) { + try { + return fs.exists(new Path(partitionPath, HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)); + } catch (IOException ioe) { + throw new HoodieException("Error checking Hoodie partition metadata for " + partitionPath, ioe); + } } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java index d1af640fe..c3f6fc66f 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java @@ -98,7 +98,7 @@ public interface TableFileSystemView { /** * Group data files with corresponding delta files - * @param fs + * * @param partitionPath * @return * @throws IOException diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java index b65f3dba4..4b10808b3 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java @@ -207,7 +207,7 @@ public class HoodieTableFileSystemView implements TableFileSystemView, Serializa l -> l.stream().sorted(HoodieLogFile.getLogVersionComparator()) .collect(toList()))); - // Filter the delta files by the commit time of the latest base fine and collect as a list + // Filter the delta files by the commit time of the latest base file and collect as a list Optional lastTimestamp = metaClient.getActiveTimeline().lastInstant(); return lastTimestamp.map(hoodieInstant -> getLatestVersionInPartition(partitionPath, hoodieInstant.getTimestamp()).map( diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java index 78ad5035f..9ce683292 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java @@ -135,6 +135,12 @@ public class FSUtils { return datePartitions; } + public static String getRelativePartitionPath(Path basePath, Path partitionPath) { + String partitionFullPath = partitionPath.toString(); + int partitionStartIndex = partitionFullPath.lastIndexOf(basePath.getName()); + return partitionFullPath.substring(partitionStartIndex + basePath.getName().length() + 1); + } + /** * Obtain all the partition paths, that are present in this table, denoted by presence of {@link * com.uber.hoodie.common.model.HoodiePartitionMetadata#HOODIE_PARTITION_METAFILE} @@ -147,9 +153,7 @@ public class FSUtils { while (allFiles.hasNext()) { Path filePath = allFiles.next().getPath(); if (filePath.getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) { - String partitionFullPath = filePath.getParent().toString(); - int partitionStartIndex = partitionFullPath.lastIndexOf(basePath.getName()) ; - partitions.add(partitionFullPath.substring(partitionStartIndex + basePath.getName().length() + 1)); + partitions.add(getRelativePartitionPath(basePath, filePath.getParent())); } } return partitions; diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java index 31f5ab3b1..cdb6baa55 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java @@ -31,6 +31,7 @@ import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; import java.io.*; import java.util.ArrayList; @@ -80,17 +81,48 @@ public class ParquetUtils { return rowKeys; } + + /** + * + * Read the metadata from a parquet file + * + * @param parquetFilePath + * @return + */ + public static ParquetMetadata readMetadata(Path parquetFilePath) { + return readMetadata(new Configuration(), parquetFilePath); + } + + public static ParquetMetadata readMetadata(Configuration conf, Path parquetFilePath) { + ParquetMetadata footer; + try { + // TODO(vc): Should we use the parallel reading version here? + footer = ParquetFileReader.readFooter(conf, parquetFilePath); + } catch (IOException e) { + throw new HoodieIOException("Failed to read footer for parquet " + parquetFilePath, + e); + } + return footer; + } + + + /** + * Get the schema of the given parquet file. + * + * @param parquetFilePath + * @return + */ + public static MessageType readSchema(Path parquetFilePath) { + return readMetadata(parquetFilePath).getFileMetaData().getSchema(); + } + + + /** * Read out the bloom filter from the parquet file meta data. */ public static BloomFilter readBloomFilterFromParquetMetadata(Path parquetFilePath) { - ParquetMetadata footer; - try { - footer = ParquetFileReader.readFooter(new Configuration(), parquetFilePath); - } catch (IOException e) { - throw new HoodieIndexException("Failed to read footer for parquet " + parquetFilePath, - e); - } + ParquetMetadata footer = readMetadata(parquetFilePath); Map metadata = footer.getFileMetaData().getKeyValueMetaData(); if (metadata.containsKey(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY)) { return new BloomFilter(metadata.get(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY)); diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java index e0fe295e6..1e827103f 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java @@ -73,8 +73,10 @@ import static parquet.filter2.predicate.FilterApi.gt; @UseFileSplitsFromInputFormat public class HoodieInputFormat extends MapredParquetInputFormat implements Configurable { + public static final Log LOG = LogFactory.getLog(HoodieInputFormat.class); - private Configuration conf; + + protected Configuration conf; @Override public FileStatus[] listStatus(JobConf job) throws IOException { @@ -82,7 +84,7 @@ public class HoodieInputFormat extends MapredParquetInputFormat FileStatus[] fileStatuses = super.listStatus(job); Map> groupedFileStatus = groupFileStatus(fileStatuses); LOG.info("Found a total of " + groupedFileStatus.size() + " groups"); - List returns = new ArrayList(); + List returns = new ArrayList<>(); for(Map.Entry> entry:groupedFileStatus.entrySet()) { HoodieTableMetaClient metadata = entry.getKey(); if(metadata == null) { @@ -97,7 +99,8 @@ public class HoodieInputFormat extends MapredParquetInputFormat } String tableName = metadata.getTableConfig().getTableName(); String mode = HoodieHiveUtil.readMode(Job.getInstance(job), tableName); - HoodieTimeline timeline = metadata.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + // FIXME(VC): This is incorrect and needs to change to include commits, delta commits, compactions, as all of them produce a base parquet file today + HoodieTimeline timeline = metadata.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants(); TableFileSystemView fsView = new HoodieTableFileSystemView(metadata, timeline); if (HoodieHiveUtil.INCREMENTAL_SCAN_MODE.equals(mode)) { @@ -160,12 +163,16 @@ public class HoodieInputFormat extends MapredParquetInputFormat Map> grouped = new HashMap<>(); HoodieTableMetaClient metadata = null; String nonHoodieBasePath = null; - for(FileStatus status:fileStatuses) { + for(FileStatus status: fileStatuses) { + if (!status.getPath().getName().endsWith(".parquet")) { + //FIXME(vc): skip non parquet files for now. This wont be needed once log file name start with "." + continue; + } if ((metadata == null && nonHoodieBasePath == null) || (metadata == null && !status.getPath().toString() .contains(nonHoodieBasePath)) || (metadata != null && !status.getPath().toString() .contains(metadata.getBasePath()))) { try { - metadata = getTableMetaClient(status.getPath().getParent()); + metadata = getTableMetaClient(status.getPath().getFileSystem(conf), status.getPath().getParent()); nonHoodieBasePath = null; } catch (InvalidDatasetException e) { LOG.info("Handling a non-hoodie path " + status.getPath()); @@ -278,8 +285,7 @@ public class HoodieInputFormat extends MapredParquetInputFormat * @return * @throws IOException */ - private HoodieTableMetaClient getTableMetaClient(Path dataPath) throws IOException { - FileSystem fs = dataPath.getFileSystem(conf); + protected static HoodieTableMetaClient getTableMetaClient(FileSystem fs, Path dataPath) { int levels = HoodieHiveUtil.DEFAULT_LEVELS_TO_BASEPATH; if (HoodiePartitionMetadata.hasPartitionMetadata(fs, dataPath)) { HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, dataPath); @@ -289,6 +295,5 @@ public class HoodieInputFormat extends MapredParquetInputFormat Path baseDir = HoodieHiveUtil.getNthParent(dataPath, levels); LOG.info("Reading hoodie metadata from path " + baseDir.toString()); return new HoodieTableMetaClient(fs, baseDir.toString()); - } } diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieParquetSerde.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieParquetSerde.java new file mode 100644 index 000000000..701ab90a9 --- /dev/null +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieParquetSerde.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.hadoop.realtime; + +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; + +/** + * Simply extends ParquetHiveSerDe + */ +public class HoodieParquetSerde extends ParquetHiveSerDe { + + public HoodieParquetSerde() { + super(); + } +} diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeFileSplit.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeFileSplit.java new file mode 100644 index 000000000..a352f48a5 --- /dev/null +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeFileSplit.java @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.hadoop.realtime; + +import org.apache.hadoop.mapred.FileSplit; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +/** + * Filesplit that wraps the base split and a list of log files to merge deltas from. + */ +public class HoodieRealtimeFileSplit extends FileSplit { + + private List deltaFilePaths; + + private String maxCommitTime; + + + public HoodieRealtimeFileSplit() { + super(); + } + + public HoodieRealtimeFileSplit(FileSplit baseSplit, List deltaLogFiles, String maxCommitTime) throws IOException { + super(baseSplit.getPath(), baseSplit.getStart(), baseSplit.getLength(), baseSplit.getLocations()); + this.deltaFilePaths = deltaLogFiles; + this.maxCommitTime = maxCommitTime; + } + + public List getDeltaFilePaths() { + return deltaFilePaths; + } + + public String getMaxCommitTime() { + return maxCommitTime; + } + + private static void writeString(String str, DataOutput out) throws IOException { + byte[] pathBytes = str.getBytes(StandardCharsets.UTF_8); + out.writeInt(pathBytes.length); + out.write(pathBytes); + } + + private static String readString(DataInput in) throws IOException { + byte[] pathBytes = new byte[in.readInt()]; + in.readFully(pathBytes); + return new String(pathBytes, StandardCharsets.UTF_8); + } + + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + + writeString(maxCommitTime, out); + out.writeInt(deltaFilePaths.size()); + for (String logFilePath: deltaFilePaths) { + writeString(logFilePath, out); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + + maxCommitTime = readString(in); + int totalLogFiles = in.readInt(); + deltaFilePaths = new ArrayList<>(totalLogFiles); + for (int i=0; i < totalLogFiles; i++) { + deltaFilePaths.add(readString(in)); + } + } +} diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java new file mode 100644 index 000000000..3a60c2187 --- /dev/null +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java @@ -0,0 +1,188 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.hadoop.realtime; + +import com.google.common.base.Preconditions; + +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.log.HoodieLogFile; +import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; +import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.exception.HoodieIOException; +import com.uber.hoodie.hadoop.HoodieInputFormat; +import com.uber.hoodie.hadoop.UseFileSplitsFromInputFormat; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Input Format, that provides a real-time view of data in a Hoodie dataset + */ +@UseFileSplitsFromInputFormat +public class HoodieRealtimeInputFormat extends HoodieInputFormat implements Configurable { + + public static final Log LOG = LogFactory.getLog(HoodieRealtimeInputFormat.class); + + // These positions have to be deterministic across all tables + public static final int HOODIE_COMMIT_TIME_COL_POS = 0; + public static final int HOODIE_RECORD_KEY_COL_POS = 2; + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + + Stream fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is); + + // obtain all unique parent folders for splits + Map> partitionsToParquetSplits = fileSplits.collect(Collectors.groupingBy(split -> split.getPath().getParent())); + // TODO(vc): Should we handle also non-hoodie splits here? + Map metaClientMap = new HashMap<>(); + Map partitionsToMetaClient = partitionsToParquetSplits.keySet().stream() + .collect(Collectors.toMap(Function.identity(), p -> { + // find if we have a metaclient already for this partition. + Optional matchingBasePath = metaClientMap.keySet().stream() + .filter(basePath -> p.toString().startsWith(basePath)).findFirst(); + if (matchingBasePath.isPresent()) { + return metaClientMap.get(matchingBasePath.get()); + } + + try { + HoodieTableMetaClient metaClient = getTableMetaClient(p.getFileSystem(conf), p); + metaClientMap.put(metaClient.getBasePath(), metaClient); + return metaClient; + } catch (IOException e) { + throw new HoodieIOException("Error creating hoodie meta client against : " + p, e); + } + })); + + // for all unique split parents, obtain all delta files based on delta commit timeline, grouped on file id + List rtSplits = new ArrayList<>(); + partitionsToParquetSplits.keySet().stream().forEach(partitionPath -> { + // for each partition path obtain the data & log file groupings, then map back to inputsplits + HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath); + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline()); + String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath); + + try { + Map> dataLogFileGrouping = fsView.groupLatestDataFileWithLogFiles(relPartitionPath); + + // subgroup splits again by file id & match with log files. + Map> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream() + .collect(Collectors.groupingBy(split -> FSUtils.getFileId(split.getPath().getName()))); + dataLogFileGrouping.forEach((dataFile, logFiles) -> { + List dataFileSplits = groupedInputSplits.get(dataFile.getFileId()); + dataFileSplits.forEach(split -> { + try { + List logFilePaths = logFiles.stream().map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()); + String maxCommitTime = metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant().get().getTimestamp(); + rtSplits.add(new HoodieRealtimeFileSplit(split, logFilePaths, maxCommitTime)); + } catch (IOException e) { + throw new HoodieIOException("Error creating hoodie real time split ", e); + } + }); + }); + } catch (IOException e) { + throw new HoodieIOException("Error obtaining data file/log file grouping: " + partitionPath, e); + } + }); + + return rtSplits.toArray(new InputSplit[rtSplits.size()]); + } + + + @Override + public FileStatus[] listStatus(JobConf job) throws IOException { + // Call the HoodieInputFormat::listStatus to obtain all latest parquet files, based on commit timeline. + return super.listStatus(job); + } + + + private static Configuration addExtraReadColsIfNeeded(Configuration configuration) { + String readColNames = configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR); + String readColIds = configuration.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR); + + if (!readColNames.contains(HoodieRecord.RECORD_KEY_METADATA_FIELD)) { + configuration.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, + readColNames + "," + HoodieRecord.RECORD_KEY_METADATA_FIELD); + configuration.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, + readColIds + "," + HOODIE_RECORD_KEY_COL_POS); + LOG.info(String.format("Adding extra _hoodie_record_key column, to enable log merging cols (%s) ids (%s) ", + configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), + configuration.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR))); + } + + if (!readColNames.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD)) { + configuration.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, + readColNames + "," + HoodieRecord.COMMIT_TIME_METADATA_FIELD); + configuration.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, + readColIds + "," + HOODIE_COMMIT_TIME_COL_POS); + LOG.info(String.format("Adding extra _hoodie_commit_time column, to enable log merging cols (%s) ids (%s) ", + configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), + configuration.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR))); + } + + return configuration; + } + + + + @Override + public RecordReader getRecordReader(final InputSplit split, + final JobConf job, + final Reporter reporter) throws IOException { + LOG.info("Creating record reader with readCols :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)); + // sanity check + Preconditions.checkArgument(split instanceof HoodieRealtimeFileSplit, + "HoodieRealtimeRecordReader can only work on HoodieRealtimeFileSplit"); + return new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) split, job, super.getRecordReader(split, job, reporter)); + } + + @Override + public void setConf(Configuration conf) { + this.conf = addExtraReadColsIfNeeded(conf); + } + + @Override + public Configuration getConf() { + return addExtraReadColsIfNeeded(conf); + } +} diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java new file mode 100644 index 000000000..3d619ec82 --- /dev/null +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java @@ -0,0 +1,319 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.hadoop.realtime; + +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.timeline.HoodieDefaultTimeline; +import com.uber.hoodie.common.util.ParquetUtils; +import com.uber.hoodie.exception.HoodieException; +import com.uber.hoodie.exception.HoodieIOException; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.generic.GenericArray; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.AvroFSInput; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.schema.MessageType; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; + +/** + * Record Reader implementation to merge fresh avro data with base parquet data, to support real time + * queries. + */ +public class HoodieRealtimeRecordReader implements RecordReader { + + private final RecordReader parquetReader; + private final HoodieRealtimeFileSplit split; + private final JobConf jobConf; + + public static final Log LOG = LogFactory.getLog(HoodieRealtimeRecordReader.class); + + private final HashMap deltaRecordMap; + private final MessageType baseFileSchema; + + + public HoodieRealtimeRecordReader(HoodieRealtimeFileSplit split, + JobConf job, + RecordReader realReader) { + this.split = split; + this.jobConf = job; + this.parquetReader = realReader; + this.deltaRecordMap = new HashMap<>(); + + LOG.info("cfg ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)); + try { + baseFileSchema = ParquetUtils.readSchema(split.getPath()); + readAndCompactLog(); + } catch (IOException e) { + throw new HoodieIOException("Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e); + } + } + + /** + * Goes through the log files and populates a map with latest version of each key logged, since the base split was written. + */ + private void readAndCompactLog() throws IOException { + Schema writerSchema = new AvroSchemaConverter().convert(baseFileSchema); + List projectionFields = orderFields( + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR), + jobConf.get("partition_columns")); + // TODO(vc): In the future, the reader schema should be updated based on log files & be able to null out fields not present before + Schema readerSchema = generateProjectionSchema(writerSchema, projectionFields); + + // FIXME(vc): This is ugly.. Need to fix the usage everywhere + HoodieTimeline defTimeline = new HoodieDefaultTimeline(); + + LOG.info(String.format("About to read logs %s for base split %s, projecting cols %s", + split.getDeltaFilePaths(), split.getPath(), projectionFields)); + for (String logFilePath: split.getDeltaFilePaths()) { + GenericDatumReader datumReader = new GenericDatumReader<>(writerSchema, readerSchema); + final AvroFSInput input = new AvroFSInput(FileContext.getFileContext(jobConf), new Path(logFilePath)); + DataFileReader reader = (DataFileReader) DataFileReader.openReader(input, datumReader); + while (reader.hasNext()) { + GenericRecord rec = reader.next(); + String key = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); + String commitTime = rec.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(); + if (defTimeline.compareTimestamps(commitTime, split.getMaxCommitTime(), HoodieTimeline.GREATER)) { + // stop reading this log file. we hit a record later than max known commit time. + break; + } + + // we assume, a later safe record in the log, is newer than what we have in the map & replace it. + ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(rec, writerSchema); + deltaRecordMap.put(key, aWritable); + if (LOG.isDebugEnabled()) { + LOG.debug("Log record : " + arrayWritableToString(aWritable)); + } + } + reader.close(); + } + } + + private static String arrayWritableToString(ArrayWritable writable) { + if (writable == null) { + return "null"; + } + + StringBuilder builder = new StringBuilder(); + Writable[] values = writable.get(); + builder.append(String.format("Size: %s,", values.length)); + for (Writable w: values) { + builder.append(w + " "); + } + return builder.toString(); + } + + + /** + * Given a comma separated list of field names and positions at which they appear on Hive, + * return a ordered list of field names, that can be passed onto storage. + * + * @param fieldNameCsv + * @param fieldOrderCsv + * @return + */ + public static List orderFields(String fieldNameCsv, String fieldOrderCsv, String partitioningFieldsCsv) { + + String[] fieldOrders = fieldOrderCsv.split(","); + Set partitioningFields = Arrays.stream(partitioningFieldsCsv.split(",")).collect(Collectors.toSet()); + List fieldNames = Arrays.stream(fieldNameCsv.split(",")).filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList()); + + // Hive does not provide ids for partitioning fields, so check for lengths excluding that. + if (fieldNames.size() != fieldOrders.length) { + throw new HoodieException(String.format("Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d", + fieldNames.size(), fieldOrders.length)); + } + TreeMap orderedFieldMap = new TreeMap<>(); + for (int ox=0; ox < fieldOrders.length; ox++) { + orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNames.get(ox)); + } + return orderedFieldMap.values().stream().collect(Collectors.toList()); + } + + + + /** + * Generate a reader schema off the provided writeSchema, to just project out + * the provided columns + * + * @param writeSchema + * @param fieldNames + * @return + */ + public static Schema generateProjectionSchema(Schema writeSchema, List fieldNames) { + List projectedFields = new ArrayList<>(); + for (String fn: fieldNames) { + Schema.Field field = writeSchema.getField(fn); + if (field == null) { + throw new HoodieException("Field "+ fn + " not found log schema. Query cannot proceed!"); + } + projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue())); + } + + return Schema.createRecord(projectedFields); + } + + /** + * Convert the projected read from delta record into an array writable + * + * @param value + * @param schema + * @return + */ + public static Writable avroToArrayWritable(Object value, Schema schema) { + + // if value is null, make a NullWritable + if (value == null) { + return NullWritable.get(); + } + + switch (schema.getType()) { + case STRING: + return new Text(value.toString()); + case BYTES: + return new BytesWritable((byte[]) value); + case INT: + return new IntWritable((Integer) value); + case LONG: + return new LongWritable((Long) value); + case FLOAT: + return new FloatWritable((Float) value); + case DOUBLE: + return new DoubleWritable((Double) value); + case BOOLEAN: + return new BooleanWritable((Boolean) value); + case NULL: + return NullWritable.get(); + case RECORD: + GenericRecord record = (GenericRecord) value; + Writable[] values1 = new Writable[schema.getFields().size()]; + int index1 = 0; + for (Schema.Field field : schema.getFields()) { + values1[index1++] = avroToArrayWritable(record.get(field.name()), field.schema()); + } + return new ArrayWritable(Writable.class, values1); + case ENUM: + return new Text(value.toString()); + case ARRAY: + Writable[] values2 = new Writable[schema.getFields().size()]; + int index2 = 0; + for (Object obj : (GenericArray) value) { + values2[index2++] = avroToArrayWritable(obj, schema.getElementType()); + } + return new ArrayWritable(Writable.class, values2); + case MAP: + // TODO(vc): Need to add support for complex types + case UNION: + List types = schema.getTypes(); + if (types.size() != 2) { + throw new IllegalArgumentException("Only support union with 2 fields"); + } + Schema s1 = types.get(0); + Schema s2 = types.get(1); + if (s1.getType() == Schema.Type.NULL) { + return avroToArrayWritable(value, s2); + } else if (s2.getType() == Schema.Type.NULL) { + return avroToArrayWritable(value, s1); + } else { + throw new IllegalArgumentException("Only support union with null"); + } + case FIXED: + return new BytesWritable(((GenericFixed) value).bytes()); + } + return null; + } + + + @Override + public boolean next(Void aVoid, ArrayWritable arrayWritable) throws IOException { + // Call the underlying parquetReader.next - which may replace the passed in ArrayWritable with a new block of values + boolean result = this.parquetReader.next(aVoid, arrayWritable); + if(!result) { + // if the result is false, then there are no more records + return false; + } else { + // TODO(VC): Right now, we assume all records in log, have a matching base record. (which would be true until we have a way to index logs too) + // return from delta records map if we have some match. + String key = arrayWritable.get()[HoodieRealtimeInputFormat.HOODIE_RECORD_KEY_COL_POS].toString(); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("key %s, base values: %s, log values: %s", + key, arrayWritableToString(arrayWritable), arrayWritableToString(deltaRecordMap.get(key)))); + } + if (deltaRecordMap.containsKey(key)) { + arrayWritable.set(deltaRecordMap.get(key).get()); + } + return true; + } + } + + @Override + public Void createKey() { + return parquetReader.createKey(); + } + + @Override + public ArrayWritable createValue() { + return parquetReader.createValue(); + } + + @Override + public long getPos() throws IOException { + return parquetReader.getPos(); + } + + @Override + public void close() throws IOException { + parquetReader.close(); + } + + @Override + public float getProgress() throws IOException { + return parquetReader.getProgress(); + } +} diff --git a/hoodie-utilities/src/test/java/HoodieSparkSQLExample.java b/hoodie-utilities/src/test/java/HoodieSparkSQLExample.java new file mode 100644 index 000000000..134fc76ea --- /dev/null +++ b/hoodie-utilities/src/test/java/HoodieSparkSQLExample.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + + +import org.apache.spark.sql.SparkSession; + +/** + * Examples to do Spark SQL on Hoodie dataset. + */ +public class HoodieSparkSQLExample { + + public static void main(String[] args) throws Exception { + + SparkSession spark = SparkSession.builder() + .appName("Hoodie SparkSQL") + .config("hive.metastore.uris","thrift://localhost:10000") + .config("spark.sql.hive.convertMetastoreParquet", false) + .enableHiveSupport() + .master("local[2]") + .getOrCreate(); + + spark.sql("describe hoodie_rt").show(); + spark.sql("select * from hoodie_rt").show(); + spark.sql("select end_lon as e1, driver, rider as r1, datestr, driver, datestr, rider, _hoodie_record_key from hoodie_rt").show(); + spark.sql("select fare, begin_lon, begin_lat, timestamp from hoodie_rt where fare > 2.0").show(); + spark.sql("select count(*) as cnt, _hoodie_file_name as file from hoodie_rt group by _hoodie_file_name").show(); + } +}