From 37838cea6094ddc66191df42e8b2c84f132d1623 Mon Sep 17 00:00:00 2001 From: Gary Li Date: Tue, 9 Jun 2020 06:10:16 -0700 Subject: [PATCH] [HUDI-822] decouple Hudi related logics from HoodieInputFormat (#1592) - Refactoring business logic out of InputFormat into Utils helpers. --- .../table/TestHoodieMergeOnReadTable.java | 12 +- .../commit/TestCopyOnWriteActionExecutor.java | 10 +- .../hudi/hadoop/HoodieParquetInputFormat.java | 236 ++---------- .../hudi/hadoop/HoodieROTablePathFilter.java | 3 +- .../apache/hudi/hadoop/InputPathHandler.java | 4 +- .../hadoop/config/HoodieRealtimeConfig.java | 42 +++ .../AbstractRealtimeRecordReader.java | 271 +------------- .../HoodieCombineRealtimeRecordReader.java | 3 +- .../HoodieParquetRealtimeInputFormat.java | 106 +----- .../RealtimeCompactedRecordReader.java | 35 +- .../RealtimeUnmergedRecordReader.java | 8 +- .../HoodieHiveUtils.java} | 6 +- .../hadoop/utils/HoodieInputFormatUtils.java | 342 ++++++++++++++++++ .../utils/HoodieRealtimeInputFormatUtils.java | 154 ++++++++ .../HoodieRealtimeRecordReaderUtils.java | 271 ++++++++++++++ .../hadoop/TestHoodieParquetInputFormat.java | 27 +- .../TestHoodieRealtimeRecordReader.java | 3 +- .../hadoop/testutils/InputFormatTestUtil.java | 10 +- 18 files changed, 919 insertions(+), 624 deletions(-) create mode 100644 hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/config/HoodieRealtimeConfig.java rename hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/{HoodieHiveUtil.java => utils/HoodieHiveUtils.java} (98%) create mode 100644 hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java create mode 100644 hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java create mode 100644 hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index 57c0d8d27..53de65ec3 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -46,7 +46,7 @@ import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.hadoop.HoodieHiveUtil; +import org.apache.hudi.hadoop.utils.HoodieHiveUtils; import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; import org.apache.hudi.index.HoodieIndex; @@ -1463,19 +1463,19 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { private void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull, boolean stopAtCompaction) { String modePropertyName = - String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); - jobConf.set(modePropertyName, HoodieHiveUtil.INCREMENTAL_SCAN_MODE); + String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.set(modePropertyName, HoodieHiveUtils.INCREMENTAL_SCAN_MODE); String startCommitTimestampName = - String.format(HoodieHiveUtil.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + String.format(HoodieHiveUtils.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); jobConf.set(startCommitTimestampName, startCommit); String maxCommitPulls = - String.format(HoodieHiveUtil.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); jobConf.setInt(maxCommitPulls, numberOfCommitsToPull); String stopAtCompactionPropName = - String.format(HoodieHiveUtil.HOODIE_STOP_AT_COMPACTION_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + String.format(HoodieHiveUtils.HOODIE_STOP_AT_COMPACTION_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); jobConf.setBoolean(stopAtCompactionPropName, stopAtCompaction); } diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java index 63a04ec48..c95a917d9 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java @@ -32,7 +32,7 @@ import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.hadoop.HoodieHiveUtil; +import org.apache.hudi.hadoop.utils.HoodieHiveUtils; import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.io.HoodieCreateHandle; import org.apache.hudi.table.HoodieCopyOnWriteTable; @@ -225,15 +225,15 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { private void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull) { String modePropertyName = - String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); - jobConf.set(modePropertyName, HoodieHiveUtil.INCREMENTAL_SCAN_MODE); + String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.set(modePropertyName, HoodieHiveUtils.INCREMENTAL_SCAN_MODE); String startCommitTimestampName = - String.format(HoodieHiveUtil.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + String.format(HoodieHiveUtils.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); jobConf.set(startCommitTimestampName, startCommit); String maxCommitPulls = - String.format(HoodieHiveUtil.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); jobConf.setInt(maxCommitPulls, numberOfCommitsToPull); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java index 5a10f3c99..4db928cb6 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java @@ -18,10 +18,17 @@ package org.apache.hudi.hadoop; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.hadoop.utils.HoodieHiveUtils; +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; + import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; import org.apache.hadoop.io.ArrayWritable; @@ -31,31 +38,14 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.Job; -import org.apache.hudi.common.model.HoodieBaseFile; -import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.model.HoodiePartitionMetadata; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.view.HoodieTableFileSystemView; -import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; -import org.apache.hudi.exception.HoodieIOException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; /** * HoodieInputFormat which understands the Hoodie File Structure and filters files based on the Hoodie Mode. If paths @@ -69,10 +59,14 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement protected Configuration conf; + protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) { + return HoodieInputFormatUtils.filterInstantsTimeline(timeline); + } + @Override public FileStatus[] listStatus(JobConf job) throws IOException { // Segregate inputPaths[] to incremental, snapshot and non hoodie paths - List incrementalTables = HoodieHiveUtil.getIncrementalTableNames(Job.getInstance(job)); + List incrementalTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(job)); InputPathHandler inputPathHandler = new InputPathHandler(conf, getInputPaths(job), incrementalTables); List returns = new ArrayList<>(); @@ -107,10 +101,10 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement setInputPaths(job, snapshotPaths.toArray(new Path[snapshotPaths.size()])); FileStatus[] fileStatuses = super.listStatus(job); Map> groupedFileStatus = - groupFileStatusForSnapshotPaths(fileStatuses, tableMetaClientMap.values()); + HoodieInputFormatUtils.groupFileStatusForSnapshotPaths(fileStatuses, tableMetaClientMap.values()); LOG.info("Found a total of " + groupedFileStatus.size() + " groups"); for (Map.Entry> entry : groupedFileStatus.entrySet()) { - List result = filterFileStatusForSnapshotMode(entry.getKey(), entry.getValue()); + List result = HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, entry.getKey(), entry.getValue()); if (result != null) { returns.addAll(result); } @@ -119,36 +113,6 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement return returns.toArray(new FileStatus[returns.size()]); } - /** - * Filter any specific instants that we do not want to process. - * example timeline: - * - * t0 -> create bucket1.parquet - * t1 -> create and append updates bucket1.log - * t2 -> request compaction - * t3 -> create bucket2.parquet - * - * if compaction at t2 takes a long time, incremental readers on RO tables can move to t3 and would skip updates in t1 - * - * To workaround this problem, we want to stop returning data belonging to commits > t2. - * After compaction is complete, incremental reader would see updates in t2, t3, so on. - */ - protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) { - HoodieDefaultTimeline commitsAndCompactionTimeline = timeline.getCommitsAndCompactionTimeline(); - Option pendingCompactionInstant = commitsAndCompactionTimeline.filterPendingCompactionTimeline().firstInstant(); - if (pendingCompactionInstant.isPresent()) { - HoodieDefaultTimeline instantsTimeline = commitsAndCompactionTimeline.findInstantsBefore(pendingCompactionInstant.get().getTimestamp()); - int numCommitsFilteredByCompaction = commitsAndCompactionTimeline.getCommitsTimeline().countInstants() - - instantsTimeline.getCommitsTimeline().countInstants(); - LOG.info("Earliest pending compaction instant is: " + pendingCompactionInstant.get().getTimestamp() - + " skipping " + numCommitsFilteredByCompaction + " commits"); - - return instantsTimeline; - } else { - return timeline; - } - } - /** * Achieves listStatus functionality for an incrementally queried table. Instead of listing all * partitions and then filtering based on the commits of interest, this logic first extracts the @@ -158,157 +122,22 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement JobConf job, HoodieTableMetaClient tableMetaClient, List inputPaths) throws IOException { String tableName = tableMetaClient.getTableConfig().getTableName(); Job jobContext = Job.getInstance(job); - HoodieDefaultTimeline baseTimeline; - if (HoodieHiveUtil.stopAtCompaction(jobContext, tableName)) { - baseTimeline = filterInstantsTimeline(tableMetaClient.getActiveTimeline()); - } else { - baseTimeline = tableMetaClient.getActiveTimeline(); - } - - HoodieTimeline timeline = baseTimeline.getCommitsTimeline().filterCompletedInstants(); - String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(jobContext, tableName); - // Total number of commits to return in this batch. Set this to -1 to get all the commits. - Integer maxCommits = HoodieHiveUtil.readMaxCommits(jobContext, tableName); - LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs); - List commitsToCheck = timeline.findInstantsAfter(lastIncrementalTs, maxCommits) - .getInstants().collect(Collectors.toList()); - // Extract partitions touched by the commitsToCheck - Set partitionsToList = new HashSet<>(); - for (HoodieInstant commit : commitsToCheck) { - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(), - HoodieCommitMetadata.class); - partitionsToList.addAll(commitMetadata.getPartitionToWriteStats().keySet()); - } - if (partitionsToList.isEmpty()) { + Option timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient); + if (!timeline.isPresent()) { return null; } - String incrementalInputPaths = partitionsToList.stream() - .map(s -> StringUtils.isNullOrEmpty(s) ? tableMetaClient.getBasePath() : tableMetaClient.getBasePath() + Path.SEPARATOR + s) - .filter(s -> { - /* - * Ensure to return only results from the original input path that has incremental changes - * This check is needed for the following corner case - When the caller invokes - * HoodieInputFormat.listStatus multiple times (with small batches of Hive partitions each - * time. Ex. Hive fetch task calls listStatus for every partition once) we do not want to - * accidentally return all incremental changes for the entire table in every listStatus() - * call. This will create redundant splits. Instead we only want to return the incremental - * changes (if so any) in that batch of input paths. - * - * NOTE on Hive queries that are executed using Fetch task: - * Since Fetch tasks invoke InputFormat.listStatus() per partition, Hoodie metadata can be - * listed in every such listStatus() call. In order to avoid this, it might be useful to - * disable fetch tasks using the hive session property for incremental queries: - * `set hive.fetch.task.conversion=none;` - * This would ensure Map Reduce execution is chosen for a Hive query, which combines - * partitions (comma separated) and calls InputFormat.listStatus() only once with all - * those partitions. - */ - for (Path path : inputPaths) { - if (path.toString().contains(s)) { - return true; - } - } - return false; - }) - .collect(Collectors.joining(",")); - if (StringUtils.isNullOrEmpty(incrementalInputPaths)) { + Option> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, timeline.get()); + if (!commitsToCheck.isPresent()) { return null; } + Option incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timeline.get(), inputPaths); // Mutate the JobConf to set the input paths to only partitions touched by incremental pull. - setInputPaths(job, incrementalInputPaths); + if (!incrementalInputPaths.isPresent()) { + return null; + } + setInputPaths(job, incrementalInputPaths.get()); FileStatus[] fileStatuses = super.listStatus(job); - BaseFileOnlyView roView = new HoodieTableFileSystemView(tableMetaClient, timeline, fileStatuses); - List commitsList = commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()); - List filteredFiles = roView.getLatestBaseFilesInRange(commitsList).collect(Collectors.toList()); - List returns = new ArrayList<>(); - for (HoodieBaseFile filteredFile : filteredFiles) { - LOG.debug("Processing incremental hoodie file - " + filteredFile.getPath()); - filteredFile = checkFileStatus(filteredFile); - returns.add(filteredFile.getFileStatus()); - } - LOG.info("Total paths to process after hoodie incremental filter " + filteredFiles.size()); - return returns; - } - - /** - * Takes in a list of filesStatus and a list of table metadatas. Groups the files status list - * based on given table metadata. - * @param fileStatuses - * @param metaClientList - * @return - * @throws IOException - */ - private Map> groupFileStatusForSnapshotPaths( - FileStatus[] fileStatuses, Collection metaClientList) { - // This assumes the paths for different tables are grouped together - Map> grouped = new HashMap<>(); - HoodieTableMetaClient metadata = null; - for (FileStatus status : fileStatuses) { - Path inputPath = status.getPath(); - if (!inputPath.getName().endsWith(".parquet")) { - //FIXME(vc): skip non parquet files for now. This wont be needed once log file name start - // with "." - continue; - } - if ((metadata == null) || (!inputPath.toString().contains(metadata.getBasePath()))) { - for (HoodieTableMetaClient metaClient : metaClientList) { - if (inputPath.toString().contains(metaClient.getBasePath())) { - metadata = metaClient; - if (!grouped.containsKey(metadata)) { - grouped.put(metadata, new ArrayList<>()); - } - break; - } - } - } - grouped.get(metadata).add(status); - } - return grouped; - } - - /** - * Filters data files for a snapshot queried table. - */ - private List filterFileStatusForSnapshotMode( - HoodieTableMetaClient metadata, List fileStatuses) { - FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]); - if (LOG.isDebugEnabled()) { - LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metadata); - } - // Get all commits, delta commits, compactions, as all of them produce a base parquet file today - HoodieTimeline timeline = metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - BaseFileOnlyView roView = new HoodieTableFileSystemView(metadata, timeline, statuses); - // filter files on the latest commit found - List filteredFiles = roView.getLatestBaseFiles().collect(Collectors.toList()); - LOG.info("Total paths to process after hoodie filter " + filteredFiles.size()); - List returns = new ArrayList<>(); - for (HoodieBaseFile filteredFile : filteredFiles) { - if (LOG.isDebugEnabled()) { - LOG.debug("Processing latest hoodie file - " + filteredFile.getPath()); - } - filteredFile = checkFileStatus(filteredFile); - returns.add(filteredFile.getFileStatus()); - } - return returns; - } - - /** - * Checks the file status for a race condition which can set the file size to 0. 1. HiveInputFormat does - * super.listStatus() and gets back a FileStatus[] 2. Then it creates the HoodieTableMetaClient for the paths listed. - * 3. Generation of splits looks at FileStatus size to create splits, which skips this file - */ - private HoodieBaseFile checkFileStatus(HoodieBaseFile dataFile) { - Path dataPath = dataFile.getFileStatus().getPath(); - try { - if (dataFile.getFileSize() == 0) { - FileSystem fs = dataPath.getFileSystem(conf); - LOG.info("Refreshing file status " + dataFile.getPath()); - return new HoodieBaseFile(fs.getFileStatus(dataPath)); - } - return dataFile; - } catch (IOException e) { - throw new HoodieIOException("Could not get FileStatus on path " + dataPath); - } + return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get()); } public void setConf(Configuration conf) { @@ -338,19 +167,4 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement return super.getRecordReader(split, job, reporter); } - /** - * Read the table metadata from a data path. This assumes certain hierarchy of files which should be changed once a - * better way is figured out to pass in the hoodie meta directory - */ - protected static HoodieTableMetaClient getTableMetaClient(FileSystem fs, Path dataPath) throws IOException { - int levels = HoodieHiveUtil.DEFAULT_LEVELS_TO_BASEPATH; - if (HoodiePartitionMetadata.hasPartitionMetadata(fs, dataPath)) { - HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, dataPath); - metadata.readFromFS(); - levels = metadata.getPartitionDepth(); - } - Path baseDir = HoodieHiveUtil.getNthParent(dataPath, levels); - LOG.info("Reading hoodie metadata from path " + baseDir.toString()); - return new HoodieTableMetaClient(fs.getConf(), baseDir.toString()); - } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java index 405887599..d27d6ad8a 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.TableNotFoundException; +import org.apache.hudi.hadoop.utils.HoodieHiveUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -140,7 +141,7 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable { if (HoodiePartitionMetadata.hasPartitionMetadata(fs, folder)) { HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, folder); metadata.readFromFS(); - baseDir = HoodieHiveUtil.getNthParent(folder, metadata.getPartitionDepth()); + baseDir = HoodieHiveUtils.getNthParent(folder, metadata.getPartitionDepth()); } else { baseDir = safeGetParentsParent(folder); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java index e4a57aabe..1ad381290 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java @@ -33,7 +33,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.apache.hudi.hadoop.HoodieParquetInputFormat.getTableMetaClient; +import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getTableMetaClientForBasePath; /** * InputPathHandler takes in a set of input paths and incremental tables list. Then, classifies the @@ -97,7 +97,7 @@ public class InputPathHandler { // This path is for a table that we dont know about yet. HoodieTableMetaClient metaClient; try { - metaClient = getTableMetaClient(inputPath.getFileSystem(conf), inputPath); + metaClient = getTableMetaClientForBasePath(inputPath.getFileSystem(conf), inputPath); String tableName = metaClient.getTableConfig().getTableName(); tableMetaClientMap.put(tableName, metaClient); tagAsIncrementalOrSnapshot(inputPath, tableName, metaClient, incrementalTables); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/config/HoodieRealtimeConfig.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/config/HoodieRealtimeConfig.java new file mode 100644 index 000000000..95945f38b --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/config/HoodieRealtimeConfig.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop.config; + +/** + * Class to hold props related to Hoodie RealtimeInputFormat and RealtimeRecordReader. + */ +public final class HoodieRealtimeConfig { + + // Fraction of mapper/reducer task memory used for compaction of log files + public static final String COMPACTION_MEMORY_FRACTION_PROP = "compaction.memory.fraction"; + public static final String DEFAULT_COMPACTION_MEMORY_FRACTION = "0.75"; + // used to choose a trade off between IO vs Memory when performing compaction process + // Depending on outputfile size and memory provided, choose true to avoid OOM for large file + // size + small memory + public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = "compaction.lazy.block.read.enabled"; + public static final String DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED = "true"; + + // Property to set the max memory for dfs inputstream buffer size + public static final String MAX_DFS_STREAM_BUFFER_SIZE_PROP = "hoodie.memory.dfs.buffer.max.size"; + // Setting this to lower value of 1 MB since no control over how many RecordReaders will be started in a mapper + public static final int DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE = 1024 * 1024; // 1 MB + // Property to set file path prefix for spillable file + public static final String SPILLABLE_MAP_BASE_PATH_PROP = "hoodie.memory.spillable.map.path"; + // Default file path prefix for spillable file + public static final String DEFAULT_SPILLABLE_MAP_BASE_PATH = "/tmp/"; +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java index 5f675bbe7..b7da7499a 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -18,74 +18,34 @@ package org.apache.hudi.hadoop.realtime; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.LogReaderUtils; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; -import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; -import org.apache.avro.generic.GenericArray; -import org.apache.avro.generic.GenericFixed; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; -import org.apache.hadoop.hive.serde2.io.DoubleWritable; -import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroSchemaConverter; -import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.schema.MessageType; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.TreeMap; import java.util.stream.Collectors; /** * Record Reader implementation to merge fresh avro data with base parquet data, to support real time queries. */ public abstract class AbstractRealtimeRecordReader { - - // Fraction of mapper/reducer task memory used for compaction of log files - public static final String COMPACTION_MEMORY_FRACTION_PROP = "compaction.memory.fraction"; - public static final String DEFAULT_COMPACTION_MEMORY_FRACTION = "0.75"; - // used to choose a trade off between IO vs Memory when performing compaction process - // Depending on outputfile size and memory provided, choose true to avoid OOM for large file - // size + small memory - public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = "compaction.lazy.block.read.enabled"; - public static final String DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED = "true"; - - // Property to set the max memory for dfs inputstream buffer size - public static final String MAX_DFS_STREAM_BUFFER_SIZE_PROP = "hoodie.memory.dfs.buffer.max.size"; - // Setting this to lower value of 1 MB since no control over how many RecordReaders will be started in a mapper - public static final int DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE = 1024 * 1024; // 1 MB - // Property to set file path prefix for spillable file - public static final String SPILLABLE_MAP_BASE_PATH_PROP = "hoodie.memory.spillable.map.path"; - // Default file path prefix for spillable file - public static final String DEFAULT_SPILLABLE_MAP_BASE_PATH = "/tmp/"; - private static final Logger LOG = LogManager.getLogger(AbstractRealtimeRecordReader.class); protected final HoodieRealtimeFileSplit split; @@ -106,7 +66,7 @@ public abstract class AbstractRealtimeRecordReader { try { this.usesCustomPayload = usesCustomPayload(); LOG.info("usesCustomPayload ==> " + this.usesCustomPayload); - baseFileSchema = readSchema(jobConf, split.getPath()); + baseFileSchema = HoodieRealtimeRecordReaderUtils.readSchema(jobConf, split.getPath()); init(); } catch (IOException e) { throw new HoodieIOException("Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e); @@ -119,220 +79,6 @@ public abstract class AbstractRealtimeRecordReader { || metaClient.getTableConfig().getPayloadClass().contains("org.apache.hudi.OverwriteWithLatestAvroPayload")); } - /** - * Reads the schema from the parquet file. This is different from ParquetUtils as it uses the twitter parquet to - * support hive 1.1.0 - */ - private static MessageType readSchema(Configuration conf, Path parquetFilePath) { - try { - return ParquetFileReader.readFooter(conf, parquetFilePath).getFileMetaData().getSchema(); - } catch (IOException e) { - throw new HoodieIOException("Failed to read footer for parquet " + parquetFilePath, e); - } - } - - /** - * Prints a JSON representation of the ArrayWritable for easier debuggability. - */ - protected static String arrayWritableToString(ArrayWritable writable) { - if (writable == null) { - return "null"; - } - StringBuilder builder = new StringBuilder(); - Writable[] values = writable.get(); - builder.append("\"values_" + Math.random() + "_" + values.length + "\": {"); - int i = 0; - for (Writable w : values) { - if (w instanceof ArrayWritable) { - builder.append(arrayWritableToString((ArrayWritable) w)).append(","); - } else { - builder.append("\"value" + i + "\":\"" + w + "\"").append(","); - if (w == null) { - builder.append("\"type" + i + "\":\"unknown\"").append(","); - } else { - builder.append("\"type" + i + "\":\"" + w.getClass().getSimpleName() + "\"").append(","); - } - } - i++; - } - builder.deleteCharAt(builder.length() - 1); - builder.append("}"); - return builder.toString(); - } - - /** - * Given a comma separated list of field names and positions at which they appear on Hive, return - * an ordered list of field names, that can be passed onto storage. - */ - private static List orderFields(String fieldNameCsv, String fieldOrderCsv, List partitioningFields) { - // Need to convert the following to Set first since Hive does not handle duplicate field names correctly but - // handles duplicate fields orders correctly. - // Fields Orders -> {@link https://github - // .com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java - // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L188} - // Field Names -> {@link https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java - // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L229} - String[] fieldOrdersWithDups = fieldOrderCsv.split(","); - Set fieldOrdersSet = new LinkedHashSet<>(Arrays.asList(fieldOrdersWithDups)); - String[] fieldOrders = fieldOrdersSet.toArray(new String[0]); - List fieldNames = Arrays.stream(fieldNameCsv.split(",")) - .filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList()); - Set fieldNamesSet = new LinkedHashSet<>(fieldNames); - // Hive does not provide ids for partitioning fields, so check for lengths excluding that. - if (fieldNamesSet.size() != fieldOrders.length) { - throw new HoodieException(String - .format("Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d", - fieldNames.size(), fieldOrders.length)); - } - TreeMap orderedFieldMap = new TreeMap<>(); - String[] fieldNamesArray = fieldNamesSet.toArray(new String[0]); - for (int ox = 0; ox < fieldOrders.length; ox++) { - orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNamesArray[ox]); - } - return new ArrayList<>(orderedFieldMap.values()); - } - - /** - * Generate a reader schema off the provided writeSchema, to just project out the provided columns. - */ - public static Schema generateProjectionSchema(Schema writeSchema, Map schemaFieldsMap, - List fieldNames) { - /** - * Avro & Presto field names seems to be case sensitive (support fields differing only in case) whereas - * Hive/Impala/SparkSQL(default) are case-insensitive. Spark allows this to be configurable using - * spark.sql.caseSensitive=true - * - * For a RT table setup with no delta-files (for a latest file-slice) -> we translate parquet schema to Avro Here - * the field-name case is dependent on parquet schema. Hive (1.x/2.x/CDH) translate column projections to - * lower-cases - * - */ - List projectedFields = new ArrayList<>(); - for (String fn : fieldNames) { - Schema.Field field = schemaFieldsMap.get(fn.toLowerCase()); - if (field == null) { - throw new HoodieException("Field " + fn + " not found in log schema. Query cannot proceed! " - + "Derived Schema Fields: " + new ArrayList<>(schemaFieldsMap.keySet())); - } else { - projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal())); - } - } - - Schema projectedSchema = Schema.createRecord(writeSchema.getName(), writeSchema.getDoc(), - writeSchema.getNamespace(), writeSchema.isError()); - projectedSchema.setFields(projectedFields); - return projectedSchema; - } - - public static Map getNameToFieldMap(Schema schema) { - return schema.getFields().stream().map(r -> Pair.of(r.name().toLowerCase(), r)) - .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); - } - - /** - * Convert the projected read from delta record into an array writable. - */ - public static Writable avroToArrayWritable(Object value, Schema schema) { - - if (value == null) { - return null; - } - - switch (schema.getType()) { - case STRING: - return new Text(value.toString()); - case BYTES: - return new BytesWritable((byte[]) value); - case INT: - return new IntWritable((Integer) value); - case LONG: - return new LongWritable((Long) value); - case FLOAT: - return new FloatWritable((Float) value); - case DOUBLE: - return new DoubleWritable((Double) value); - case BOOLEAN: - return new BooleanWritable((Boolean) value); - case NULL: - return null; - case RECORD: - GenericRecord record = (GenericRecord) value; - Writable[] recordValues = new Writable[schema.getFields().size()]; - int recordValueIndex = 0; - for (Schema.Field field : schema.getFields()) { - recordValues[recordValueIndex++] = avroToArrayWritable(record.get(field.name()), field.schema()); - } - return new ArrayWritable(Writable.class, recordValues); - case ENUM: - return new Text(value.toString()); - case ARRAY: - GenericArray arrayValue = (GenericArray) value; - Writable[] arrayValues = new Writable[arrayValue.size()]; - int arrayValueIndex = 0; - for (Object obj : arrayValue) { - arrayValues[arrayValueIndex++] = avroToArrayWritable(obj, schema.getElementType()); - } - // Hive 1.x will fail here, it requires values2 to be wrapped into another ArrayWritable - return new ArrayWritable(Writable.class, arrayValues); - case MAP: - Map mapValue = (Map) value; - Writable[] mapValues = new Writable[mapValue.size()]; - int mapValueIndex = 0; - for (Object entry : mapValue.entrySet()) { - Map.Entry mapEntry = (Map.Entry) entry; - Writable[] nestedMapValues = new Writable[2]; - nestedMapValues[0] = new Text(mapEntry.getKey().toString()); - nestedMapValues[1] = avroToArrayWritable(mapEntry.getValue(), schema.getValueType()); - mapValues[mapValueIndex++] = new ArrayWritable(Writable.class, nestedMapValues); - } - // Hive 1.x will fail here, it requires values3 to be wrapped into another ArrayWritable - return new ArrayWritable(Writable.class, mapValues); - case UNION: - List types = schema.getTypes(); - if (types.size() != 2) { - throw new IllegalArgumentException("Only support union with 2 fields"); - } - Schema s1 = types.get(0); - Schema s2 = types.get(1); - if (s1.getType() == Schema.Type.NULL) { - return avroToArrayWritable(value, s2); - } else if (s2.getType() == Schema.Type.NULL) { - return avroToArrayWritable(value, s1); - } else { - throw new IllegalArgumentException("Only support union with null"); - } - case FIXED: - if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("decimal")) { - LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) LogicalTypes.fromSchema(schema); - HiveDecimalWritable writable = new HiveDecimalWritable(((GenericFixed) value).bytes(), - decimal.getScale()); - return HiveDecimalWritable.enforcePrecisionScale(writable, - decimal.getPrecision(), - decimal.getScale()); - } - return new BytesWritable(((GenericFixed) value).bytes()); - default: - return null; - } - } - - /** - * Hive implementation of ParquetRecordReader results in partition columns not present in the original parquet file to - * also be part of the projected schema. Hive expects the record reader implementation to return the row in its - * entirety (with un-projected column having null values). As we use writerSchema for this, make sure writer schema - * also includes partition columns - * - * @param schema Schema to be changed - */ - private static Schema addPartitionFields(Schema schema, List partitioningFields) { - final Set firstLevelFieldNames = - schema.getFields().stream().map(Field::name).map(String::toLowerCase).collect(Collectors.toSet()); - List fieldsToAdd = partitioningFields.stream().map(String::toLowerCase) - .filter(x -> !firstLevelFieldNames.contains(x)).collect(Collectors.toList()); - - return HoodieAvroUtils.appendNullSchemaFields(schema, fieldsToAdd); - } - /** * Goes through the log files in reverse order and finds the schema from the last available data block. If not, falls * back to the schema from the latest parquet file. Finally, sets the partition column and projection fields into the @@ -353,16 +99,16 @@ public abstract class AbstractRealtimeRecordReader { List partitioningFields = partitionFields.length() > 0 ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList()) : new ArrayList<>(); - writerSchema = addPartitionFields(writerSchema, partitioningFields); - List projectionFields = orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), + writerSchema = HoodieRealtimeRecordReaderUtils.addPartitionFields(writerSchema, partitioningFields); + List projectionFields = HoodieRealtimeRecordReaderUtils.orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR), partitioningFields); - Map schemaFieldsMap = getNameToFieldMap(writerSchema); + Map schemaFieldsMap = HoodieRealtimeRecordReaderUtils.getNameToFieldMap(writerSchema); hiveSchema = constructHiveOrderedSchema(writerSchema, schemaFieldsMap); // TODO(vc): In the future, the reader schema should be updated based on log files & be able // to null out fields not present before - readerSchema = generateProjectionSchema(writerSchema, schemaFieldsMap, projectionFields); + readerSchema = HoodieRealtimeRecordReaderUtils.generateProjectionSchema(writerSchema, schemaFieldsMap, projectionFields); LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s", split.getDeltaLogPaths(), split.getPath(), projectionFields)); } @@ -409,7 +155,8 @@ public abstract class AbstractRealtimeRecordReader { public long getMaxCompactionMemoryInBytes() { // jobConf.getMemoryForMapTask() returns in MB return (long) Math - .ceil(Double.parseDouble(jobConf.get(COMPACTION_MEMORY_FRACTION_PROP, DEFAULT_COMPACTION_MEMORY_FRACTION)) + .ceil(Double.parseDouble(jobConf.get(HoodieRealtimeConfig.COMPACTION_MEMORY_FRACTION_PROP, + HoodieRealtimeConfig.DEFAULT_COMPACTION_MEMORY_FRACTION)) * jobConf.getMemoryForMapTask() * 1024 * 1024L); } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java index bdf11ed75..7a0bd37bb 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java @@ -20,6 +20,7 @@ package org.apache.hudi.hadoop.realtime; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.hadoop.hive.HoodieCombineRealtimeFileSplit; +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; @@ -65,7 +66,7 @@ public class HoodieCombineRealtimeRecordReader implements RecordReader 0) { this.currentRecordReader.close(); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java index ae3fb5ccb..11247910e 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java @@ -18,27 +18,17 @@ package org.apache.hudi.hadoop.realtime; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.FileSlice; -import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.view.HoodieTableFileSystemView; -import org.apache.hudi.common.util.CollectionUtils; -import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat; +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; +import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; @@ -52,13 +42,7 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Function; -import java.util.stream.Collectors; import java.util.stream.Stream; /** @@ -70,11 +54,6 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i private static final Logger LOG = LogManager.getLogger(HoodieParquetRealtimeInputFormat.class); - // These positions have to be deterministic across all tables - public static final int HOODIE_COMMIT_TIME_COL_POS = 0; - public static final int HOODIE_RECORD_KEY_COL_POS = 2; - public static final int HOODIE_PARTITION_PATH_COL_POS = 3; - public static final String HOODIE_READ_COLUMNS_PROP = "hoodie.read.columns.set"; // To make Hive on Spark queries work with RT tables. Our theory is that due to // {@link org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher} // not handling empty list correctly, the ParquetRecordReaderWrapper ends up adding the same column ids multiple @@ -85,74 +64,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i Stream fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is); - // obtain all unique parent folders for splits - Map> partitionsToParquetSplits = - fileSplits.collect(Collectors.groupingBy(split -> split.getPath().getParent())); - // TODO(vc): Should we handle also non-hoodie splits here? - Map metaClientMap = new HashMap<>(); - Map partitionsToMetaClient = - partitionsToParquetSplits.keySet().stream().collect(Collectors.toMap(Function.identity(), p -> { - // find if we have a metaclient already for this partition. - Option matchingBasePath = Option.fromJavaOptional( - metaClientMap.keySet().stream().filter(basePath -> p.toString().startsWith(basePath)).findFirst()); - if (matchingBasePath.isPresent()) { - return metaClientMap.get(matchingBasePath.get()); - } - - try { - HoodieTableMetaClient metaClient = getTableMetaClient(p.getFileSystem(conf), p); - metaClientMap.put(metaClient.getBasePath(), metaClient); - return metaClient; - } catch (IOException e) { - throw new HoodieIOException("Error creating hoodie meta client against : " + p, e); - } - })); - - // for all unique split parents, obtain all delta files based on delta commit timeline, - // grouped on file id - List rtSplits = new ArrayList<>(); - partitionsToParquetSplits.keySet().forEach(partitionPath -> { - // for each partition path obtain the data & log file groupings, then map back to inputsplits - HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath); - HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline()); - String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath); - - try { - // Both commit and delta-commits are included - pick the latest completed one - Option latestCompletedInstant = - metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant(); - - Stream latestFileSlices = latestCompletedInstant - .map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp())) - .orElse(Stream.empty()); - - // subgroup splits again by file id & match with log files. - Map> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream() - .collect(Collectors.groupingBy(split -> FSUtils.getFileId(split.getPath().getName()))); - latestFileSlices.forEach(fileSlice -> { - List dataFileSplits = groupedInputSplits.get(fileSlice.getFileId()); - dataFileSplits.forEach(split -> { - try { - List logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) - .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()); - // Get the maxCommit from the last delta or compaction or commit - when - // bootstrapped from COW table - String maxCommitTime = metaClient - .getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, - HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) - .filterCompletedInstants().lastInstant().get().getTimestamp(); - rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFilePaths, maxCommitTime)); - } catch (IOException e) { - throw new HoodieIOException("Error creating hoodie real time split ", e); - } - }); - }); - } catch (Exception e) { - throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e); - } - }); - LOG.info("Returning a total splits of " + rtSplits.size()); - return rtSplits.toArray(new InputSplit[0]); + return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits); } @Override @@ -199,9 +111,9 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i private static void addRequiredProjectionFields(Configuration configuration) { // Need this to do merge records in HoodieRealtimeRecordReader - addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, HOODIE_RECORD_KEY_COL_POS); - addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HOODIE_COMMIT_TIME_COL_POS); - addProjectionField(configuration, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HOODIE_PARTITION_PATH_COL_POS); + addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS); + addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS); + addProjectionField(configuration, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_PARTITION_PATH_COL_POS); } /** @@ -228,12 +140,12 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i // risk of experiencing race conditions. Hence, we synchronize on the JobConf object here. There is negligible // latency incurred here due to the synchronization since get record reader is called once per spilt before the // actual heavy lifting of reading the parquet files happen. - if (jobConf.get(HOODIE_READ_COLUMNS_PROP) == null) { + if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null) { synchronized (jobConf) { LOG.info( "Before adding Hoodie columns, Projections :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) + ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); - if (jobConf.get(HOODIE_READ_COLUMNS_PROP) == null) { + if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null) { // Hive (across all versions) fails for queries like select count(`_hoodie_commit_time`) from table; // In this case, the projection fields gets removed. Looking at HiveInputFormat implementation, in some cases // hoodie additional projection columns are reset after calling setConf and only natural projections @@ -245,7 +157,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i addRequiredProjectionFields(jobConf); this.conf = jobConf; - this.conf.set(HOODIE_READ_COLUMNS_PROP, "true"); + this.conf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true"); } } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index 73e0b507d..02bb5eb63 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -24,6 +24,9 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.util.Option; +import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.io.ArrayWritable; @@ -60,13 +63,17 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader // NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit // but can return records for completed commits > the commit we are trying to read (if using // readCommit() API) - return new HoodieMergedLogRecordScanner(FSUtils.getFs(split.getPath().toString(), jobConf), split.getBasePath(), - split.getDeltaLogPaths(), usesCustomPayload ? getWriterSchema() : getReaderSchema(), split.getMaxCommitTime(), + return new HoodieMergedLogRecordScanner( + FSUtils.getFs(split.getPath().toString(), jobConf), + split.getBasePath(), + split.getDeltaLogPaths(), + usesCustomPayload ? getWriterSchema() : getReaderSchema(), + split.getMaxCommitTime(), getMaxCompactionMemoryInBytes(), - Boolean - .parseBoolean(jobConf.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)), - false, jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE_PROP, DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), - jobConf.get(SPILLABLE_MAP_BASE_PATH_PROP, DEFAULT_SPILLABLE_MAP_BASE_PATH)); + Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)), + false, + jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), + jobConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)); } @Override @@ -81,7 +88,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader // TODO(VC): Right now, we assume all records in log, have a matching base record. (which // would be true until we have a way to index logs too) // return from delta records map if we have some match. - String key = arrayWritable.get()[HoodieParquetRealtimeInputFormat.HOODIE_RECORD_KEY_COL_POS].toString(); + String key = arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString(); if (deltaRecordMap.containsKey(key)) { // TODO(NA): Invoke preCombine here by converting arrayWritable to Avro. This is required since the // deltaRecord may not be a full record and needs values of columns from the parquet @@ -105,11 +112,11 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader // we assume, a later safe record in the log, is newer than what we have in the map & // replace it. Since we want to return an arrayWritable which is the same length as the elements in the latest // schema, we use writerSchema to create the arrayWritable from the latest generic record - ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(recordToReturn, getHiveSchema()); + ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(recordToReturn, getHiveSchema()); Writable[] replaceValue = aWritable.get(); if (LOG.isDebugEnabled()) { - LOG.debug(String.format("key %s, base values: %s, log values: %s", key, arrayWritableToString(arrayWritable), - arrayWritableToString(aWritable))); + LOG.debug(String.format("key %s, base values: %s, log values: %s", key, HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable), + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable))); } Writable[] originalValue = arrayWritable.get(); try { @@ -117,10 +124,10 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader arrayWritable.set(originalValue); } catch (RuntimeException re) { LOG.error("Got exception when doing array copy", re); - LOG.error("Base record :" + arrayWritableToString(arrayWritable)); - LOG.error("Log record :" + arrayWritableToString(aWritable)); - String errMsg = "Base-record :" + arrayWritableToString(arrayWritable) - + " ,Log-record :" + arrayWritableToString(aWritable) + " ,Error :" + re.getMessage(); + LOG.error("Base record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable)); + LOG.error("Log record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable)); + String errMsg = "Base-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable) + + " ,Log-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable) + " ,Error :" + re.getMessage(); throw new RuntimeException(errMsg, re); } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java index 4c773d458..8bc1cfb41 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java @@ -37,6 +37,8 @@ import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; import org.apache.hudi.hadoop.RecordReaderValueIterator; import org.apache.hudi.hadoop.SafeParquetRecordReaderWrapper; +import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader implements RecordReader { @@ -76,11 +78,11 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader this.iterator = this.executor.getQueue().iterator(); this.logRecordScanner = new HoodieUnMergedLogRecordScanner(FSUtils.getFs(split.getPath().toString(), jobConf), split.getBasePath(), split.getDeltaLogPaths(), getReaderSchema(), split.getMaxCommitTime(), - Boolean.parseBoolean(jobConf.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)), - false, jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE_PROP, DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), record -> { + Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)), + false, jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), record -> { // convert Hoodie log record to Hadoop AvroWritable and buffer GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema()).get(); - ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(rec, getHiveSchema()); + ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema()); this.executor.getQueue().insertRecord(aWritable); }); // Start reading and buffering diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java similarity index 98% rename from hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java rename to hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java index 0537cfa09..02fb9d06d 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.hadoop; +package org.apache.hudi.hadoop.utils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.JobContext; @@ -31,9 +31,9 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; -public class HoodieHiveUtil { +public class HoodieHiveUtils { - public static final Logger LOG = LogManager.getLogger(HoodieHiveUtil.class); + public static final Logger LOG = LogManager.getLogger(HoodieHiveUtils.class); public static final String HOODIE_CONSUME_MODE_PATTERN = "hoodie.%s.consume.mode"; public static final String HOODIE_START_COMMIT_PATTERN = "hoodie.%s.consume.start.timestamp"; diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java new file mode 100644 index 000000000..d10b66434 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop.utils; + +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodiePartitionMetadata; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.table.view.TableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.exception.HoodieIOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class HoodieInputFormatUtils { + + // These positions have to be deterministic across all tables + public static final int HOODIE_COMMIT_TIME_COL_POS = 0; + public static final int HOODIE_RECORD_KEY_COL_POS = 2; + public static final int HOODIE_PARTITION_PATH_COL_POS = 3; + public static final String HOODIE_READ_COLUMNS_PROP = "hoodie.read.columns.set"; + + private static final Logger LOG = LogManager.getLogger(HoodieInputFormatUtils.class); + + /** + * Filter any specific instants that we do not want to process. + * example timeline: + * + * t0 -> create bucket1.parquet + * t1 -> create and append updates bucket1.log + * t2 -> request compaction + * t3 -> create bucket2.parquet + * + * if compaction at t2 takes a long time, incremental readers on RO tables can move to t3 and would skip updates in t1 + * + * To workaround this problem, we want to stop returning data belonging to commits > t2. + * After compaction is complete, incremental reader would see updates in t2, t3, so on. + * @param timeline + * @return + */ + public static HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) { + HoodieDefaultTimeline commitsAndCompactionTimeline = timeline.getCommitsAndCompactionTimeline(); + Option pendingCompactionInstant = commitsAndCompactionTimeline + .filterPendingCompactionTimeline().firstInstant(); + if (pendingCompactionInstant.isPresent()) { + HoodieDefaultTimeline instantsTimeline = commitsAndCompactionTimeline + .findInstantsBefore(pendingCompactionInstant.get().getTimestamp()); + int numCommitsFilteredByCompaction = commitsAndCompactionTimeline.getCommitsTimeline().countInstants() + - instantsTimeline.getCommitsTimeline().countInstants(); + LOG.info("Earliest pending compaction instant is: " + pendingCompactionInstant.get().getTimestamp() + + " skipping " + numCommitsFilteredByCompaction + " commits"); + + return instantsTimeline; + } else { + return timeline; + } + } + + /** + * Extract partitions touched by the commitsToCheck. + * @param commitsToCheck + * @param tableMetaClient + * @param timeline + * @param inputPaths + * @return + * @throws IOException + */ + public static Option getAffectedPartitions(List commitsToCheck, + HoodieTableMetaClient tableMetaClient, + HoodieTimeline timeline, + List inputPaths) throws IOException { + Set partitionsToList = new HashSet<>(); + for (HoodieInstant commit : commitsToCheck) { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(), + HoodieCommitMetadata.class); + partitionsToList.addAll(commitMetadata.getPartitionToWriteStats().keySet()); + } + if (partitionsToList.isEmpty()) { + return Option.empty(); + } + String incrementalInputPaths = partitionsToList.stream() + .map(s -> StringUtils.isNullOrEmpty(s) ? tableMetaClient.getBasePath() : tableMetaClient.getBasePath() + Path.SEPARATOR + s) + .filter(s -> { + /* + * Ensure to return only results from the original input path that has incremental changes + * This check is needed for the following corner case - When the caller invokes + * HoodieInputFormat.listStatus multiple times (with small batches of Hive partitions each + * time. Ex. Hive fetch task calls listStatus for every partition once) we do not want to + * accidentally return all incremental changes for the entire table in every listStatus() + * call. This will create redundant splits. Instead we only want to return the incremental + * changes (if so any) in that batch of input paths. + * + * NOTE on Hive queries that are executed using Fetch task: + * Since Fetch tasks invoke InputFormat.listStatus() per partition, Hoodie metadata can be + * listed in every such listStatus() call. In order to avoid this, it might be useful to + * disable fetch tasks using the hive session property for incremental queries: + * `set hive.fetch.task.conversion=none;` + * This would ensure Map Reduce execution is chosen for a Hive query, which combines + * partitions (comma separated) and calls InputFormat.listStatus() only once with all + * those partitions. + */ + for (Path path : inputPaths) { + if (path.toString().contains(s)) { + return true; + } + } + return false; + }) + .collect(Collectors.joining(",")); + return Option.of(incrementalInputPaths); + } + + /** + * Extract HoodieTimeline based on HoodieTableMetaClient. + * @param job + * @param tableMetaClient + * @return + */ + public static Option getFilteredCommitsTimeline(Job job, HoodieTableMetaClient tableMetaClient) { + String tableName = tableMetaClient.getTableConfig().getTableName(); + HoodieDefaultTimeline baseTimeline; + if (HoodieHiveUtils.stopAtCompaction(job, tableName)) { + baseTimeline = filterInstantsTimeline(tableMetaClient.getActiveTimeline()); + } else { + baseTimeline = tableMetaClient.getActiveTimeline(); + } + return Option.of(baseTimeline.getCommitsTimeline().filterCompletedInstants()); + } + + /** + * Get commits for incremental query from Hive map reduce configuration. + * @param job + * @param tableName + * @param timeline + * @return + */ + public static Option> getCommitsForIncrementalQuery(Job job, String tableName, HoodieTimeline timeline) { + String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(job, tableName); + // Total number of commits to return in this batch. Set this to -1 to get all the commits. + Integer maxCommits = HoodieHiveUtils.readMaxCommits(job, tableName); + LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs); + return Option.of(timeline.findInstantsAfter(lastIncrementalTs, maxCommits) + .getInstants().collect(Collectors.toList())); + } + + /** + * Extract HoodieTableMetaClient by base path. + * @param conf + * @param partitions + * @return + */ + public static Map getTableMetaClientByBasePath(Configuration conf, Set partitions) { + Map metaClientMap = new HashMap<>(); + return partitions.stream().collect(Collectors.toMap(Function.identity(), p -> { + // Get meta client if this path is the base path. + Option matchingBasePath = Option.fromJavaOptional( + metaClientMap.keySet().stream().filter(basePath -> p.toString().startsWith(basePath)).findFirst()); + if (matchingBasePath.isPresent()) { + return metaClientMap.get(matchingBasePath.get()); + } + + try { + HoodieTableMetaClient metaClient = getTableMetaClientForBasePath(p.getFileSystem(conf), p); + metaClientMap.put(metaClient.getBasePath(), metaClient); + return metaClient; + } catch (IOException e) { + throw new HoodieIOException("Error creating hoodie meta client against : " + p, e); + } + })); + } + + /** + * Extract HoodieTableMetaClient from a partition path(not base path). + * @param fs + * @param dataPath + * @return + * @throws IOException + */ + public static HoodieTableMetaClient getTableMetaClientForBasePath(FileSystem fs, Path dataPath) throws IOException { + int levels = HoodieHiveUtils.DEFAULT_LEVELS_TO_BASEPATH; + if (HoodiePartitionMetadata.hasPartitionMetadata(fs, dataPath)) { + HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, dataPath); + metadata.readFromFS(); + levels = metadata.getPartitionDepth(); + } + Path baseDir = HoodieHiveUtils.getNthParent(dataPath, levels); + LOG.info("Reading hoodie metadata from path " + baseDir.toString()); + return new HoodieTableMetaClient(fs.getConf(), baseDir.toString()); + } + + /** + * Filter a list of FileStatus based on commitsToCheck for incremental view. + * @param job + * @param tableMetaClient + * @param timeline + * @param fileStatuses + * @param commitsToCheck + * @return + */ + public static List filterIncrementalFileStatus(Job job, HoodieTableMetaClient tableMetaClient, + HoodieTimeline timeline, FileStatus[] fileStatuses, List commitsToCheck) { + TableFileSystemView.BaseFileOnlyView roView = new HoodieTableFileSystemView(tableMetaClient, timeline, fileStatuses); + List commitsList = commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + List filteredFiles = roView.getLatestBaseFilesInRange(commitsList).collect(Collectors.toList()); + List returns = new ArrayList<>(); + for (HoodieBaseFile filteredFile : filteredFiles) { + LOG.debug("Processing incremental hoodie file - " + filteredFile.getPath()); + filteredFile = refreshFileStatus(job.getConfiguration(), filteredFile); + returns.add(filteredFile.getFileStatus()); + } + LOG.info("Total paths to process after hoodie incremental filter " + filteredFiles.size()); + return returns; + } + + /** + * Takes in a list of filesStatus and a list of table metadatas. Groups the files status list + * based on given table metadata. + * @param fileStatuses + * @param metaClientList + * @return + * @throws IOException + */ + public static Map> groupFileStatusForSnapshotPaths( + FileStatus[] fileStatuses, Collection metaClientList) { + // This assumes the paths for different tables are grouped together + Map> grouped = new HashMap<>(); + HoodieTableMetaClient metadata = null; + for (FileStatus status : fileStatuses) { + Path inputPath = status.getPath(); + if (!inputPath.getName().endsWith(".parquet")) { + //FIXME(vc): skip non parquet files for now. This wont be needed once log file name start + // with "." + continue; + } + if ((metadata == null) || (!inputPath.toString().contains(metadata.getBasePath()))) { + for (HoodieTableMetaClient metaClient : metaClientList) { + if (inputPath.toString().contains(metaClient.getBasePath())) { + metadata = metaClient; + if (!grouped.containsKey(metadata)) { + grouped.put(metadata, new ArrayList<>()); + } + break; + } + } + } + grouped.get(metadata).add(status); + } + return grouped; + } + + /** + * Filters data files for a snapshot queried table. + * @param job + * @param metadata + * @param fileStatuses + * @return + */ + public static List filterFileStatusForSnapshotMode( + JobConf job, HoodieTableMetaClient metadata, List fileStatuses) { + FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]); + if (LOG.isDebugEnabled()) { + LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metadata); + } + // Get all commits, delta commits, compactions, as all of them produce a base parquet file today + HoodieTimeline timeline = metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + TableFileSystemView.BaseFileOnlyView roView = new HoodieTableFileSystemView(metadata, timeline, statuses); + // filter files on the latest commit found + List filteredFiles = roView.getLatestBaseFiles().collect(Collectors.toList()); + LOG.info("Total paths to process after hoodie filter " + filteredFiles.size()); + List returns = new ArrayList<>(); + for (HoodieBaseFile filteredFile : filteredFiles) { + if (LOG.isDebugEnabled()) { + LOG.debug("Processing latest hoodie file - " + filteredFile.getPath()); + } + filteredFile = refreshFileStatus(job, filteredFile); + returns.add(filteredFile.getFileStatus()); + } + return returns; + } + + /** + * Checks the file status for a race condition which can set the file size to 0. 1. HiveInputFormat does + * super.listStatus() and gets back a FileStatus[] 2. Then it creates the HoodieTableMetaClient for the paths listed. + * 3. Generation of splits looks at FileStatus size to create splits, which skips this file + * @param conf + * @param dataFile + * @return + */ + private static HoodieBaseFile refreshFileStatus(Configuration conf, HoodieBaseFile dataFile) { + Path dataPath = dataFile.getFileStatus().getPath(); + try { + if (dataFile.getFileSize() == 0) { + FileSystem fs = dataPath.getFileSystem(conf); + LOG.info("Refreshing file status " + dataFile.getPath()); + return new HoodieBaseFile(fs.getFileStatus(dataPath)); + } + return dataFile; + } catch (IOException e) { + throw new HoodieIOException("Could not get FileStatus on path " + dataPath); + } + } + +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java new file mode 100644 index 000000000..7ae4ea0e5 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop.utils; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { + + private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class); + + public static InputSplit[] getRealtimeSplits(Configuration conf, Stream fileSplits) throws IOException { + Map> partitionsToParquetSplits = + fileSplits.collect(Collectors.groupingBy(split -> split.getPath().getParent())); + // TODO(vc): Should we handle also non-hoodie splits here? + Map partitionsToMetaClient = getTableMetaClientByBasePath(conf, partitionsToParquetSplits.keySet()); + + // for all unique split parents, obtain all delta files based on delta commit timeline, + // grouped on file id + List rtSplits = new ArrayList<>(); + partitionsToParquetSplits.keySet().forEach(partitionPath -> { + // for each partition path obtain the data & log file groupings, then map back to inputsplits + HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath); + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline()); + String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath); + + try { + // Both commit and delta-commits are included - pick the latest completed one + Option latestCompletedInstant = + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant(); + + Stream latestFileSlices = latestCompletedInstant + .map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp())) + .orElse(Stream.empty()); + + // subgroup splits again by file id & match with log files. + Map> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream() + .collect(Collectors.groupingBy(split -> FSUtils.getFileId(split.getPath().getName()))); + latestFileSlices.forEach(fileSlice -> { + List dataFileSplits = groupedInputSplits.get(fileSlice.getFileId()); + dataFileSplits.forEach(split -> { + try { + List logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) + .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()); + // Get the maxCommit from the last delta or compaction or commit - when + // bootstrapped from COW table + String maxCommitTime = metaClient + .getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, + HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) + .filterCompletedInstants().lastInstant().get().getTimestamp(); + rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFilePaths, maxCommitTime)); + } catch (IOException e) { + throw new HoodieIOException("Error creating hoodie real time split ", e); + } + }); + }); + } catch (Exception e) { + throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e); + } + }); + LOG.info("Returning a total splits of " + rtSplits.size()); + return rtSplits.toArray(new InputSplit[0]); + } + + // Return parquet file with a list of log files in the same file group. + public static Map> groupLogsByBaseFile(Configuration conf, Stream fileStatuses) { + Map> partitionsToParquetSplits = + fileStatuses.collect(Collectors.groupingBy(file -> file.getPath().getParent())); + // TODO(vc): Should we handle also non-hoodie splits here? + Map partitionsToMetaClient = getTableMetaClientByBasePath(conf, partitionsToParquetSplits.keySet()); + + // for all unique split parents, obtain all delta files based on delta commit timeline, + // grouped on file id + Map> resultMap = new HashMap<>(); + partitionsToParquetSplits.keySet().forEach(partitionPath -> { + // for each partition path obtain the data & log file groupings, then map back to inputsplits + HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath); + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline()); + String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath); + + try { + // Both commit and delta-commits are included - pick the latest completed one + Option latestCompletedInstant = + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant(); + + Stream latestFileSlices = latestCompletedInstant + .map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp())) + .orElse(Stream.empty()); + + // subgroup splits again by file id & match with log files. + Map> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream() + .collect(Collectors.groupingBy(file -> FSUtils.getFileId(file.getPath().getName()))); + latestFileSlices.forEach(fileSlice -> { + List dataFileSplits = groupedInputSplits.get(fileSlice.getFileId()); + dataFileSplits.forEach(split -> { + try { + List logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) + .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()); + resultMap.put(split.getPath().toString(), logFilePaths); + } catch (Exception e) { + throw new HoodieException("Error creating hoodie real time split ", e); + } + }); + }); + } catch (Exception e) { + throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e); + } + }); + return resultMap; + } + +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java new file mode 100644 index 000000000..6af37709c --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop.utils; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; + +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericArray; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.schema.MessageType; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; + +public class HoodieRealtimeRecordReaderUtils { + + /** + * Reads the schema from the parquet file. This is different from ParquetUtils as it uses the twitter parquet to + * support hive 1.1.0 + */ + public static MessageType readSchema(Configuration conf, Path parquetFilePath) { + try { + return ParquetFileReader.readFooter(conf, parquetFilePath).getFileMetaData().getSchema(); + } catch (IOException e) { + throw new HoodieIOException("Failed to read footer for parquet " + parquetFilePath, e); + } + } + + /** + * Prints a JSON representation of the ArrayWritable for easier debuggability. + */ + public static String arrayWritableToString(ArrayWritable writable) { + if (writable == null) { + return "null"; + } + StringBuilder builder = new StringBuilder(); + Writable[] values = writable.get(); + builder.append("\"values_" + Math.random() + "_" + values.length + "\": {"); + int i = 0; + for (Writable w : values) { + if (w instanceof ArrayWritable) { + builder.append(arrayWritableToString((ArrayWritable) w)).append(","); + } else { + builder.append("\"value" + i + "\":\"" + w + "\"").append(","); + if (w == null) { + builder.append("\"type" + i + "\":\"unknown\"").append(","); + } else { + builder.append("\"type" + i + "\":\"" + w.getClass().getSimpleName() + "\"").append(","); + } + } + i++; + } + builder.deleteCharAt(builder.length() - 1); + builder.append("}"); + return builder.toString(); + } + + /** + * Generate a reader schema off the provided writeSchema, to just project out the provided columns. + */ + public static Schema generateProjectionSchema(Schema writeSchema, Map schemaFieldsMap, + List fieldNames) { + /** + * Avro & Presto field names seems to be case sensitive (support fields differing only in case) whereas + * Hive/Impala/SparkSQL(default) are case-insensitive. Spark allows this to be configurable using + * spark.sql.caseSensitive=true + * + * For a RT table setup with no delta-files (for a latest file-slice) -> we translate parquet schema to Avro Here + * the field-name case is dependent on parquet schema. Hive (1.x/2.x/CDH) translate column projections to + * lower-cases + * + */ + List projectedFields = new ArrayList<>(); + for (String fn : fieldNames) { + Schema.Field field = schemaFieldsMap.get(fn.toLowerCase()); + if (field == null) { + throw new HoodieException("Field " + fn + " not found in log schema. Query cannot proceed! " + + "Derived Schema Fields: " + new ArrayList<>(schemaFieldsMap.keySet())); + } else { + projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal())); + } + } + + Schema projectedSchema = Schema.createRecord(writeSchema.getName(), writeSchema.getDoc(), + writeSchema.getNamespace(), writeSchema.isError()); + projectedSchema.setFields(projectedFields); + return projectedSchema; + } + + public static Map getNameToFieldMap(Schema schema) { + return schema.getFields().stream().map(r -> Pair.of(r.name().toLowerCase(), r)) + .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); + } + + /** + * Convert the projected read from delta record into an array writable. + */ + public static Writable avroToArrayWritable(Object value, Schema schema) { + + if (value == null) { + return null; + } + + switch (schema.getType()) { + case STRING: + return new Text(value.toString()); + case BYTES: + return new BytesWritable((byte[]) value); + case INT: + return new IntWritable((Integer) value); + case LONG: + return new LongWritable((Long) value); + case FLOAT: + return new FloatWritable((Float) value); + case DOUBLE: + return new DoubleWritable((Double) value); + case BOOLEAN: + return new BooleanWritable((Boolean) value); + case NULL: + return null; + case RECORD: + GenericRecord record = (GenericRecord) value; + Writable[] recordValues = new Writable[schema.getFields().size()]; + int recordValueIndex = 0; + for (Schema.Field field : schema.getFields()) { + recordValues[recordValueIndex++] = avroToArrayWritable(record.get(field.name()), field.schema()); + } + return new ArrayWritable(Writable.class, recordValues); + case ENUM: + return new Text(value.toString()); + case ARRAY: + GenericArray arrayValue = (GenericArray) value; + Writable[] arrayValues = new Writable[arrayValue.size()]; + int arrayValueIndex = 0; + for (Object obj : arrayValue) { + arrayValues[arrayValueIndex++] = avroToArrayWritable(obj, schema.getElementType()); + } + // Hive 1.x will fail here, it requires values2 to be wrapped into another ArrayWritable + return new ArrayWritable(Writable.class, arrayValues); + case MAP: + Map mapValue = (Map) value; + Writable[] mapValues = new Writable[mapValue.size()]; + int mapValueIndex = 0; + for (Object entry : mapValue.entrySet()) { + Map.Entry mapEntry = (Map.Entry) entry; + Writable[] nestedMapValues = new Writable[2]; + nestedMapValues[0] = new Text(mapEntry.getKey().toString()); + nestedMapValues[1] = avroToArrayWritable(mapEntry.getValue(), schema.getValueType()); + mapValues[mapValueIndex++] = new ArrayWritable(Writable.class, nestedMapValues); + } + // Hive 1.x will fail here, it requires values3 to be wrapped into another ArrayWritable + return new ArrayWritable(Writable.class, mapValues); + case UNION: + List types = schema.getTypes(); + if (types.size() != 2) { + throw new IllegalArgumentException("Only support union with 2 fields"); + } + Schema s1 = types.get(0); + Schema s2 = types.get(1); + if (s1.getType() == Schema.Type.NULL) { + return avroToArrayWritable(value, s2); + } else if (s2.getType() == Schema.Type.NULL) { + return avroToArrayWritable(value, s1); + } else { + throw new IllegalArgumentException("Only support union with null"); + } + case FIXED: + if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("decimal")) { + LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) LogicalTypes.fromSchema(schema); + HiveDecimalWritable writable = new HiveDecimalWritable(((GenericFixed) value).bytes(), + decimal.getScale()); + return HiveDecimalWritable.enforcePrecisionScale(writable, + decimal.getPrecision(), + decimal.getScale()); + } + return new BytesWritable(((GenericFixed) value).bytes()); + default: + return null; + } + } + + /** + * Given a comma separated list of field names and positions at which they appear on Hive, return + * an ordered list of field names, that can be passed onto storage. + */ + public static List orderFields(String fieldNameCsv, String fieldOrderCsv, List partitioningFields) { + // Need to convert the following to Set first since Hive does not handle duplicate field names correctly but + // handles duplicate fields orders correctly. + // Fields Orders -> {@link https://github + // .com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java + // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L188} + // Field Names -> {@link https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java + // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L229} + String[] fieldOrdersWithDups = fieldOrderCsv.split(","); + Set fieldOrdersSet = new LinkedHashSet<>(Arrays.asList(fieldOrdersWithDups)); + String[] fieldOrders = fieldOrdersSet.toArray(new String[0]); + List fieldNames = Arrays.stream(fieldNameCsv.split(",")) + .filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList()); + Set fieldNamesSet = new LinkedHashSet<>(fieldNames); + // Hive does not provide ids for partitioning fields, so check for lengths excluding that. + if (fieldNamesSet.size() != fieldOrders.length) { + throw new HoodieException(String + .format("Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d", + fieldNames.size(), fieldOrders.length)); + } + TreeMap orderedFieldMap = new TreeMap<>(); + String[] fieldNamesArray = fieldNamesSet.toArray(new String[0]); + for (int ox = 0; ox < fieldOrders.length; ox++) { + orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNamesArray[ox]); + } + return new ArrayList<>(orderedFieldMap.values()); + } + + /** + * Hive implementation of ParquetRecordReader results in partition columns not present in the original parquet file to + * also be part of the projected schema. Hive expects the record reader implementation to return the row in its + * entirety (with un-projected column having null values). As we use writerSchema for this, make sure writer schema + * also includes partition columns + * + * @param schema Schema to be changed + */ + public static Schema addPartitionFields(Schema schema, List partitioningFields) { + final Set firstLevelFieldNames = + schema.getFields().stream().map(Schema.Field::name).map(String::toLowerCase).collect(Collectors.toSet()); + List fieldsToAdd = partitioningFields.stream().map(String::toLowerCase) + .filter(x -> !firstLevelFieldNames.contains(x)).collect(Collectors.toList()); + + return HoodieAvroUtils.appendNullSchemaFields(schema, fieldsToAdd); + } +} diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java index ad38d3341..aa9d828c6 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.hadoop.testutils.InputFormatTestUtil; +import org.apache.hudi.hadoop.utils.HoodieHiveUtils; import org.apache.avro.Schema; import org.apache.hadoop.fs.FileStatus; @@ -103,7 +104,7 @@ public class TestHoodieParquetInputFormat { timeline.setInstants(instants); // Verify getCommitsTimelineBeforePendingCompaction does not return instants after first compaction instant - HoodieTimeline filteredTimeline = new HoodieParquetInputFormat().filterInstantsTimeline(timeline); + HoodieTimeline filteredTimeline = inputFormat.filterInstantsTimeline(timeline); assertTrue(filteredTimeline.containsInstant(t1)); assertTrue(filteredTimeline.containsInstant(t2)); assertFalse(filteredTimeline.containsInstant(t3)); @@ -116,7 +117,7 @@ public class TestHoodieParquetInputFormat { instants.remove(t3); timeline = new HoodieActiveTimeline(metaClient); timeline.setInstants(instants); - filteredTimeline = new HoodieParquetInputFormat().filterInstantsTimeline(timeline); + filteredTimeline = inputFormat.filterInstantsTimeline(timeline); // verify all remaining instants are returned. assertTrue(filteredTimeline.containsInstant(t1)); @@ -130,7 +131,7 @@ public class TestHoodieParquetInputFormat { instants.remove(t5); timeline = new HoodieActiveTimeline(metaClient); timeline.setInstants(instants); - filteredTimeline = new HoodieParquetInputFormat().filterInstantsTimeline(timeline); + filteredTimeline = inputFormat.filterInstantsTimeline(timeline); // verify all remaining instants are returned. assertTrue(filteredTimeline.containsInstant(t1)); @@ -267,7 +268,7 @@ public class TestHoodieParquetInputFormat { ensureFilesInCommit("Pulling 3 commits from 100, should get us the 1 files from 300 commit", files, "300", 1); ensureFilesInCommit("Pulling 3 commits from 100, should get us the 1 files from 200 commit", files, "200", 1); - InputFormatTestUtil.setupIncremental(jobConf, "100", HoodieHiveUtil.MAX_COMMIT_ALL); + InputFormatTestUtil.setupIncremental(jobConf, "100", HoodieHiveUtils.MAX_COMMIT_ALL); files = inputFormat.listStatus(jobConf); assertEquals(5, files.length, @@ -312,15 +313,15 @@ public class TestHoodieParquetInputFormat { public void testGetIncrementalTableNames() throws IOException { String[] expectedincrTables = {"db1.raw_trips", "db2.model_trips", "db3.model_trips"}; JobConf conf = new JobConf(); - String incrementalMode1 = String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[0]); - conf.set(incrementalMode1, HoodieHiveUtil.INCREMENTAL_SCAN_MODE); - String incrementalMode2 = String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[1]); - conf.set(incrementalMode2,HoodieHiveUtil.INCREMENTAL_SCAN_MODE); - String incrementalMode3 = String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, "db3.model_trips"); - conf.set(incrementalMode3, HoodieHiveUtil.INCREMENTAL_SCAN_MODE.toLowerCase()); - String defaultmode = String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips"); - conf.set(defaultmode, HoodieHiveUtil.DEFAULT_SCAN_MODE); - List actualincrTables = HoodieHiveUtil.getIncrementalTableNames(Job.getInstance(conf)); + String incrementalMode1 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[0]); + conf.set(incrementalMode1, HoodieHiveUtils.INCREMENTAL_SCAN_MODE); + String incrementalMode2 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[1]); + conf.set(incrementalMode2,HoodieHiveUtils.INCREMENTAL_SCAN_MODE); + String incrementalMode3 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.model_trips"); + conf.set(incrementalMode3, HoodieHiveUtils.INCREMENTAL_SCAN_MODE.toLowerCase()); + String defaultmode = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips"); + conf.set(defaultmode, HoodieHiveUtils.DEFAULT_SCAN_MODE); + List actualincrTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(conf)); for (String expectedincrTable : expectedincrTables) { assertTrue(actualincrTables.contains(expectedincrTable)); } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index 3e63fef6c..5a735ce1b 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.testutils.InputFormatTestUtil; +import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; @@ -80,7 +81,7 @@ public class TestHoodieRealtimeRecordReader { @BeforeEach public void setUp() { jobConf = new JobConf(); - jobConf.set(AbstractRealtimeRecordReader.MAX_DFS_STREAM_BUFFER_SIZE_PROP, String.valueOf(1024 * 1024)); + jobConf.set(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, String.valueOf(1024 * 1024)); hadoopConf = HoodieTestUtils.getDefaultHadoopConf(); fs = FSUtils.getFs(basePath.toString(), hadoopConf); } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index 0c6ed7ba5..05669bb35 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -28,7 +28,7 @@ import org.apache.hudi.common.table.log.block.HoodieCommandBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.SchemaTestUtil; -import org.apache.hudi.hadoop.HoodieHiveUtil; +import org.apache.hudi.hadoop.utils.HoodieHiveUtils; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -104,15 +104,15 @@ public class InputFormatTestUtil { public static void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull) { String modePropertyName = - String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); - jobConf.set(modePropertyName, HoodieHiveUtil.INCREMENTAL_SCAN_MODE); + String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.set(modePropertyName, HoodieHiveUtils.INCREMENTAL_SCAN_MODE); String startCommitTimestampName = - String.format(HoodieHiveUtil.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + String.format(HoodieHiveUtils.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); jobConf.set(startCommitTimestampName, startCommit); String maxCommitPulls = - String.format(HoodieHiveUtil.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); jobConf.setInt(maxCommitPulls, numberOfCommitsToPull); }