1
0

Adding a tool to read/inspect a HoodieLogFile

This commit is contained in:
Nishith Agarwal
2018-01-21 21:09:52 -08:00
committed by vinoth chandar
parent ba7c258c61
commit 9dff8c2326
5 changed files with 275 additions and 26 deletions

View File

@@ -1,4 +1,5 @@
#!/usr/bin/env bash
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
HOODIE_JAR=`ls $DIR/target/hoodie-cli-*-SNAPSHOT.jar`
if [ -z "$HADOOP_CONF_DIR" ]; then
@@ -9,4 +10,7 @@ if [ -z "$SPARK_CONF_DIR" ]; then
echo "setting spark conf dir"
SPARK_CONF_DIR="/etc/spark/conf"
fi
java -cp ${HADOOP_CONF_DIR}:${SPARK_CONF_DIR}:$DIR/target/lib/*:$HOODIE_JAR org.springframework.shell.Bootstrap
if [ -z "$CLIENT_JAR" ]; then
echo "client jar location not set"
fi
java -cp ${HADOOP_CONF_DIR}:${SPARK_CONF_DIR}:$DIR/target/lib/*:$HOODIE_JAR:${CLIENT_JAR} org.springframework.shell.Bootstrap

View File

@@ -0,0 +1,221 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.cli.commands;
import com.beust.jcommander.internal.Maps;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.uber.hoodie.cli.HoodieCLI;
import com.uber.hoodie.cli.HoodiePrintHelper;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.log.HoodieCompactedLogRecordScanner;
import com.uber.hoodie.common.table.log.HoodieLogFormat;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
import com.uber.hoodie.common.table.log.block.HoodieCorruptBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.hive.util.SchemaUtil;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
import parquet.avro.AvroSchemaConverter;
import scala.Tuple2;
import scala.Tuple3;
@Component
public class HoodieLogFileCommand implements CommandMarker {
@CliAvailabilityIndicator({"show logfiles"})
public boolean isShowArchivedLogFileAvailable() {
return HoodieCLI.tableMetadata != null;
}
@CliCommand(value = "show logfile metadata", help = "Read commit metadata from log files")
public String showLogFileCommits(
@CliOption(key = "logFilePathPattern", mandatory = true, help = "Fully qualified path for the log file")
final String logFilePathPattern) throws IOException {
FileSystem fs = HoodieCLI.tableMetadata.getFs();
List<String> logFilePaths = Arrays.stream(fs.globStatus(new Path(logFilePathPattern)))
.map(status -> status.getPath().toString()).collect(Collectors.toList());
Map<String, List<Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer>>> commitCountAndMetadata = Maps
.newHashMap();
int totalEntries = 0;
int numCorruptBlocks = 0;
for (String logFilePath : logFilePaths) {
FileStatus[] fsStatus = fs.listStatus(
new Path(logFilePath));
Schema writerSchema = new AvroSchemaConverter()
.convert(SchemaUtil
.readSchemaFromLogFile(HoodieCLI.tableMetadata.getFs(), new Path(logFilePath)));
HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(fs,
new HoodieLogFile(fsStatus[0].getPath()), writerSchema);
// read the avro blocks
while (reader.hasNext()) {
HoodieLogBlock n = reader.next();
String instantTime;
int recordCount = 0;
if (n instanceof HoodieCorruptBlock) {
try {
instantTime = n.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME);
} catch (Exception e) {
numCorruptBlocks++;
instantTime = "corrupt_block_" + numCorruptBlocks;
// could not read metadata for corrupt block
}
} else {
instantTime = n.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME);
if (n instanceof HoodieAvroDataBlock) {
recordCount = ((HoodieAvroDataBlock) n).getRecords().size();
}
}
if (commitCountAndMetadata.containsKey(instantTime)) {
commitCountAndMetadata.get(instantTime)
.add(new Tuple3<>(n.getBlockType(),
new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount));
totalEntries++;
} else {
List<Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer>> list
= new ArrayList<>();
list.add(new Tuple3<>(n.getBlockType(),
new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount));
commitCountAndMetadata.put(instantTime, list);
totalEntries++;
}
}
}
String[][] rows = new String[totalEntries + 1][];
int i = 0;
ObjectMapper objectMapper = new ObjectMapper();
for (Map.Entry<String, List<Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer>>> entry : commitCountAndMetadata
.entrySet()) {
String instantTime = entry.getKey().toString();
for (Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer> tuple3 : entry
.getValue()) {
String[] output = new String[5];
output[0] = instantTime;
output[1] = String.valueOf(tuple3._3());
output[2] = tuple3._1().toString();
output[3] = objectMapper.writeValueAsString(tuple3._2()._1());
output[4] = objectMapper.writeValueAsString(tuple3._2()._2());
rows[i] = output;
i++;
}
}
return HoodiePrintHelper.print(
new String[]{"InstantTime", "RecordCount", "BlockType", "HeaderMetadata", "FooterMetadata"},
rows);
}
@CliCommand(value = "show logfile records", help = "Read records from log files")
public String showLogFileRecords(
@CliOption(key = {
"limit"}, mandatory = false, help = "Limit commits", unspecifiedDefaultValue = "10")
final Integer limit,
@CliOption(key = "logFilePathPattern", mandatory = true, help = "Fully qualified paths for the log files")
final String logFilePathPattern,
@CliOption(key = "mergeRecords", mandatory = false, help = "If the records in the log files should be merged",
unspecifiedDefaultValue = "false")
final Boolean shouldMerge) throws IOException {
System.out
.println("===============> Showing only " + limit + " records <===============");
FileSystem fs = HoodieCLI.tableMetadata.getFs();
List<String> logFilePaths = Arrays.stream(fs.globStatus(new Path(logFilePathPattern)))
.map(status -> status.getPath().toString()).collect(Collectors.toList());
// TODO : readerSchema can change across blocks/log files, fix this inside Scanner
AvroSchemaConverter converter = new AvroSchemaConverter();
// get schema from last log file
Schema readerSchema = converter
.convert(SchemaUtil
.readSchemaFromLogFile(fs, new Path(logFilePaths.get(logFilePaths.size() - 1))));
List<IndexedRecord> allRecords = new ArrayList<>();
if (shouldMerge) {
System.out.println("===========================> MERGING RECORDS <===================");
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs,
HoodieCLI.tableMetadata.getBasePath(), logFilePaths, readerSchema,
HoodieCLI.tableMetadata.getActiveTimeline().getCommitTimeline().lastInstant().get()
.getTimestamp(),
Long.valueOf(HoodieCompactionConfig.DEFAULT_MAX_SIZE_IN_MEMORY_PER_COMPACTION_IN_BYTES),
Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED),
Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED));
for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : scanner) {
Optional<IndexedRecord> record = hoodieRecord.getData().getInsertValue(readerSchema);
if (allRecords.size() >= limit) {
break;
}
allRecords.add(record.get());
}
} else {
for (String logFile : logFilePaths) {
Schema writerSchema = new AvroSchemaConverter()
.convert(SchemaUtil
.readSchemaFromLogFile(HoodieCLI.tableMetadata.getFs(), new Path(logFile)));
HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(fs,
new HoodieLogFile(new Path(logFile)), writerSchema);
// read the avro blocks
while (reader.hasNext()) {
HoodieLogBlock n = reader.next();
if (n instanceof HoodieAvroDataBlock) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) n;
List<IndexedRecord> records = blk.getRecords();
allRecords.addAll(records);
if (allRecords.size() >= limit) {
break;
}
}
}
if (allRecords.size() >= limit) {
break;
}
}
}
String[][] rows = new String[allRecords.size() + 1][];
int i = 0;
for (IndexedRecord record : allRecords) {
String[] data = new String[1];
data[0] = record.toString();
rows[i] = data;
i++;
}
return HoodiePrintHelper.print(
new String[]{"Records"}, rows);
}
}

View File

@@ -21,17 +21,6 @@ import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.storage.SizeAwareDataInputStream;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.exception.HoodieIOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import javax.annotation.Nonnull;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -43,6 +32,16 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nonnull;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.hadoop.fs.FSDataInputStream;
/**
* DataBlock contains a list of records serialized using Avro.
@@ -161,6 +160,10 @@ public class HoodieAvroDataBlock extends HoodieLogBlock {
}
public Schema getSchema() {
// if getSchema was invoked before converting byte [] to records
if (records == null) {
getRecords();
}
return schema;
}

View File

@@ -395,21 +395,13 @@ public class HoodieHiveClient {
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private MessageType readSchemaFromLogFile(Optional<HoodieInstant> lastCompactionCommitOpt,
Path path) throws IOException {
Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null);
HoodieAvroDataBlock lastBlock = null;
while (reader.hasNext()) {
HoodieLogBlock block = reader.next();
if (block instanceof HoodieAvroDataBlock) {
lastBlock = (HoodieAvroDataBlock) block;
}
}
if (lastBlock != null) {
lastBlock.getRecords();
return new parquet.avro.AvroSchemaConverter().convert(lastBlock.getSchema());
}
MessageType messageType = SchemaUtil.readSchemaFromLogFile(fs, path);
// Fall back to read the schema from last compaction
LOG.info("Falling back to read the schema from last compaction " + lastCompactionCommitOpt);
return readSchemaFromLastCompaction(lastCompactionCommitOpt);
if (messageType == null) {
LOG.info("Falling back to read the schema from last compaction " + lastCompactionCommitOpt);
return readSchemaFromLastCompaction(lastCompactionCommitOpt);
}
return messageType;
}
/**

View File

@@ -18,13 +18,22 @@ package com.uber.hoodie.hive.util;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.table.log.HoodieLogFormat;
import com.uber.hoodie.common.table.log.HoodieLogFormat.Reader;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.hive.HiveSyncConfig;
import com.uber.hoodie.hive.HoodieHiveSyncException;
import com.uber.hoodie.hive.SchemaDifference;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import parquet.schema.DecimalMetadata;
@@ -416,4 +425,24 @@ public class SchemaUtil {
// TODO - all partition fields should be part of the schema. datestr is treated as special. Dont do that
return "String";
}
/**
* Read the schema from the log file on path
*/
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public static MessageType readSchemaFromLogFile(FileSystem fs,
Path path) throws IOException {
Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null);
HoodieAvroDataBlock lastBlock = null;
while (reader.hasNext()) {
HoodieLogBlock block = reader.next();
if (block instanceof HoodieAvroDataBlock) {
lastBlock = (HoodieAvroDataBlock) block;
}
}
if (lastBlock != null) {
return new parquet.avro.AvroSchemaConverter().convert(lastBlock.getSchema());
}
return null;
}
}