[HUDI-822] decouple Hudi related logics from HoodieInputFormat (#1592)
- Refactoring business logic out of InputFormat into Utils helpers.
This commit is contained in:
@@ -46,7 +46,7 @@ import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
import org.apache.hudi.config.HoodieIndexConfig;
|
||||
import org.apache.hudi.config.HoodieStorageConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.hadoop.HoodieHiveUtil;
|
||||
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
|
||||
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
|
||||
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
@@ -1463,19 +1463,19 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
|
||||
private void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull, boolean stopAtCompaction) {
|
||||
String modePropertyName =
|
||||
String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
|
||||
jobConf.set(modePropertyName, HoodieHiveUtil.INCREMENTAL_SCAN_MODE);
|
||||
String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
|
||||
jobConf.set(modePropertyName, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
|
||||
|
||||
String startCommitTimestampName =
|
||||
String.format(HoodieHiveUtil.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
|
||||
String.format(HoodieHiveUtils.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
|
||||
jobConf.set(startCommitTimestampName, startCommit);
|
||||
|
||||
String maxCommitPulls =
|
||||
String.format(HoodieHiveUtil.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
|
||||
String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
|
||||
jobConf.setInt(maxCommitPulls, numberOfCommitsToPull);
|
||||
|
||||
String stopAtCompactionPropName =
|
||||
String.format(HoodieHiveUtil.HOODIE_STOP_AT_COMPACTION_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
|
||||
String.format(HoodieHiveUtils.HOODIE_STOP_AT_COMPACTION_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
|
||||
jobConf.setBoolean(stopAtCompactionPropName, stopAtCompaction);
|
||||
}
|
||||
|
||||
|
||||
@@ -32,7 +32,7 @@ import org.apache.hudi.common.util.ParquetUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.config.HoodieStorageConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.hadoop.HoodieHiveUtil;
|
||||
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
|
||||
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
|
||||
import org.apache.hudi.io.HoodieCreateHandle;
|
||||
import org.apache.hudi.table.HoodieCopyOnWriteTable;
|
||||
@@ -225,15 +225,15 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
|
||||
|
||||
private void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull) {
|
||||
String modePropertyName =
|
||||
String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
|
||||
jobConf.set(modePropertyName, HoodieHiveUtil.INCREMENTAL_SCAN_MODE);
|
||||
String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
|
||||
jobConf.set(modePropertyName, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
|
||||
|
||||
String startCommitTimestampName =
|
||||
String.format(HoodieHiveUtil.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
|
||||
String.format(HoodieHiveUtils.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
|
||||
jobConf.set(startCommitTimestampName, startCommit);
|
||||
|
||||
String maxCommitPulls =
|
||||
String.format(HoodieHiveUtil.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
|
||||
String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
|
||||
jobConf.setInt(maxCommitPulls, numberOfCommitsToPull);
|
||||
}
|
||||
|
||||
|
||||
@@ -18,10 +18,17 @@
|
||||
|
||||
package org.apache.hudi.hadoop;
|
||||
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
|
||||
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
|
||||
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
|
||||
import org.apache.hadoop.io.ArrayWritable;
|
||||
@@ -31,31 +38,14 @@ import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.RecordReader;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodiePartitionMetadata;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* HoodieInputFormat which understands the Hoodie File Structure and filters files based on the Hoodie Mode. If paths
|
||||
@@ -69,10 +59,14 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
|
||||
|
||||
protected Configuration conf;
|
||||
|
||||
protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
|
||||
return HoodieInputFormatUtils.filterInstantsTimeline(timeline);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileStatus[] listStatus(JobConf job) throws IOException {
|
||||
// Segregate inputPaths[] to incremental, snapshot and non hoodie paths
|
||||
List<String> incrementalTables = HoodieHiveUtil.getIncrementalTableNames(Job.getInstance(job));
|
||||
List<String> incrementalTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(job));
|
||||
InputPathHandler inputPathHandler = new InputPathHandler(conf, getInputPaths(job), incrementalTables);
|
||||
List<FileStatus> returns = new ArrayList<>();
|
||||
|
||||
@@ -107,10 +101,10 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
|
||||
setInputPaths(job, snapshotPaths.toArray(new Path[snapshotPaths.size()]));
|
||||
FileStatus[] fileStatuses = super.listStatus(job);
|
||||
Map<HoodieTableMetaClient, List<FileStatus>> groupedFileStatus =
|
||||
groupFileStatusForSnapshotPaths(fileStatuses, tableMetaClientMap.values());
|
||||
HoodieInputFormatUtils.groupFileStatusForSnapshotPaths(fileStatuses, tableMetaClientMap.values());
|
||||
LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
|
||||
for (Map.Entry<HoodieTableMetaClient, List<FileStatus>> entry : groupedFileStatus.entrySet()) {
|
||||
List<FileStatus> result = filterFileStatusForSnapshotMode(entry.getKey(), entry.getValue());
|
||||
List<FileStatus> result = HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, entry.getKey(), entry.getValue());
|
||||
if (result != null) {
|
||||
returns.addAll(result);
|
||||
}
|
||||
@@ -119,36 +113,6 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
|
||||
return returns.toArray(new FileStatus[returns.size()]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Filter any specific instants that we do not want to process.
|
||||
* example timeline:
|
||||
*
|
||||
* t0 -> create bucket1.parquet
|
||||
* t1 -> create and append updates bucket1.log
|
||||
* t2 -> request compaction
|
||||
* t3 -> create bucket2.parquet
|
||||
*
|
||||
* if compaction at t2 takes a long time, incremental readers on RO tables can move to t3 and would skip updates in t1
|
||||
*
|
||||
* To workaround this problem, we want to stop returning data belonging to commits > t2.
|
||||
* After compaction is complete, incremental reader would see updates in t2, t3, so on.
|
||||
*/
|
||||
protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
|
||||
HoodieDefaultTimeline commitsAndCompactionTimeline = timeline.getCommitsAndCompactionTimeline();
|
||||
Option<HoodieInstant> pendingCompactionInstant = commitsAndCompactionTimeline.filterPendingCompactionTimeline().firstInstant();
|
||||
if (pendingCompactionInstant.isPresent()) {
|
||||
HoodieDefaultTimeline instantsTimeline = commitsAndCompactionTimeline.findInstantsBefore(pendingCompactionInstant.get().getTimestamp());
|
||||
int numCommitsFilteredByCompaction = commitsAndCompactionTimeline.getCommitsTimeline().countInstants()
|
||||
- instantsTimeline.getCommitsTimeline().countInstants();
|
||||
LOG.info("Earliest pending compaction instant is: " + pendingCompactionInstant.get().getTimestamp()
|
||||
+ " skipping " + numCommitsFilteredByCompaction + " commits");
|
||||
|
||||
return instantsTimeline;
|
||||
} else {
|
||||
return timeline;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Achieves listStatus functionality for an incrementally queried table. Instead of listing all
|
||||
* partitions and then filtering based on the commits of interest, this logic first extracts the
|
||||
@@ -158,157 +122,22 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
|
||||
JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {
|
||||
String tableName = tableMetaClient.getTableConfig().getTableName();
|
||||
Job jobContext = Job.getInstance(job);
|
||||
HoodieDefaultTimeline baseTimeline;
|
||||
if (HoodieHiveUtil.stopAtCompaction(jobContext, tableName)) {
|
||||
baseTimeline = filterInstantsTimeline(tableMetaClient.getActiveTimeline());
|
||||
} else {
|
||||
baseTimeline = tableMetaClient.getActiveTimeline();
|
||||
}
|
||||
|
||||
HoodieTimeline timeline = baseTimeline.getCommitsTimeline().filterCompletedInstants();
|
||||
String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(jobContext, tableName);
|
||||
// Total number of commits to return in this batch. Set this to -1 to get all the commits.
|
||||
Integer maxCommits = HoodieHiveUtil.readMaxCommits(jobContext, tableName);
|
||||
LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
|
||||
List<HoodieInstant> commitsToCheck = timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
|
||||
.getInstants().collect(Collectors.toList());
|
||||
// Extract partitions touched by the commitsToCheck
|
||||
Set<String> partitionsToList = new HashSet<>();
|
||||
for (HoodieInstant commit : commitsToCheck) {
|
||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
|
||||
HoodieCommitMetadata.class);
|
||||
partitionsToList.addAll(commitMetadata.getPartitionToWriteStats().keySet());
|
||||
}
|
||||
if (partitionsToList.isEmpty()) {
|
||||
Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
|
||||
if (!timeline.isPresent()) {
|
||||
return null;
|
||||
}
|
||||
String incrementalInputPaths = partitionsToList.stream()
|
||||
.map(s -> StringUtils.isNullOrEmpty(s) ? tableMetaClient.getBasePath() : tableMetaClient.getBasePath() + Path.SEPARATOR + s)
|
||||
.filter(s -> {
|
||||
/*
|
||||
* Ensure to return only results from the original input path that has incremental changes
|
||||
* This check is needed for the following corner case - When the caller invokes
|
||||
* HoodieInputFormat.listStatus multiple times (with small batches of Hive partitions each
|
||||
* time. Ex. Hive fetch task calls listStatus for every partition once) we do not want to
|
||||
* accidentally return all incremental changes for the entire table in every listStatus()
|
||||
* call. This will create redundant splits. Instead we only want to return the incremental
|
||||
* changes (if so any) in that batch of input paths.
|
||||
*
|
||||
* NOTE on Hive queries that are executed using Fetch task:
|
||||
* Since Fetch tasks invoke InputFormat.listStatus() per partition, Hoodie metadata can be
|
||||
* listed in every such listStatus() call. In order to avoid this, it might be useful to
|
||||
* disable fetch tasks using the hive session property for incremental queries:
|
||||
* `set hive.fetch.task.conversion=none;`
|
||||
* This would ensure Map Reduce execution is chosen for a Hive query, which combines
|
||||
* partitions (comma separated) and calls InputFormat.listStatus() only once with all
|
||||
* those partitions.
|
||||
*/
|
||||
for (Path path : inputPaths) {
|
||||
if (path.toString().contains(s)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
})
|
||||
.collect(Collectors.joining(","));
|
||||
if (StringUtils.isNullOrEmpty(incrementalInputPaths)) {
|
||||
Option<List<HoodieInstant>> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, timeline.get());
|
||||
if (!commitsToCheck.isPresent()) {
|
||||
return null;
|
||||
}
|
||||
Option<String> incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timeline.get(), inputPaths);
|
||||
// Mutate the JobConf to set the input paths to only partitions touched by incremental pull.
|
||||
setInputPaths(job, incrementalInputPaths);
|
||||
if (!incrementalInputPaths.isPresent()) {
|
||||
return null;
|
||||
}
|
||||
setInputPaths(job, incrementalInputPaths.get());
|
||||
FileStatus[] fileStatuses = super.listStatus(job);
|
||||
BaseFileOnlyView roView = new HoodieTableFileSystemView(tableMetaClient, timeline, fileStatuses);
|
||||
List<String> commitsList = commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
|
||||
List<HoodieBaseFile> filteredFiles = roView.getLatestBaseFilesInRange(commitsList).collect(Collectors.toList());
|
||||
List<FileStatus> returns = new ArrayList<>();
|
||||
for (HoodieBaseFile filteredFile : filteredFiles) {
|
||||
LOG.debug("Processing incremental hoodie file - " + filteredFile.getPath());
|
||||
filteredFile = checkFileStatus(filteredFile);
|
||||
returns.add(filteredFile.getFileStatus());
|
||||
}
|
||||
LOG.info("Total paths to process after hoodie incremental filter " + filteredFiles.size());
|
||||
return returns;
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes in a list of filesStatus and a list of table metadatas. Groups the files status list
|
||||
* based on given table metadata.
|
||||
* @param fileStatuses
|
||||
* @param metaClientList
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
private Map<HoodieTableMetaClient, List<FileStatus>> groupFileStatusForSnapshotPaths(
|
||||
FileStatus[] fileStatuses, Collection<HoodieTableMetaClient> metaClientList) {
|
||||
// This assumes the paths for different tables are grouped together
|
||||
Map<HoodieTableMetaClient, List<FileStatus>> grouped = new HashMap<>();
|
||||
HoodieTableMetaClient metadata = null;
|
||||
for (FileStatus status : fileStatuses) {
|
||||
Path inputPath = status.getPath();
|
||||
if (!inputPath.getName().endsWith(".parquet")) {
|
||||
//FIXME(vc): skip non parquet files for now. This wont be needed once log file name start
|
||||
// with "."
|
||||
continue;
|
||||
}
|
||||
if ((metadata == null) || (!inputPath.toString().contains(metadata.getBasePath()))) {
|
||||
for (HoodieTableMetaClient metaClient : metaClientList) {
|
||||
if (inputPath.toString().contains(metaClient.getBasePath())) {
|
||||
metadata = metaClient;
|
||||
if (!grouped.containsKey(metadata)) {
|
||||
grouped.put(metadata, new ArrayList<>());
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
grouped.get(metadata).add(status);
|
||||
}
|
||||
return grouped;
|
||||
}
|
||||
|
||||
/**
|
||||
* Filters data files for a snapshot queried table.
|
||||
*/
|
||||
private List<FileStatus> filterFileStatusForSnapshotMode(
|
||||
HoodieTableMetaClient metadata, List<FileStatus> fileStatuses) {
|
||||
FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metadata);
|
||||
}
|
||||
// Get all commits, delta commits, compactions, as all of them produce a base parquet file today
|
||||
HoodieTimeline timeline = metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
||||
BaseFileOnlyView roView = new HoodieTableFileSystemView(metadata, timeline, statuses);
|
||||
// filter files on the latest commit found
|
||||
List<HoodieBaseFile> filteredFiles = roView.getLatestBaseFiles().collect(Collectors.toList());
|
||||
LOG.info("Total paths to process after hoodie filter " + filteredFiles.size());
|
||||
List<FileStatus> returns = new ArrayList<>();
|
||||
for (HoodieBaseFile filteredFile : filteredFiles) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Processing latest hoodie file - " + filteredFile.getPath());
|
||||
}
|
||||
filteredFile = checkFileStatus(filteredFile);
|
||||
returns.add(filteredFile.getFileStatus());
|
||||
}
|
||||
return returns;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks the file status for a race condition which can set the file size to 0. 1. HiveInputFormat does
|
||||
* super.listStatus() and gets back a FileStatus[] 2. Then it creates the HoodieTableMetaClient for the paths listed.
|
||||
* 3. Generation of splits looks at FileStatus size to create splits, which skips this file
|
||||
*/
|
||||
private HoodieBaseFile checkFileStatus(HoodieBaseFile dataFile) {
|
||||
Path dataPath = dataFile.getFileStatus().getPath();
|
||||
try {
|
||||
if (dataFile.getFileSize() == 0) {
|
||||
FileSystem fs = dataPath.getFileSystem(conf);
|
||||
LOG.info("Refreshing file status " + dataFile.getPath());
|
||||
return new HoodieBaseFile(fs.getFileStatus(dataPath));
|
||||
}
|
||||
return dataFile;
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Could not get FileStatus on path " + dataPath);
|
||||
}
|
||||
return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
|
||||
}
|
||||
|
||||
public void setConf(Configuration conf) {
|
||||
@@ -338,19 +167,4 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
|
||||
return super.getRecordReader(split, job, reporter);
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the table metadata from a data path. This assumes certain hierarchy of files which should be changed once a
|
||||
* better way is figured out to pass in the hoodie meta directory
|
||||
*/
|
||||
protected static HoodieTableMetaClient getTableMetaClient(FileSystem fs, Path dataPath) throws IOException {
|
||||
int levels = HoodieHiveUtil.DEFAULT_LEVELS_TO_BASEPATH;
|
||||
if (HoodiePartitionMetadata.hasPartitionMetadata(fs, dataPath)) {
|
||||
HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, dataPath);
|
||||
metadata.readFromFS();
|
||||
levels = metadata.getPartitionDepth();
|
||||
}
|
||||
Path baseDir = HoodieHiveUtil.getNthParent(dataPath, levels);
|
||||
LOG.info("Reading hoodie metadata from path " + baseDir.toString());
|
||||
return new HoodieTableMetaClient(fs.getConf(), baseDir.toString());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.exception.TableNotFoundException;
|
||||
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
@@ -140,7 +141,7 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable {
|
||||
if (HoodiePartitionMetadata.hasPartitionMetadata(fs, folder)) {
|
||||
HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, folder);
|
||||
metadata.readFromFS();
|
||||
baseDir = HoodieHiveUtil.getNthParent(folder, metadata.getPartitionDepth());
|
||||
baseDir = HoodieHiveUtils.getNthParent(folder, metadata.getPartitionDepth());
|
||||
} else {
|
||||
baseDir = safeGetParentsParent(folder);
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@ import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.hudi.hadoop.HoodieParquetInputFormat.getTableMetaClient;
|
||||
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getTableMetaClientForBasePath;
|
||||
|
||||
/**
|
||||
* InputPathHandler takes in a set of input paths and incremental tables list. Then, classifies the
|
||||
@@ -97,7 +97,7 @@ public class InputPathHandler {
|
||||
// This path is for a table that we dont know about yet.
|
||||
HoodieTableMetaClient metaClient;
|
||||
try {
|
||||
metaClient = getTableMetaClient(inputPath.getFileSystem(conf), inputPath);
|
||||
metaClient = getTableMetaClientForBasePath(inputPath.getFileSystem(conf), inputPath);
|
||||
String tableName = metaClient.getTableConfig().getTableName();
|
||||
tableMetaClientMap.put(tableName, metaClient);
|
||||
tagAsIncrementalOrSnapshot(inputPath, tableName, metaClient, incrementalTables);
|
||||
|
||||
@@ -0,0 +1,42 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.hadoop.config;
|
||||
|
||||
/**
|
||||
* Class to hold props related to Hoodie RealtimeInputFormat and RealtimeRecordReader.
|
||||
*/
|
||||
public final class HoodieRealtimeConfig {
|
||||
|
||||
// Fraction of mapper/reducer task memory used for compaction of log files
|
||||
public static final String COMPACTION_MEMORY_FRACTION_PROP = "compaction.memory.fraction";
|
||||
public static final String DEFAULT_COMPACTION_MEMORY_FRACTION = "0.75";
|
||||
// used to choose a trade off between IO vs Memory when performing compaction process
|
||||
// Depending on outputfile size and memory provided, choose true to avoid OOM for large file
|
||||
// size + small memory
|
||||
public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = "compaction.lazy.block.read.enabled";
|
||||
public static final String DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED = "true";
|
||||
|
||||
// Property to set the max memory for dfs inputstream buffer size
|
||||
public static final String MAX_DFS_STREAM_BUFFER_SIZE_PROP = "hoodie.memory.dfs.buffer.max.size";
|
||||
// Setting this to lower value of 1 MB since no control over how many RecordReaders will be started in a mapper
|
||||
public static final int DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE = 1024 * 1024; // 1 MB
|
||||
// Property to set file path prefix for spillable file
|
||||
public static final String SPILLABLE_MAP_BASE_PATH_PROP = "hoodie.memory.spillable.map.path";
|
||||
// Default file path prefix for spillable file
|
||||
public static final String DEFAULT_SPILLABLE_MAP_BASE_PATH = "/tmp/";
|
||||
}
|
||||
@@ -18,74 +18,34 @@
|
||||
|
||||
package org.apache.hudi.hadoop.realtime;
|
||||
|
||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||
import org.apache.hudi.common.model.HoodieAvroPayload;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.log.LogReaderUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
|
||||
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
|
||||
|
||||
import org.apache.avro.LogicalTypes;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.Schema.Field;
|
||||
import org.apache.avro.generic.GenericArray;
|
||||
import org.apache.avro.generic.GenericFixed;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
|
||||
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
|
||||
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
|
||||
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
|
||||
import org.apache.hadoop.io.ArrayWritable;
|
||||
import org.apache.hadoop.io.BooleanWritable;
|
||||
import org.apache.hadoop.io.BytesWritable;
|
||||
import org.apache.hadoop.io.FloatWritable;
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
import org.apache.hadoop.io.LongWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.parquet.avro.AvroSchemaConverter;
|
||||
import org.apache.parquet.hadoop.ParquetFileReader;
|
||||
import org.apache.parquet.schema.MessageType;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Record Reader implementation to merge fresh avro data with base parquet data, to support real time queries.
|
||||
*/
|
||||
public abstract class AbstractRealtimeRecordReader {
|
||||
|
||||
// Fraction of mapper/reducer task memory used for compaction of log files
|
||||
public static final String COMPACTION_MEMORY_FRACTION_PROP = "compaction.memory.fraction";
|
||||
public static final String DEFAULT_COMPACTION_MEMORY_FRACTION = "0.75";
|
||||
// used to choose a trade off between IO vs Memory when performing compaction process
|
||||
// Depending on outputfile size and memory provided, choose true to avoid OOM for large file
|
||||
// size + small memory
|
||||
public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = "compaction.lazy.block.read.enabled";
|
||||
public static final String DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED = "true";
|
||||
|
||||
// Property to set the max memory for dfs inputstream buffer size
|
||||
public static final String MAX_DFS_STREAM_BUFFER_SIZE_PROP = "hoodie.memory.dfs.buffer.max.size";
|
||||
// Setting this to lower value of 1 MB since no control over how many RecordReaders will be started in a mapper
|
||||
public static final int DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE = 1024 * 1024; // 1 MB
|
||||
// Property to set file path prefix for spillable file
|
||||
public static final String SPILLABLE_MAP_BASE_PATH_PROP = "hoodie.memory.spillable.map.path";
|
||||
// Default file path prefix for spillable file
|
||||
public static final String DEFAULT_SPILLABLE_MAP_BASE_PATH = "/tmp/";
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(AbstractRealtimeRecordReader.class);
|
||||
|
||||
protected final HoodieRealtimeFileSplit split;
|
||||
@@ -106,7 +66,7 @@ public abstract class AbstractRealtimeRecordReader {
|
||||
try {
|
||||
this.usesCustomPayload = usesCustomPayload();
|
||||
LOG.info("usesCustomPayload ==> " + this.usesCustomPayload);
|
||||
baseFileSchema = readSchema(jobConf, split.getPath());
|
||||
baseFileSchema = HoodieRealtimeRecordReaderUtils.readSchema(jobConf, split.getPath());
|
||||
init();
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e);
|
||||
@@ -119,220 +79,6 @@ public abstract class AbstractRealtimeRecordReader {
|
||||
|| metaClient.getTableConfig().getPayloadClass().contains("org.apache.hudi.OverwriteWithLatestAvroPayload"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads the schema from the parquet file. This is different from ParquetUtils as it uses the twitter parquet to
|
||||
* support hive 1.1.0
|
||||
*/
|
||||
private static MessageType readSchema(Configuration conf, Path parquetFilePath) {
|
||||
try {
|
||||
return ParquetFileReader.readFooter(conf, parquetFilePath).getFileMetaData().getSchema();
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Failed to read footer for parquet " + parquetFilePath, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Prints a JSON representation of the ArrayWritable for easier debuggability.
|
||||
*/
|
||||
protected static String arrayWritableToString(ArrayWritable writable) {
|
||||
if (writable == null) {
|
||||
return "null";
|
||||
}
|
||||
StringBuilder builder = new StringBuilder();
|
||||
Writable[] values = writable.get();
|
||||
builder.append("\"values_" + Math.random() + "_" + values.length + "\": {");
|
||||
int i = 0;
|
||||
for (Writable w : values) {
|
||||
if (w instanceof ArrayWritable) {
|
||||
builder.append(arrayWritableToString((ArrayWritable) w)).append(",");
|
||||
} else {
|
||||
builder.append("\"value" + i + "\":\"" + w + "\"").append(",");
|
||||
if (w == null) {
|
||||
builder.append("\"type" + i + "\":\"unknown\"").append(",");
|
||||
} else {
|
||||
builder.append("\"type" + i + "\":\"" + w.getClass().getSimpleName() + "\"").append(",");
|
||||
}
|
||||
}
|
||||
i++;
|
||||
}
|
||||
builder.deleteCharAt(builder.length() - 1);
|
||||
builder.append("}");
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a comma separated list of field names and positions at which they appear on Hive, return
|
||||
* an ordered list of field names, that can be passed onto storage.
|
||||
*/
|
||||
private static List<String> orderFields(String fieldNameCsv, String fieldOrderCsv, List<String> partitioningFields) {
|
||||
// Need to convert the following to Set first since Hive does not handle duplicate field names correctly but
|
||||
// handles duplicate fields orders correctly.
|
||||
// Fields Orders -> {@link https://github
|
||||
// .com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java
|
||||
// /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L188}
|
||||
// Field Names -> {@link https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java
|
||||
// /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L229}
|
||||
String[] fieldOrdersWithDups = fieldOrderCsv.split(",");
|
||||
Set<String> fieldOrdersSet = new LinkedHashSet<>(Arrays.asList(fieldOrdersWithDups));
|
||||
String[] fieldOrders = fieldOrdersSet.toArray(new String[0]);
|
||||
List<String> fieldNames = Arrays.stream(fieldNameCsv.split(","))
|
||||
.filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList());
|
||||
Set<String> fieldNamesSet = new LinkedHashSet<>(fieldNames);
|
||||
// Hive does not provide ids for partitioning fields, so check for lengths excluding that.
|
||||
if (fieldNamesSet.size() != fieldOrders.length) {
|
||||
throw new HoodieException(String
|
||||
.format("Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d",
|
||||
fieldNames.size(), fieldOrders.length));
|
||||
}
|
||||
TreeMap<Integer, String> orderedFieldMap = new TreeMap<>();
|
||||
String[] fieldNamesArray = fieldNamesSet.toArray(new String[0]);
|
||||
for (int ox = 0; ox < fieldOrders.length; ox++) {
|
||||
orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNamesArray[ox]);
|
||||
}
|
||||
return new ArrayList<>(orderedFieldMap.values());
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a reader schema off the provided writeSchema, to just project out the provided columns.
|
||||
*/
|
||||
public static Schema generateProjectionSchema(Schema writeSchema, Map<String, Field> schemaFieldsMap,
|
||||
List<String> fieldNames) {
|
||||
/**
|
||||
* Avro & Presto field names seems to be case sensitive (support fields differing only in case) whereas
|
||||
* Hive/Impala/SparkSQL(default) are case-insensitive. Spark allows this to be configurable using
|
||||
* spark.sql.caseSensitive=true
|
||||
*
|
||||
* For a RT table setup with no delta-files (for a latest file-slice) -> we translate parquet schema to Avro Here
|
||||
* the field-name case is dependent on parquet schema. Hive (1.x/2.x/CDH) translate column projections to
|
||||
* lower-cases
|
||||
*
|
||||
*/
|
||||
List<Schema.Field> projectedFields = new ArrayList<>();
|
||||
for (String fn : fieldNames) {
|
||||
Schema.Field field = schemaFieldsMap.get(fn.toLowerCase());
|
||||
if (field == null) {
|
||||
throw new HoodieException("Field " + fn + " not found in log schema. Query cannot proceed! "
|
||||
+ "Derived Schema Fields: " + new ArrayList<>(schemaFieldsMap.keySet()));
|
||||
} else {
|
||||
projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()));
|
||||
}
|
||||
}
|
||||
|
||||
Schema projectedSchema = Schema.createRecord(writeSchema.getName(), writeSchema.getDoc(),
|
||||
writeSchema.getNamespace(), writeSchema.isError());
|
||||
projectedSchema.setFields(projectedFields);
|
||||
return projectedSchema;
|
||||
}
|
||||
|
||||
public static Map<String, Field> getNameToFieldMap(Schema schema) {
|
||||
return schema.getFields().stream().map(r -> Pair.of(r.name().toLowerCase(), r))
|
||||
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert the projected read from delta record into an array writable.
|
||||
*/
|
||||
public static Writable avroToArrayWritable(Object value, Schema schema) {
|
||||
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
switch (schema.getType()) {
|
||||
case STRING:
|
||||
return new Text(value.toString());
|
||||
case BYTES:
|
||||
return new BytesWritable((byte[]) value);
|
||||
case INT:
|
||||
return new IntWritable((Integer) value);
|
||||
case LONG:
|
||||
return new LongWritable((Long) value);
|
||||
case FLOAT:
|
||||
return new FloatWritable((Float) value);
|
||||
case DOUBLE:
|
||||
return new DoubleWritable((Double) value);
|
||||
case BOOLEAN:
|
||||
return new BooleanWritable((Boolean) value);
|
||||
case NULL:
|
||||
return null;
|
||||
case RECORD:
|
||||
GenericRecord record = (GenericRecord) value;
|
||||
Writable[] recordValues = new Writable[schema.getFields().size()];
|
||||
int recordValueIndex = 0;
|
||||
for (Schema.Field field : schema.getFields()) {
|
||||
recordValues[recordValueIndex++] = avroToArrayWritable(record.get(field.name()), field.schema());
|
||||
}
|
||||
return new ArrayWritable(Writable.class, recordValues);
|
||||
case ENUM:
|
||||
return new Text(value.toString());
|
||||
case ARRAY:
|
||||
GenericArray arrayValue = (GenericArray) value;
|
||||
Writable[] arrayValues = new Writable[arrayValue.size()];
|
||||
int arrayValueIndex = 0;
|
||||
for (Object obj : arrayValue) {
|
||||
arrayValues[arrayValueIndex++] = avroToArrayWritable(obj, schema.getElementType());
|
||||
}
|
||||
// Hive 1.x will fail here, it requires values2 to be wrapped into another ArrayWritable
|
||||
return new ArrayWritable(Writable.class, arrayValues);
|
||||
case MAP:
|
||||
Map mapValue = (Map) value;
|
||||
Writable[] mapValues = new Writable[mapValue.size()];
|
||||
int mapValueIndex = 0;
|
||||
for (Object entry : mapValue.entrySet()) {
|
||||
Map.Entry mapEntry = (Map.Entry) entry;
|
||||
Writable[] nestedMapValues = new Writable[2];
|
||||
nestedMapValues[0] = new Text(mapEntry.getKey().toString());
|
||||
nestedMapValues[1] = avroToArrayWritable(mapEntry.getValue(), schema.getValueType());
|
||||
mapValues[mapValueIndex++] = new ArrayWritable(Writable.class, nestedMapValues);
|
||||
}
|
||||
// Hive 1.x will fail here, it requires values3 to be wrapped into another ArrayWritable
|
||||
return new ArrayWritable(Writable.class, mapValues);
|
||||
case UNION:
|
||||
List<Schema> types = schema.getTypes();
|
||||
if (types.size() != 2) {
|
||||
throw new IllegalArgumentException("Only support union with 2 fields");
|
||||
}
|
||||
Schema s1 = types.get(0);
|
||||
Schema s2 = types.get(1);
|
||||
if (s1.getType() == Schema.Type.NULL) {
|
||||
return avroToArrayWritable(value, s2);
|
||||
} else if (s2.getType() == Schema.Type.NULL) {
|
||||
return avroToArrayWritable(value, s1);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Only support union with null");
|
||||
}
|
||||
case FIXED:
|
||||
if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("decimal")) {
|
||||
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) LogicalTypes.fromSchema(schema);
|
||||
HiveDecimalWritable writable = new HiveDecimalWritable(((GenericFixed) value).bytes(),
|
||||
decimal.getScale());
|
||||
return HiveDecimalWritable.enforcePrecisionScale(writable,
|
||||
decimal.getPrecision(),
|
||||
decimal.getScale());
|
||||
}
|
||||
return new BytesWritable(((GenericFixed) value).bytes());
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Hive implementation of ParquetRecordReader results in partition columns not present in the original parquet file to
|
||||
* also be part of the projected schema. Hive expects the record reader implementation to return the row in its
|
||||
* entirety (with un-projected column having null values). As we use writerSchema for this, make sure writer schema
|
||||
* also includes partition columns
|
||||
*
|
||||
* @param schema Schema to be changed
|
||||
*/
|
||||
private static Schema addPartitionFields(Schema schema, List<String> partitioningFields) {
|
||||
final Set<String> firstLevelFieldNames =
|
||||
schema.getFields().stream().map(Field::name).map(String::toLowerCase).collect(Collectors.toSet());
|
||||
List<String> fieldsToAdd = partitioningFields.stream().map(String::toLowerCase)
|
||||
.filter(x -> !firstLevelFieldNames.contains(x)).collect(Collectors.toList());
|
||||
|
||||
return HoodieAvroUtils.appendNullSchemaFields(schema, fieldsToAdd);
|
||||
}
|
||||
|
||||
/**
|
||||
* Goes through the log files in reverse order and finds the schema from the last available data block. If not, falls
|
||||
* back to the schema from the latest parquet file. Finally, sets the partition column and projection fields into the
|
||||
@@ -353,16 +99,16 @@ public abstract class AbstractRealtimeRecordReader {
|
||||
List<String> partitioningFields =
|
||||
partitionFields.length() > 0 ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList())
|
||||
: new ArrayList<>();
|
||||
writerSchema = addPartitionFields(writerSchema, partitioningFields);
|
||||
List<String> projectionFields = orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
|
||||
writerSchema = HoodieRealtimeRecordReaderUtils.addPartitionFields(writerSchema, partitioningFields);
|
||||
List<String> projectionFields = HoodieRealtimeRecordReaderUtils.orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
|
||||
jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR), partitioningFields);
|
||||
|
||||
Map<String, Field> schemaFieldsMap = getNameToFieldMap(writerSchema);
|
||||
Map<String, Field> schemaFieldsMap = HoodieRealtimeRecordReaderUtils.getNameToFieldMap(writerSchema);
|
||||
hiveSchema = constructHiveOrderedSchema(writerSchema, schemaFieldsMap);
|
||||
// TODO(vc): In the future, the reader schema should be updated based on log files & be able
|
||||
// to null out fields not present before
|
||||
|
||||
readerSchema = generateProjectionSchema(writerSchema, schemaFieldsMap, projectionFields);
|
||||
readerSchema = HoodieRealtimeRecordReaderUtils.generateProjectionSchema(writerSchema, schemaFieldsMap, projectionFields);
|
||||
LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s",
|
||||
split.getDeltaLogPaths(), split.getPath(), projectionFields));
|
||||
}
|
||||
@@ -409,7 +155,8 @@ public abstract class AbstractRealtimeRecordReader {
|
||||
public long getMaxCompactionMemoryInBytes() {
|
||||
// jobConf.getMemoryForMapTask() returns in MB
|
||||
return (long) Math
|
||||
.ceil(Double.parseDouble(jobConf.get(COMPACTION_MEMORY_FRACTION_PROP, DEFAULT_COMPACTION_MEMORY_FRACTION))
|
||||
.ceil(Double.parseDouble(jobConf.get(HoodieRealtimeConfig.COMPACTION_MEMORY_FRACTION_PROP,
|
||||
HoodieRealtimeConfig.DEFAULT_COMPACTION_MEMORY_FRACTION))
|
||||
* jobConf.getMemoryForMapTask() * 1024 * 1024L);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ package org.apache.hudi.hadoop.realtime;
|
||||
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.hadoop.hive.HoodieCombineRealtimeFileSplit;
|
||||
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
|
||||
|
||||
import org.apache.hadoop.io.ArrayWritable;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
@@ -65,7 +66,7 @@ public class HoodieCombineRealtimeRecordReader implements RecordReader<NullWrita
|
||||
public boolean next(NullWritable key, ArrayWritable value) throws IOException {
|
||||
if (this.currentRecordReader.next(key, value)) {
|
||||
LOG.info("Reading from record reader");
|
||||
LOG.info(AbstractRealtimeRecordReader.arrayWritableToString(value));
|
||||
LOG.info(HoodieRealtimeRecordReaderUtils.arrayWritableToString(value));
|
||||
return true;
|
||||
} else if (recordReaders.size() > 0) {
|
||||
this.currentRecordReader.close();
|
||||
|
||||
@@ -18,27 +18,17 @@
|
||||
|
||||
package org.apache.hudi.hadoop.realtime;
|
||||
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.FileSlice;
|
||||
import org.apache.hudi.common.model.HoodieLogFile;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||
import org.apache.hudi.common.util.CollectionUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
|
||||
import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
|
||||
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
|
||||
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
|
||||
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
|
||||
import org.apache.hadoop.io.ArrayWritable;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
@@ -52,13 +42,7 @@ import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
@@ -70,11 +54,6 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(HoodieParquetRealtimeInputFormat.class);
|
||||
|
||||
// These positions have to be deterministic across all tables
|
||||
public static final int HOODIE_COMMIT_TIME_COL_POS = 0;
|
||||
public static final int HOODIE_RECORD_KEY_COL_POS = 2;
|
||||
public static final int HOODIE_PARTITION_PATH_COL_POS = 3;
|
||||
public static final String HOODIE_READ_COLUMNS_PROP = "hoodie.read.columns.set";
|
||||
// To make Hive on Spark queries work with RT tables. Our theory is that due to
|
||||
// {@link org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher}
|
||||
// not handling empty list correctly, the ParquetRecordReaderWrapper ends up adding the same column ids multiple
|
||||
@@ -85,74 +64,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
|
||||
|
||||
Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is);
|
||||
|
||||
// obtain all unique parent folders for splits
|
||||
Map<Path, List<FileSplit>> partitionsToParquetSplits =
|
||||
fileSplits.collect(Collectors.groupingBy(split -> split.getPath().getParent()));
|
||||
// TODO(vc): Should we handle also non-hoodie splits here?
|
||||
Map<String, HoodieTableMetaClient> metaClientMap = new HashMap<>();
|
||||
Map<Path, HoodieTableMetaClient> partitionsToMetaClient =
|
||||
partitionsToParquetSplits.keySet().stream().collect(Collectors.toMap(Function.identity(), p -> {
|
||||
// find if we have a metaclient already for this partition.
|
||||
Option<String> matchingBasePath = Option.fromJavaOptional(
|
||||
metaClientMap.keySet().stream().filter(basePath -> p.toString().startsWith(basePath)).findFirst());
|
||||
if (matchingBasePath.isPresent()) {
|
||||
return metaClientMap.get(matchingBasePath.get());
|
||||
}
|
||||
|
||||
try {
|
||||
HoodieTableMetaClient metaClient = getTableMetaClient(p.getFileSystem(conf), p);
|
||||
metaClientMap.put(metaClient.getBasePath(), metaClient);
|
||||
return metaClient;
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Error creating hoodie meta client against : " + p, e);
|
||||
}
|
||||
}));
|
||||
|
||||
// for all unique split parents, obtain all delta files based on delta commit timeline,
|
||||
// grouped on file id
|
||||
List<HoodieRealtimeFileSplit> rtSplits = new ArrayList<>();
|
||||
partitionsToParquetSplits.keySet().forEach(partitionPath -> {
|
||||
// for each partition path obtain the data & log file groupings, then map back to inputsplits
|
||||
HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath);
|
||||
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
|
||||
String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath);
|
||||
|
||||
try {
|
||||
// Both commit and delta-commits are included - pick the latest completed one
|
||||
Option<HoodieInstant> latestCompletedInstant =
|
||||
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
|
||||
|
||||
Stream<FileSlice> latestFileSlices = latestCompletedInstant
|
||||
.map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp()))
|
||||
.orElse(Stream.empty());
|
||||
|
||||
// subgroup splits again by file id & match with log files.
|
||||
Map<String, List<FileSplit>> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream()
|
||||
.collect(Collectors.groupingBy(split -> FSUtils.getFileId(split.getPath().getName())));
|
||||
latestFileSlices.forEach(fileSlice -> {
|
||||
List<FileSplit> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
|
||||
dataFileSplits.forEach(split -> {
|
||||
try {
|
||||
List<String> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
|
||||
.map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
|
||||
// Get the maxCommit from the last delta or compaction or commit - when
|
||||
// bootstrapped from COW table
|
||||
String maxCommitTime = metaClient
|
||||
.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
|
||||
HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
|
||||
.filterCompletedInstants().lastInstant().get().getTimestamp();
|
||||
rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFilePaths, maxCommitTime));
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Error creating hoodie real time split ", e);
|
||||
}
|
||||
});
|
||||
});
|
||||
} catch (Exception e) {
|
||||
throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e);
|
||||
}
|
||||
});
|
||||
LOG.info("Returning a total splits of " + rtSplits.size());
|
||||
return rtSplits.toArray(new InputSplit[0]);
|
||||
return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -199,9 +111,9 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
|
||||
|
||||
private static void addRequiredProjectionFields(Configuration configuration) {
|
||||
// Need this to do merge records in HoodieRealtimeRecordReader
|
||||
addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, HOODIE_RECORD_KEY_COL_POS);
|
||||
addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HOODIE_COMMIT_TIME_COL_POS);
|
||||
addProjectionField(configuration, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HOODIE_PARTITION_PATH_COL_POS);
|
||||
addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS);
|
||||
addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS);
|
||||
addProjectionField(configuration, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_PARTITION_PATH_COL_POS);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -228,12 +140,12 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
|
||||
// risk of experiencing race conditions. Hence, we synchronize on the JobConf object here. There is negligible
|
||||
// latency incurred here due to the synchronization since get record reader is called once per spilt before the
|
||||
// actual heavy lifting of reading the parquet files happen.
|
||||
if (jobConf.get(HOODIE_READ_COLUMNS_PROP) == null) {
|
||||
if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null) {
|
||||
synchronized (jobConf) {
|
||||
LOG.info(
|
||||
"Before adding Hoodie columns, Projections :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
|
||||
+ ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
|
||||
if (jobConf.get(HOODIE_READ_COLUMNS_PROP) == null) {
|
||||
if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null) {
|
||||
// Hive (across all versions) fails for queries like select count(`_hoodie_commit_time`) from table;
|
||||
// In this case, the projection fields gets removed. Looking at HiveInputFormat implementation, in some cases
|
||||
// hoodie additional projection columns are reset after calling setConf and only natural projections
|
||||
@@ -245,7 +157,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
|
||||
addRequiredProjectionFields(jobConf);
|
||||
|
||||
this.conf = jobConf;
|
||||
this.conf.set(HOODIE_READ_COLUMNS_PROP, "true");
|
||||
this.conf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,6 +24,9 @@ import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
|
||||
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
|
||||
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hadoop.io.ArrayWritable;
|
||||
@@ -60,13 +63,17 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
|
||||
// NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit
|
||||
// but can return records for completed commits > the commit we are trying to read (if using
|
||||
// readCommit() API)
|
||||
return new HoodieMergedLogRecordScanner(FSUtils.getFs(split.getPath().toString(), jobConf), split.getBasePath(),
|
||||
split.getDeltaLogPaths(), usesCustomPayload ? getWriterSchema() : getReaderSchema(), split.getMaxCommitTime(),
|
||||
return new HoodieMergedLogRecordScanner(
|
||||
FSUtils.getFs(split.getPath().toString(), jobConf),
|
||||
split.getBasePath(),
|
||||
split.getDeltaLogPaths(),
|
||||
usesCustomPayload ? getWriterSchema() : getReaderSchema(),
|
||||
split.getMaxCommitTime(),
|
||||
getMaxCompactionMemoryInBytes(),
|
||||
Boolean
|
||||
.parseBoolean(jobConf.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
|
||||
false, jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE_PROP, DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE),
|
||||
jobConf.get(SPILLABLE_MAP_BASE_PATH_PROP, DEFAULT_SPILLABLE_MAP_BASE_PATH));
|
||||
Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
|
||||
false,
|
||||
jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE),
|
||||
jobConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -81,7 +88,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
|
||||
// TODO(VC): Right now, we assume all records in log, have a matching base record. (which
|
||||
// would be true until we have a way to index logs too)
|
||||
// return from delta records map if we have some match.
|
||||
String key = arrayWritable.get()[HoodieParquetRealtimeInputFormat.HOODIE_RECORD_KEY_COL_POS].toString();
|
||||
String key = arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString();
|
||||
if (deltaRecordMap.containsKey(key)) {
|
||||
// TODO(NA): Invoke preCombine here by converting arrayWritable to Avro. This is required since the
|
||||
// deltaRecord may not be a full record and needs values of columns from the parquet
|
||||
@@ -105,11 +112,11 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
|
||||
// we assume, a later safe record in the log, is newer than what we have in the map &
|
||||
// replace it. Since we want to return an arrayWritable which is the same length as the elements in the latest
|
||||
// schema, we use writerSchema to create the arrayWritable from the latest generic record
|
||||
ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(recordToReturn, getHiveSchema());
|
||||
ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(recordToReturn, getHiveSchema());
|
||||
Writable[] replaceValue = aWritable.get();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(String.format("key %s, base values: %s, log values: %s", key, arrayWritableToString(arrayWritable),
|
||||
arrayWritableToString(aWritable)));
|
||||
LOG.debug(String.format("key %s, base values: %s, log values: %s", key, HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable),
|
||||
HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable)));
|
||||
}
|
||||
Writable[] originalValue = arrayWritable.get();
|
||||
try {
|
||||
@@ -117,10 +124,10 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
|
||||
arrayWritable.set(originalValue);
|
||||
} catch (RuntimeException re) {
|
||||
LOG.error("Got exception when doing array copy", re);
|
||||
LOG.error("Base record :" + arrayWritableToString(arrayWritable));
|
||||
LOG.error("Log record :" + arrayWritableToString(aWritable));
|
||||
String errMsg = "Base-record :" + arrayWritableToString(arrayWritable)
|
||||
+ " ,Log-record :" + arrayWritableToString(aWritable) + " ,Error :" + re.getMessage();
|
||||
LOG.error("Base record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable));
|
||||
LOG.error("Log record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable));
|
||||
String errMsg = "Base-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable)
|
||||
+ " ,Log-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable) + " ,Error :" + re.getMessage();
|
||||
throw new RuntimeException(errMsg, re);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,6 +37,8 @@ import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
|
||||
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
|
||||
import org.apache.hudi.hadoop.RecordReaderValueIterator;
|
||||
import org.apache.hudi.hadoop.SafeParquetRecordReaderWrapper;
|
||||
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
|
||||
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
|
||||
|
||||
class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader
|
||||
implements RecordReader<NullWritable, ArrayWritable> {
|
||||
@@ -76,11 +78,11 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader
|
||||
this.iterator = this.executor.getQueue().iterator();
|
||||
this.logRecordScanner = new HoodieUnMergedLogRecordScanner(FSUtils.getFs(split.getPath().toString(), jobConf),
|
||||
split.getBasePath(), split.getDeltaLogPaths(), getReaderSchema(), split.getMaxCommitTime(),
|
||||
Boolean.parseBoolean(jobConf.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
|
||||
false, jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE_PROP, DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), record -> {
|
||||
Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
|
||||
false, jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), record -> {
|
||||
// convert Hoodie log record to Hadoop AvroWritable and buffer
|
||||
GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema()).get();
|
||||
ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(rec, getHiveSchema());
|
||||
ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema());
|
||||
this.executor.getQueue().insertRecord(aWritable);
|
||||
});
|
||||
// Start reading and buffering
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.hadoop;
|
||||
package org.apache.hudi.hadoop.utils;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapreduce.JobContext;
|
||||
@@ -31,9 +31,9 @@ import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class HoodieHiveUtil {
|
||||
public class HoodieHiveUtils {
|
||||
|
||||
public static final Logger LOG = LogManager.getLogger(HoodieHiveUtil.class);
|
||||
public static final Logger LOG = LogManager.getLogger(HoodieHiveUtils.class);
|
||||
|
||||
public static final String HOODIE_CONSUME_MODE_PATTERN = "hoodie.%s.consume.mode";
|
||||
public static final String HOODIE_START_COMMIT_PATTERN = "hoodie.%s.consume.start.timestamp";
|
||||
@@ -0,0 +1,342 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.hadoop.utils;
|
||||
|
||||
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodiePartitionMetadata;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||
import org.apache.hudi.common.table.view.TableFileSystemView;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class HoodieInputFormatUtils {
|
||||
|
||||
// These positions have to be deterministic across all tables
|
||||
public static final int HOODIE_COMMIT_TIME_COL_POS = 0;
|
||||
public static final int HOODIE_RECORD_KEY_COL_POS = 2;
|
||||
public static final int HOODIE_PARTITION_PATH_COL_POS = 3;
|
||||
public static final String HOODIE_READ_COLUMNS_PROP = "hoodie.read.columns.set";
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(HoodieInputFormatUtils.class);
|
||||
|
||||
/**
|
||||
* Filter any specific instants that we do not want to process.
|
||||
* example timeline:
|
||||
*
|
||||
* t0 -> create bucket1.parquet
|
||||
* t1 -> create and append updates bucket1.log
|
||||
* t2 -> request compaction
|
||||
* t3 -> create bucket2.parquet
|
||||
*
|
||||
* if compaction at t2 takes a long time, incremental readers on RO tables can move to t3 and would skip updates in t1
|
||||
*
|
||||
* To workaround this problem, we want to stop returning data belonging to commits > t2.
|
||||
* After compaction is complete, incremental reader would see updates in t2, t3, so on.
|
||||
* @param timeline
|
||||
* @return
|
||||
*/
|
||||
public static HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
|
||||
HoodieDefaultTimeline commitsAndCompactionTimeline = timeline.getCommitsAndCompactionTimeline();
|
||||
Option<HoodieInstant> pendingCompactionInstant = commitsAndCompactionTimeline
|
||||
.filterPendingCompactionTimeline().firstInstant();
|
||||
if (pendingCompactionInstant.isPresent()) {
|
||||
HoodieDefaultTimeline instantsTimeline = commitsAndCompactionTimeline
|
||||
.findInstantsBefore(pendingCompactionInstant.get().getTimestamp());
|
||||
int numCommitsFilteredByCompaction = commitsAndCompactionTimeline.getCommitsTimeline().countInstants()
|
||||
- instantsTimeline.getCommitsTimeline().countInstants();
|
||||
LOG.info("Earliest pending compaction instant is: " + pendingCompactionInstant.get().getTimestamp()
|
||||
+ " skipping " + numCommitsFilteredByCompaction + " commits");
|
||||
|
||||
return instantsTimeline;
|
||||
} else {
|
||||
return timeline;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract partitions touched by the commitsToCheck.
|
||||
* @param commitsToCheck
|
||||
* @param tableMetaClient
|
||||
* @param timeline
|
||||
* @param inputPaths
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public static Option<String> getAffectedPartitions(List<HoodieInstant> commitsToCheck,
|
||||
HoodieTableMetaClient tableMetaClient,
|
||||
HoodieTimeline timeline,
|
||||
List<Path> inputPaths) throws IOException {
|
||||
Set<String> partitionsToList = new HashSet<>();
|
||||
for (HoodieInstant commit : commitsToCheck) {
|
||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
|
||||
HoodieCommitMetadata.class);
|
||||
partitionsToList.addAll(commitMetadata.getPartitionToWriteStats().keySet());
|
||||
}
|
||||
if (partitionsToList.isEmpty()) {
|
||||
return Option.empty();
|
||||
}
|
||||
String incrementalInputPaths = partitionsToList.stream()
|
||||
.map(s -> StringUtils.isNullOrEmpty(s) ? tableMetaClient.getBasePath() : tableMetaClient.getBasePath() + Path.SEPARATOR + s)
|
||||
.filter(s -> {
|
||||
/*
|
||||
* Ensure to return only results from the original input path that has incremental changes
|
||||
* This check is needed for the following corner case - When the caller invokes
|
||||
* HoodieInputFormat.listStatus multiple times (with small batches of Hive partitions each
|
||||
* time. Ex. Hive fetch task calls listStatus for every partition once) we do not want to
|
||||
* accidentally return all incremental changes for the entire table in every listStatus()
|
||||
* call. This will create redundant splits. Instead we only want to return the incremental
|
||||
* changes (if so any) in that batch of input paths.
|
||||
*
|
||||
* NOTE on Hive queries that are executed using Fetch task:
|
||||
* Since Fetch tasks invoke InputFormat.listStatus() per partition, Hoodie metadata can be
|
||||
* listed in every such listStatus() call. In order to avoid this, it might be useful to
|
||||
* disable fetch tasks using the hive session property for incremental queries:
|
||||
* `set hive.fetch.task.conversion=none;`
|
||||
* This would ensure Map Reduce execution is chosen for a Hive query, which combines
|
||||
* partitions (comma separated) and calls InputFormat.listStatus() only once with all
|
||||
* those partitions.
|
||||
*/
|
||||
for (Path path : inputPaths) {
|
||||
if (path.toString().contains(s)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
})
|
||||
.collect(Collectors.joining(","));
|
||||
return Option.of(incrementalInputPaths);
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract HoodieTimeline based on HoodieTableMetaClient.
|
||||
* @param job
|
||||
* @param tableMetaClient
|
||||
* @return
|
||||
*/
|
||||
public static Option<HoodieTimeline> getFilteredCommitsTimeline(Job job, HoodieTableMetaClient tableMetaClient) {
|
||||
String tableName = tableMetaClient.getTableConfig().getTableName();
|
||||
HoodieDefaultTimeline baseTimeline;
|
||||
if (HoodieHiveUtils.stopAtCompaction(job, tableName)) {
|
||||
baseTimeline = filterInstantsTimeline(tableMetaClient.getActiveTimeline());
|
||||
} else {
|
||||
baseTimeline = tableMetaClient.getActiveTimeline();
|
||||
}
|
||||
return Option.of(baseTimeline.getCommitsTimeline().filterCompletedInstants());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get commits for incremental query from Hive map reduce configuration.
|
||||
* @param job
|
||||
* @param tableName
|
||||
* @param timeline
|
||||
* @return
|
||||
*/
|
||||
public static Option<List<HoodieInstant>> getCommitsForIncrementalQuery(Job job, String tableName, HoodieTimeline timeline) {
|
||||
String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(job, tableName);
|
||||
// Total number of commits to return in this batch. Set this to -1 to get all the commits.
|
||||
Integer maxCommits = HoodieHiveUtils.readMaxCommits(job, tableName);
|
||||
LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
|
||||
return Option.of(timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
|
||||
.getInstants().collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract HoodieTableMetaClient by base path.
|
||||
* @param conf
|
||||
* @param partitions
|
||||
* @return
|
||||
*/
|
||||
public static Map<Path, HoodieTableMetaClient> getTableMetaClientByBasePath(Configuration conf, Set<Path> partitions) {
|
||||
Map<String, HoodieTableMetaClient> metaClientMap = new HashMap<>();
|
||||
return partitions.stream().collect(Collectors.toMap(Function.identity(), p -> {
|
||||
// Get meta client if this path is the base path.
|
||||
Option<String> matchingBasePath = Option.fromJavaOptional(
|
||||
metaClientMap.keySet().stream().filter(basePath -> p.toString().startsWith(basePath)).findFirst());
|
||||
if (matchingBasePath.isPresent()) {
|
||||
return metaClientMap.get(matchingBasePath.get());
|
||||
}
|
||||
|
||||
try {
|
||||
HoodieTableMetaClient metaClient = getTableMetaClientForBasePath(p.getFileSystem(conf), p);
|
||||
metaClientMap.put(metaClient.getBasePath(), metaClient);
|
||||
return metaClient;
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Error creating hoodie meta client against : " + p, e);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract HoodieTableMetaClient from a partition path(not base path).
|
||||
* @param fs
|
||||
* @param dataPath
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public static HoodieTableMetaClient getTableMetaClientForBasePath(FileSystem fs, Path dataPath) throws IOException {
|
||||
int levels = HoodieHiveUtils.DEFAULT_LEVELS_TO_BASEPATH;
|
||||
if (HoodiePartitionMetadata.hasPartitionMetadata(fs, dataPath)) {
|
||||
HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, dataPath);
|
||||
metadata.readFromFS();
|
||||
levels = metadata.getPartitionDepth();
|
||||
}
|
||||
Path baseDir = HoodieHiveUtils.getNthParent(dataPath, levels);
|
||||
LOG.info("Reading hoodie metadata from path " + baseDir.toString());
|
||||
return new HoodieTableMetaClient(fs.getConf(), baseDir.toString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Filter a list of FileStatus based on commitsToCheck for incremental view.
|
||||
* @param job
|
||||
* @param tableMetaClient
|
||||
* @param timeline
|
||||
* @param fileStatuses
|
||||
* @param commitsToCheck
|
||||
* @return
|
||||
*/
|
||||
public static List<FileStatus> filterIncrementalFileStatus(Job job, HoodieTableMetaClient tableMetaClient,
|
||||
HoodieTimeline timeline, FileStatus[] fileStatuses, List<HoodieInstant> commitsToCheck) {
|
||||
TableFileSystemView.BaseFileOnlyView roView = new HoodieTableFileSystemView(tableMetaClient, timeline, fileStatuses);
|
||||
List<String> commitsList = commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
|
||||
List<HoodieBaseFile> filteredFiles = roView.getLatestBaseFilesInRange(commitsList).collect(Collectors.toList());
|
||||
List<FileStatus> returns = new ArrayList<>();
|
||||
for (HoodieBaseFile filteredFile : filteredFiles) {
|
||||
LOG.debug("Processing incremental hoodie file - " + filteredFile.getPath());
|
||||
filteredFile = refreshFileStatus(job.getConfiguration(), filteredFile);
|
||||
returns.add(filteredFile.getFileStatus());
|
||||
}
|
||||
LOG.info("Total paths to process after hoodie incremental filter " + filteredFiles.size());
|
||||
return returns;
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes in a list of filesStatus and a list of table metadatas. Groups the files status list
|
||||
* based on given table metadata.
|
||||
* @param fileStatuses
|
||||
* @param metaClientList
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public static Map<HoodieTableMetaClient, List<FileStatus>> groupFileStatusForSnapshotPaths(
|
||||
FileStatus[] fileStatuses, Collection<HoodieTableMetaClient> metaClientList) {
|
||||
// This assumes the paths for different tables are grouped together
|
||||
Map<HoodieTableMetaClient, List<FileStatus>> grouped = new HashMap<>();
|
||||
HoodieTableMetaClient metadata = null;
|
||||
for (FileStatus status : fileStatuses) {
|
||||
Path inputPath = status.getPath();
|
||||
if (!inputPath.getName().endsWith(".parquet")) {
|
||||
//FIXME(vc): skip non parquet files for now. This wont be needed once log file name start
|
||||
// with "."
|
||||
continue;
|
||||
}
|
||||
if ((metadata == null) || (!inputPath.toString().contains(metadata.getBasePath()))) {
|
||||
for (HoodieTableMetaClient metaClient : metaClientList) {
|
||||
if (inputPath.toString().contains(metaClient.getBasePath())) {
|
||||
metadata = metaClient;
|
||||
if (!grouped.containsKey(metadata)) {
|
||||
grouped.put(metadata, new ArrayList<>());
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
grouped.get(metadata).add(status);
|
||||
}
|
||||
return grouped;
|
||||
}
|
||||
|
||||
/**
|
||||
* Filters data files for a snapshot queried table.
|
||||
* @param job
|
||||
* @param metadata
|
||||
* @param fileStatuses
|
||||
* @return
|
||||
*/
|
||||
public static List<FileStatus> filterFileStatusForSnapshotMode(
|
||||
JobConf job, HoodieTableMetaClient metadata, List<FileStatus> fileStatuses) {
|
||||
FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metadata);
|
||||
}
|
||||
// Get all commits, delta commits, compactions, as all of them produce a base parquet file today
|
||||
HoodieTimeline timeline = metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
||||
TableFileSystemView.BaseFileOnlyView roView = new HoodieTableFileSystemView(metadata, timeline, statuses);
|
||||
// filter files on the latest commit found
|
||||
List<HoodieBaseFile> filteredFiles = roView.getLatestBaseFiles().collect(Collectors.toList());
|
||||
LOG.info("Total paths to process after hoodie filter " + filteredFiles.size());
|
||||
List<FileStatus> returns = new ArrayList<>();
|
||||
for (HoodieBaseFile filteredFile : filteredFiles) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Processing latest hoodie file - " + filteredFile.getPath());
|
||||
}
|
||||
filteredFile = refreshFileStatus(job, filteredFile);
|
||||
returns.add(filteredFile.getFileStatus());
|
||||
}
|
||||
return returns;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks the file status for a race condition which can set the file size to 0. 1. HiveInputFormat does
|
||||
* super.listStatus() and gets back a FileStatus[] 2. Then it creates the HoodieTableMetaClient for the paths listed.
|
||||
* 3. Generation of splits looks at FileStatus size to create splits, which skips this file
|
||||
* @param conf
|
||||
* @param dataFile
|
||||
* @return
|
||||
*/
|
||||
private static HoodieBaseFile refreshFileStatus(Configuration conf, HoodieBaseFile dataFile) {
|
||||
Path dataPath = dataFile.getFileStatus().getPath();
|
||||
try {
|
||||
if (dataFile.getFileSize() == 0) {
|
||||
FileSystem fs = dataPath.getFileSystem(conf);
|
||||
LOG.info("Refreshing file status " + dataFile.getPath());
|
||||
return new HoodieBaseFile(fs.getFileStatus(dataPath));
|
||||
}
|
||||
return dataFile;
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Could not get FileStatus on path " + dataPath);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,154 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.hadoop.utils;
|
||||
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.FileSlice;
|
||||
import org.apache.hudi.common.model.HoodieLogFile;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||
import org.apache.hudi.common.util.CollectionUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.FileSplit;
|
||||
import org.apache.hadoop.mapred.InputSplit;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
|
||||
|
||||
public static InputSplit[] getRealtimeSplits(Configuration conf, Stream<FileSplit> fileSplits) throws IOException {
|
||||
Map<Path, List<FileSplit>> partitionsToParquetSplits =
|
||||
fileSplits.collect(Collectors.groupingBy(split -> split.getPath().getParent()));
|
||||
// TODO(vc): Should we handle also non-hoodie splits here?
|
||||
Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByBasePath(conf, partitionsToParquetSplits.keySet());
|
||||
|
||||
// for all unique split parents, obtain all delta files based on delta commit timeline,
|
||||
// grouped on file id
|
||||
List<HoodieRealtimeFileSplit> rtSplits = new ArrayList<>();
|
||||
partitionsToParquetSplits.keySet().forEach(partitionPath -> {
|
||||
// for each partition path obtain the data & log file groupings, then map back to inputsplits
|
||||
HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath);
|
||||
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
|
||||
String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath);
|
||||
|
||||
try {
|
||||
// Both commit and delta-commits are included - pick the latest completed one
|
||||
Option<HoodieInstant> latestCompletedInstant =
|
||||
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
|
||||
|
||||
Stream<FileSlice> latestFileSlices = latestCompletedInstant
|
||||
.map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp()))
|
||||
.orElse(Stream.empty());
|
||||
|
||||
// subgroup splits again by file id & match with log files.
|
||||
Map<String, List<FileSplit>> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream()
|
||||
.collect(Collectors.groupingBy(split -> FSUtils.getFileId(split.getPath().getName())));
|
||||
latestFileSlices.forEach(fileSlice -> {
|
||||
List<FileSplit> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
|
||||
dataFileSplits.forEach(split -> {
|
||||
try {
|
||||
List<String> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
|
||||
.map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
|
||||
// Get the maxCommit from the last delta or compaction or commit - when
|
||||
// bootstrapped from COW table
|
||||
String maxCommitTime = metaClient
|
||||
.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
|
||||
HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
|
||||
.filterCompletedInstants().lastInstant().get().getTimestamp();
|
||||
rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFilePaths, maxCommitTime));
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Error creating hoodie real time split ", e);
|
||||
}
|
||||
});
|
||||
});
|
||||
} catch (Exception e) {
|
||||
throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e);
|
||||
}
|
||||
});
|
||||
LOG.info("Returning a total splits of " + rtSplits.size());
|
||||
return rtSplits.toArray(new InputSplit[0]);
|
||||
}
|
||||
|
||||
// Return parquet file with a list of log files in the same file group.
|
||||
public static Map<String, List<String>> groupLogsByBaseFile(Configuration conf, Stream<FileStatus> fileStatuses) {
|
||||
Map<Path, List<FileStatus>> partitionsToParquetSplits =
|
||||
fileStatuses.collect(Collectors.groupingBy(file -> file.getPath().getParent()));
|
||||
// TODO(vc): Should we handle also non-hoodie splits here?
|
||||
Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByBasePath(conf, partitionsToParquetSplits.keySet());
|
||||
|
||||
// for all unique split parents, obtain all delta files based on delta commit timeline,
|
||||
// grouped on file id
|
||||
Map<String, List<String>> resultMap = new HashMap<>();
|
||||
partitionsToParquetSplits.keySet().forEach(partitionPath -> {
|
||||
// for each partition path obtain the data & log file groupings, then map back to inputsplits
|
||||
HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath);
|
||||
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
|
||||
String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath);
|
||||
|
||||
try {
|
||||
// Both commit and delta-commits are included - pick the latest completed one
|
||||
Option<HoodieInstant> latestCompletedInstant =
|
||||
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
|
||||
|
||||
Stream<FileSlice> latestFileSlices = latestCompletedInstant
|
||||
.map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp()))
|
||||
.orElse(Stream.empty());
|
||||
|
||||
// subgroup splits again by file id & match with log files.
|
||||
Map<String, List<FileStatus>> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream()
|
||||
.collect(Collectors.groupingBy(file -> FSUtils.getFileId(file.getPath().getName())));
|
||||
latestFileSlices.forEach(fileSlice -> {
|
||||
List<FileStatus> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
|
||||
dataFileSplits.forEach(split -> {
|
||||
try {
|
||||
List<String> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
|
||||
.map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
|
||||
resultMap.put(split.getPath().toString(), logFilePaths);
|
||||
} catch (Exception e) {
|
||||
throw new HoodieException("Error creating hoodie real time split ", e);
|
||||
}
|
||||
});
|
||||
});
|
||||
} catch (Exception e) {
|
||||
throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e);
|
||||
}
|
||||
});
|
||||
return resultMap;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,271 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.hadoop.utils;
|
||||
|
||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
|
||||
import org.apache.avro.LogicalTypes;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericArray;
|
||||
import org.apache.avro.generic.GenericFixed;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
|
||||
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
|
||||
import org.apache.hadoop.io.ArrayWritable;
|
||||
import org.apache.hadoop.io.BooleanWritable;
|
||||
import org.apache.hadoop.io.BytesWritable;
|
||||
import org.apache.hadoop.io.FloatWritable;
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
import org.apache.hadoop.io.LongWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.parquet.hadoop.ParquetFileReader;
|
||||
import org.apache.parquet.schema.MessageType;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class HoodieRealtimeRecordReaderUtils {
|
||||
|
||||
/**
|
||||
* Reads the schema from the parquet file. This is different from ParquetUtils as it uses the twitter parquet to
|
||||
* support hive 1.1.0
|
||||
*/
|
||||
public static MessageType readSchema(Configuration conf, Path parquetFilePath) {
|
||||
try {
|
||||
return ParquetFileReader.readFooter(conf, parquetFilePath).getFileMetaData().getSchema();
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Failed to read footer for parquet " + parquetFilePath, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Prints a JSON representation of the ArrayWritable for easier debuggability.
|
||||
*/
|
||||
public static String arrayWritableToString(ArrayWritable writable) {
|
||||
if (writable == null) {
|
||||
return "null";
|
||||
}
|
||||
StringBuilder builder = new StringBuilder();
|
||||
Writable[] values = writable.get();
|
||||
builder.append("\"values_" + Math.random() + "_" + values.length + "\": {");
|
||||
int i = 0;
|
||||
for (Writable w : values) {
|
||||
if (w instanceof ArrayWritable) {
|
||||
builder.append(arrayWritableToString((ArrayWritable) w)).append(",");
|
||||
} else {
|
||||
builder.append("\"value" + i + "\":\"" + w + "\"").append(",");
|
||||
if (w == null) {
|
||||
builder.append("\"type" + i + "\":\"unknown\"").append(",");
|
||||
} else {
|
||||
builder.append("\"type" + i + "\":\"" + w.getClass().getSimpleName() + "\"").append(",");
|
||||
}
|
||||
}
|
||||
i++;
|
||||
}
|
||||
builder.deleteCharAt(builder.length() - 1);
|
||||
builder.append("}");
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a reader schema off the provided writeSchema, to just project out the provided columns.
|
||||
*/
|
||||
public static Schema generateProjectionSchema(Schema writeSchema, Map<String, Schema.Field> schemaFieldsMap,
|
||||
List<String> fieldNames) {
|
||||
/**
|
||||
* Avro & Presto field names seems to be case sensitive (support fields differing only in case) whereas
|
||||
* Hive/Impala/SparkSQL(default) are case-insensitive. Spark allows this to be configurable using
|
||||
* spark.sql.caseSensitive=true
|
||||
*
|
||||
* For a RT table setup with no delta-files (for a latest file-slice) -> we translate parquet schema to Avro Here
|
||||
* the field-name case is dependent on parquet schema. Hive (1.x/2.x/CDH) translate column projections to
|
||||
* lower-cases
|
||||
*
|
||||
*/
|
||||
List<Schema.Field> projectedFields = new ArrayList<>();
|
||||
for (String fn : fieldNames) {
|
||||
Schema.Field field = schemaFieldsMap.get(fn.toLowerCase());
|
||||
if (field == null) {
|
||||
throw new HoodieException("Field " + fn + " not found in log schema. Query cannot proceed! "
|
||||
+ "Derived Schema Fields: " + new ArrayList<>(schemaFieldsMap.keySet()));
|
||||
} else {
|
||||
projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()));
|
||||
}
|
||||
}
|
||||
|
||||
Schema projectedSchema = Schema.createRecord(writeSchema.getName(), writeSchema.getDoc(),
|
||||
writeSchema.getNamespace(), writeSchema.isError());
|
||||
projectedSchema.setFields(projectedFields);
|
||||
return projectedSchema;
|
||||
}
|
||||
|
||||
public static Map<String, Schema.Field> getNameToFieldMap(Schema schema) {
|
||||
return schema.getFields().stream().map(r -> Pair.of(r.name().toLowerCase(), r))
|
||||
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert the projected read from delta record into an array writable.
|
||||
*/
|
||||
public static Writable avroToArrayWritable(Object value, Schema schema) {
|
||||
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
switch (schema.getType()) {
|
||||
case STRING:
|
||||
return new Text(value.toString());
|
||||
case BYTES:
|
||||
return new BytesWritable((byte[]) value);
|
||||
case INT:
|
||||
return new IntWritable((Integer) value);
|
||||
case LONG:
|
||||
return new LongWritable((Long) value);
|
||||
case FLOAT:
|
||||
return new FloatWritable((Float) value);
|
||||
case DOUBLE:
|
||||
return new DoubleWritable((Double) value);
|
||||
case BOOLEAN:
|
||||
return new BooleanWritable((Boolean) value);
|
||||
case NULL:
|
||||
return null;
|
||||
case RECORD:
|
||||
GenericRecord record = (GenericRecord) value;
|
||||
Writable[] recordValues = new Writable[schema.getFields().size()];
|
||||
int recordValueIndex = 0;
|
||||
for (Schema.Field field : schema.getFields()) {
|
||||
recordValues[recordValueIndex++] = avroToArrayWritable(record.get(field.name()), field.schema());
|
||||
}
|
||||
return new ArrayWritable(Writable.class, recordValues);
|
||||
case ENUM:
|
||||
return new Text(value.toString());
|
||||
case ARRAY:
|
||||
GenericArray arrayValue = (GenericArray) value;
|
||||
Writable[] arrayValues = new Writable[arrayValue.size()];
|
||||
int arrayValueIndex = 0;
|
||||
for (Object obj : arrayValue) {
|
||||
arrayValues[arrayValueIndex++] = avroToArrayWritable(obj, schema.getElementType());
|
||||
}
|
||||
// Hive 1.x will fail here, it requires values2 to be wrapped into another ArrayWritable
|
||||
return new ArrayWritable(Writable.class, arrayValues);
|
||||
case MAP:
|
||||
Map mapValue = (Map) value;
|
||||
Writable[] mapValues = new Writable[mapValue.size()];
|
||||
int mapValueIndex = 0;
|
||||
for (Object entry : mapValue.entrySet()) {
|
||||
Map.Entry mapEntry = (Map.Entry) entry;
|
||||
Writable[] nestedMapValues = new Writable[2];
|
||||
nestedMapValues[0] = new Text(mapEntry.getKey().toString());
|
||||
nestedMapValues[1] = avroToArrayWritable(mapEntry.getValue(), schema.getValueType());
|
||||
mapValues[mapValueIndex++] = new ArrayWritable(Writable.class, nestedMapValues);
|
||||
}
|
||||
// Hive 1.x will fail here, it requires values3 to be wrapped into another ArrayWritable
|
||||
return new ArrayWritable(Writable.class, mapValues);
|
||||
case UNION:
|
||||
List<Schema> types = schema.getTypes();
|
||||
if (types.size() != 2) {
|
||||
throw new IllegalArgumentException("Only support union with 2 fields");
|
||||
}
|
||||
Schema s1 = types.get(0);
|
||||
Schema s2 = types.get(1);
|
||||
if (s1.getType() == Schema.Type.NULL) {
|
||||
return avroToArrayWritable(value, s2);
|
||||
} else if (s2.getType() == Schema.Type.NULL) {
|
||||
return avroToArrayWritable(value, s1);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Only support union with null");
|
||||
}
|
||||
case FIXED:
|
||||
if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("decimal")) {
|
||||
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) LogicalTypes.fromSchema(schema);
|
||||
HiveDecimalWritable writable = new HiveDecimalWritable(((GenericFixed) value).bytes(),
|
||||
decimal.getScale());
|
||||
return HiveDecimalWritable.enforcePrecisionScale(writable,
|
||||
decimal.getPrecision(),
|
||||
decimal.getScale());
|
||||
}
|
||||
return new BytesWritable(((GenericFixed) value).bytes());
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a comma separated list of field names and positions at which they appear on Hive, return
|
||||
* an ordered list of field names, that can be passed onto storage.
|
||||
*/
|
||||
public static List<String> orderFields(String fieldNameCsv, String fieldOrderCsv, List<String> partitioningFields) {
|
||||
// Need to convert the following to Set first since Hive does not handle duplicate field names correctly but
|
||||
// handles duplicate fields orders correctly.
|
||||
// Fields Orders -> {@link https://github
|
||||
// .com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java
|
||||
// /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L188}
|
||||
// Field Names -> {@link https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java
|
||||
// /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L229}
|
||||
String[] fieldOrdersWithDups = fieldOrderCsv.split(",");
|
||||
Set<String> fieldOrdersSet = new LinkedHashSet<>(Arrays.asList(fieldOrdersWithDups));
|
||||
String[] fieldOrders = fieldOrdersSet.toArray(new String[0]);
|
||||
List<String> fieldNames = Arrays.stream(fieldNameCsv.split(","))
|
||||
.filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList());
|
||||
Set<String> fieldNamesSet = new LinkedHashSet<>(fieldNames);
|
||||
// Hive does not provide ids for partitioning fields, so check for lengths excluding that.
|
||||
if (fieldNamesSet.size() != fieldOrders.length) {
|
||||
throw new HoodieException(String
|
||||
.format("Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d",
|
||||
fieldNames.size(), fieldOrders.length));
|
||||
}
|
||||
TreeMap<Integer, String> orderedFieldMap = new TreeMap<>();
|
||||
String[] fieldNamesArray = fieldNamesSet.toArray(new String[0]);
|
||||
for (int ox = 0; ox < fieldOrders.length; ox++) {
|
||||
orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNamesArray[ox]);
|
||||
}
|
||||
return new ArrayList<>(orderedFieldMap.values());
|
||||
}
|
||||
|
||||
/**
|
||||
* Hive implementation of ParquetRecordReader results in partition columns not present in the original parquet file to
|
||||
* also be part of the projected schema. Hive expects the record reader implementation to return the row in its
|
||||
* entirety (with un-projected column having null values). As we use writerSchema for this, make sure writer schema
|
||||
* also includes partition columns
|
||||
*
|
||||
* @param schema Schema to be changed
|
||||
*/
|
||||
public static Schema addPartitionFields(Schema schema, List<String> partitioningFields) {
|
||||
final Set<String> firstLevelFieldNames =
|
||||
schema.getFields().stream().map(Schema.Field::name).map(String::toLowerCase).collect(Collectors.toSet());
|
||||
List<String> fieldsToAdd = partitioningFields.stream().map(String::toLowerCase)
|
||||
.filter(x -> !firstLevelFieldNames.contains(x)).collect(Collectors.toList());
|
||||
|
||||
return HoodieAvroUtils.appendNullSchemaFields(schema, fieldsToAdd);
|
||||
}
|
||||
}
|
||||
@@ -29,6 +29,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
|
||||
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||
import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
|
||||
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
@@ -103,7 +104,7 @@ public class TestHoodieParquetInputFormat {
|
||||
timeline.setInstants(instants);
|
||||
|
||||
// Verify getCommitsTimelineBeforePendingCompaction does not return instants after first compaction instant
|
||||
HoodieTimeline filteredTimeline = new HoodieParquetInputFormat().filterInstantsTimeline(timeline);
|
||||
HoodieTimeline filteredTimeline = inputFormat.filterInstantsTimeline(timeline);
|
||||
assertTrue(filteredTimeline.containsInstant(t1));
|
||||
assertTrue(filteredTimeline.containsInstant(t2));
|
||||
assertFalse(filteredTimeline.containsInstant(t3));
|
||||
@@ -116,7 +117,7 @@ public class TestHoodieParquetInputFormat {
|
||||
instants.remove(t3);
|
||||
timeline = new HoodieActiveTimeline(metaClient);
|
||||
timeline.setInstants(instants);
|
||||
filteredTimeline = new HoodieParquetInputFormat().filterInstantsTimeline(timeline);
|
||||
filteredTimeline = inputFormat.filterInstantsTimeline(timeline);
|
||||
|
||||
// verify all remaining instants are returned.
|
||||
assertTrue(filteredTimeline.containsInstant(t1));
|
||||
@@ -130,7 +131,7 @@ public class TestHoodieParquetInputFormat {
|
||||
instants.remove(t5);
|
||||
timeline = new HoodieActiveTimeline(metaClient);
|
||||
timeline.setInstants(instants);
|
||||
filteredTimeline = new HoodieParquetInputFormat().filterInstantsTimeline(timeline);
|
||||
filteredTimeline = inputFormat.filterInstantsTimeline(timeline);
|
||||
|
||||
// verify all remaining instants are returned.
|
||||
assertTrue(filteredTimeline.containsInstant(t1));
|
||||
@@ -267,7 +268,7 @@ public class TestHoodieParquetInputFormat {
|
||||
ensureFilesInCommit("Pulling 3 commits from 100, should get us the 1 files from 300 commit", files, "300", 1);
|
||||
ensureFilesInCommit("Pulling 3 commits from 100, should get us the 1 files from 200 commit", files, "200", 1);
|
||||
|
||||
InputFormatTestUtil.setupIncremental(jobConf, "100", HoodieHiveUtil.MAX_COMMIT_ALL);
|
||||
InputFormatTestUtil.setupIncremental(jobConf, "100", HoodieHiveUtils.MAX_COMMIT_ALL);
|
||||
files = inputFormat.listStatus(jobConf);
|
||||
|
||||
assertEquals(5, files.length,
|
||||
@@ -312,15 +313,15 @@ public class TestHoodieParquetInputFormat {
|
||||
public void testGetIncrementalTableNames() throws IOException {
|
||||
String[] expectedincrTables = {"db1.raw_trips", "db2.model_trips", "db3.model_trips"};
|
||||
JobConf conf = new JobConf();
|
||||
String incrementalMode1 = String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[0]);
|
||||
conf.set(incrementalMode1, HoodieHiveUtil.INCREMENTAL_SCAN_MODE);
|
||||
String incrementalMode2 = String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[1]);
|
||||
conf.set(incrementalMode2,HoodieHiveUtil.INCREMENTAL_SCAN_MODE);
|
||||
String incrementalMode3 = String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, "db3.model_trips");
|
||||
conf.set(incrementalMode3, HoodieHiveUtil.INCREMENTAL_SCAN_MODE.toLowerCase());
|
||||
String defaultmode = String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips");
|
||||
conf.set(defaultmode, HoodieHiveUtil.DEFAULT_SCAN_MODE);
|
||||
List<String> actualincrTables = HoodieHiveUtil.getIncrementalTableNames(Job.getInstance(conf));
|
||||
String incrementalMode1 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[0]);
|
||||
conf.set(incrementalMode1, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
|
||||
String incrementalMode2 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[1]);
|
||||
conf.set(incrementalMode2,HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
|
||||
String incrementalMode3 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.model_trips");
|
||||
conf.set(incrementalMode3, HoodieHiveUtils.INCREMENTAL_SCAN_MODE.toLowerCase());
|
||||
String defaultmode = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips");
|
||||
conf.set(defaultmode, HoodieHiveUtils.DEFAULT_SCAN_MODE);
|
||||
List<String> actualincrTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(conf));
|
||||
for (String expectedincrTable : expectedincrTables) {
|
||||
assertTrue(actualincrTables.contains(expectedincrTable));
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@ import org.apache.hudi.common.testutils.SchemaTestUtil;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
|
||||
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.Schema.Field;
|
||||
@@ -80,7 +81,7 @@ public class TestHoodieRealtimeRecordReader {
|
||||
@BeforeEach
|
||||
public void setUp() {
|
||||
jobConf = new JobConf();
|
||||
jobConf.set(AbstractRealtimeRecordReader.MAX_DFS_STREAM_BUFFER_SIZE_PROP, String.valueOf(1024 * 1024));
|
||||
jobConf.set(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, String.valueOf(1024 * 1024));
|
||||
hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
|
||||
fs = FSUtils.getFs(basePath.toString(), hadoopConf);
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
|
||||
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
|
||||
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||
import org.apache.hudi.common.testutils.SchemaTestUtil;
|
||||
import org.apache.hudi.hadoop.HoodieHiveUtil;
|
||||
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
@@ -104,15 +104,15 @@ public class InputFormatTestUtil {
|
||||
|
||||
public static void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull) {
|
||||
String modePropertyName =
|
||||
String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
|
||||
jobConf.set(modePropertyName, HoodieHiveUtil.INCREMENTAL_SCAN_MODE);
|
||||
String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
|
||||
jobConf.set(modePropertyName, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
|
||||
|
||||
String startCommitTimestampName =
|
||||
String.format(HoodieHiveUtil.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
|
||||
String.format(HoodieHiveUtils.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
|
||||
jobConf.set(startCommitTimestampName, startCommit);
|
||||
|
||||
String maxCommitPulls =
|
||||
String.format(HoodieHiveUtil.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
|
||||
String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
|
||||
jobConf.setInt(maxCommitPulls, numberOfCommitsToPull);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user