diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CompactionCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CompactionCommand.java new file mode 100644 index 000000000..b5ed2ada5 --- /dev/null +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CompactionCommand.java @@ -0,0 +1,229 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.cli.commands; + +import com.uber.hoodie.avro.model.HoodieCompactionOperation; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; +import com.uber.hoodie.cli.HoodieCLI; +import com.uber.hoodie.cli.HoodiePrintHelper; +import com.uber.hoodie.cli.TableHeader; +import com.uber.hoodie.cli.commands.SparkMain.SparkCommand; +import com.uber.hoodie.cli.utils.InputStreamConsumer; +import com.uber.hoodie.cli.utils.SparkUtil; +import com.uber.hoodie.common.model.HoodieTableType; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.table.timeline.HoodieInstant.State; +import com.uber.hoodie.common.util.AvroUtils; +import com.uber.hoodie.exception.HoodieIOException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.launcher.SparkLauncher; +import org.apache.spark.util.Utils; +import org.springframework.shell.core.CommandMarker; +import org.springframework.shell.core.annotation.CliCommand; +import org.springframework.shell.core.annotation.CliOption; +import org.springframework.stereotype.Component; + +@Component +public class CompactionCommand implements CommandMarker { + + private static Logger log = LogManager.getLogger(HDFSParquetImportCommand.class); + + @CliCommand(value = "compactions show all", help = "Shows all compactions that are in active timeline") + public String compactionsAll( + @CliOption(key = { + "includeExtraMetadata"}, help = "Include extra metadata", unspecifiedDefaultValue = "false") final + boolean includeExtraMetadata, + @CliOption(key = { + "limit"}, mandatory = false, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit, + @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, + @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, + @CliOption(key = { + "headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") final + boolean headerOnly) throws IOException { + HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline(); + HoodieTimeline timeline = activeTimeline.getCommitsAndCompactionTimeline(); + HoodieTimeline commitTimeline = activeTimeline.getCommitTimeline().filterCompletedInstants(); + Set committed = commitTimeline.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()); + + List instants = timeline.getInstants().collect(Collectors.toList()); + List rows = new ArrayList<>(); + Collections.reverse(instants); + for (int i = 0; i < instants.size(); i++) { + HoodieInstant instant = instants.get(i); + HoodieCompactionPlan workload = null; + if (!instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) { + try { + // This could be a completed compaction. Assume a compaction request file is present but skip if fails + workload = AvroUtils.deserializeCompactionPlan( + activeTimeline.getInstantAuxiliaryDetails( + HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get()); + } catch (HoodieIOException ioe) { + // SKIP + } + } else { + workload = AvroUtils.deserializeCompactionPlan(activeTimeline.getInstantAuxiliaryDetails( + HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get()); + } + + if (null != workload) { + HoodieInstant.State state = instant.getState(); + if (committed.contains(instant.getTimestamp())) { + state = State.COMPLETED; + } + if (includeExtraMetadata) { + rows.add(new Comparable[]{instant.getTimestamp(), + state.toString(), + workload.getOperations() == null ? 0 : workload.getOperations().size(), + workload.getExtraMetadata().toString()}); + } else { + rows.add(new Comparable[]{instant.getTimestamp(), + state.toString(), + workload.getOperations() == null ? 0 : workload.getOperations().size()}); + } + } + } + + Map> fieldNameToConverterMap = new HashMap<>(); + TableHeader header = new TableHeader() + .addTableHeaderField("Compaction Instant Time") + .addTableHeaderField("State") + .addTableHeaderField("Total FileIds to be Compacted"); + if (includeExtraMetadata) { + header = header.addTableHeaderField("Extra Metadata"); + } + return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows); + } + + @CliCommand(value = "compaction show", help = "Shows compaction details for a specific compaction instant") + public String compactionShow( + @CliOption(key = "instant", mandatory = true, help = "Base path for the target hoodie dataset") final + String compactionInstantTime, + @CliOption(key = { + "limit"}, mandatory = false, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit, + @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, + @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, + @CliOption(key = { + "headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") final boolean headerOnly) + throws Exception { + HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline(); + HoodieCompactionPlan workload = AvroUtils.deserializeCompactionPlan( + activeTimeline.getInstantAuxiliaryDetails( + HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get()); + + List rows = new ArrayList<>(); + if ((null != workload) && (null != workload.getOperations())) { + for (HoodieCompactionOperation op : workload.getOperations()) { + rows.add(new Comparable[]{op.getPartitionPath(), + op.getFileId(), + op.getBaseInstantTime(), + op.getDataFilePath(), + op.getDeltaFilePaths().size(), + op.getMetrics().toString() + }); + } + } + + Map> fieldNameToConverterMap = new HashMap<>(); + TableHeader header = new TableHeader() + .addTableHeaderField("Partition Path") + .addTableHeaderField("File Id") + .addTableHeaderField("Base Instant") + .addTableHeaderField("Data File Path") + .addTableHeaderField("Total Delta Files") + .addTableHeaderField("getMetrics"); + return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows); + } + + @CliCommand(value = "compaction schedule", help = "Schedule Compaction") + public String scheduleCompact( + @CliOption(key = "tableName", mandatory = true, help = "Table name") final String tableName, + @CliOption(key = "rowKeyField", mandatory = true, help = "Row key field name") final String rowKeyField, + @CliOption(key = { + "parallelism"}, mandatory = true, help = "Parallelism for hoodie compaction") final String parallelism, + @CliOption(key = "schemaFilePath", mandatory = true, help = "Path for Avro schema file") final String + schemaFilePath, + @CliOption(key = "sparkMemory", mandatory = true, help = "Spark executor memory") final String sparkMemory, + @CliOption(key = "retry", mandatory = true, help = "Number of retries") final String retry) throws Exception { + boolean initialized = HoodieCLI.initConf(); + HoodieCLI.initFS(initialized); + + // First get a compaction instant time and pass it to spark launcher for scheduling compaction + String compactionInstantTime = HoodieActiveTimeline.createNewCommitTime(); + + if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) { + String sparkPropertiesPath = Utils.getDefaultPropertiesFile( + scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + sparkLauncher.addAppArgs(SparkCommand.COMPACT_SCHEDULE.toString(), HoodieCLI.tableMetadata.getBasePath(), + tableName, compactionInstantTime, rowKeyField, parallelism, schemaFilePath, sparkMemory, retry); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + if (exitCode != 0) { + return "Failed to run compaction for " + compactionInstantTime; + } + return "Compaction successfully completed for " + compactionInstantTime; + } else { + throw new Exception("Compactions can only be run for table type : MERGE_ON_READ"); + } + } + + @CliCommand(value = "compaction run", help = "Run Compaction for given instant time") + public String compact( + @CliOption(key = "tableName", mandatory = true, help = "Table name") final String tableName, + @CliOption(key = "rowKeyField", mandatory = true, help = "Row key field name") final String rowKeyField, + @CliOption(key = { + "parallelism"}, mandatory = true, help = "Parallelism for hoodie compaction") final String parallelism, + @CliOption(key = "schemaFilePath", mandatory = true, help = "Path for Avro schema file") final String + schemaFilePath, + @CliOption(key = "sparkMemory", mandatory = true, help = "Spark executor memory") final String sparkMemory, + @CliOption(key = "retry", mandatory = true, help = "Number of retries") final String retry, + @CliOption(key = "compactionInstant", mandatory = true, help = "Base path for the target hoodie dataset") final + String compactionInstantTime) throws Exception { + boolean initialized = HoodieCLI.initConf(); + HoodieCLI.initFS(initialized); + + if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) { + String sparkPropertiesPath = Utils.getDefaultPropertiesFile( + scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + sparkLauncher.addAppArgs(SparkCommand.COMPACT_RUN.toString(), HoodieCLI.tableMetadata.getBasePath(), + tableName, compactionInstantTime, rowKeyField, parallelism, schemaFilePath, sparkMemory, retry); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + if (exitCode != 0) { + return "Failed to run compaction for " + compactionInstantTime; + } + return "Compaction successfully completed for " + compactionInstantTime; + } else { + throw new Exception("Compactions can only be run for table type : MERGE_ON_READ"); + } + } +} \ No newline at end of file diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HDFSParquetImportCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HDFSParquetImportCommand.java index c3298c9ae..6750bafe9 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HDFSParquetImportCommand.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HDFSParquetImportCommand.java @@ -29,14 +29,17 @@ import org.springframework.shell.core.CommandMarker; import org.springframework.shell.core.annotation.CliCommand; import org.springframework.shell.core.annotation.CliOption; import org.springframework.stereotype.Component; +import scala.collection.JavaConverters; @Component public class HDFSParquetImportCommand implements CommandMarker { private static Logger log = LogManager.getLogger(HDFSParquetImportCommand.class); - @CliCommand(value = "hdfsparquetimport", help = "Imports hdfs dataset to a hoodie dataset") + @CliCommand(value = "hdfsparquetimport", help = "Imports Parquet dataset to a hoodie dataset") public String convert( + @CliOption(key = "upsert", mandatory = false, unspecifiedDefaultValue = "false", + help = "Uses upsert API instead of the default insert API of WriteClient") boolean useUpsert, @CliOption(key = "srcPath", mandatory = true, help = "Base path for the input dataset") final String srcPath, @CliOption(key = "srcType", mandatory = true, help = "Source type for the input dataset") final String srcType, @CliOption(key = "targetPath", mandatory = true, help = "Base path for the target hoodie dataset") final String @@ -59,10 +62,16 @@ public class HDFSParquetImportCommand implements CommandMarker { boolean initialized = HoodieCLI.initConf(); HoodieCLI.initFS(initialized); String sparkPropertiesPath = Utils.getDefaultPropertiesFile( - scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); + JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala()); + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); - sparkLauncher.addAppArgs(SparkCommand.IMPORT.toString(), srcPath, targetPath, tableName, tableType, rowKeyField, + String cmd = SparkCommand.IMPORT.toString(); + if (useUpsert) { + cmd = SparkCommand.UPSERT.toString(); + } + + sparkLauncher.addAppArgs(cmd, srcPath, targetPath, tableName, tableType, rowKeyField, partitionPathField, parallelism, schemaFilePath, sparkMemory, retry); Process process = sparkLauncher.launch(); InputStreamConsumer.captureOutput(process); diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SparkMain.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SparkMain.java index 32e2a0091..526e64620 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SparkMain.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SparkMain.java @@ -24,6 +24,7 @@ import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.utilities.HDFSParquetImporter; +import com.uber.hoodie.utilities.HoodieCompactor; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; @@ -32,12 +33,11 @@ public class SparkMain { protected static final Logger LOG = Logger.getLogger(SparkMain.class); - /** * Commands */ enum SparkCommand { - ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT + ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, } public static void main(String[] args) throws Exception { @@ -62,9 +62,20 @@ public class SparkMain { returnCode = rollbackToSavepoint(jsc, args[1], args[2]); break; case IMPORT: + case UPSERT: assert (args.length == 11); - returnCode = dataImport(jsc, args[1], args[2], args[3], args[4], args[5], args[6], Integer.parseInt(args[7]), - args[8], SparkUtil.DEFUALT_SPARK_MASTER, args[9], Integer.parseInt(args[10])); + returnCode = dataLoad(jsc, command, args[1], args[2], args[3], args[4], args[5], args[6], + Integer.parseInt(args[7]), args[8], SparkUtil.DEFUALT_SPARK_MASTER, args[9], Integer.parseInt(args[10])); + break; + case COMPACT_RUN: + assert (args.length == 9); + returnCode = compact(jsc, args[1], args[2], args[3], args[4], args[5], Integer.parseInt(args[6]), + args[7], args[8], Integer.parseInt(args[9]), false); + break; + case COMPACT_SCHEDULE: + assert (args.length == 10); + returnCode = compact(jsc, args[1], args[2], args[3], args[4], args[5], Integer.parseInt(args[6]), + args[7], args[8], Integer.parseInt(args[9]), true); break; default: break; @@ -73,10 +84,12 @@ public class SparkMain { System.exit(returnCode); } - private static int dataImport(JavaSparkContext jsc, String srcPath, String targetPath, String tableName, + private static int dataLoad(JavaSparkContext jsc, String command, + String srcPath, String targetPath, String tableName, String tableType, String rowKey, String partitionKey, int parallelism, String schemaFile, String sparkMaster, String sparkMemory, int retry) throws Exception { HDFSParquetImporter.Config cfg = new HDFSParquetImporter.Config(); + cfg.command = command; cfg.srcPath = srcPath; cfg.targetPath = targetPath; cfg.tableName = tableName; @@ -89,6 +102,22 @@ public class SparkMain { return new HDFSParquetImporter(cfg).dataImport(jsc, retry); } + private static int compact(JavaSparkContext jsc, String basePath, String tableName, String compactionInstant, + String rowKey, String partitionKey, int parallelism, String schemaFile, + String sparkMemory, int retry, boolean schedule) throws Exception { + HoodieCompactor.Config cfg = new HoodieCompactor.Config(); + cfg.basePath = basePath; + cfg.tableName = tableName; + cfg.compactionInstantTime = compactionInstant; + cfg.rowKey = rowKey; + cfg.partitionKey = partitionKey; + cfg.parallelism = parallelism; + cfg.schemaFile = schemaFile; + cfg.runSchedule = schedule; + jsc.getConf().set("spark.executor.memory", sparkMemory); + return new HoodieCompactor(cfg).compact(jsc, retry); + } + private static int deduplicatePartitionPath(JavaSparkContext jsc, String duplicatedPartitionPath, String repairedOutputPath, String basePath) throws Exception { DedupeSparkJob job = new DedupeSparkJob(basePath, duplicatedPartitionPath, repairedOutputPath, new SQLContext(jsc), diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java index a21683f0a..164f4486c 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java @@ -149,6 +149,10 @@ public class HoodieInstant implements Serializable { && Objects.equals(timestamp, that.timestamp); } + public State getState() { + return state; + } + @Override public int hashCode() { return Objects.hash(state, action, timestamp); diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HDFSParquetImporter.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HDFSParquetImporter.java index 70871b30a..7ec989c95 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HDFSParquetImporter.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HDFSParquetImporter.java @@ -26,24 +26,21 @@ import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.HoodieJsonPayload; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.table.HoodieTableConfig; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.util.FSUtils; -import com.uber.hoodie.config.HoodieIndexConfig; -import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieIOException; -import com.uber.hoodie.index.HoodieIndex; import java.io.IOException; import java.io.Serializable; -import java.nio.ByteBuffer; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; import java.util.List; +import java.util.Optional; import java.util.Properties; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; @@ -52,23 +49,150 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.hadoop.ParquetInputFormat; -import org.apache.spark.Accumulator; -import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; +/** + * Loads data from Parquet Sources + */ public class HDFSParquetImporter implements Serializable { + public static final SimpleDateFormat PARTITION_FORMATTER = new SimpleDateFormat("yyyy/MM/dd"); private static volatile Logger logger = LogManager.getLogger(HDFSParquetImporter.class); private final Config cfg; private transient FileSystem fs; - public static final SimpleDateFormat PARTITION_FORMATTER = new SimpleDateFormat("yyyy/MM/dd"); public HDFSParquetImporter(Config cfg) throws IOException { this.cfg = cfg; } + public static void main(String[] args) throws Exception { + final Config cfg = new Config(); + JCommander cmd = new JCommander(cfg, args); + if (cfg.help || args.length == 0) { + cmd.usage(); + System.exit(1); + } + HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg); + dataImporter.dataImport(UtilHelpers.buildSparkContext(cfg.tableName, cfg.sparkMaster, cfg.sparkMemory), cfg.retry); + } + + public int dataImport(JavaSparkContext jsc, int retry) throws Exception { + this.fs = FSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration()); + int ret = -1; + try { + // Verify that targetPath is not present. + if (fs.exists(new Path(cfg.targetPath))) { + throw new HoodieIOException(String.format("Make sure %s is not present.", cfg.targetPath)); + } + do { + ret = dataImport(jsc); + } while (ret != 0 && retry-- > 0); + } catch (Throwable t) { + logger.error(t); + } + return ret; + } + + @VisibleForTesting + protected int dataImport(JavaSparkContext jsc) throws IOException { + try { + if (fs.exists(new Path(cfg.targetPath))) { + // cleanup target directory. + fs.delete(new Path(cfg.targetPath), true); + } + + //Get schema. + String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile); + + // Initialize target hoodie table. + Properties properties = new Properties(); + properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, cfg.tableName); + properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, cfg.tableType); + HoodieTableMetaClient + .initializePathAsHoodieDataset(jsc.hadoopConfiguration(), cfg.targetPath, properties); + + HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.targetPath, schemaStr, + cfg.parallelism, Optional.empty()); + + JavaRDD> hoodieRecords = buildHoodieRecordsForImport(jsc, schemaStr); + // Get instant time. + String instantTime = client.startCommit(); + JavaRDD writeResponse = load(client, instantTime, hoodieRecords); + return UtilHelpers.handleErrors(jsc, instantTime, writeResponse); + } catch (Throwable t) { + logger.error("Error occurred.", t); + } + return -1; + } + + protected JavaRDD> buildHoodieRecordsForImport( + JavaSparkContext jsc, String schemaStr) throws IOException { + Job job = Job.getInstance(jsc.hadoopConfiguration()); + // Allow recursive directories to be found + job.getConfiguration().set(FileInputFormat.INPUT_DIR_RECURSIVE, "true"); + // To parallelize reading file status. + job.getConfiguration().set(FileInputFormat.LIST_STATUS_NUM_THREADS, "1024"); + AvroReadSupport + .setAvroReadSchema(jsc.hadoopConfiguration(), (new Schema.Parser().parse(schemaStr))); + ParquetInputFormat.setReadSupportClass(job, (AvroReadSupport.class)); + + return jsc.newAPIHadoopFile(cfg.srcPath, + ParquetInputFormat.class, Void.class, GenericRecord.class, job.getConfiguration()) + // To reduce large number of + // tasks. + .coalesce(16 * cfg.parallelism) + .map(entry -> { + GenericRecord genericRecord + = ((Tuple2) entry)._2(); + Object partitionField = + genericRecord.get(cfg.partitionKey); + if (partitionField == null) { + throw new HoodieIOException( + "partition key is missing. :" + + cfg.partitionKey); + } + Object rowField = genericRecord.get(cfg.rowKey); + if (rowField == null) { + throw new HoodieIOException( + "row field is missing. :" + cfg.rowKey); + } + String partitionPath = partitionField.toString(); + logger.info("Row Key : " + rowField + ", Partition Path is (" + partitionPath + ")"); + if (partitionField instanceof Number) { + try { + long ts = (long) (Double.parseDouble(partitionField.toString()) * 1000L); + partitionPath = + PARTITION_FORMATTER.format(new Date(ts)); + } catch (NumberFormatException nfe) { + logger.warn("Unable to parse date from partition field. Assuming partition as (" + partitionField + ")"); + } + } + return new HoodieRecord<>( + new HoodieKey( + (String) rowField, partitionPath), + new HoodieJsonPayload( + genericRecord.toString())); + }); + } + + /** + * Imports records to Hoodie dataset + * + * @param client Hoodie Client + * @param instantTime Instant Time + * @param hoodieRecords Hoodie Records + * @param Type + */ + protected JavaRDD load(HoodieWriteClient client, + String instantTime, JavaRDD> hoodieRecords) { + if (cfg.command.toLowerCase().equals("insert")) { + return client.insert(hoodieRecords, instantTime); + } + return client.upsert(hoodieRecords, instantTime); + } + public static class FormatValidator implements IValueValidator { List validFormats = Arrays.asList("parquet"); @@ -97,6 +221,10 @@ public class HDFSParquetImporter implements Serializable { public static class Config implements Serializable { + @Parameter(names = {"--command", "-c"}, + description = "Write command Valid values are insert(default)/upsert", + required = false) + public String command = "INSERT"; @Parameter(names = {"--src-path", "-sp"}, description = "Base path for the input dataset", required = true) public String srcPath = null; @@ -137,167 +265,4 @@ public class HDFSParquetImporter implements Serializable { @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; } - - public static void main(String[] args) throws Exception { - final HDFSParquetImporter.Config cfg = new HDFSParquetImporter.Config(); - JCommander cmd = new JCommander(cfg, args); - if (cfg.help || args.length == 0) { - cmd.usage(); - System.exit(1); - } - HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg); - dataImporter.dataImport(dataImporter.getSparkContext(), cfg.retry); - } - - private JavaSparkContext getSparkContext() { - SparkConf sparkConf = new SparkConf().setAppName("hoodie-data-importer-" + cfg.tableName); - sparkConf.setMaster(cfg.sparkMaster); - - if (cfg.sparkMaster.startsWith("yarn")) { - sparkConf.set("spark.eventLog.overwrite", "true"); - sparkConf.set("spark.eventLog.enabled", "true"); - } - - sparkConf.set("spark.driver.maxResultSize", "2g"); - sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - sparkConf.set("spark.executor.memory", cfg.sparkMemory); - - // Configure hadoop conf - sparkConf.set("spark.hadoop.mapred.output.compress", "true"); - sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true"); - sparkConf.set("spark.hadoop.mapred.output.compression.codec", - "org.apache.hadoop.io.compress.GzipCodec"); - sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK"); - - sparkConf = HoodieWriteClient.registerClasses(sparkConf); - return new JavaSparkContext(sparkConf); - } - - private String getSchema() throws Exception { - // Read schema file. - Path p = new Path(cfg.schemaFile); - if (!fs.exists(p)) { - throw new Exception(String.format("Could not find - %s - schema file.", cfg.schemaFile)); - } - long len = fs.getFileStatus(p).getLen(); - ByteBuffer buf = ByteBuffer.allocate((int) len); - FSDataInputStream inputStream = null; - try { - inputStream = fs.open(p); - inputStream.readFully(0, buf.array(), 0, buf.array().length); - } finally { - if (inputStream != null) { - inputStream.close(); - } - } - return new String(buf.array()); - } - - public int dataImport(JavaSparkContext jsc, int retry) throws Exception { - this.fs = FSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration()); - int ret = -1; - try { - // Verify that targetPath is not present. - if (fs.exists(new Path(cfg.targetPath))) { - throw new HoodieIOException(String.format("Make sure %s is not present.", cfg.targetPath)); - } - do { - ret = dataImport(jsc); - } while (ret != 0 && retry-- > 0); - } catch (Throwable t) { - logger.error(t); - } - return ret; - } - - @VisibleForTesting - protected int dataImport(JavaSparkContext jsc) throws IOException { - try { - if (fs.exists(new Path(cfg.targetPath))) { - // cleanup target directory. - fs.delete(new Path(cfg.targetPath), true); - } - - //Get schema. - String schemaStr = getSchema(); - - // Initialize target hoodie table. - Properties properties = new Properties(); - properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, cfg.tableName); - properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, cfg.tableType); - HoodieTableMetaClient - .initializePathAsHoodieDataset(jsc.hadoopConfiguration(), cfg.targetPath, properties); - - HoodieWriteClient client = createHoodieClient(jsc, cfg.targetPath, schemaStr, - cfg.parallelism); - - Job job = Job.getInstance(jsc.hadoopConfiguration()); - // To parallelize reading file status. - job.getConfiguration().set(FileInputFormat.LIST_STATUS_NUM_THREADS, "1024"); - AvroReadSupport - .setAvroReadSchema(jsc.hadoopConfiguration(), (new Schema.Parser().parse(schemaStr))); - ParquetInputFormat.setReadSupportClass(job, (AvroReadSupport.class)); - - JavaRDD> hoodieRecords = jsc.newAPIHadoopFile(cfg.srcPath, - ParquetInputFormat.class, Void.class, GenericRecord.class, job.getConfiguration()) - // To reduce large number of - // tasks. - .coalesce(16 * cfg.parallelism) - .map(entry -> { - GenericRecord genericRecord - = ((Tuple2) entry)._2(); - Object partitionField = - genericRecord.get(cfg.partitionKey); - if (partitionField == null) { - throw new HoodieIOException( - "partition key is missing. :" - + cfg.partitionKey); - } - Object rowField = genericRecord.get(cfg.rowKey); - if (rowField == null) { - throw new HoodieIOException( - "row field is missing. :" + cfg.rowKey); - } - long ts = (long) ((Double) partitionField * 1000L); - String partitionPath = - PARTITION_FORMATTER.format(new Date(ts)); - return new HoodieRecord<>( - new HoodieKey( - (String) rowField, partitionPath), - new HoodieJsonPayload( - genericRecord.toString())); - }); - // Get commit time. - String commitTime = client.startCommit(); - - JavaRDD writeResponse = client.bulkInsert(hoodieRecords, commitTime); - Accumulator errors = jsc.accumulator(0); - writeResponse.foreach(writeStatus -> { - if (writeStatus.hasErrors()) { - errors.add(1); - logger.error(String.format("Error processing records :writeStatus:%s", - writeStatus.getStat().toString())); - } - }); - if (errors.value() == 0) { - logger.info( - String.format("Dataset imported into hoodie dataset with %s commit time.", commitTime)); - return 0; - } - logger.error(String.format("Import failed with %d errors.", errors.value())); - } catch (Throwable t) { - logger.error("Error occurred.", t); - } - return -1; - } - - private static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, - String schemaStr, int parallelism) throws Exception { - HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) - .withParallelism(parallelism, parallelism).withSchema(schemaStr) - .combineInput(true, true).withIndexConfig( - HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) - .build(); - return new HoodieWriteClient(jsc, config); - } } diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactor.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactor.java new file mode 100644 index 000000000..d986a6ac4 --- /dev/null +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactor.java @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.uber.hoodie.HoodieWriteClient; +import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.common.util.FSUtils; +import java.io.Serializable; +import java.util.Optional; +import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + + +public class HoodieCompactor { + + private static volatile Logger logger = LogManager.getLogger(HDFSParquetImporter.class); + private final Config cfg; + private transient FileSystem fs; + + public HoodieCompactor(Config cfg) { + this.cfg = cfg; + } + + public static class Config implements Serializable { + @Parameter(names = {"--base-path", + "-sp"}, description = "Base path for the dataset", required = true) + public String basePath = null; + @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true) + public String tableName = null; + @Parameter(names = {"--instant-time", + "-sp"}, description = "Compaction Instant time", required = true) + public String compactionInstantTime = null; + @Parameter(names = {"--row-key-field", + "-rk"}, description = "Row key field name", required = true) + public String rowKey = null; + @Parameter(names = {"--partition-key-field", + "-pk"}, description = "Partition key field name", required = true) + public String partitionKey = null; + @Parameter(names = {"--parallelism", + "-pl"}, description = "Parallelism for hoodie insert", required = true) + public int parallelism = 1; + @Parameter(names = {"--schema-file", + "-sf"}, description = "path for Avro schema file", required = true) + public String schemaFile = null; + @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false) + public String sparkMaster = null; + @Parameter(names = {"--spark-memory", + "-sm"}, description = "spark memory to use", required = true) + public String sparkMemory = null; + @Parameter(names = {"--retry", "-rt"}, description = "number of retries", required = false) + public int retry = 0; + @Parameter(names = {"--schedule", "-sc"}, description = "Schedule compaction", required = false) + public Boolean runSchedule = false; + @Parameter(names = {"--strategy", "-st"}, description = "Stratgey Class", required = false) + public String strategyClassName = null; + @Parameter(names = {"--help", "-h"}, help = true) + public Boolean help = false; + } + + public static void main(String[] args) throws Exception { + final Config cfg = new Config(); + JCommander cmd = new JCommander(cfg, args); + if (cfg.help || args.length == 0) { + cmd.usage(); + System.exit(1); + } + HoodieCompactor compactor = new HoodieCompactor(cfg); + compactor.compact(UtilHelpers.buildSparkContext(cfg.tableName, cfg.sparkMaster, cfg.sparkMemory), cfg.retry); + } + + public int compact(JavaSparkContext jsc, int retry) { + this.fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration()); + int ret = -1; + try { + do { + if (cfg.runSchedule) { + if (null == cfg.strategyClassName) { + throw new IllegalArgumentException("Missing Strategy class name for running compaction"); + } + ret = doSchedule(jsc); + } else { + ret = doCompact(jsc); + } + } while (ret != 0 && retry-- > 0); + } catch (Throwable t) { + logger.error(t); + } + return ret; + } + + private int doCompact(JavaSparkContext jsc) throws Exception { + //Get schema. + String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile); + HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, + Optional.empty()); + JavaRDD writeResponse = client.compact(cfg.compactionInstantTime); + return UtilHelpers.handleErrors(jsc, cfg.compactionInstantTime, writeResponse); + } + + private int doSchedule(JavaSparkContext jsc) throws Exception { + //Get schema. + String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile); + HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, + Optional.of(cfg.strategyClassName)); + client.scheduleCompactionAtInstant(cfg.compactionInstantTime, Optional.empty()); + return 0; + } +} \ No newline at end of file diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java index d64829ea6..77952bcf0 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java @@ -18,24 +18,39 @@ package com.uber.hoodie.utilities; +import com.uber.hoodie.HoodieWriteClient; +import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.common.util.ReflectionUtils; +import com.uber.hoodie.config.HoodieCompactionConfig; +import com.uber.hoodie.config.HoodieIndexConfig; +import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieIOException; +import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException; import com.uber.hoodie.utilities.schema.SchemaProvider; import com.uber.hoodie.utilities.sources.Source; import com.uber.hoodie.utilities.sources.SourceDataFormat; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Optional; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.lang3.reflect.ConstructorUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.Accumulator; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; /** * Bunch of helper methods */ public class UtilHelpers { + private static Logger logger = LogManager.getLogger(UtilHelpers.class); public static Source createSource(String sourceClass, PropertiesConfiguration cfg, JavaSparkContext jssc, SourceDataFormat dataFormat, SchemaProvider schemaProvider) @@ -76,4 +91,98 @@ public class UtilHelpers { } } + /** + * Parse Schema from file + * + * @param fs File System + * @param schemaFile Schema File + */ + public static String parseSchema(FileSystem fs, String schemaFile) throws Exception { + // Read schema file. + Path p = new Path(schemaFile); + if (!fs.exists(p)) { + throw new Exception(String.format("Could not find - %s - schema file.", schemaFile)); + } + long len = fs.getFileStatus(p).getLen(); + ByteBuffer buf = ByteBuffer.allocate((int) len); + FSDataInputStream inputStream = null; + try { + inputStream = fs.open(p); + inputStream.readFully(0, buf.array(), 0, buf.array().length); + } finally { + if (inputStream != null) { + inputStream.close(); + } + } + return new String(buf.array()); + } + + /** + * Build Spark Context for ingestion/compaction + * @return + */ + public static JavaSparkContext buildSparkContext(String tableName, String sparkMaster, String sparkMemory) { + SparkConf sparkConf = new SparkConf().setAppName("hoodie-data-importer-" + tableName); + sparkConf.setMaster(sparkMaster); + + if (sparkMaster.startsWith("yarn")) { + sparkConf.set("spark.eventLog.overwrite", "true"); + sparkConf.set("spark.eventLog.enabled", "true"); + } + + sparkConf.set("spark.driver.maxResultSize", "2g"); + sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + sparkConf.set("spark.executor.memory", sparkMemory); + + // Configure hadoop conf + sparkConf.set("spark.hadoop.mapred.output.compress", "true"); + sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true"); + sparkConf.set("spark.hadoop.mapred.output.compression.codec", + "org.apache.hadoop.io.compress.GzipCodec"); + sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK"); + + sparkConf = HoodieWriteClient.registerClasses(sparkConf); + return new JavaSparkContext(sparkConf); + } + + /** + * Build Hoodie write client + * + * @param jsc Java Spark Context + * @param basePath Base Path + * @param schemaStr Schema + * @param parallelism Parallelism + */ + public static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, + String schemaStr, int parallelism, Optional compactionStrategyClass) throws Exception { + HoodieCompactionConfig compactionConfig = + compactionStrategyClass.map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false) + .withCompactionStrategy(ReflectionUtils.loadClass(strategy)) + .build()).orElse(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build()); + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withParallelism(parallelism, parallelism).withSchema(schemaStr) + .combineInput(true, true) + .withCompactionConfig(compactionConfig) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) + .build(); + return new HoodieWriteClient(jsc, config); + } + + public static int handleErrors(JavaSparkContext jsc, String instantTime, JavaRDD writeResponse) { + Accumulator errors = jsc.accumulator(0); + writeResponse.foreach(writeStatus -> { + if (writeStatus.hasErrors()) { + errors.add(1); + logger.error(String.format("Error processing records :writeStatus:%s", + writeStatus.getStat().toString())); + } + }); + if (errors.value() == 0) { + logger.info( + String.format("Dataset imported into hoodie dataset with %s instant time.", instantTime)); + return 0; + } + logger.error(String.format("Import failed with %d errors.", errors.value())); + return -1; + } }