1
0

[HUDI-624]: Split some of the code from PR for HUDI-479 (#1344)

This commit is contained in:
Suneel Marthi
2020-02-21 01:22:21 -05:00
committed by GitHub
parent 185ff646ad
commit 078d4825d9
31 changed files with 130 additions and 141 deletions

View File

@@ -29,7 +29,6 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.compact.strategy.CompactionStrategy;
import org.apache.hudi.metrics.MetricsReporterType;
import com.google.common.base.Preconditions;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.spark.storage.StorageLevel;
@@ -40,6 +39,7 @@ import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
/**
@@ -764,7 +764,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
// Build WriteConfig at the end
HoodieWriteConfig config = new HoodieWriteConfig(props);
Preconditions.checkArgument(config.getBasePath() != null);
Objects.requireNonNull(config.getBasePath());
return config;
}
}

View File

@@ -33,7 +33,7 @@ import java.util.Iterator;
*/
public abstract class LazyIterableIterator<I, O> implements Iterable<O>, Iterator<O> {
protected Iterator<I> inputItr = null;
protected Iterator<I> inputItr;
private boolean consumed = false;
private boolean startCalled = false;
private boolean endCalled = false;

View File

@@ -18,9 +18,8 @@
package org.apache.hudi.index.bloom;
import com.google.common.base.Objects;
import java.io.Serializable;
import java.util.Objects;
/**
* Metadata about a given file group, useful for index lookup.
@@ -80,14 +79,14 @@ public class BloomIndexFileInfo implements Serializable {
}
BloomIndexFileInfo that = (BloomIndexFileInfo) o;
return Objects.equal(that.fileId, fileId) && Objects.equal(that.minRecordKey, minRecordKey)
&& Objects.equal(that.maxRecordKey, maxRecordKey);
return Objects.equals(that.fileId, fileId) && Objects.equals(that.minRecordKey, minRecordKey)
&& Objects.equals(that.maxRecordKey, maxRecordKey);
}
@Override
public int hashCode() {
return Objects.hashCode(fileId, minRecordKey, maxRecordKey);
return Objects.hash(fileId, minRecordKey, maxRecordKey);
}
@Override

View File

@@ -44,7 +44,6 @@ import org.apache.hudi.exception.HoodieAppendException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.table.HoodieTable;
import com.google.common.collect.Maps;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
@@ -56,6 +55,7 @@ import org.apache.spark.util.SizeEstimator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
@@ -97,7 +97,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
// Max block size to limit to for a log block
private int maxBlockSize = config.getLogFileDataBlockMaxSize();
// Header metadata for a log block
private Map<HeaderMetadataType, String> header = Maps.newHashMap();
private Map<HeaderMetadataType, String> header = new HashMap<>();
// Total number of new records inserted into the delta file
private long insertRecordsWritten = 0;

View File

@@ -49,8 +49,6 @@ import org.apache.hudi.table.HoodieTable;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
@@ -61,7 +59,9 @@ import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -145,7 +145,7 @@ public class HoodieCommitArchiveLog {
// TODO: Handle ROLLBACK_ACTION in future
// ROLLBACK_ACTION is currently not defined in HoodieActiveTimeline
HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline()
.getTimelineOfActions(Sets.newHashSet(HoodieTimeline.CLEAN_ACTION)).filterCompletedInstants();
.getTimelineOfActions(Collections.singleton(HoodieTimeline.CLEAN_ACTION)).filterCompletedInstants();
Stream<HoodieInstant> instants = cleanAndRollbackTimeline.getInstants()
.collect(Collectors.groupingBy(HoodieInstant::getAction)).values().stream().map(hoodieInstants -> {
if (hoodieInstants.size() > maxCommitsToKeep) {
@@ -270,7 +270,7 @@ public class HoodieCommitArchiveLog {
private void writeToFile(Schema wrapperSchema, List<IndexedRecord> records) throws Exception {
if (records.size() > 0) {
Map<HeaderMetadataType, String> header = Maps.newHashMap();
Map<HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString());
HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header);
this.writer = writer.appendBlock(block);

View File

@@ -22,8 +22,7 @@ import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.config.HoodieWriteConfig;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
/**
@@ -40,7 +39,7 @@ public class BoundedIOCompactionStrategy extends CompactionStrategy {
// Iterate through the operations in order and accept operations as long as we are within the
// IO limit
// Preserves the original ordering of compactions
List<HoodieCompactionOperation> finalOperations = Lists.newArrayList();
List<HoodieCompactionOperation> finalOperations = new ArrayList<>();
long targetIORemaining = writeConfig.getTargetIOPerCompactionInMB();
for (HoodieCompactionOperation op : operations) {
long opIo = op.getMetrics().get(TOTAL_IO_MB).longValue();

View File

@@ -28,9 +28,8 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.compact.HoodieMergeOnReadTableCompactor;
import com.google.common.collect.Maps;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -61,7 +60,7 @@ public abstract class CompactionStrategy implements Serializable {
*/
public Map<String, Double> captureMetrics(HoodieWriteConfig writeConfig, Option<HoodieBaseFile> dataFile,
String partitionPath, List<HoodieLogFile> logFiles) {
Map<String, Double> metrics = Maps.newHashMap();
Map<String, Double> metrics = new HashMap<>();
long defaultMaxParquetFileSize = writeConfig.getParquetMaxFileSize();
// Total size of all the log files
Long totalLogFileSize = logFiles.stream().map(HoodieLogFile::getFileSize).filter(size -> size >= 0)

View File

@@ -21,7 +21,6 @@ package org.apache.hudi.metrics;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import com.google.common.base.Preconditions;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -32,6 +31,7 @@ import javax.management.remote.JMXServiceURL;
import java.io.Closeable;
import java.lang.management.ManagementFactory;
import java.rmi.registry.LocateRegistry;
import java.util.Objects;
/**
* Implementation of Jmx reporter, which used to report jmx metric.
@@ -67,7 +67,7 @@ public class JmxMetricsReporter extends MetricsReporter {
@Override
public void start() {
try {
Preconditions.checkNotNull(connector, "Cannot start as the jmxReporter is null.");
Objects.requireNonNull(connector, "Cannot start as the jmxReporter is null.");
connector.start();
} catch (Exception e) {
throw new HoodieException(e);

View File

@@ -32,7 +32,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.PathFilter;
@@ -47,6 +46,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import scala.Tuple2;
@@ -129,7 +129,7 @@ public class RollbackExecutor implements Serializable {
// getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
// cloud-storage : HUDI-168
Map<FileStatus, Long> filesToNumBlocksRollback = new HashMap<>();
filesToNumBlocksRollback.put(metaClient.getFs().getFileStatus(Preconditions.checkNotNull(writer).getLogFile().getPath()), 1L);
filesToNumBlocksRollback.put(metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()), 1L);
return new Tuple2<>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
@@ -215,7 +215,7 @@ public class RollbackExecutor implements Serializable {
private Map<HeaderMetadataType, String> generateHeader(String commit) {
// generate metadata
Map<HeaderMetadataType, String> header = Maps.newHashMap();
Map<HeaderMetadataType, String> header = new HashMap<>();
header.put(HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
header.put(HeaderMetadataType.TARGET_INSTANT_TIME, commit);
header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE,