From 4bea7587384842ac82590b0a9e785b8f73690885 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 18 Jan 2022 14:54:51 -0800 Subject: [PATCH] [HUDI-3191] Rebasing Hive's FileInputFormat onto `AbstractHoodieTableFileIndex` (#4531) --- docker/demo/config/log4j.properties | 2 + docker/hoodie/hadoop/hive_base/Dockerfile | 2 + .../apache/hudi/io/HoodieAppendHandle.java | 2 +- hudi-common/pom.xml | 70 +++++++++- .../org/apache/hudi/common/fs/FSUtils.java | 6 + .../common/model/HoodieTableQueryType.java | 36 +++++ .../hudi/HoodieTableFileIndexBase.scala | 47 ++++--- .../apache/hudi/common/fs/TestFSUtils.java | 3 + hudi-hadoop-mr/pom.xml | 7 + .../hudi/hadoop/HiveHoodieTableFileIndex.java | 100 +++++++++++++ .../hadoop/HoodieFileInputFormatBase.java | 131 ++++++++++++++++-- .../hudi/hadoop/utils/HoodieHiveUtils.java | 25 ++-- .../hadoop/testutils/InputFormatTestUtil.java | 41 +++++- .../org/apache/hudi/integ/ITTestBase.java | 41 ++++-- .../hudi-spark-common/pom.xml | 1 + .../org/apache/hudi/HoodieFileIndex.scala | 9 +- .../hudi/SparkHoodieTableFileIndex.scala | 25 +++- packaging/hudi-hadoop-mr-bundle/pom.xml | 17 ++- packaging/hudi-hive-sync-bundle/pom.xml | 18 +++ packaging/hudi-presto-bundle/pom.xml | 90 +++++++++++- packaging/hudi-trino-bundle/pom.xml | 28 +++- 21 files changed, 623 insertions(+), 78 deletions(-) create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/model/HoodieTableQueryType.java rename hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala => hudi-common/src/main/scala/org/apache/hudi/HoodieTableFileIndexBase.scala (88%) create mode 100644 hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java diff --git a/docker/demo/config/log4j.properties b/docker/demo/config/log4j.properties index 225e62e47..df8ad3d15 100644 --- a/docker/demo/config/log4j.properties +++ b/docker/demo/config/log4j.properties @@ -35,6 +35,8 @@ log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO log4j.logger.org.apache.parquet=ERROR log4j.logger.parquet=ERROR log4j.logger.org.apache.spark=WARN +# Disabling Jetty logs +log4j.logger.org.apache.hudi.org.eclipse.jetty=ERROR # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR \ No newline at end of file diff --git a/docker/hoodie/hadoop/hive_base/Dockerfile b/docker/hoodie/hadoop/hive_base/Dockerfile index 8d85fd5b5..7d04d94fc 100644 --- a/docker/hoodie/hadoop/hive_base/Dockerfile +++ b/docker/hoodie/hadoop/hive_base/Dockerfile @@ -64,6 +64,8 @@ COPY entrypoint.sh /usr/local/bin/ RUN chmod +x /usr/local/bin/entrypoint.sh ENV PATH $HIVE_HOME/bin/:$PATH +# NOTE: This is the only battle-proven method to inject jars into Hive CLI +ENV AUX_CLASSPATH=file://${HUDI_HADOOP_BUNDLE} ENTRYPOINT ["entrypoint.sh"] CMD startup.sh diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 7cc0c5dfa..6df05a7c6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -162,7 +162,7 @@ public class HoodieAppendHandle extends deltaWriteStat.setLogFiles(logFiles); try { - //save hoodie partition meta in the partition path + // Save hoodie partition meta in the partition path HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, baseInstantTime, new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath)); partitionMetadata.trySave(getPartitionId()); diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml index c20ff22f8..e19070a6f 100644 --- a/hudi-common/pom.xml +++ b/hudi-common/pom.xml @@ -35,10 +35,61 @@ src/main/resources + + + + + net.alchim31.maven + scala-maven-plugin + ${scala-maven-plugin.version} + + + -nobootcp + -target:jvm-1.8 + + false + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + - org.jacoco - jacoco-maven-plugin + net.alchim31.maven + scala-maven-plugin + + + scala-compile-first + process-resources + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + compile + + compile + + + org.apache.maven.plugins @@ -60,6 +111,14 @@ org.apache.rat apache-rat-plugin + + org.scalastyle + scalastyle-maven-plugin + + + org.jacoco + jacoco-maven-plugin + org.apache.avro avro-maven-plugin @@ -97,6 +156,13 @@ + + + org.scala-lang + scala-library + ${scala.version} + + com.fasterxml.jackson.core diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index ceec282f1..d7af8a7d4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -202,7 +202,13 @@ public class FSUtils { public static String getRelativePartitionPath(Path basePath, Path fullPartitionPath) { basePath = Path.getPathWithoutSchemeAndAuthority(basePath); fullPartitionPath = Path.getPathWithoutSchemeAndAuthority(fullPartitionPath); + String fullPartitionPathStr = fullPartitionPath.toString(); + + if (!fullPartitionPathStr.startsWith(basePath.toString())) { + throw new IllegalArgumentException("Partition path does not belong to base-path"); + } + int partitionStartIndex = fullPartitionPathStr.indexOf(basePath.getName(), basePath.getParent() == null ? 0 : basePath.getParent().toString().length()); // Partition-Path could be empty for non-partitioned tables diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieTableQueryType.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieTableQueryType.java new file mode 100644 index 000000000..15449b329 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieTableQueryType.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +/** + * Hudi table could be queried in one of the 3 following ways: + * + *
    + *
  1. Snapshot: snapshot of the table at the given (latest if not provided) instant is queried
  2. + *
  3. Read Optimized (MOR only): snapshot of the table at the given (latest if not provided) + * instant is queried, but w/o reading any of the delta-log files (only reading base-files)
  4. + *
  5. Incremental: only records added w/in the given time-window (defined by beginning and ending instant) + * are queried
  6. + *
