1
0

[HUDI-159] Redesigning bundles for lighter-weight integrations

- Documented principles applied for redesign at packaging/README.md
 - No longer depends on incl commons-codec, commons-io, commons-pool, commons-dbcp, commons-lang, commons-logging, avro-mapred
 - Introduce new FileIOUtils & added checkstyle rule for illegal import of above
 - Parquet, Avro dependencies moved to provided scope to enable being picked up from Hive/Spark/Presto instead
 - Pickup jackson jars for Hive sync tool from HIVE_HOME & unbundling jackson everywhere
 - Remove hive-jdbc standalone jar from being bundled in Spark/Hive/Utilities bundles
 - 6.5x reduced number of classes across bundles
This commit is contained in:
vinoth chandar
2019-09-02 16:15:55 -07:00
committed by Balaji Varadarajan
parent 0e6f078ec4
commit 7a973a6944
60 changed files with 689 additions and 1380 deletions

View File

@@ -19,12 +19,12 @@
package org.apache.hudi.common;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import javax.xml.bind.DatatypeConverter;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;
import org.apache.hudi.exception.HoodieIndexException;

View File

@@ -23,15 +23,14 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.StringWriter;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.io.IOUtils;
import org.apache.hudi.avro.MercifulJsonConverter;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
@@ -87,9 +86,7 @@ public class HoodieJsonPayload implements HoodieRecordPayload<HoodieJsonPayload>
private String unCompressData(byte[] data) throws IOException {
InflaterInputStream iis = new InflaterInputStream(new ByteArrayInputStream(data));
try {
StringWriter sw = new StringWriter(dataSize);
IOUtils.copy(iis, sw);
return sw.toString();
return FileIOUtils.readAsUTFString(iis, dataSize);
} finally {
iis.close();
}

View File

@@ -30,13 +30,13 @@ import java.util.HashSet;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
@@ -394,7 +394,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
private Option<byte[]> readDataFromPath(Path detailPath) {
try (FSDataInputStream is = metaClient.getFs().open(detailPath)) {
return Option.of(IOUtils.toByteArray(is));
return Option.of(FileIOUtils.readAsByteArray(is));
} catch (IOException e) {
throw new HoodieIOException("Could not read commit details from " + detailPath, e);
}

View File

@@ -27,7 +27,6 @@ import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.codec.binary.Hex;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -68,7 +67,8 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
} catch (NoSuchAlgorithmException nse) {
throw new HoodieException(nse);
}
this.timelineHash = new String(Hex.encodeHex(md.digest()));
this.timelineHash = StringUtils.toHexString(md.digest());
}
/**

View File

@@ -20,28 +20,20 @@ package org.apache.hudi.common.util;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.mapred.FsInput;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
@@ -52,49 +44,9 @@ import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.avro.model.HoodieSavepointPartitionMetadata;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.exception.HoodieIOException;
public class AvroUtils {
public static List<HoodieRecord<HoodieAvroPayload>> loadFromFiles(FileSystem fs,
List<String> deltaFilePaths, Schema expectedSchema) {
List<HoodieRecord<HoodieAvroPayload>> loadedRecords = Lists.newArrayList();
deltaFilePaths.forEach(s -> {
List<HoodieRecord<HoodieAvroPayload>> records = loadFromFile(fs, s, expectedSchema);
loadedRecords.addAll(records);
});
return loadedRecords;
}
public static List<HoodieRecord<HoodieAvroPayload>> loadFromFile(FileSystem fs,
String deltaFilePath, Schema expectedSchema) {
List<HoodieRecord<HoodieAvroPayload>> loadedRecords = Lists.newArrayList();
Path path = new Path(deltaFilePath);
try {
SeekableInput input = new FsInput(path, fs.getConf());
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>();
// Set the expected schema to be the current schema to account for schema evolution
reader.setExpected(expectedSchema);
FileReader<GenericRecord> fileReader = DataFileReader.openReader(input, reader);
for (GenericRecord deltaRecord : fileReader) {
String key = deltaRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
String partitionPath =
deltaRecord.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
loadedRecords.add(new HoodieRecord<>(new HoodieKey(key, partitionPath),
new HoodieAvroPayload(Option.of(deltaRecord))));
}
fileReader.close(); // also closes underlying FsInput
} catch (IOException e) {
throw new HoodieIOException("Could not read avro records from path " + deltaFilePath,
e);
}
return loadedRecords;
}
public static HoodieCleanMetadata convertCleanMetadata(String startCleanTime,
Option<Long> durationInMs, List<HoodieCleanStat> cleanStats) {
ImmutableMap.Builder<String, HoodieCleanPartitionMetadata> partitionMetadataBuilder =

View File

@@ -107,7 +107,7 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard {
log.warn("Got IOException waiting for file event. Have tried " + retryNum + " time(s)", ioe);
}
return false;
}, "Timed out waiting for filles to become visible");
}, "Timed out waiting for files to become visible");
}
/**

View File

@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.util;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
/**
* Bunch of utility methods for working with files and byte streams
*/
public class FileIOUtils {
public static final long KB = 1024;
public static void deleteDirectory(File directory) throws IOException {
if (directory.exists()) {
Files.walk(directory.toPath())
.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(File::delete);
directory.delete();
if (directory.exists()) {
throw new IOException("Unable to delete directory " + directory);
}
}
}
public static void mkdir(File directory) throws IOException {
if (!directory.exists()) {
directory.mkdirs();
}
if (!directory.isDirectory()) {
throw new IOException("Unable to create :" + directory);
}
}
public static String readAsUTFString(InputStream input) throws IOException {
return readAsUTFString(input, 128);
}
public static String readAsUTFString(InputStream input, int length) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream(length);
copy(input, bos);
return new String(bos.toByteArray(), StandardCharsets.UTF_8);
}
public static void copy(InputStream inputStream, OutputStream outputStream) throws IOException {
byte[] buffer = new byte[1024];
int len;
while ((len = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, len);
}
}
public static byte[] readAsByteArray(InputStream input) throws IOException {
return readAsByteArray(input, 128);
}
public static byte[] readAsByteArray(InputStream input, int outputSize) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream(outputSize);
copy(input, bos);
return bos.toByteArray();
}
public static void writeStringToFile(String str, String filePath) throws IOException {
PrintStream out = new PrintStream(new FileOutputStream(filePath));
out.println(str);
out.flush();
out.close();
}
}

