diff --git a/hoodie-cli/hoodie-cli.sh b/hoodie-cli/hoodie-cli.sh index 70a54a4c7..73ffe333f 100755 --- a/hoodie-cli/hoodie-cli.sh +++ b/hoodie-cli/hoodie-cli.sh @@ -1,4 +1,5 @@ #!/usr/bin/env bash + DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" HOODIE_JAR=`ls $DIR/target/hoodie-cli-*-SNAPSHOT.jar` if [ -z "$HADOOP_CONF_DIR" ]; then @@ -9,4 +10,7 @@ if [ -z "$SPARK_CONF_DIR" ]; then echo "setting spark conf dir" SPARK_CONF_DIR="/etc/spark/conf" fi -java -cp ${HADOOP_CONF_DIR}:${SPARK_CONF_DIR}:$DIR/target/lib/*:$HOODIE_JAR org.springframework.shell.Bootstrap +if [ -z "$CLIENT_JAR" ]; then + echo "client jar location not set" +fi +java -cp ${HADOOP_CONF_DIR}:${SPARK_CONF_DIR}:$DIR/target/lib/*:$HOODIE_JAR:${CLIENT_JAR} org.springframework.shell.Bootstrap diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HoodieLogFileCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HoodieLogFileCommand.java new file mode 100644 index 000000000..e12ed4b46 --- /dev/null +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HoodieLogFileCommand.java @@ -0,0 +1,221 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.cli.commands; + +import com.beust.jcommander.internal.Maps; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.uber.hoodie.cli.HoodieCLI; +import com.uber.hoodie.cli.HoodiePrintHelper; +import com.uber.hoodie.common.model.HoodieLogFile; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.table.log.HoodieCompactedLogRecordScanner; +import com.uber.hoodie.common.table.log.HoodieLogFormat; +import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; +import com.uber.hoodie.common.table.log.block.HoodieCorruptBlock; +import com.uber.hoodie.common.table.log.block.HoodieLogBlock; +import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HeaderMetadataType; +import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HoodieLogBlockType; +import com.uber.hoodie.config.HoodieCompactionConfig; +import com.uber.hoodie.hive.util.SchemaUtil; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.springframework.shell.core.CommandMarker; +import org.springframework.shell.core.annotation.CliAvailabilityIndicator; +import org.springframework.shell.core.annotation.CliCommand; +import org.springframework.shell.core.annotation.CliOption; +import org.springframework.stereotype.Component; +import parquet.avro.AvroSchemaConverter; +import scala.Tuple2; +import scala.Tuple3; + +@Component +public class HoodieLogFileCommand implements CommandMarker { + + @CliAvailabilityIndicator({"show logfiles"}) + public boolean isShowArchivedLogFileAvailable() { + return HoodieCLI.tableMetadata != null; + } + + @CliCommand(value = "show logfile metadata", help = "Read commit metadata from log files") + public String showLogFileCommits( + @CliOption(key = "logFilePathPattern", mandatory = true, help = "Fully qualified path for the log file") + final String logFilePathPattern) throws IOException { + + FileSystem fs = HoodieCLI.tableMetadata.getFs(); + List logFilePaths = Arrays.stream(fs.globStatus(new Path(logFilePathPattern))) + .map(status -> status.getPath().toString()).collect(Collectors.toList()); + Map, Map>, Integer>>> commitCountAndMetadata = Maps + .newHashMap(); + int totalEntries = 0; + int numCorruptBlocks = 0; + + for (String logFilePath : logFilePaths) { + FileStatus[] fsStatus = fs.listStatus( + new Path(logFilePath)); + Schema writerSchema = new AvroSchemaConverter() + .convert(SchemaUtil + .readSchemaFromLogFile(HoodieCLI.tableMetadata.getFs(), new Path(logFilePath))); + HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(fs, + new HoodieLogFile(fsStatus[0].getPath()), writerSchema); + + // read the avro blocks + while (reader.hasNext()) { + HoodieLogBlock n = reader.next(); + String instantTime; + int recordCount = 0; + if (n instanceof HoodieCorruptBlock) { + try { + instantTime = n.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME); + } catch (Exception e) { + numCorruptBlocks++; + instantTime = "corrupt_block_" + numCorruptBlocks; + // could not read metadata for corrupt block + } + } else { + instantTime = n.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME); + if (n instanceof HoodieAvroDataBlock) { + recordCount = ((HoodieAvroDataBlock) n).getRecords().size(); + } + } + if (commitCountAndMetadata.containsKey(instantTime)) { + commitCountAndMetadata.get(instantTime) + .add(new Tuple3<>(n.getBlockType(), + new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount)); + totalEntries++; + } else { + List, Map>, Integer>> list + = new ArrayList<>(); + list.add(new Tuple3<>(n.getBlockType(), + new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount)); + commitCountAndMetadata.put(instantTime, list); + totalEntries++; + } + } + } + String[][] rows = new String[totalEntries + 1][]; + int i = 0; + ObjectMapper objectMapper = new ObjectMapper(); + for (Map.Entry, Map>, Integer>>> entry : commitCountAndMetadata + .entrySet()) { + String instantTime = entry.getKey().toString(); + for (Tuple3, Map>, Integer> tuple3 : entry + .getValue()) { + String[] output = new String[5]; + output[0] = instantTime; + output[1] = String.valueOf(tuple3._3()); + output[2] = tuple3._1().toString(); + output[3] = objectMapper.writeValueAsString(tuple3._2()._1()); + output[4] = objectMapper.writeValueAsString(tuple3._2()._2()); + rows[i] = output; + i++; + } + } + return HoodiePrintHelper.print( + new String[]{"InstantTime", "RecordCount", "BlockType", "HeaderMetadata", "FooterMetadata"}, + rows); + } + + @CliCommand(value = "show logfile records", help = "Read records from log files") + public String showLogFileRecords( + @CliOption(key = { + "limit"}, mandatory = false, help = "Limit commits", unspecifiedDefaultValue = "10") + final Integer limit, + @CliOption(key = "logFilePathPattern", mandatory = true, help = "Fully qualified paths for the log files") + final String logFilePathPattern, + @CliOption(key = "mergeRecords", mandatory = false, help = "If the records in the log files should be merged", + unspecifiedDefaultValue = "false") + final Boolean shouldMerge) throws IOException { + + System.out + .println("===============> Showing only " + limit + " records <==============="); + + FileSystem fs = HoodieCLI.tableMetadata.getFs(); + List logFilePaths = Arrays.stream(fs.globStatus(new Path(logFilePathPattern))) + .map(status -> status.getPath().toString()).collect(Collectors.toList()); + + // TODO : readerSchema can change across blocks/log files, fix this inside Scanner + AvroSchemaConverter converter = new AvroSchemaConverter(); + // get schema from last log file + Schema readerSchema = converter + .convert(SchemaUtil + .readSchemaFromLogFile(fs, new Path(logFilePaths.get(logFilePaths.size() - 1)))); + + List allRecords = new ArrayList<>(); + + if (shouldMerge) { + System.out.println("===========================> MERGING RECORDS <==================="); + HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, + HoodieCLI.tableMetadata.getBasePath(), logFilePaths, readerSchema, + HoodieCLI.tableMetadata.getActiveTimeline().getCommitTimeline().lastInstant().get() + .getTimestamp(), + Long.valueOf(HoodieCompactionConfig.DEFAULT_MAX_SIZE_IN_MEMORY_PER_COMPACTION_IN_BYTES), + Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED), + Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED)); + for (HoodieRecord hoodieRecord : scanner) { + Optional record = hoodieRecord.getData().getInsertValue(readerSchema); + if (allRecords.size() >= limit) { + break; + } + allRecords.add(record.get()); + } + } else { + for (String logFile : logFilePaths) { + Schema writerSchema = new AvroSchemaConverter() + .convert(SchemaUtil + .readSchemaFromLogFile(HoodieCLI.tableMetadata.getFs(), new Path(logFile))); + HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(fs, + new HoodieLogFile(new Path(logFile)), writerSchema); + // read the avro blocks + while (reader.hasNext()) { + HoodieLogBlock n = reader.next(); + if (n instanceof HoodieAvroDataBlock) { + HoodieAvroDataBlock blk = (HoodieAvroDataBlock) n; + List records = blk.getRecords(); + allRecords.addAll(records); + if (allRecords.size() >= limit) { + break; + } + } + } + if (allRecords.size() >= limit) { + break; + } + } + } + String[][] rows = new String[allRecords.size() + 1][]; + int i = 0; + for (IndexedRecord record : allRecords) { + String[] data = new String[1]; + data[0] = record.toString(); + rows[i] = data; + i++; + } + return HoodiePrintHelper.print( + new String[]{"Records"}, rows); + } +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieAvroDataBlock.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieAvroDataBlock.java index 94e89e793..eb2b5f45f 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieAvroDataBlock.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieAvroDataBlock.java @@ -21,17 +21,6 @@ import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.storage.SizeAwareDataInputStream; import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.exception.HoodieIOException; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.IndexedRecord; -import org.apache.avro.io.Decoder; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.io.Encoder; -import org.apache.avro.io.EncoderFactory; -import org.apache.hadoop.fs.FSDataInputStream; - -import javax.annotation.Nonnull; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; @@ -43,6 +32,16 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import javax.annotation.Nonnull; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.hadoop.fs.FSDataInputStream; /** * DataBlock contains a list of records serialized using Avro. @@ -161,6 +160,10 @@ public class HoodieAvroDataBlock extends HoodieLogBlock { } public Schema getSchema() { + // if getSchema was invoked before converting byte [] to records + if (records == null) { + getRecords(); + } return schema; } diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java index 69ae7aff5..1ac16ad43 100644 --- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java @@ -395,21 +395,13 @@ public class HoodieHiveClient { @SuppressWarnings("OptionalUsedAsFieldOrParameterType") private MessageType readSchemaFromLogFile(Optional lastCompactionCommitOpt, Path path) throws IOException { - Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null); - HoodieAvroDataBlock lastBlock = null; - while (reader.hasNext()) { - HoodieLogBlock block = reader.next(); - if (block instanceof HoodieAvroDataBlock) { - lastBlock = (HoodieAvroDataBlock) block; - } - } - if (lastBlock != null) { - lastBlock.getRecords(); - return new parquet.avro.AvroSchemaConverter().convert(lastBlock.getSchema()); - } + MessageType messageType = SchemaUtil.readSchemaFromLogFile(fs, path); // Fall back to read the schema from last compaction - LOG.info("Falling back to read the schema from last compaction " + lastCompactionCommitOpt); - return readSchemaFromLastCompaction(lastCompactionCommitOpt); + if (messageType == null) { + LOG.info("Falling back to read the schema from last compaction " + lastCompactionCommitOpt); + return readSchemaFromLastCompaction(lastCompactionCommitOpt); + } + return messageType; } /** diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/util/SchemaUtil.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/util/SchemaUtil.java index 9f16c777f..9eda294d9 100644 --- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/util/SchemaUtil.java +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/util/SchemaUtil.java @@ -18,13 +18,22 @@ package com.uber.hoodie.hive.util; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.uber.hoodie.common.model.HoodieLogFile; +import com.uber.hoodie.common.table.log.HoodieLogFormat; +import com.uber.hoodie.common.table.log.HoodieLogFormat.Reader; +import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; +import com.uber.hoodie.common.table.log.block.HoodieLogBlock; +import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.hive.HiveSyncConfig; import com.uber.hoodie.hive.HoodieHiveSyncException; import com.uber.hoodie.hive.SchemaDifference; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import parquet.schema.DecimalMetadata; @@ -416,4 +425,24 @@ public class SchemaUtil { // TODO - all partition fields should be part of the schema. datestr is treated as special. Dont do that return "String"; } + + /** + * Read the schema from the log file on path + */ + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + public static MessageType readSchemaFromLogFile(FileSystem fs, + Path path) throws IOException { + Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null); + HoodieAvroDataBlock lastBlock = null; + while (reader.hasNext()) { + HoodieLogBlock block = reader.next(); + if (block instanceof HoodieAvroDataBlock) { + lastBlock = (HoodieAvroDataBlock) block; + } + } + if (lastBlock != null) { + return new parquet.avro.AvroSchemaConverter().convert(lastBlock.getSchema()); + } + return null; + } }