[HUDI-1174] Changes for bootstrapped tables to work with presto (#1944)
The purpose of this pull request is to implement changes required on Hudi side to get Bootstrapped tables integrated with Presto. The testing was done against presto 0.232 and following changes were identified to make it work: Annotation UseRecordReaderFromInputFormat is required on HoodieParquetInputFormat as well, because the reading for bootstrapped tables needs to happen through record reader to be able to perform the merge. On presto side, this annotation is already handled. We need to internally maintain VIRTUAL_COLUMN_NAMES because presto's internal hive version hive-apache-1.2.2 has VirutalColumn as a class, versus the one we depend on in hudi which is an enum. Dependency changes in hudi-presto-bundle to avoid runtime exceptions.
This commit is contained in:
@@ -37,7 +37,6 @@ import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
|
||||
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
|
||||
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
|
||||
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
|
||||
import org.apache.hadoop.io.ArrayWritable;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
@@ -63,6 +62,7 @@ import java.util.stream.IntStream;
|
||||
* that does not correspond to a hoodie table then they are passed in as is (as what FileInputFormat.listStatus()
|
||||
* would do). The JobConf could have paths from multipe Hoodie/Non-Hoodie tables
|
||||
*/
|
||||
@UseRecordReaderFromInputFormat
|
||||
@UseFileSplitsFromInputFormat
|
||||
public class HoodieParquetInputFormat extends MapredParquetInputFormat implements Configurable {
|
||||
|
||||
@@ -179,7 +179,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
|
||||
// clearOutExistingPredicate(job);
|
||||
// }
|
||||
if (split instanceof BootstrapBaseFileSplit) {
|
||||
BootstrapBaseFileSplit eSplit = (BootstrapBaseFileSplit)split;
|
||||
BootstrapBaseFileSplit eSplit = (BootstrapBaseFileSplit) split;
|
||||
String[] rawColNames = HoodieColumnProjectionUtils.getReadColumnNames(job);
|
||||
List<Integer> rawColIds = HoodieColumnProjectionUtils.getReadColumnIDs(job);
|
||||
List<Pair<Integer, String>> projectedColsWithIndex =
|
||||
@@ -191,7 +191,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
|
||||
.collect(Collectors.toList());
|
||||
List<Pair<Integer, String>> externalColsProjected = projectedColsWithIndex.stream()
|
||||
.filter(idxWithName -> !HoodieRecord.HOODIE_META_COLUMNS.contains(idxWithName.getValue())
|
||||
&& !VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(idxWithName.getValue()))
|
||||
&& !HoodieHiveUtils.VIRTUAL_COLUMN_NAMES.contains(idxWithName.getValue()))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// This always matches hive table description
|
||||
|
||||
@@ -20,6 +20,7 @@ package org.apache.hudi.hadoop.utils;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapreduce.JobContext;
|
||||
import org.apache.hudi.common.util.CollectionUtils;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
@@ -27,6 +28,7 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
@@ -38,6 +40,9 @@ public class HoodieHiveUtils {
|
||||
public static final String HOODIE_CONSUME_MODE_PATTERN = "hoodie.%s.consume.mode";
|
||||
public static final String HOODIE_START_COMMIT_PATTERN = "hoodie.%s.consume.start.timestamp";
|
||||
public static final String HOODIE_MAX_COMMIT_PATTERN = "hoodie.%s.consume.max.commits";
|
||||
public static final Set<String> VIRTUAL_COLUMN_NAMES = CollectionUtils.createImmutableSet(
|
||||
"INPUT__FILE__NAME", "BLOCK__OFFSET__INSIDE__FILE", "ROW__OFFSET__INSIDE__BLOCK", "RAW__DATA__SIZE",
|
||||
"ROW__ID", "GROUPING__ID");
|
||||
/*
|
||||
* Boolean property to stop incremental reader when there is a pending compaction.
|
||||
* This is needed to prevent certain race conditions with RO views of MOR tables. only applicable for RO views.
|
||||
|
||||
@@ -29,14 +29,19 @@ public class TestAnnotation {
|
||||
@Test
|
||||
public void testHoodieParquetInputFormatAnnotation() {
|
||||
assertTrue(HoodieParquetInputFormat.class.isAnnotationPresent(UseFileSplitsFromInputFormat.class));
|
||||
assertTrue(HoodieParquetInputFormat.class.isAnnotationPresent(UseRecordReaderFromInputFormat.class));
|
||||
Annotation[] annotations = HoodieParquetInputFormat.class.getAnnotations();
|
||||
boolean found = false;
|
||||
boolean foundFileSplitsAnnotation = false;
|
||||
boolean foundRecordReaderAnnotation = false;
|
||||
for (Annotation annotation : annotations) {
|
||||
if ("UseFileSplitsFromInputFormat".equals(annotation.annotationType().getSimpleName())) {
|
||||
found = true;
|
||||
if (UseFileSplitsFromInputFormat.class.getSimpleName().equals(annotation.annotationType().getSimpleName())) {
|
||||
foundFileSplitsAnnotation = true;
|
||||
} else if (UseRecordReaderFromInputFormat.class.getSimpleName().equals(annotation.annotationType().getSimpleName())) {
|
||||
foundRecordReaderAnnotation = true;
|
||||
}
|
||||
}
|
||||
assertTrue(found);
|
||||
assertTrue(foundFileSplitsAnnotation);
|
||||
assertTrue(foundRecordReaderAnnotation);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -47,10 +52,9 @@ public class TestAnnotation {
|
||||
boolean foundFileSplitsAnnotation = false;
|
||||
boolean foundRecordReaderAnnotation = false;
|
||||
for (Annotation annotation : annotations) {
|
||||
if ("UseFileSplitsFromInputFormat".equals(annotation.annotationType().getSimpleName())) {
|
||||
if (UseFileSplitsFromInputFormat.class.getSimpleName().equals(annotation.annotationType().getSimpleName())) {
|
||||
foundFileSplitsAnnotation = true;
|
||||
}
|
||||
if ("UseRecordReaderFromInputFormat".equals(annotation.annotationType().getSimpleName())) {
|
||||
} else if (UseRecordReaderFromInputFormat.class.getSimpleName().equals(annotation.annotationType().getSimpleName())) {
|
||||
foundRecordReaderAnnotation = true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -76,7 +76,12 @@
|
||||
<include>org.apache.hbase:hbase-common</include>
|
||||
<include>org.apache.hbase:hbase-protocol</include>
|
||||
<include>org.apache.hbase:hbase-server</include>
|
||||
<include>org.apache.hbase:hbase-annotations</include>
|
||||
<include>org.apache.htrace:htrace-core</include>
|
||||
<include>com.yammer.metrics:metrics-core</include>
|
||||
<include>com.google.guava:guava</include>
|
||||
<include>commons-lang:commons-lang</include>
|
||||
<include>com.google.protobuf:protobuf-java</include>
|
||||
</includes>
|
||||
</artifactSet>
|
||||
<relocations>
|
||||
@@ -105,6 +110,22 @@
|
||||
<pattern> org.apache.htrace.</pattern>
|
||||
<shadedPattern>org.apache.hudi.org.apache.htrace.</shadedPattern>
|
||||
</relocation>
|
||||
<relocation>
|
||||
<pattern>com.yammer.metrics.</pattern>
|
||||
<shadedPattern>org.apache.hudi.com.yammer.metrics.</shadedPattern>
|
||||
</relocation>
|
||||
<relocation>
|
||||
<pattern>com.google.common.</pattern>
|
||||
<shadedPattern>${presto.bundle.bootstrap.shade.prefix}com.google.common.</shadedPattern>
|
||||
</relocation>
|
||||
<relocation>
|
||||
<pattern>org.apache.commons.lang.</pattern>
|
||||
<shadedPattern>${presto.bundle.bootstrap.shade.prefix}org.apache.commons.lang.</shadedPattern>
|
||||
</relocation>
|
||||
<relocation>
|
||||
<pattern>com.google.protobuf.</pattern>
|
||||
<shadedPattern>${presto.bundle.bootstrap.shade.prefix}com.google.protobuf.</shadedPattern>
|
||||
</relocation>
|
||||
</relocations>
|
||||
<createDependencyReducedPom>false</createDependencyReducedPom>
|
||||
<filters>
|
||||
@@ -159,5 +180,42 @@
|
||||
<artifactId>avro</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
<!--Guava needs to be shaded because HBase 1.2.3 depends on an earlier guava version i.e 12.0.1 and hits runtime
|
||||
issues with the guava version present in Presto runtime-->
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>12.0.1</version>
|
||||
<scope>${presto.bundle.bootstrap.scope}</scope>
|
||||
</dependency>
|
||||
|
||||
<!--commons-lang needs to be shaded because HBase 1.2.3 needs it at runtime, but Presto runtime does not have this
|
||||
dependency-->
|
||||
<dependency>
|
||||
<groupId>commons-lang</groupId>
|
||||
<artifactId>commons-lang</artifactId>
|
||||
<version>2.6</version>
|
||||
<scope>${presto.bundle.bootstrap.scope}</scope>
|
||||
</dependency>
|
||||
|
||||
<!--protobuf needs to be shaded because HBase 1.2.3 needs it at runtime, but Presto runtime does not have this
|
||||
dependency-->
|
||||
<dependency>
|
||||
<groupId>com.google.protobuf</groupId>
|
||||
<artifactId>protobuf-java</artifactId>
|
||||
<version>2.5.0</version>
|
||||
<scope>${presto.bundle.bootstrap.scope}</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
<profiles>
|
||||
<profile>
|
||||
<id>presto-shade-unbundle-bootstrap</id>
|
||||
<properties>
|
||||
<presto.bundle.bootstrap.scope>provided</presto.bundle.bootstrap.scope>
|
||||
<presto.bundle.bootstrap.shade.prefix></presto.bundle.bootstrap.shade.prefix>
|
||||
</properties>
|
||||
</profile>
|
||||
</profiles>
|
||||
</project>
|
||||
|
||||
2
pom.xml
2
pom.xml
@@ -126,6 +126,8 @@
|
||||
<utilities.bundle.hive.shade.prefix></utilities.bundle.hive.shade.prefix>
|
||||
<argLine>-Xmx2g</argLine>
|
||||
<jacoco.version>0.8.5</jacoco.version>
|
||||
<presto.bundle.bootstrap.scope>compile</presto.bundle.bootstrap.scope>
|
||||
<presto.bundle.bootstrap.shade.prefix>org.apache.hudi.</presto.bundle.bootstrap.shade.prefix>
|
||||
</properties>
|
||||
|
||||
<scm>
|
||||
|
||||
Reference in New Issue
Block a user