View File

@@ -30,7 +30,6 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
@@ -86,7 +85,7 @@ public class RocksDBDAO {
private void init() throws HoodieException {
try {
log.info("DELETING RocksDB persisted at " + rocksDBBasePath);
FileUtils.deleteDirectory(new File(rocksDBBasePath));
FileIOUtils.deleteDirectory(new File(rocksDBBasePath));
managedHandlesMap = new ConcurrentHashMap<>();
managedDescriptorMap = new ConcurrentHashMap<>();
@@ -103,7 +102,7 @@ public class RocksDBDAO {
});
final List<ColumnFamilyDescriptor> managedColumnFamilies = loadManagedColumnFamilies(dbOptions);
final List<ColumnFamilyHandle> managedHandles = new ArrayList<>();
FileUtils.forceMkdir(new File(rocksDBBasePath));
FileIOUtils.mkdir(new File(rocksDBBasePath));
rocksDB = RocksDB.open(dbOptions, rocksDBBasePath, managedColumnFamilies, managedHandles);
Preconditions.checkArgument(managedHandles.size() == managedColumnFamilies.size(),
@@ -450,7 +449,7 @@ public class RocksDBDAO {
managedDescriptorMap.clear();
getRocksDB().close();
try {
FileUtils.deleteDirectory(new File(rocksDBBasePath));
FileIOUtils.deleteDirectory(new File(rocksDBBasePath));
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}

View File

@@ -54,4 +54,15 @@ public class StringUtils {
return org.apache.hadoop.util.StringUtils.join(separator, array);
}
public static String toHexString(byte[] bytes) {
StringBuilder sb = new StringBuilder(bytes.length * 2);
for (byte b: bytes) {
sb.append(String.format("%02x", b));
}
return sb.toString();
}
public static boolean isEmpty(String str) {
return str == null || str.length() == 0;
}
}

View File

@@ -20,7 +20,6 @@ package org.apache.hudi.common.util.collection;
import java.io.Serializable;
import java.util.Map;
import org.apache.commons.lang.builder.CompareToBuilder;
/**
* (NOTE: Adapted from Apache commons-lang3)
@@ -57,7 +56,7 @@ public abstract class Pair<L, R> implements Map.Entry<L, R>, Comparable<Pair<L,
* @return a pair formed from the two parameters, not null
*/
public static <L, R> Pair<L, R> of(final L left, final R right) {
return new ImmutablePair<L, R>(left, right);
return new ImmutablePair<>(left, right);
}
//-----------------------------------------------------------------------
@@ -117,8 +116,20 @@ public abstract class Pair<L, R> implements Map.Entry<L, R>, Comparable<Pair<L,
*/
@Override
public int compareTo(final Pair<L, R> other) {
return new CompareToBuilder().append(getLeft(), other.getLeft())
.append(getRight(), other.getRight()).toComparison();
checkComparable(this);
checkComparable(other);
Comparable thisLeft = (Comparable) getLeft();
Comparable thisRight = (Comparable) getRight();
Comparable otherLeft = (Comparable) other.getLeft();
Comparable otherRight = (Comparable) other.getRight();
if (thisLeft.compareTo(otherLeft) == 0) {
return thisRight.compareTo(otherRight);
} else {
return thisLeft.compareTo(otherLeft);
}
}
/**
@@ -178,4 +189,9 @@ public abstract class Pair<L, R> implements Map.Entry<L, R>, Comparable<Pair<L,
return String.format(format, getLeft(), getRight());
}
private void checkComparable(Pair<L, R> pair) {
if (!(pair.getLeft() instanceof Comparable) || !(pair.getRight() instanceof Comparable)) {
throw new IllegalArgumentException("Elements of Pair must implement Comparable :" + pair);
}
}
}

View File

@@ -19,7 +19,6 @@
package org.apache.hudi.common.util.collection;
import java.io.Serializable;
import org.apache.commons.lang.builder.CompareToBuilder;
/**
* (NOTE: Adapted from Apache commons-lang3)
@@ -96,9 +95,17 @@ public abstract class Triple<L, M, R> implements Comparable<Triple<L, M, R>>, Se
*/
@Override
public int compareTo(final Triple<L, M, R> other) {
return new CompareToBuilder().append(getLeft(), other.getLeft())
.append(getMiddle(), other.getMiddle())
.append(getRight(), other.getRight()).toComparison();
checkComparable(this);
checkComparable(other);
Comparable thisLeft = (Comparable) getLeft();
Comparable otherLeft = (Comparable) other.getLeft();
if (thisLeft.compareTo(otherLeft) == 0) {
return Pair.of(getMiddle(), getRight()).compareTo(Pair.of(other.getMiddle(), other.getRight()));
} else {
return thisLeft.compareTo(otherLeft);
}
}
/**
@@ -160,5 +167,11 @@ public abstract class Triple<L, M, R> implements Comparable<Triple<L, M, R>>, Se
return String.format(format, getLeft(), getMiddle(), getRight());
}
private void checkComparable(Triple<L, M, R> triplet) {
if (!(triplet.getLeft() instanceof Comparable) || !(triplet.getMiddle() instanceof Comparable)
|| !(triplet.getRight() instanceof Comparable)) {
throw new IllegalArgumentException("Elements of Triple must implement Comparable :" + triplet);
}
}
}