+ */ +public enum HoodieTableQueryType { + QUERY_TYPE_SNAPSHOT, + QUERY_TYPE_INCREMENTAL, + QUERY_TYPE_READ_OPTIMIZED +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala b/hudi-common/src/main/scala/org/apache/hudi/HoodieTableFileIndexBase.scala similarity index 88% rename from hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala rename to hudi-common/src/main/scala/org/apache/hudi/HoodieTableFileIndexBase.scala index 462d88e02..f25c7d99d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AbstractHoodieTableFileIndex.scala +++ b/hudi-common/src/main/scala/org/apache/hudi/HoodieTableFileIndexBase.scala @@ -18,17 +18,16 @@ package org.apache.hudi import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL} import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.engine.HoodieEngineContext import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.FileSlice import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ +import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView} -import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ +import scala.collection.JavaConversions._ import scala.collection.mutable /** @@ -50,11 +49,14 @@ import scala.collection.mutable * @param shouldIncludePendingCommits flags whether file-index should exclude any pending operations * @param fileStatusCache transient cache of fetched [[FileStatus]]es */ -abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, - metaClient: HoodieTableMetaClient, - configProperties: TypedProperties, - specifiedQueryInstant: Option[String] = None, - @transient fileStatusCache: FileStatusCacheTrait) { +abstract class HoodieTableFileIndexBase(engineContext: HoodieEngineContext, + metaClient: HoodieTableMetaClient, + configProperties: TypedProperties, + queryType: HoodieTableQueryType, + protected val queryPaths: Seq[Path], + specifiedQueryInstant: Option[String] = None, + shouldIncludePendingCommits: Boolean = false, + @transient fileStatusCache: FileStatusCacheTrait) { /** * Get all completeCommits. */ @@ -70,12 +72,11 @@ abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, private lazy val metadataConfig = HoodieMetadataConfig.newBuilder .fromProperties(configProperties) .build() - protected val basePath: String = metaClient.getBasePath - private val queryType = configProperties(QUERY_TYPE.key()) private val tableType = metaClient.getTableType - @transient private val queryPath = new Path(configProperties.getOrElse("path", "'path' option required")) + protected val basePath: String = metaClient.getBasePath + @transient @volatile protected var cachedFileSize: Long = 0L @transient @@ -109,12 +110,13 @@ abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, .getOrElse(Array.empty[FileStatus]) metaClient.reloadActiveTimeline() - val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants - val latestInstant = activeInstants.lastInstant() + + val activeTimeline = getActiveTimeline + val latestInstant = activeTimeline.lastInstant() // TODO we can optimize the flow by: // - First fetch list of files from instants of interest // - Load FileStatus's - fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants, allFiles) + fileSystemView = new HoodieTableFileSystemView(metaClient, activeTimeline, allFiles) val queryInstant = if (specifiedQueryInstant.isDefined) { specifiedQueryInstant } else if (latestInstant.isPresent) { @@ -124,7 +126,7 @@ abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, } (tableType, queryType) match { - case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) => + case (MERGE_ON_READ, HoodieTableQueryType.QUERY_TYPE_SNAPSHOT) => // Fetch and store latest base and log files, and their sizes cachedAllInputFileSlices = partitionFiles.map(p => { val latestSlices = if (queryInstant.isDefined) { @@ -168,6 +170,15 @@ abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, refresh0() } + private def getActiveTimeline = { + val timeline = metaClient.getActiveTimeline.getCommitsTimeline + if (shouldIncludePendingCommits) { + timeline + } else { + timeline.filterCompletedInstants() + } + } + private def fileSliceSize(fileSlice: FileSlice): Long = { val logFileSize = fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).filter(_ > 0).sum if (fileSlice.getBaseFile.isPresent) { @@ -216,11 +227,11 @@ abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext, } def getAllQueryPartitionPaths: Seq[PartitionPath] = { - val queryPartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), queryPath) + val queryRelativePartitionPaths = queryPaths.map(FSUtils.getRelativePartitionPath(new Path(basePath), _)) // Load all the partition path from the basePath, and filter by the query partition path. - // TODO load files from the queryPartitionPath directly. + // TODO load files from the queryRelativePartitionPaths directly. val partitionPaths = FSUtils.getAllPartitionPaths(engineContext, metadataConfig, basePath).asScala - .filter(_.startsWith(queryPartitionPath)) + .filter(path => queryRelativePartitionPaths.exists(path.startsWith)) val partitionSchema = _partitionColumns diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java index 3e8ad220b..e4460ce62 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java @@ -190,6 +190,9 @@ public class TestFSUtils extends HoodieCommonTestHarness { Path basePath = new Path("/test/apache"); Path partitionPath = new Path("/test/apache/hudi/sub"); assertEquals("hudi/sub", FSUtils.getRelativePartitionPath(basePath, partitionPath)); + + Path nonPartitionPath = new Path("/test/something/else"); + assertThrows(IllegalArgumentException.class, () -> FSUtils.getRelativePartitionPath(basePath, nonPartitionPath)); } @Test diff --git a/hudi-hadoop-mr/pom.xml b/hudi-hadoop-mr/pom.xml index bf87bfaa3..57fbdb7b8 100644 --- a/hudi-hadoop-mr/pom.xml +++ b/hudi-hadoop-mr/pom.xml @@ -30,6 +30,13 @@ + + + org.scala-lang + scala-library + ${scala.version} + + org.apache.hudi diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java new file mode 100644 index 000000000..585728d1e --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.HoodieTableFileIndexBase; +import org.apache.hudi.FileStatusCacheTrait; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieTableQueryType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Function0; +import scala.collection.JavaConverters; + +import java.util.List; + +/** + * Implementation of {@link HoodieTableFileIndexBase} for Hive-based query engines + */ +public class HiveHoodieTableFileIndex extends HoodieTableFileIndexBase { + + public static final Logger LOG = LoggerFactory.getLogger(HiveHoodieTableFileIndex.class); + + public HiveHoodieTableFileIndex(HoodieEngineContext engineContext, + HoodieTableMetaClient metaClient, + TypedProperties configProperties, + HoodieTableQueryType queryType, + List queryPaths, + Option specifiedQueryInstant, + boolean shouldIncludePendingCommits + ) { + super(engineContext, + metaClient, + configProperties, + queryType, + JavaConverters.asScalaBufferConverter(queryPaths).asScala(), + toScalaOption(specifiedQueryInstant), + shouldIncludePendingCommits, + new NoopCache()); + } + + private static scala.Option toScalaOption(Option opt) { + return scala.Option.apply(opt.orElse(null)); + } + + @Override + public Object[] parsePartitionColumnValues(String[] partitionColumns, String partitionPath) { + // NOTE: Parsing partition path into partition column values isn't required on Hive, + // since Hive does partition pruning in a different way (based on the input-path being + // fetched by the query engine) + return new Object[0]; + } + + @Override + public void logInfo(Function0 lazyStr) { + LOG.info(lazyStr.apply()); + } + + @Override + public void logWarning(Function0 lazyStr) { + LOG.info(lazyStr.apply()); + } + + static class NoopCache implements FileStatusCacheTrait { + @Override + public scala.Option get(Path path) { + return scala.Option.empty(); + } + + @Override + public void put(Path path, FileStatus[] leafFiles) { + // no-op + } + + @Override + public void invalidate() { + // no-op + } + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java index 821e467e9..9597256eb 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java @@ -27,18 +27,33 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieTableQueryType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.hadoop.utils.HoodieHiveUtils; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; +import scala.collection.JavaConverters; +import scala.collection.Seq; +import javax.annotation.Nonnull; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.common.util.ValidationUtils.checkState; /** * Base implementation of the Hive's {@link FileInputFormat} allowing for reading of Hudi's @@ -67,6 +82,27 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat logFiles) { + List sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); + try { + RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(latestLogFile.getFileStatus()); + rtFileStatus.setDeltaLogFiles(sortedLogFiles); + return rtFileStatus; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Nonnull + private static FileStatus getFileStatusUnchecked(Option baseFileOpt) { + try { + return HoodieInputFormatUtils.getFileStatus(baseFileOpt.get()); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + @Override public FileStatus[] listStatus(JobConf job) throws IOException { // Segregate inputPaths[] to incremental, snapshot and non hoodie paths @@ -102,18 +138,98 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat snapshotPaths = inputPathHandler.getSnapshotPaths(); if (snapshotPaths.size() > 0) { - returns.addAll(HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, includeLogFilesForSnapShotView())); + returns.addAll(listStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths)); } return returns.toArray(new FileStatus[0]); } + @Nonnull + private List listStatusForSnapshotMode(JobConf job, + Map tableMetaClientMap, + List snapshotPaths) throws IOException { + HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(job); + List targetFiles = new ArrayList<>(); + + TypedProperties props = new TypedProperties(new Properties()); + + Map> groupedPaths = + HoodieInputFormatUtils.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(), snapshotPaths); + + for (Map.Entry> entry : groupedPaths.entrySet()) { + HoodieTableMetaClient tableMetaClient = entry.getKey(); + List partitionPaths = entry.getValue(); + + // Hive job might specify a max commit instant up to which table's state + // should be examined. We simply pass it as query's instant to the file-index + Option queryCommitInstant = + HoodieHiveUtils.getMaxCommit(job, tableMetaClient.getTableConfig().getTableName()); + + boolean shouldIncludePendingCommits = + HoodieHiveUtils.shouldIncludePendingCommits(job, tableMetaClient.getTableConfig().getTableName()); + + HiveHoodieTableFileIndex fileIndex = + new HiveHoodieTableFileIndex( + engineContext, + tableMetaClient, + props, + HoodieTableQueryType.QUERY_TYPE_SNAPSHOT, + partitionPaths, + queryCommitInstant, + shouldIncludePendingCommits); + + Map> partitionedFileSlices = + JavaConverters.mapAsJavaMapConverter(fileIndex.listFileSlices()).asJava(); + + targetFiles.addAll( + partitionedFileSlices.values() + .stream() + .flatMap(seq -> JavaConverters.seqAsJavaListConverter(seq).asJava().stream()) + .map(fileSlice -> { + Option baseFileOpt = fileSlice.getBaseFile(); + Option latestLogFileOpt = fileSlice.getLatestLogFile(); + if (baseFileOpt.isPresent()) { + return getFileStatusUnchecked(baseFileOpt); + } else if (includeLogFilesForSnapShotView() && latestLogFileOpt.isPresent()) { + return createRealtimeFileStatusUnchecked(latestLogFileOpt.get(), fileSlice.getLogFiles()); + } else { + throw new IllegalStateException("Invalid state: either base-file or log-file should be present"); + } + }) + .collect(Collectors.toList()) + ); + } + + // TODO cleanup + validate(targetFiles, listStatusForSnapshotModeLegacy(job, tableMetaClientMap, snapshotPaths)); + + return targetFiles; + } + + private void validate(List targetFiles, List legacyFileStatuses) { + List diff = CollectionUtils.diff(targetFiles, legacyFileStatuses); + checkState(diff.isEmpty(), "Should be empty"); + } + + @Nonnull + private List listStatusForSnapshotModeLegacy(JobConf job, Map tableMetaClientMap, List snapshotPaths) throws IOException { + return HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, includeLogFilesForSnapShotView()); + } + + /** + * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} operation to subclasses that + * lists files (returning an array of {@link FileStatus}) corresponding to the input paths specified + * as part of provided {@link JobConf} + */ + protected final FileStatus[] doListStatus(JobConf job) throws IOException { + return super.listStatus(job); + } + /** * Achieves listStatus functionality for an incrementally queried table. Instead of listing all * partitions and then filtering based on the commits of interest, this logic first extracts the * partitions touched by the desired commits and then lists only those partitions. */ - protected List listStatusForIncrementalMode( - JobConf job, HoodieTableMetaClient tableMetaClient, List inputPaths) throws IOException { + protected List listStatusForIncrementalMode(JobConf job, HoodieTableMetaClient tableMetaClient, List inputPaths) throws IOException { String tableName = tableMetaClient.getTableConfig().getTableName(); Job jobContext = Job.getInstance(job); Option timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient); @@ -133,13 +249,4 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat getMaxCommit(JobConf job, String tableName) { + return Option.ofNullable(job.get(String.format(HOODIE_CONSUME_COMMIT, tableName))); + } + public static boolean stopAtCompaction(JobContext job, String tableName) { String compactionPropName = String.format(HOODIE_STOP_AT_COMPACTION_PATTERN, tableName); boolean stopAtCompaction = job.getConfiguration().getBoolean(compactionPropName, true); @@ -149,23 +158,17 @@ public class HoodieHiveUtils { * (false or notSet, notSet) -> returns completedTimeline unfiltered * * validCommit is one which exists in the timeline being checked and vice versa - * - * @param tableName - * @param job - * @param metaClient - * @return */ public static HoodieTimeline getTableTimeline(final String tableName, final JobConf job, final HoodieTableMetaClient metaClient) { HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline(); - boolean includePendingCommits = job.getBoolean(String.format(HOODIE_CONSUME_PENDING_COMMITS, tableName), false); - String maxCommit = job.get(String.format(HOODIE_CONSUME_COMMIT, tableName)); + boolean includePendingCommits = shouldIncludePendingCommits(job, tableName); + Option maxCommit = getMaxCommit(job, tableName); - if (!includePendingCommits && maxCommit == null) { - return timeline.filterCompletedInstants(); - } + HoodieTimeline finalizedTimeline = includePendingCommits ? timeline : timeline.filterCompletedInstants(); + + return !maxCommit.isPresent() ? finalizedTimeline : filterIfInstantExists(tableName, finalizedTimeline, maxCommit.get()); - return filterIfInstantExists(tableName, includePendingCommits ? timeline : timeline.filterCompletedInstants(), maxCommit); } private static HoodieTimeline filterIfInstantExists(String tableName, HoodieTimeline timeline, String maxCommit) { diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index 13d921979..b71652b67 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -18,10 +18,13 @@ package org.apache.hudi.hadoop.testutils; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.log.HoodieLogFormat; @@ -68,8 +71,10 @@ public class InputFormatTestUtil { throws IOException { HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE, baseFileFormat); + java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", "01")); - Files.createDirectories(partitionPath); + setupPartition(basePath, partitionPath); + return simulateInserts(partitionPath.toFile(), baseFileFormat.getFileExtension(), "fileId1", numberOfFiles, commitNumber); } @@ -79,8 +84,10 @@ public class InputFormatTestUtil { throws IOException { HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE, baseFileFormat); + java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", finalLevelPartitionName)); - Files.createDirectories(partitionPath); + setupPartition(basePath, partitionPath); + return simulateInserts(partitionPath.toFile(), baseFileFormat.getFileExtension(), "fileId1" + finalLevelPartitionName, numberOfFiles, commitNumber); } @@ -175,8 +182,12 @@ public class InputFormatTestUtil { public static File prepareParquetTable(java.nio.file.Path basePath, Schema schema, int numberOfFiles, int numberOfRecords, String commitNumber, HoodieTableType tableType) throws IOException { HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), tableType, HoodieFileFormat.PARQUET); + java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", "01")); + setupPartition(basePath, partitionPath); + createData(schema, partitionPath, numberOfFiles, numberOfRecords, commitNumber); + return partitionPath.toFile(); } @@ -188,8 +199,12 @@ public class InputFormatTestUtil { public static File prepareSimpleParquetTable(java.nio.file.Path basePath, Schema schema, int numberOfFiles, int numberOfRecords, String commitNumber, HoodieTableType tableType) throws Exception { HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), tableType, HoodieFileFormat.PARQUET); + java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", "01")); + setupPartition(basePath, partitionPath); + createSimpleData(schema, partitionPath, numberOfFiles, numberOfRecords, commitNumber); + return partitionPath.toFile(); } @@ -211,7 +226,10 @@ public class InputFormatTestUtil { HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString()); for (int i = 0; i < numberPartitions; i++) { java.nio.file.Path partitionPath = basePath.resolve(Paths.get(2016 + i + "", "05", "01")); + setupPartition(basePath, partitionPath); + createData(schema, partitionPath, 1, numberOfRecordsPerPartition, commitNumber); + result.add(partitionPath.toFile()); } return result; @@ -400,10 +418,27 @@ public class InputFormatTestUtil { jobConf.addResource(conf); } + private static void setupPartition(java.nio.file.Path basePath, java.nio.file.Path partitionPath) throws IOException { + Files.createDirectories(partitionPath); + + // Create partition metadata to properly setup table's partition + RawLocalFileSystem lfs = new RawLocalFileSystem(); + lfs.setConf(HoodieTestUtils.getDefaultHadoopConf()); + + HoodiePartitionMetadata partitionMetadata = + new HoodiePartitionMetadata( + new LocalFileSystem(lfs), + "0", + new Path(basePath.toAbsolutePath().toString()), + new Path(partitionPath.toAbsolutePath().toString()) + ); + + partitionMetadata.trySave((int) (Math.random() * 1000)); + } + public static void setInputPath(JobConf jobConf, String inputPath) { jobConf.set("mapreduce.input.fileinputformat.inputdir", inputPath); jobConf.set("mapreduce.input.fileinputformat.inputdir", inputPath); jobConf.set("map.input.dir", inputPath); } - } diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java index ccdf4679b..3c7a6034b 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java @@ -40,6 +40,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.TimeoutException; @@ -83,10 +84,7 @@ public abstract class ITTestBase { protected DockerClient dockerClient; protected Map runningContainers; - static String[] getHiveConsoleCommand(String rawCommand) { - String jarCommand = "add jar " + HUDI_HADOOP_BUNDLE + ";"; - String fullCommand = jarCommand + rawCommand; - + static String[] getHiveConsoleCommand(String hiveExpr) { List cmd = new ArrayList<>(); cmd.add("hive"); cmd.add("--hiveconf"); @@ -94,7 +92,7 @@ public abstract class ITTestBase { cmd.add("--hiveconf"); cmd.add("hive.stats.autogather=false"); cmd.add("-e"); - cmd.add("\"" + fullCommand + "\""); + cmd.add("\"" + hiveExpr + "\""); return cmd.toArray(new String[0]); } @@ -181,13 +179,32 @@ public abstract class ITTestBase { private TestExecStartResultCallback executeCommandInDocker( String containerName, String[] command, boolean expectedToSucceed) throws Exception { - return executeCommandInDocker(containerName, command, true, expectedToSucceed); + return executeCommandInDocker(containerName, command, true, expectedToSucceed, Collections.emptyMap()); } - private TestExecStartResultCallback executeCommandInDocker( - String containerName, String[] command, boolean checkIfSucceed, boolean expectedToSucceed) throws Exception { - Container sparkWorkerContainer = runningContainers.get(containerName); - ExecCreateCmd cmd = dockerClient.execCreateCmd(sparkWorkerContainer.getId()).withCmd(command).withAttachStdout(true) + private TestExecStartResultCallback executeCommandInDocker(String containerName, + String[] command, + boolean checkIfSucceed, + boolean expectedToSucceed) throws Exception { + return executeCommandInDocker(containerName, command, checkIfSucceed, expectedToSucceed, Collections.emptyMap()); + } + + private TestExecStartResultCallback executeCommandInDocker(String containerName, + String[] command, + boolean checkIfSucceed, + boolean expectedToSucceed, + Map env) throws Exception { + Container targetContainer = runningContainers.get(containerName); + + List dockerEnv = env.entrySet() + .stream() + .map(e -> String.format("%s=%s", e.getKey(), e.getValue())) + .collect(Collectors.toList()); + + ExecCreateCmd cmd = dockerClient.execCreateCmd(targetContainer.getId()) + .withEnv(dockerEnv) + .withCmd(command) + .withAttachStdout(true) .withAttachStderr(true); ExecCreateCmdResponse createCmdResponse = cmd.exec(); @@ -255,7 +272,9 @@ public abstract class ITTestBase { LOG.info("\n#################################################################################################"); String[] hiveCmd = getHiveConsoleCommand(hiveCommand); - TestExecStartResultCallback callback = executeCommandInDocker(HIVESERVER, hiveCmd, true); + Map env = Collections.singletonMap("AUX_CLASSPATH", "file://" + HUDI_HADOOP_BUNDLE); + TestExecStartResultCallback callback = + executeCommandInDocker(HIVESERVER, hiveCmd, true, true, env); return Pair.of(callback.getStdout().toString().trim(), callback.getStderr().toString().trim()); } diff --git a/hudi-spark-datasource/hudi-spark-common/pom.xml b/hudi-spark-datasource/hudi-spark-common/pom.xml index 790fd50b6..31ac48025 100644 --- a/hudi-spark-datasource/hudi-spark-common/pom.xml +++ b/hudi-spark-datasource/hudi-spark-common/pom.xml @@ -46,6 +46,7 @@ -nobootcp + -target:jvm-1.8 false diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index d7aa94f48..f9a7620b9 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -66,14 +66,13 @@ case class HoodieFileIndex(spark: SparkSession, metaClient = metaClient, schemaSpec = schemaSpec, configProperties = getConfigProperties(spark, options), + queryPaths = Seq(HoodieFileIndex.getQueryPath(options)), specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant), fileStatusCache = fileStatusCache ) with FileIndex { - @transient private val queryPath = new Path(options.getOrElse("path", "'path' option required")) - - override def rootPaths: Seq[Path] = queryPath :: Nil + override def rootPaths: Seq[Path] = queryPaths def enableDataSkipping(): Boolean = { options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), @@ -282,4 +281,8 @@ object HoodieFileIndex { properties.putAll(options.asJava) properties } + + private def getQueryPath(options: Map[String, String]) = { + new Path(options.getOrElse("path", "'path' option required")) + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index e4f7b82e9..8586300e6 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -18,10 +18,11 @@ package org.apache.hudi import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hudi.SparkHoodieTableFileIndex.generateFieldMap +import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_INCREMENTAL_OPT_VAL, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL} +import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, generateFieldMap} import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.TypedProperties -import org.apache.hudi.common.model.FileSlice +import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} import org.apache.spark.api.java.JavaSparkContext @@ -35,8 +36,10 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.unsafe.types.UTF8String +import scala.collection.JavaConversions._ + /** - * Implementation of the [[AbstractHoodieTableFileIndex]] for Spark + * Implementation of the [[HoodieTableFileIndexBase]] for Spark * * @param spark spark session * @param metaClient Hudi table's meta-client @@ -49,14 +52,17 @@ class SparkHoodieTableFileIndex(spark: SparkSession, metaClient: HoodieTableMetaClient, schemaSpec: Option[StructType], configProperties: TypedProperties, + queryPaths: Seq[Path], specifiedQueryInstant: Option[String] = None, @transient fileStatusCache: FileStatusCache = NoopCache) - extends AbstractHoodieTableFileIndex( + extends HoodieTableFileIndexBase( engineContext = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)), metaClient, configProperties, + queryType = deduceQueryType(configProperties), + queryPaths, specifiedQueryInstant, - SparkHoodieTableFileIndex.adapt(fileStatusCache) + fileStatusCache = SparkHoodieTableFileIndex.adapt(fileStatusCache) ) with SparkAdapterSupport with Logging { @@ -278,6 +284,15 @@ object SparkHoodieTableFileIndex { traverse(Right(structType)) } + private def deduceQueryType(configProperties: TypedProperties): HoodieTableQueryType = { + configProperties(QUERY_TYPE.key()) match { + case QUERY_TYPE_SNAPSHOT_OPT_VAL => HoodieTableQueryType.QUERY_TYPE_SNAPSHOT + case QUERY_TYPE_INCREMENTAL_OPT_VAL => HoodieTableQueryType.QUERY_TYPE_INCREMENTAL + case QUERY_TYPE_READ_OPTIMIZED_OPT_VAL => HoodieTableQueryType.QUERY_TYPE_READ_OPTIMIZED + case _ @ qt => throw new IllegalArgumentException(s"query-type ($qt) not supported") + } + } + private def adapt(cache: FileStatusCache): FileStatusCacheTrait = { new FileStatusCacheTrait { override def get(path: Path): Option[Array[FileStatus]] = cache.getLeafFiles(path) diff --git a/packaging/hudi-hadoop-mr-bundle/pom.xml b/packaging/hudi-hadoop-mr-bundle/pom.xml index 683949051..23399233e 100644 --- a/packaging/hudi-hadoop-mr-bundle/pom.xml +++ b/packaging/hudi-hadoop-mr-bundle/pom.xml @@ -67,11 +67,14 @@ org.apache.hudi:hudi-common org.apache.hudi:hudi-hadoop-mr + + org.scala-lang:scala-library + org.apache.parquet:parquet-avro + org.apache.avro:avro com.esotericsoftware:kryo-shaded org.objenesis:objenesis com.esotericsoftware:minlog - org.apache.avro:avro org.apache.hbase:hbase-common org.apache.hbase:hbase-client org.apache.hbase:hbase-protocol @@ -152,16 +155,27 @@ ${project.version} + + + + org.scala-lang + scala-library + ${scala.version} + + org.apache.parquet parquet-avro + ${parquet.version} compile + org.apache.avro avro + ${avro.version} compile @@ -172,6 +186,7 @@ compile
+ org.apache.hbase hbase-common diff --git a/packaging/hudi-hive-sync-bundle/pom.xml b/packaging/hudi-hive-sync-bundle/pom.xml index 967eab469..9b775e76c 100644 --- a/packaging/hudi-hive-sync-bundle/pom.xml +++ b/packaging/hudi-hive-sync-bundle/pom.xml @@ -69,6 +69,9 @@ org.apache.hudi:hudi-sync-common org.apache.hudi:hudi-hive-sync + + org.scala-lang:scala-library + com.beust:jcommander org.apache.avro:avro org.apache.parquet:parquet-avro @@ -118,27 +121,42 @@ hudi-common ${project.version} + org.apache.hudi hudi-hadoop-mr-bundle ${project.version} + org.apache.hudi hudi-hive-sync ${project.version} + + + + org.scala-lang + scala-library + ${scala.version} + + + org.apache.parquet parquet-avro + ${parquet.version} compile + org.apache.avro avro + ${avro.version} compile +
diff --git a/packaging/hudi-presto-bundle/pom.xml b/packaging/hudi-presto-bundle/pom.xml index 9d14a36c2..f085c30b4 100644 --- a/packaging/hudi-presto-bundle/pom.xml +++ b/packaging/hudi-presto-bundle/pom.xml @@ -66,15 +66,20 @@ org.apache.hudi:hudi-common org.apache.hudi:hudi-hadoop-mr + + + org.scala-lang:scala-library + + org.apache.parquet:parquet-avro + org.apache.avro:avro org.codehaus.jackson:* com.esotericsoftware:kryo-shaded org.objenesis:objenesis com.esotericsoftware:minlog - org.apache.hbase:hbase-client org.apache.hbase:hbase-common + org.apache.hbase:hbase-client org.apache.hbase:hbase-protocol org.apache.hbase:hbase-server - org.apache.hbase:hbase-annotations org.apache.htrace:htrace-core com.yammer.metrics:metrics-core com.google.guava:guava @@ -83,6 +88,10 @@ + + org.apache.avro. + org.apache.hudi.org.apache.avro. + org.codehaus.jackson. org.apache.hudi.org.codehaus.jackson. @@ -120,8 +129,8 @@ ${presto.bundle.bootstrap.shade.prefix}org.apache.htrace. - org.apache.hadoop.hbase. - ${presto.bundle.bootstrap.shade.prefix}org.apache.hadoop.hbase. + org.apache.parquet.avro. + ${presto.bundle.bootstrap.shade.prefix}org.apache.parquet.avro. false @@ -134,7 +143,6 @@ META-INF/*.RSA META-INF/services/javax.* com/esotericsoftware/reflectasm/** - avro/shaded/com/google/common/** stringBehavior.avsc @@ -166,6 +174,78 @@ org.apache.hudi hudi-hadoop-mr-bundle ${project.version} + + + org.apache.hbase + hbase-common + + + org.apache.hbase + hbase-server + + + org.apache.hbase + hbase-client + + + + + + + + org.scala-lang + scala-library + ${scala.version} + + + + + org.apache.parquet + parquet-avro + compile + + + + + org.apache.avro + avro + compile + + + + + org.apache.hbase + hbase-common + ${hbase.version} + + + + org.apache.hbase + hbase-server + ${hbase.version} + compile + + + org.apache.hbase + hbase-common + + + javax.servlet + * + + + org.codehaus.jackson + * + + + org.mortbay.jetty + * + + + tomcat + * + + + org.scala-lang:scala-library + org.apache.parquet:parquet-avro org.apache.avro:avro org.codehaus.jackson:* com.esotericsoftware:kryo-shaded org.objenesis:objenesis com.esotericsoftware:minlog - org.apache.hbase:hbase-client org.apache.hbase:hbase-common + org.apache.hbase:hbase-client org.apache.hbase:hbase-protocol org.apache.hbase:hbase-server org.apache.hbase:hbase-annotations @@ -186,21 +189,31 @@ - + + + + org.scala-lang + scala-library + ${scala.version} + + + org.apache.hbase - hbase-shaded-client + hbase-common ${hbase.version} - test org.apache.hbase - hbase-shaded-server + hbase-server ${hbase.version} - compile + + org.apache.hbase + hbase-common + javax.servlet * @@ -224,12 +237,15 @@ org.apache.parquet parquet-avro + ${parquet.version} compile + org.apache.avro avro + ${avro.version} compile