1
0

Move HoodieAvroReader to hoodie-common, it will be used for compaction and in the record reader

This commit is contained in:
Prasanna Rajaperumal
2017-04-03 13:52:07 -07:00
committed by prazanna
parent aee136777b
commit 1e802ad4f2
2 changed files with 2 additions and 10 deletions

View File

@@ -1,99 +0,0 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.io;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.uber.hoodie.common.model.HoodieAvroPayload;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.util.AvroUtils;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Spliterator;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
/**
* This reads a bunch of HoodieRecords from avro log files and deduplicates and mantains the merged
* state in memory. This is useful for compaction and record reader
*/
public class HoodieAvroReader implements Iterable<HoodieRecord<HoodieAvroPayload>> {
private final Collection<HoodieRecord<HoodieAvroPayload>> records;
private AtomicLong totalLogFiles = new AtomicLong(0);
private AtomicLong totalLogRecords = new AtomicLong(0);
private long totalRecordsToUpdate;
public HoodieAvroReader(FileSystem fs, List<String> logFilePaths, Schema readerSchema) {
Map<String, HoodieRecord<HoodieAvroPayload>> records = Maps.newHashMap();
for (String path : logFilePaths) {
totalLogFiles.incrementAndGet();
List<HoodieRecord<HoodieAvroPayload>> recordsFromFile = AvroUtils
.loadFromFile(fs, path, readerSchema);
totalLogRecords.addAndGet(recordsFromFile.size());
for (HoodieRecord<HoodieAvroPayload> recordFromFile : recordsFromFile) {
String key = recordFromFile.getRecordKey();
if (records.containsKey(key)) {
// Merge and store the merged record
HoodieAvroPayload combinedValue = records.get(key).getData()
.preCombine(recordFromFile.getData());
records.put(key, new HoodieRecord<>(new HoodieKey(key, recordFromFile.getPartitionPath()),
combinedValue));
} else {
// Put the record as is
records.put(key, recordFromFile);
}
}
}
this.records = records.values();
this.totalRecordsToUpdate = records.size();
}
@Override
public Iterator<HoodieRecord<HoodieAvroPayload>> iterator() {
return records.iterator();
}
@Override
public void forEach(Consumer<? super HoodieRecord<HoodieAvroPayload>> consumer) {
records.forEach(consumer);
}
@Override
public Spliterator<HoodieRecord<HoodieAvroPayload>> spliterator() {
return records.spliterator();
}
public long getTotalLogFiles() {
return totalLogFiles.get();
}
public long getTotalLogRecords() {
return totalLogRecords.get();
}
public long getTotalRecordsToUpdate() {
return totalRecordsToUpdate;
}
}

View File

@@ -22,23 +22,19 @@ import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.CompactionWriteStat;
import com.uber.hoodie.common.model.HoodieAvroPayload;
import com.uber.hoodie.common.model.HoodieCompactionMetadata;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.log.avro.HoodieAvroReader;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.AvroUtils;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieCompactionException;
import com.uber.hoodie.io.HoodieAvroReader;
import com.uber.hoodie.table.HoodieCopyOnWriteTable;
import com.uber.hoodie.table.HoodieTable;
import java.util.Collection;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
@@ -46,8 +42,6 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.io.IOException;
import java.nio.charset.StandardCharsets;