1
0

Reworking the deltastreamer tool

- Standardize version of jackson
 - DFSPropertiesConfiguration replaces usage of commons PropertiesConfiguration
 - Remove dependency on ConstructorUtils
 - Throw error if ordering value is not present, during key generation
 - Switch to shade plugin for hoodie-utilities
 - Added support for consumption for Confluent avro kafka serdes
 - Support for Confluent schema registry
 - KafkaSource now deals with skews nicely, by doing round robin allocation of source limit across partitions
 - Added support for BULK_INSERT operations as well
 - Pass in the payload class config properly into HoodieWriteClient
 - Fix documentation based on new usage
 - Adding tests on deltastreamer, sources and all new util classes.
This commit is contained in:
Vinoth Chandar
2018-08-04 03:35:30 -07:00
committed by vinoth chandar
parent fb95dbdedb
commit d58ddbd999
49 changed files with 1919 additions and 754 deletions

View File

@@ -1,50 +0,0 @@
<!--
~ Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd">
<id>bin</id>
<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<outputDirectory>/</outputDirectory>
<unpack>true</unpack>
<scope>runtime</scope>
<excludes>
<exclude>junit:junit</exclude>
<exclude>com.google.code.findbugs:*</exclude>
<exclude>org.apache.hadoop:*</exclude>
<exclude>org.apache.hbase:*</exclude>
<!--<exclude>log4j:*</exclude>-->
<!--<exclude>org.slf4j:*</exclude>-->
<!--<exclude>commons-dbcp:*</exclude>-->
<!--<exclude>org.apache.httpcomponents:*</exclude>-->
</excludes>
<unpackOptions>
<!--<excludes>-->
<!--<exclude>-->
<!--**/slf4j/**-->
<!--</exclude>-->
<!--</excludes>-->
</unpackOptions>
</dependencySet>
</dependencySets>
</assembly>

View File

@@ -75,7 +75,9 @@ public class HDFSParquetImporter implements Serializable {
System.exit(1);
}
HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
dataImporter.dataImport(UtilHelpers.buildSparkContext(cfg.tableName, cfg.sparkMaster, cfg.sparkMemory), cfg.retry);
dataImporter
.dataImport(UtilHelpers.buildSparkContext("data-importer-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory),
cfg.retry);
}
public int dataImport(JavaSparkContext jsc, int retry) throws Exception {
@@ -206,19 +208,6 @@ public class HDFSParquetImporter implements Serializable {
}
}
public static class SourceTypeValidator implements IValueValidator<String> {
List<String> validSourceTypes = Arrays.asList("hdfs");
@Override
public void validate(String name, String value) throws ParameterException {
if (value == null || !validSourceTypes.contains(value)) {
throw new ParameterException(String.format(
"Invalid source type: value:%s: supported source types:%s", value, validSourceTypes));
}
}
}
public static class Config implements Serializable {
@Parameter(names = {"--command", "-c"},
@@ -228,10 +217,6 @@ public class HDFSParquetImporter implements Serializable {
@Parameter(names = {"--src-path",
"-sp"}, description = "Base path for the input dataset", required = true)
public String srcPath = null;
@Parameter(names = {"--src-type",
"-st"}, description = "Source type for the input dataset", required = true,
validateValueWith = SourceTypeValidator.class)
public String srcType = null;
@Parameter(names = {"--target-path",
"-tp"}, description = "Base path for the target hoodie dataset", required = true)
public String targetPath = null;

View File

@@ -93,8 +93,6 @@ public class HiveIncrementalPuller {
public int maxCommits = 3;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
@Parameter(names = {"--storageFormat"})
public String tempTableStorageFormat = "AVRO";
}
static {
@@ -207,12 +205,7 @@ public class HiveIncrementalPuller {
}
private String getStoredAsClause() {
if (config.tempTableStorageFormat.equalsIgnoreCase("json")) {
// Special case for json
// default json serde does not support having same key even if its under multiple depths
return "ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS TEXTFILE";
}
return "STORED AS " + config.tempTableStorageFormat;
return "STORED AS AVRO";
}
private void initHiveBeelineProperties(Statement stmt) throws SQLException {

View File

@@ -80,7 +80,8 @@ public class HoodieCompactor {
System.exit(1);
}
HoodieCompactor compactor = new HoodieCompactor(cfg);
compactor.compact(UtilHelpers.buildSparkContext(cfg.tableName, cfg.sparkMaster, cfg.sparkMemory), cfg.retry);
compactor.compact(UtilHelpers.buildSparkContext("compactor-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory),
cfg.retry);
}
public int compact(JavaSparkContext jsc, int retry) {
@@ -119,4 +120,4 @@ public class HoodieCompactor {
client.scheduleCompactionAtInstant(cfg.compactionInstantTime, Optional.empty());
return 0;
}
}
}

View File

@@ -20,22 +20,20 @@ package com.uber.hoodie.utilities;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.util.DFSPropertiesConfiguration;
import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import com.uber.hoodie.utilities.sources.Source;
import com.uber.hoodie.utilities.sources.SourceDataFormat;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Optional;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -52,42 +50,34 @@ import org.apache.spark.api.java.JavaSparkContext;
public class UtilHelpers {
private static Logger logger = LogManager.getLogger(UtilHelpers.class);
public static Source createSource(String sourceClass, PropertiesConfiguration cfg,
JavaSparkContext jssc, SourceDataFormat dataFormat, SchemaProvider schemaProvider)
public static Source createSource(String sourceClass, TypedProperties cfg,
JavaSparkContext jssc, SchemaProvider schemaProvider)
throws IOException {
try {
return (Source) ConstructorUtils.invokeConstructor(Class.forName(sourceClass), (Object) cfg,
(Object) jssc, (Object) dataFormat, (Object) schemaProvider);
return (Source) ReflectionUtils.loadClass(sourceClass,
new Class<?>[]{TypedProperties.class, JavaSparkContext.class, SchemaProvider.class},
cfg, jssc, schemaProvider);
} catch (Throwable e) {
throw new IOException("Could not load source class " + sourceClass, e);
}
}
public static SchemaProvider createSchemaProvider(String schemaProviderClass,
PropertiesConfiguration cfg) throws IOException {
TypedProperties cfg, JavaSparkContext jssc) throws IOException {
try {
return (SchemaProvider) ConstructorUtils.invokeConstructor(Class.forName(schemaProviderClass),
(Object) cfg);
return (SchemaProvider) ReflectionUtils.loadClass(schemaProviderClass, cfg, jssc);
} catch (Throwable e) {
throw new IOException("Could not load schema provider class " + schemaProviderClass, e);
}
}
/**
* TODO: Support hierarchical config files (see CONFIGURATION-609 for sample)
*/
public static PropertiesConfiguration readConfig(FileSystem fs, Path cfgPath) {
public static DFSPropertiesConfiguration readConfig(FileSystem fs, Path cfgPath) {
try {
FSDataInputStream in = fs.open(cfgPath);
PropertiesConfiguration config = new PropertiesConfiguration();
config.load(in);
in.close();
return config;
} catch (IOException e) {
throw new HoodieIOException("Unable to read config file at :" + cfgPath, e);
} catch (ConfigurationException e) {
throw new HoodieDeltaStreamerException("Invalid configs found in config file at :" + cfgPath,
e);
return new DFSPropertiesConfiguration(fs, cfgPath);
} catch (Exception e) {
throw new HoodieException("Unable to read props file at :" + cfgPath, e);
}
}
@@ -117,24 +107,16 @@ public class UtilHelpers {
return new String(buf.array());
}
/**
* Build Spark Context for ingestion/compaction
* @return
*/
public static JavaSparkContext buildSparkContext(String tableName, String sparkMaster, String sparkMemory) {
SparkConf sparkConf = new SparkConf().setAppName("hoodie-data-importer-" + tableName);
sparkConf.setMaster(sparkMaster);
if (sparkMaster.startsWith("yarn")) {
private static SparkConf buildSparkConf(String appName, String defaultMaster) {
SparkConf sparkConf = new SparkConf().setAppName(appName);
String master = sparkConf.get("spark.master", defaultMaster);
sparkConf.setMaster(master);
if (master.startsWith("yarn")) {
sparkConf.set("spark.eventLog.overwrite", "true");
sparkConf.set("spark.eventLog.enabled", "true");
}
sparkConf.set("spark.driver.maxResultSize", "2g");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.executor.memory", sparkMemory);
// Configure hadoop conf
sparkConf.set("spark.hadoop.mapred.output.compress", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec",
@@ -142,6 +124,20 @@ public class UtilHelpers {
sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
sparkConf = HoodieWriteClient.registerClasses(sparkConf);
return sparkConf;
}
public static JavaSparkContext buildSparkContext(String appName, String defaultMaster) {
return new JavaSparkContext(buildSparkConf(appName, defaultMaster));
}
/**
* Build Spark Context for ingestion/compaction
* @return
*/
public static JavaSparkContext buildSparkContext(String appName, String sparkMaster, String sparkMemory) {
SparkConf sparkConf = buildSparkConf(appName, sparkMaster);
sparkConf.set("spark.executor.memory", sparkMemory);
return new JavaSparkContext(sparkConf);
}
@@ -185,4 +181,10 @@ public class UtilHelpers {
logger.error(String.format("Import failed with %d errors.", errors.value()));
return -1;
}
public static TypedProperties readConfig(InputStream in) throws IOException {
TypedProperties defaults = new TypedProperties();
defaults.load(in);
return defaults;
}
}

View File

@@ -35,6 +35,8 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
@@ -44,9 +46,8 @@ import com.uber.hoodie.utilities.UtilHelpers;
import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException;
import com.uber.hoodie.utilities.schema.FilebasedSchemaProvider;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import com.uber.hoodie.utilities.sources.DFSSource;
import com.uber.hoodie.utilities.sources.JsonDFSSource;
import com.uber.hoodie.utilities.sources.Source;
import com.uber.hoodie.utilities.sources.SourceDataFormat;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
@@ -56,13 +57,10 @@ import java.util.Optional;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.collection.JavaConversions;
@@ -77,7 +75,7 @@ public class HoodieDeltaStreamer implements Serializable {
private static volatile Logger log = LogManager.getLogger(HoodieDeltaStreamer.class);
private static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
private final Config cfg;
@@ -113,9 +111,15 @@ public class HoodieDeltaStreamer implements Serializable {
private transient JavaSparkContext jssc;
public HoodieDeltaStreamer(Config cfg) throws IOException {
/**
* Bag of properties with source, hoodie client, key generator etc.
*/
TypedProperties props;
public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc) throws IOException {
this.cfg = cfg;
this.jssc = getSparkContext();
this.jssc = jssc;
this.fs = FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration());
if (fs.exists(new Path(cfg.targetBasePath))) {
@@ -126,61 +130,19 @@ public class HoodieDeltaStreamer implements Serializable {
this.commitTimelineOpt = Optional.empty();
}
//TODO(vc) Should these be passed from outside?
initSchemaProvider();
initKeyGenerator();
initSource();
}
this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath)).getConfig();
log.info("Creating delta streamer with configs : " + props.toString());
this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc);
this.keyGenerator = DataSourceUtils.createKeyGenerator(cfg.keyGeneratorClass, props);
this.source = UtilHelpers.createSource(cfg.sourceClassName, props, jssc, schemaProvider);
private void initSource() throws IOException {
// Create the source & schema providers
PropertiesConfiguration sourceCfg = UtilHelpers.readConfig(fs, new Path(cfg.sourceConfigProps));
log.info("Creating source " + cfg.sourceClassName + " with configs : " + sourceCfg.toString());
this.source = UtilHelpers.createSource(cfg.sourceClassName, sourceCfg, jssc, cfg.sourceFormat,
schemaProvider);
}
private void initSchemaProvider() throws IOException {
PropertiesConfiguration schemaCfg = UtilHelpers.readConfig(fs,
new Path(cfg.schemaProviderConfigProps));
log.info(
"Creating schema provider " + cfg.schemaProviderClassName + " with configs : " + schemaCfg
.toString());
this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, schemaCfg);
}
private void initKeyGenerator() throws IOException {
PropertiesConfiguration keygenCfg = UtilHelpers.readConfig(fs, new Path(cfg.keyGeneratorProps));
log.info("Creating key generator " + cfg.keyGeneratorClass + " with configs : " + keygenCfg
.toString());
this.keyGenerator = DataSourceUtils.createKeyGenerator(cfg.keyGeneratorClass, keygenCfg);
}
private JavaSparkContext getSparkContext() {
SparkConf sparkConf = new SparkConf()
.setAppName("hoodie-delta-streamer-" + cfg.targetTableName);
//sparkConf.setMaster(cfg.sparkMaster);
sparkConf.setMaster("local[2]");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.driver.maxResultSize", "2g");
// Configure hadoop conf
sparkConf.set("spark.hadoop.mapred.output.compress", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec",
"org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
sparkConf = HoodieWriteClient.registerClasses(sparkConf);
// register the schemas, so that shuffle does not serialize the full schemas
List<Schema> schemas = Arrays.asList(schemaProvider.getSourceSchema(),
schemaProvider.getTargetSchema());
sparkConf.registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList());
return new JavaSparkContext(sparkConf);
jssc.sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList());
}
private void sync() throws Exception {
public void sync() throws Exception {
// Retrieve the previous round checkpoints, if any
Optional<String> resumeCheckpointStr = Optional.empty();
if (commitTimelineOpt.isPresent()) {
@@ -207,7 +169,7 @@ public class HoodieDeltaStreamer implements Serializable {
// Pull the data from the source & prepare the write
Pair<Optional<JavaRDD<GenericRecord>>, String> dataAndCheckpoint = source.fetchNewData(
resumeCheckpointStr, cfg.maxInputBytes);
resumeCheckpointStr, cfg.sourceLimit);
if (!dataAndCheckpoint.getKey().isPresent()) {
log.info("No new data, nothing to commit.. ");
@@ -222,7 +184,7 @@ public class HoodieDeltaStreamer implements Serializable {
});
// Perform the write
HoodieWriteConfig hoodieCfg = getHoodieClientConfig(cfg.hoodieClientProps);
HoodieWriteConfig hoodieCfg = getHoodieClientConfig();
HoodieWriteClient client = new HoodieWriteClient<>(jssc, hoodieCfg);
String commitTime = client.startCommit();
log.info("Starting commit : " + commitTime);
@@ -232,6 +194,8 @@ public class HoodieDeltaStreamer implements Serializable {
writeStatusRDD = client.insert(records, commitTime);
} else if (cfg.operation == Operation.UPSERT) {
writeStatusRDD = client.upsert(records, commitTime);
} else if (cfg.operation == Operation.BULK_INSERT) {
writeStatusRDD = client.bulkInsert(records, commitTime);
} else {
throw new HoodieDeltaStreamerException("Unknown operation :" + cfg.operation);
}
@@ -245,157 +209,84 @@ public class HoodieDeltaStreamer implements Serializable {
if (success) {
log.info("Commit " + commitTime + " successful!");
// TODO(vc): Kick off hive sync from here.
} else {
log.info("Commit " + commitTime + " failed!");
}
client.close();
}
private HoodieWriteConfig getHoodieClientConfig(String hoodieClientCfgPath) throws Exception {
private HoodieWriteConfig getHoodieClientConfig() throws Exception {
return HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(cfg.targetBasePath)
.withAutoCommit(false).withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withPayloadClass(
OverwriteWithLatestAvroPayload
.class
.getName()).build())
.withAutoCommit(false)
.withSchema(schemaProvider.getTargetSchema().toString())
.forTable(cfg.targetTableName).withIndexConfig(
HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.fromInputStream(fs.open(new Path(hoodieClientCfgPath))).build();
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName).build())
.forTable(cfg.targetTableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withProps(props).build();
}
private enum Operation {
UPSERT, INSERT
public enum Operation {
UPSERT, INSERT, BULK_INSERT
}
private class OperationConvertor implements IStringConverter<Operation> {
@Override
public Operation convert(String value) throws ParameterException {
return Operation.valueOf(value);
}
}
private class SourceFormatConvertor implements IStringConverter<SourceDataFormat> {
@Override
public SourceDataFormat convert(String value) throws ParameterException {
return SourceDataFormat.valueOf(value);
}
}
public static class Config implements Serializable {
/**
* TARGET CONFIGS
**/
@Parameter(names = {
"--target-base-path"}, description = "base path for the target hoodie dataset", required = true)
@Parameter(names = {"--target-base-path"}, description = "base path for the target hoodie dataset. "
+ "(Will be created if did not exist first time around. If exists, expected to be a hoodie dataset)",
required = true)
public String targetBasePath;
// TODO: How to obtain hive configs to register?
@Parameter(names = {
"--target-table"}, description = "name of the target table in Hive", required = true)
@Parameter(names = {"--target-table"}, description = "name of the target table in Hive", required = true)
public String targetTableName;
@Parameter(names = {"--hoodie-client-config"}, description =
"path to properties file on localfs or "
+ "dfs, with hoodie client config. "
+ "Sane defaults"
+ "are used, but recommend use to "
+ "provide basic things like metrics "
+ "endpoints, hive configs etc")
public String hoodieClientProps = null;
@Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
+ "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
+ "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer"
+ "to individual classes, for supported properties.")
public String propsFilePath =
"file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties";
/**
* SOURCE CONFIGS
**/
@Parameter(names = {"--source-class"}, description =
"subclass of com.uber.hoodie.utilities.sources"
+ ".Source to use to read data. "
+ "built-in options: com.uber.hoodie.utilities"
+ ".common.{DFSSource (default), KafkaSource, "
+ "HiveIncrPullSource}")
public String sourceClassName = DFSSource.class.getName();
@Parameter(names = {"--source-class"}, description = "Subclass of com.uber.hoodie.utilities.sources to read data. "
+ "Built-in options: com.uber.hoodie.utilities.sources.{JsonDFSSource (default), AvroDFSSource, "
+ "JsonKafkaSource, AvroKafkaSource, HiveIncrPullSource}")
public String sourceClassName = JsonDFSSource.class.getName();
@Parameter(names = {"--source-config"}, description =
"path to properties file on localfs or dfs, with "
+ "source configs. "
+ "For list of acceptable properties, refer "
+ "the source class", required = true)
public String sourceConfigProps = null;
@Parameter(names = {"--source-format"}, description =
"Format of data in source, JSON (default), Avro. "
+ "All source data is "
+ "converted to Avro using the provided "
+ "schema in any case", converter = SourceFormatConvertor.class)
public SourceDataFormat sourceFormat = SourceDataFormat.JSON;
@Parameter(names = {"--source-ordering-field"}, description =
"Field within source record to decide how"
+ " to break ties between "
+ " records with same key in input "
+ "data. Default: 'ts' holding unix "
+ "timestamp of record")
@Parameter(names = {"--source-ordering-field"}, description = "Field within source record to decide how"
+ " to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record")
public String sourceOrderingField = "ts";
@Parameter(names = {"--key-generator-class"}, description =
"Subclass of com.uber.hoodie.utilities"
+ ".common.KeyExtractor to generate"
+ "a HoodieKey from the given avro "
+ "record. Built in: SimpleKeyGenerator"
+ " (Uses provided field names as "
+ "recordkey & partitionpath. "
+ "Nested fields specified via dot "
+ "notation, e.g: a.b.c)")
@Parameter(names = {"--key-generator-class"}, description = "Subclass of com.uber.hoodie.KeyGenerator "
+ "to generate a HoodieKey from the given avro record. Built in: SimpleKeyGenerator (uses "
+ "provided field names as recordkey & partitionpath. Nested fields specified via dot notation, e.g: a.b.c)")
public String keyGeneratorClass = SimpleKeyGenerator.class.getName();
@Parameter(names = {"--key-generator-config"}, description =
"Path to properties file on localfs or "
+ "dfs, with KeyGenerator configs. "
+ "For list of acceptable properites, "
+ "refer the KeyGenerator class",
required = true)
public String keyGeneratorProps = null;
@Parameter(names = {"--payload-class"}, description =
"subclass of HoodieRecordPayload, that works off "
+ "a GenericRecord. "
+ "Default: SourceWrapperPayload. Implement "
+ "your own, if you want to do something "
+ "other than overwriting existing value")
@Parameter(names = {"--payload-class"}, description = "subclass of HoodieRecordPayload, that works off "
+ "a GenericRecord. Implement your own, if you want to do something other than overwriting existing value")
public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName();
@Parameter(names = {"--schemaprovider-class"}, description =
"subclass of com.uber.hoodie.utilities"
+ ".schema.SchemaProvider "
+ "to attach schemas to input & target"
+ " table data, built in options: "
+ "FilebasedSchemaProvider")
@Parameter(names = {"--schemaprovider-class"}, description = "subclass of com.uber.hoodie.utilities.schema"
+ ".SchemaProvider to attach schemas to input & target table data, built in options: FilebasedSchemaProvider")
public String schemaProviderClassName = FilebasedSchemaProvider.class.getName();
@Parameter(names = {"--schemaprovider-config"}, description =
"path to properties file on localfs or dfs, with schema "
+ "configs. For list of acceptable properties, refer "
+ "the schema provider class", required = true)
public String schemaProviderConfigProps = null;
@Parameter(names = {"--source-limit"}, description = "Maximum amount of data to read from source. "
+ "Default: No limit For e.g: DFSSource => max bytes to read, KafkaSource => max events to read")
public long sourceLimit = Long.MAX_VALUE;
/**
* Other configs
**/
@Parameter(names = {
"--max-input-bytes"}, description = "Maximum number of bytes to read from source. Default: 1TB")
public long maxInputBytes = 1L * 1024 * 1024 * 1024 * 1024;
@Parameter(names = {"--op"}, description =
"Takes one of these values : UPSERT (default), INSERT (use when input "
+ "is purely new data/inserts to gain speed)",
@Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input "
+ "is purely new data/inserts to gain speed)",
converter = OperationConvertor.class)
public Operation operation = Operation.UPSERT;
@Parameter(names = {"--spark-master"}, description = "spark master to use.")
public String sparkMaster = "local[2]";
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
@@ -408,6 +299,8 @@ public class HoodieDeltaStreamer implements Serializable {
cmd.usage();
System.exit(1);
}
new HoodieDeltaStreamer(cfg).sync();
JavaSparkContext jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, cfg.sparkMaster);
new HoodieDeltaStreamer(cfg, jssc).sync();
}
}

View File

@@ -21,6 +21,7 @@ package com.uber.hoodie.utilities.keygen;
import com.uber.hoodie.DataSourceUtils;
import com.uber.hoodie.SimpleKeyGenerator;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.exception.HoodieNotSupportedException;
import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException;
import java.io.Serializable;
@@ -30,7 +31,6 @@ import java.util.Arrays;
import java.util.Date;
import java.util.TimeZone;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.configuration.PropertiesConfiguration;
/**
* Key generator, that relies on timestamps for partitioning field. Still picks record key by name.
@@ -64,7 +64,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
+ ".dateformat";
}
public TimestampBasedKeyGenerator(PropertiesConfiguration config) {
public TimestampBasedKeyGenerator(TypedProperties config) {
super(config);
DataSourceUtils.checkRequiredProperties(config,
Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP));

View File

@@ -20,14 +20,14 @@ package com.uber.hoodie.utilities.schema;
import com.uber.hoodie.DataSourceUtils;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
import java.util.Arrays;
import org.apache.avro.Schema;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
/**
* A simple schema provider, that reads off files on DFS
@@ -37,33 +37,30 @@ public class FilebasedSchemaProvider extends SchemaProvider {
/**
* Configs supported
*/
static class Config {
private static final String SOURCE_SCHEMA_FILE_PROP = "hoodie.deltastreamer.filebased"
+ ".schemaprovider.source.schema"
+ ".file";
private static final String TARGET_SCHEMA_FILE_PROP = "hoodie.deltastreamer.filebased"
+ ".schemaprovider.target.schema"
+ ".file";
public static class Config {
private static final String SOURCE_SCHEMA_FILE_PROP = "hoodie.deltastreamer.schemaprovider"
+ ".source.schema.file";
private static final String TARGET_SCHEMA_FILE_PROP = "hoodie.deltastreamer.schemaprovider"
+ ".target.schema.file";
}
private final FileSystem fs;
private final Schema sourceSchema;
private final Schema targetSchema;
private Schema targetSchema;
public FilebasedSchemaProvider(PropertiesConfiguration config) {
super(config);
this.fs = FSUtils.getFs(config.getBasePath(), new Configuration());
DataSourceUtils.checkRequiredProperties(config,
Arrays.asList(Config.SOURCE_SCHEMA_FILE_PROP, Config.TARGET_SCHEMA_FILE_PROP));
public FilebasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.SOURCE_SCHEMA_FILE_PROP));
this.fs = FSUtils.getFs(props.getString(Config.SOURCE_SCHEMA_FILE_PROP), jssc.hadoopConfiguration());
try {
this.sourceSchema = new Schema.Parser().parse(
fs.open(new Path(config.getString(Config.SOURCE_SCHEMA_FILE_PROP))));
this.targetSchema = new Schema.Parser().parse(
fs.open(new Path(config.getString(Config.TARGET_SCHEMA_FILE_PROP))));
fs.open(new Path(props.getString(Config.SOURCE_SCHEMA_FILE_PROP))));
if (props.containsKey(Config.TARGET_SCHEMA_FILE_PROP)) {
this.targetSchema = new Schema.Parser().parse(
fs.open(new Path(props.getString(Config.TARGET_SCHEMA_FILE_PROP))));
}
} catch (IOException ioe) {
throw new HoodieIOException("Error reading schema", ioe);
}
@@ -76,6 +73,10 @@ public class FilebasedSchemaProvider extends SchemaProvider {
@Override
public Schema getTargetSchema() {
return targetSchema;
if (targetSchema != null) {
return targetSchema;
} else {
return super.getTargetSchema();
}
}
}

View File

@@ -18,22 +18,29 @@
package com.uber.hoodie.utilities.schema;
import com.uber.hoodie.common.util.TypedProperties;
import java.io.Serializable;
import org.apache.avro.Schema;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.spark.api.java.JavaSparkContext;
/**
* Class to provide schema for reading data and also writing into a Hoodie table
*/
public abstract class SchemaProvider implements Serializable {
protected PropertiesConfiguration config;
protected TypedProperties config;
protected SchemaProvider(PropertiesConfiguration config) {
this.config = config;
protected JavaSparkContext jssc;
protected SchemaProvider(TypedProperties props, JavaSparkContext jssc) {
this.config = props;
this.jssc = jssc;
}
public abstract Schema getSourceSchema();
public abstract Schema getTargetSchema();
public Schema getTargetSchema() {
// by default, use source schema as target for hoodie dataset as well
return getSourceSchema();
}
}

View File

@@ -0,0 +1,71 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities.schema;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.uber.hoodie.DataSourceUtils;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
import java.net.URL;
import java.util.Arrays;
import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaSparkContext;
/**
* Obtains latest schema from the Confluent/Kafka schema-registry
*
* https://github.com/confluentinc/schema-registry
*/
public class SchemaRegistryProvider extends SchemaProvider {
/**
* Configs supported
*/
public static class Config {
private static final String SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.url";
}
private final Schema schema;
private String fetchSchemaFromRegistry(String registryUrl) throws IOException {
URL registry = new URL(registryUrl);
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(registry.openStream());
return node.get("schema").asText();
}
public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.SCHEMA_REGISTRY_URL_PROP));
String registryUrl = props.getString(Config.SCHEMA_REGISTRY_URL_PROP);
try {
this.schema = new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl));
} catch (IOException ioe) {
throw new HoodieIOException("Error reading schema from registry :" + registryUrl, ioe);
}
}
@Override
public Schema getSourceSchema() {
return schema;
}
}

View File

@@ -27,7 +27,7 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
/**
* Convert a variety of {@link SourceDataFormat} into Avro GenericRecords. Has a bunch of lazy
* Convert a variety of datum into Avro GenericRecords. Has a bunch of lazy
* fields to circumvent issues around serializing these objects from driver to executors
*/
public class AvroConvertor implements Serializable {
@@ -82,6 +82,10 @@ public class AvroConvertor implements Serializable {
return jsonConverter.convert(json);
}
public Schema getSchema() {
return new Schema.Parser().parse(schemaStr);
}
public GenericRecord fromAvroBinary(byte[] avroBinary) throws IOException {
initSchema();

View File

@@ -0,0 +1,47 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.hadoop.io.NullWritable;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
/**
* DFS Source that reads avro data
*/
public class AvroDFSSource extends DFSSource {
public AvroDFSSource(TypedProperties props, JavaSparkContext sparkContext, SchemaProvider schemaProvider) {
super(props, sparkContext, schemaProvider);
}
@Override
protected JavaRDD<GenericRecord> fromFiles(AvroConvertor convertor, String pathStr) {
JavaPairRDD<AvroKey, NullWritable> avroRDD = sparkContext.newAPIHadoopFile(pathStr,
AvroKeyInputFormat.class, AvroKey.class, NullWritable.class,
sparkContext.hadoopConfiguration());
return avroRDD.keys().map(r -> ((GenericRecord) r.datum()));
}
}

View File

@@ -0,0 +1,47 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import io.confluent.kafka.serializers.KafkaAvroDecoder;
import kafka.serializer.StringDecoder;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
/**
* Reads avro serialized Kafka data, based on the confluent schema-registry
*/
public class AvroKafkaSource extends KafkaSource {
public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SchemaProvider schemaProvider) {
super(props, sparkContext, schemaProvider);
}
@Override
protected JavaRDD<GenericRecord> toAvroRDD(OffsetRange[] offsetRanges, AvroConvertor avroConvertor) {
JavaRDD<GenericRecord> recordRDD = KafkaUtils
.createRDD(sparkContext, String.class, Object.class, StringDecoder.class, KafkaAvroDecoder.class, kafkaParams,
offsetRanges).values().map(obj -> (GenericRecord) obj);
return recordRDD;
}
}

View File

@@ -20,8 +20,10 @@ package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.DataSourceUtils;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.common.util.collection.ImmutablePair;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.HoodieNotSupportedException;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import java.io.IOException;
import java.util.ArrayList;
@@ -30,31 +32,23 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.NullWritable;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
/**
* Source to read data from a given DFS directory structure, incrementally
*/
public class DFSSource extends Source {
public abstract class DFSSource extends Source {
/**
* Configs supported
*/
static class Config {
private static final String ROOT_INPUT_PATH_PROP = "hoodie.deltastreamer.source.dfs.root";
}
@@ -62,50 +56,23 @@ public class DFSSource extends Source {
private final transient FileSystem fs;
public DFSSource(PropertiesConfiguration config, JavaSparkContext sparkContext,
SourceDataFormat dataFormat, SchemaProvider schemaProvider) {
super(config, sparkContext, dataFormat, schemaProvider);
this.fs = FSUtils.getFs(config.getBasePath(), sparkContext.hadoopConfiguration());
DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.ROOT_INPUT_PATH_PROP));
}
public static JavaRDD<GenericRecord> fromAvroFiles(final AvroConvertor convertor, String pathStr,
JavaSparkContext sparkContext) {
JavaPairRDD<AvroKey, NullWritable> avroRDD = sparkContext.newAPIHadoopFile(pathStr,
AvroKeyInputFormat.class, AvroKey.class, NullWritable.class,
sparkContext.hadoopConfiguration());
return avroRDD.keys().map(r -> ((GenericRecord) r.datum()));
}
public static JavaRDD<GenericRecord> fromJsonFiles(final AvroConvertor convertor, String pathStr,
JavaSparkContext sparkContext) {
return sparkContext.textFile(pathStr).map((String j) -> {
return convertor.fromJson(j);
});
}
public static JavaRDD<GenericRecord> fromFiles(SourceDataFormat dataFormat,
final AvroConvertor convertor, String pathStr, JavaSparkContext sparkContext) {
if (dataFormat == SourceDataFormat.AVRO) {
return DFSSource.fromAvroFiles(convertor, pathStr, sparkContext);
} else if (dataFormat == SourceDataFormat.JSON) {
return DFSSource.fromJsonFiles(convertor, pathStr, sparkContext);
} else {
throw new HoodieNotSupportedException("Unsupported data format :" + dataFormat);
}
public DFSSource(TypedProperties props, JavaSparkContext sparkContext, SchemaProvider schemaProvider) {
super(props, sparkContext, schemaProvider);
DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.ROOT_INPUT_PATH_PROP));
this.fs = FSUtils.getFs(props.getString(Config.ROOT_INPUT_PATH_PROP), sparkContext.hadoopConfiguration());
}
protected abstract JavaRDD<GenericRecord> fromFiles(final AvroConvertor convertor, String pathStr);
@Override
public Pair<Optional<JavaRDD<GenericRecord>>, String> fetchNewData(
Optional<String> lastCheckpointStr, long maxInputBytes) {
Optional<String> lastCheckpointStr, long sourceLimit) {
try {
// obtain all eligible files under root folder.
List<FileStatus> eligibleFiles = new ArrayList<>();
RemoteIterator<LocatedFileStatus> fitr = fs.listFiles(
new Path(config.getString(Config.ROOT_INPUT_PATH_PROP)), true);
new Path(props.getString(Config.ROOT_INPUT_PATH_PROP)), true);
while (fitr.hasNext()) {
LocatedFileStatus fileStatus = fitr.next();
if (fileStatus.isDirectory() || IGNORE_FILEPREFIX_LIST.stream().filter(
@@ -130,13 +97,14 @@ public class DFSSource extends Source {
continue;
}
maxModificationTime = f.getModificationTime();
currentBytes += f.getLen();
filteredFiles.add(f);
if (currentBytes >= maxInputBytes) {
if (currentBytes + f.getLen() >= sourceLimit) {
// we have enough data, we are done
break;
}
maxModificationTime = f.getModificationTime();
currentBytes += f.getLen();
filteredFiles.add(f);
}
// no data to read
@@ -153,7 +121,7 @@ public class DFSSource extends Source {
final AvroConvertor avroConvertor = new AvroConvertor(schemaStr);
return new ImmutablePair<>(
Optional.of(DFSSource.fromFiles(dataFormat, avroConvertor, pathStr, sparkContext)),
Optional.of(fromFiles(avroConvertor, pathStr)),
String.valueOf(maxModificationTime));
} catch (IOException ioe) {
throw new HoodieIOException(

View File

@@ -20,6 +20,9 @@ package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.DataSourceUtils;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.common.util.collection.ImmutablePair;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import java.io.IOException;
@@ -30,14 +33,15 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -69,12 +73,12 @@ public class HiveIncrPullSource extends Source {
private static final String ROOT_INPUT_PATH_PROP = "hoodie.deltastreamer.source.incrpull.root";
}
public HiveIncrPullSource(PropertiesConfiguration config, JavaSparkContext sparkContext,
SourceDataFormat dataFormat, SchemaProvider schemaProvider) {
super(config, sparkContext, dataFormat, schemaProvider);
this.fs = FSUtils.getFs(config.getBasePath(), sparkContext.hadoopConfiguration());
DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.ROOT_INPUT_PATH_PROP));
this.incrPullRootPath = config.getString(Config.ROOT_INPUT_PATH_PROP);
public HiveIncrPullSource(TypedProperties props, JavaSparkContext sparkContext,
SchemaProvider schemaProvider) {
super(props, sparkContext, schemaProvider);
DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.ROOT_INPUT_PATH_PROP));
this.incrPullRootPath = props.getString(Config.ROOT_INPUT_PATH_PROP);
this.fs = FSUtils.getFs(incrPullRootPath, sparkContext.hadoopConfiguration());
}
/**
@@ -110,7 +114,7 @@ public class HiveIncrPullSource extends Source {
@Override
public Pair<Optional<JavaRDD<GenericRecord>>, String> fetchNewData(
Optional<String> lastCheckpointStr, long maxInputBytes) {
Optional<String> lastCheckpointStr, long sourceLimit) {
try {
// find the source commit to pull
Optional<String> commitToPull = findCommitToPull(lastCheckpointStr);
@@ -125,10 +129,10 @@ public class HiveIncrPullSource extends Source {
fs.listStatus(new Path(incrPullRootPath, commitToPull.get())));
String pathStr = commitDeltaFiles.stream().map(f -> f.getPath().toString())
.collect(Collectors.joining(","));
String schemaStr = schemaProvider.getSourceSchema().toString();
final AvroConvertor avroConvertor = new AvroConvertor(schemaStr);
return new ImmutablePair<>(
Optional.of(DFSSource.fromFiles(dataFormat, avroConvertor, pathStr, sparkContext)),
JavaPairRDD<AvroKey, NullWritable> avroRDD = sparkContext.newAPIHadoopFile(pathStr,
AvroKeyInputFormat.class, AvroKey.class, NullWritable.class,
sparkContext.hadoopConfiguration());
return new ImmutablePair<>(Optional.of(avroRDD.keys().map(r -> ((GenericRecord) r.datum()))),
String.valueOf(commitToPull.get()));
} catch (IOException ioe) {
throw new HoodieIOException(

View File

@@ -18,12 +18,23 @@
package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
/**
* Format of the data within source.
* DFS Source that reads json data
*/
public enum SourceDataFormat {
AVRO, // No conversion needed explicitly to avro
JSON, // we will try to convert to avro
ROW, // Will be added later, so we can plug/play with spark sources.
CUSTOM // the source is responsible for conversion to avro.
public class JsonDFSSource extends DFSSource {
public JsonDFSSource(TypedProperties props, JavaSparkContext sparkContext, SchemaProvider schemaProvider) {
super(props, sparkContext, schemaProvider);
}
@Override
protected JavaRDD<GenericRecord> fromFiles(AvroConvertor convertor, String pathStr) {
return sparkContext.textFile(pathStr).map((String j) -> convertor.fromJson(j));
}
}

View File

@@ -0,0 +1,45 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import kafka.serializer.StringDecoder;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
/**
* Read json kafka data
*/
public class JsonKafkaSource extends KafkaSource {
public JsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SchemaProvider schemaProvider) {
super(properties, sparkContext, schemaProvider);
}
@Override
protected JavaRDD<GenericRecord> toAvroRDD(OffsetRange[] offsetRanges, AvroConvertor avroConvertor) {
return KafkaUtils.createRDD(sparkContext, String.class, String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, offsetRanges)
.values().map(jsonStr -> avroConvertor.fromJson(jsonStr));
}
}

View File

@@ -19,33 +19,26 @@
package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.DataSourceUtils;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.common.util.collection.ImmutablePair;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.exception.HoodieNotSupportedException;
import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import kafka.common.TopicAndPartition;
import kafka.serializer.DefaultDecoder;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.kafka.KafkaCluster;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset;
import org.apache.spark.streaming.kafka.OffsetRange;
import scala.Predef;
import scala.collection.JavaConverters;
@@ -59,10 +52,12 @@ import scala.util.Either;
/**
* Source to read data from Kafka, incrementally
*/
public class KafkaSource extends Source {
public abstract class KafkaSource extends Source {
private static volatile Logger log = LogManager.getLogger(KafkaSource.class);
private static long DEFAULT_MAX_EVENTS_TO_READ = 1000000; // 1M events max
static class CheckpointUtils {
@@ -72,6 +67,9 @@ public class KafkaSource extends Source {
public static HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> strToOffsets(
String checkpointStr) {
HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> offsetMap = new HashMap<>();
if (checkpointStr.length() == 0) {
return offsetMap;
}
String[] splits = checkpointStr.split(",");
String topic = splits[0];
for (int i = 1; i < splits.length; i++) {
@@ -83,46 +81,70 @@ public class KafkaSource extends Source {
}
/**
* String representation of checkpoint
* <p>
* Format: topic1,0:offset0,1:offset1,2:offset2, .....
* String representation of checkpoint <p> Format: topic1,0:offset0,1:offset1,2:offset2, .....
*/
public static String offsetsToStr(
HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> offsetMap) {
public static String offsetsToStr(OffsetRange[] ranges) {
StringBuilder sb = new StringBuilder();
// atleast 1 partition will be present.
sb.append(offsetMap.entrySet().stream().findFirst().get().getKey().topic() + ",");
sb.append(offsetMap.entrySet().stream()
.map(e -> String.format("%s:%d", e.getKey().partition(), e.getValue().offset()))
sb.append(ranges[0].topic() + ",");
sb.append(Arrays.stream(ranges)
.map(r -> String.format("%s:%d", r.partition(), r.untilOffset()))
.collect(Collectors.joining(",")));
return sb.toString();
}
public static OffsetRange[] computeOffsetRanges(
HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> fromOffsetMap,
HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> toOffsetMap) {
Comparator<OffsetRange> byPartition = (OffsetRange o1, OffsetRange o2) -> {
return Integer.valueOf(o1.partition()).compareTo(Integer.valueOf(o2.partition()));
};
List<OffsetRange> offsetRanges = toOffsetMap.entrySet().stream().map(e -> {
TopicAndPartition tp = e.getKey();
long fromOffset = -1;
if (fromOffsetMap.containsKey(tp)) {
fromOffset = fromOffsetMap.get(tp).offset();
}
return OffsetRange.create(tp, fromOffset, e.getValue().offset());
}).sorted(byPartition).collect(Collectors.toList());
OffsetRange[] ranges = new OffsetRange[offsetRanges.size()];
return offsetRanges.toArray(ranges);
/**
* Compute the offset ranges to read from Kafka, while handling newly added partitions, skews, event limits.
*
* @param fromOffsetMap offsets where we left off last time
* @param toOffsetMap offsets of where each partitions is currently at
* @param numEvents maximum number of events to read.
*/
public static OffsetRange[] computeOffsetRanges(
HashMap<TopicAndPartition, LeaderOffset> fromOffsetMap,
HashMap<TopicAndPartition, LeaderOffset> toOffsetMap,
long numEvents) {
Comparator<OffsetRange> byPartition = (OffsetRange o1, OffsetRange o2) ->
Integer.valueOf(o1.partition()).compareTo(Integer.valueOf(o2.partition()));
// Create initial offset ranges for each 'to' partition, with from = to offsets.
OffsetRange[] ranges = new OffsetRange[toOffsetMap.size()];
toOffsetMap.entrySet().stream().map(e -> {
TopicAndPartition tp = e.getKey();
long fromOffset = fromOffsetMap.getOrDefault(tp, new LeaderOffset("", -1, 0)).offset();
return OffsetRange.create(tp, fromOffset, fromOffset);
}).sorted(byPartition).collect(Collectors.toList()).toArray(ranges);
long allocedEvents = 0;
java.util.Set<Integer> exhaustedPartitions = new HashSet<>();
// keep going until we have events to allocate and partitions still not exhausted.
while (allocedEvents < numEvents && exhaustedPartitions.size() < toOffsetMap.size()) {
long remainingEvents = numEvents - allocedEvents;
long eventsPerPartition = (long) Math
.ceil((1.0 * remainingEvents) / (toOffsetMap.size() - exhaustedPartitions.size()));
// Allocate the remaining events to non-exhausted partitions, in round robin fashion
for (int i = 0; i < ranges.length; i++) {
OffsetRange range = ranges[i];
if (!exhaustedPartitions.contains(range.partition())) {
long toOffsetMax = toOffsetMap.get(range.topicAndPartition()).offset();
long toOffset = Math.min(toOffsetMax, range.untilOffset() + eventsPerPartition);
if (toOffset == toOffsetMax) {
exhaustedPartitions.add(range.partition());
}
allocedEvents += toOffset - range.untilOffset();
ranges[i] = OffsetRange.create(range.topicAndPartition(), range.fromOffset(), toOffset);
}
}
}
return ranges;
}
public static long totalNewMessages(OffsetRange[] ranges) {
long totalMsgs = 0;
for (OffsetRange range : ranges) {
totalMsgs += Math.max(range.untilOffset() - range.fromOffset(), 0);
}
return totalMsgs;
return Arrays.asList(ranges).stream().mapToLong(r -> r.count()).sum();
}
}
@@ -149,32 +171,31 @@ public class KafkaSource extends Source {
* Configs to be passed for this source. All standard Kafka consumer configs are also respected
*/
static class Config {
private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic";
private static final String DEFAULT_AUTO_RESET_OFFSET = "largest";
}
private HashMap<String, String> kafkaParams;
protected HashMap<String, String> kafkaParams;
private final String topicName;
protected final String topicName;
public KafkaSource(PropertiesConfiguration config, JavaSparkContext sparkContext,
SourceDataFormat dataFormat, SchemaProvider schemaProvider) {
super(config, sparkContext, dataFormat, schemaProvider);
public KafkaSource(TypedProperties props, JavaSparkContext sparkContext, SchemaProvider schemaProvider) {
super(props, sparkContext, schemaProvider);
kafkaParams = new HashMap<>();
Stream<String> keys = StreamSupport.stream(
Spliterators.spliteratorUnknownSize(config.getKeys(), Spliterator.NONNULL), false);
keys.forEach(k -> kafkaParams.put(k, config.getString(k)));
DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.KAFKA_TOPIC_NAME));
topicName = config.getString(Config.KAFKA_TOPIC_NAME);
for (Object prop : props.keySet()) {
kafkaParams.put(prop.toString(), props.getString(prop.toString()));
}
DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.KAFKA_TOPIC_NAME));
topicName = props.getString(Config.KAFKA_TOPIC_NAME);
}
protected abstract JavaRDD<GenericRecord> toAvroRDD(OffsetRange[] offsetRanges, AvroConvertor avroConvertor);
@Override
public Pair<Optional<JavaRDD<GenericRecord>>, String> fetchNewData(
Optional<String> lastCheckpointStr, long maxInputBytes) {
Optional<String> lastCheckpointStr, long sourceLimit) {
// Obtain current metadata for the topic
KafkaCluster cluster = new KafkaCluster(ScalaHelpers.toScalaMap(kafkaParams));
@@ -192,7 +213,7 @@ public class KafkaSource extends Source {
if (lastCheckpointStr.isPresent()) {
fromOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
} else {
String autoResetValue = config
String autoResetValue = props
.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET);
if (autoResetValue.equals("smallest")) {
fromOffsets = new HashMap(ScalaHelpers.toJavaMap(
@@ -206,40 +227,23 @@ public class KafkaSource extends Source {
}
}
// Always read until the latest offset
// Obtain the latest offsets.
HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> toOffsets = new HashMap(
ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get()));
// Come up with final set of OffsetRanges to read (account for new partitions)
// TODO(vc): Respect maxInputBytes, by estimating number of messages to read each batch from
// partition size
OffsetRange[] offsetRanges = CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets);
// Come up with final set of OffsetRanges to read (account for new partitions, limit number of events)
long numEvents = Math.min(DEFAULT_MAX_EVENTS_TO_READ, sourceLimit);
OffsetRange[] offsetRanges = CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents);
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
if (totalNewMsgs <= 0) {
return new ImmutablePair<>(Optional.empty(),
lastCheckpointStr.isPresent() ? lastCheckpointStr.get()
: CheckpointUtils.offsetsToStr(toOffsets));
return new ImmutablePair<>(Optional.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : "");
} else {
log.info("About to read " + totalNewMsgs + " from Kafka for topic :" + topicName);
}
// Perform the actual read from Kafka
JavaRDD<byte[]> kafkaRDD = KafkaUtils.createRDD(sparkContext, byte[].class, byte[].class,
DefaultDecoder.class, DefaultDecoder.class, kafkaParams, offsetRanges).values();
// Produce a RDD[GenericRecord]
final AvroConvertor avroConvertor = new AvroConvertor(
schemaProvider.getSourceSchema().toString());
JavaRDD<GenericRecord> newDataRDD;
if (dataFormat == SourceDataFormat.AVRO) {
newDataRDD = kafkaRDD.map(bytes -> avroConvertor.fromAvroBinary(bytes));
} else if (dataFormat == SourceDataFormat.JSON) {
newDataRDD = kafkaRDD.map(
bytes -> avroConvertor.fromJson(new String(bytes, Charset.forName("utf-8"))));
} else {
throw new HoodieNotSupportedException("Unsupport data format :" + dataFormat);
}
return new ImmutablePair<>(Optional.of(newDataRDD), CheckpointUtils.offsetsToStr(toOffsets));
final AvroConvertor avroConvertor = new AvroConvertor(schemaProvider.getSourceSchema().toString());
JavaRDD<GenericRecord> newDataRDD = toAvroRDD(offsetRanges, avroConvertor);
return new ImmutablePair<>(Optional.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges));
}
}

View File

@@ -18,12 +18,12 @@
package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import java.io.Serializable;
import java.util.Optional;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -32,32 +32,23 @@ import org.apache.spark.api.java.JavaSparkContext;
*/
public abstract class Source implements Serializable {
protected transient PropertiesConfiguration config;
protected transient TypedProperties props;
protected transient JavaSparkContext sparkContext;
protected transient SourceDataFormat dataFormat;
protected transient SchemaProvider schemaProvider;
protected Source(PropertiesConfiguration config, JavaSparkContext sparkContext,
SourceDataFormat dataFormat, SchemaProvider schemaProvider) {
this.config = config;
protected Source(TypedProperties props, JavaSparkContext sparkContext, SchemaProvider schemaProvider) {
this.props = props;
this.sparkContext = sparkContext;
this.dataFormat = dataFormat;
this.schemaProvider = schemaProvider;
}
/**
* Fetches new data upto maxInputBytes, from the provided checkpoint and returns an RDD of the
* Fetches new data upto sourceLimit, from the provided checkpoint and returns an RDD of the
* data, as well as the checkpoint to be written as a result of that.
*/
public abstract Pair<Optional<JavaRDD<GenericRecord>>, String> fetchNewData(
Optional<String> lastCheckpointStr, long maxInputBytes);
public PropertiesConfiguration getConfig() {
return config;
}
Optional<String> lastCheckpointStr, long sourceLimit);
}

View File

@@ -1,19 +0,0 @@
#
# Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
#
hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.partitionpath.field=driver

View File

@@ -1,29 +0,0 @@
{
"type" : "record",
"name" : "triprec",
"fields" : [ {
"name" : "_row_key",
"type" : "string"
}, {
"name" : "rider",
"type" : "string"
}, {
"name" : "driver",
"type" : "string"
}, {
"name" : "begin_lat",
"type" : "double"
}, {
"name" : "begin_lon",
"type" : "double"
}, {
"name" : "end_lat",
"type" : "double"
}, {
"name" : "end_lon",
"type" : "double"
}, {
"name" : "fare",
"type" : "double"
} ]
}

View File

@@ -0,0 +1,162 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.DFSPropertiesConfiguration;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.exception.DatasetNotFoundException;
import com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer;
import com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
import com.uber.hoodie.utilities.sources.TestDataSource;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.SQLContext;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Basic tests against {@link com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer}, by issuing bulk_inserts,
* upserts, inserts. Check counts at the end.
*/
public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
private static volatile Logger log = LogManager.getLogger(TestHoodieDeltaStreamer.class);
@BeforeClass
public static void initClass() throws Exception {
UtilitiesTestBase.initClass();
// prepare the configs.
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc");
TypedProperties props = new TypedProperties();
props.setProperty("include", "base.properties");
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/test-source.properties");
}
@AfterClass
public static void cleanupClass() throws Exception {
UtilitiesTestBase.cleanupClass();
}
@Before
public void setup() throws Exception {
super.setup();
TestDataSource.initDataGen();
}
@After
public void teardown() throws Exception {
super.teardown();
TestDataSource.resetDataGen();
}
static class TestHelpers {
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op) {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetBasePath = basePath;
cfg.targetTableName = "hoodie_trips";
cfg.sourceClassName = TestDataSource.class.getName();
cfg.operation = op;
cfg.sourceOrderingField = "timestamp";
cfg.propsFilePath = dfsBasePath + "/test-source.properties";
cfg.sourceLimit = 1000;
return cfg;
}
static void assertRecordCount(long expected, String datasetPath, SQLContext sqlContext) {
long recordCount = sqlContext.read().format("com.uber.hoodie").load(datasetPath).count();
assertEquals(expected, recordCount);
}
static void assertCommitMetadata(String expected, String datasetPath, FileSystem fs, int totalCommits)
throws IOException {
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), datasetPath);
HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
HoodieInstant lastCommit = timeline.lastInstant().get();
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
timeline.getInstantDetails(lastCommit).get(), HoodieCommitMetadata.class);
assertEquals(totalCommits, timeline.countInstants());
assertEquals(expected, commitMetadata.getMetadata(HoodieDeltaStreamer.CHECKPOINT_KEY));
}
}
@Test
public void testProps() throws IOException {
TypedProperties props = new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/test-source.properties"))
.getConfig();
assertEquals(2, props.getInteger("hoodie.upsert.shuffle.parallelism"));
assertEquals("_row_key", props.getString("hoodie.datasource.write.recordkey.field"));
}
@Test
public void testDatasetCreation() throws Exception {
try {
dfs.mkdirs(new Path(dfsBasePath + "/not_a_dataset"));
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(dfsBasePath + "/not_a_dataset", Operation.BULK_INSERT), jsc);
deltaStreamer.sync();
fail("Should error out when pointed out at a dir thats not a dataset");
} catch (DatasetNotFoundException e) {
//expected
log.error("Expected error during dataset creation", e);
}
}
@Test
public void testBulkInsertsAndUpserts() throws Exception {
String datasetBasePath = dfsBasePath + "/test_dataset";
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(datasetBasePath, Operation.BULK_INSERT);
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1000, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata("00000", datasetBasePath, dfs, 1);
// No new data => no commits.
cfg.sourceLimit = 0;
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1000, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata("00000", datasetBasePath, dfs, 1);
// upsert() #1
cfg.sourceLimit = 2000;
cfg.operation = Operation.UPSERT;
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(2000, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata("00001", datasetBasePath, dfs, 2);
}
}

View File

@@ -0,0 +1,137 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities;
import com.uber.hoodie.common.TestRawTripPayload;
import com.uber.hoodie.common.minicluster.HdfsTestService;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.utilities.sources.TestDataSource;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
/**
* Abstract test that provides a dfs & spark contexts.
*
* TODO(vc): this needs to be done across the board.
*/
public class UtilitiesTestBase {
protected static String dfsBasePath;
protected static HdfsTestService hdfsTestService;
protected static MiniDFSCluster dfsCluster;
protected static DistributedFileSystem dfs;
protected transient JavaSparkContext jsc = null;
protected transient SQLContext sqlContext;
@BeforeClass
public static void initClass() throws Exception {
hdfsTestService = new HdfsTestService();
dfsCluster = hdfsTestService.start(true);
dfs = dfsCluster.getFileSystem();
dfsBasePath = dfs.getWorkingDirectory().toString();
dfs.mkdirs(new Path(dfsBasePath));
}
@AfterClass
public static void cleanupClass() throws Exception {
if (hdfsTestService != null) {
hdfsTestService.stop();
}
}
@Before
public void setup() throws Exception {
TestDataSource.initDataGen();
jsc = UtilHelpers.buildSparkContext(this.getClass().getName() + "-hoodie", "local[2]");
sqlContext = new SQLContext(jsc);
}
@After
public void teardown() throws Exception {
TestDataSource.resetDataGen();
if (jsc != null) {
jsc.stop();
}
}
public static class Helpers {
// to get hold of resources bundled with jar
private static ClassLoader classLoader = Helpers.class.getClassLoader();
public static void copyToDFS(String testResourcePath, FileSystem fs, String targetPath) throws IOException {
BufferedReader reader = new BufferedReader(
new InputStreamReader(classLoader.getResourceAsStream(testResourcePath)));
PrintStream os = new PrintStream(fs.create(new Path(targetPath), true));
String line;
while ((line = reader.readLine()) != null) {
os.println(line);
}
os.flush();
os.close();
}
public static void savePropsToDFS(TypedProperties props, FileSystem fs, String targetPath) throws IOException {
String[] lines = props.keySet().stream().map(k -> String.format("%s=%s", k, props.get(k))).toArray(String[]::new);
saveStringsToDFS(lines, fs, targetPath);
}
public static void saveStringsToDFS(String[] lines, FileSystem fs, String targetPath) throws IOException {
PrintStream os = new PrintStream(fs.create(new Path(targetPath), true));
for (String l : lines) {
os.println(l);
}
os.flush();
os.close();
}
public static TypedProperties setupSchemaOnDFS() throws IOException {
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc");
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
return props;
}
public static String toJsonString(HoodieRecord hr) {
try {
return ((TestRawTripPayload) hr.getData()).getJsonData();
} catch (IOException ioe) {
return null;
}
}
public static String[] jsonifyRecords(List<HoodieRecord> records) throws IOException {
return records.stream().map(Helpers::toJsonString).toArray(String[]::new);
}
}
}

View File

@@ -0,0 +1,104 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities.sources;
import static org.junit.Assert.assertEquals;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.utilities.UtilitiesTestBase;
import com.uber.hoodie.utilities.schema.FilebasedSchemaProvider;
import java.io.IOException;
import java.util.Optional;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Basic tests against all subclasses of {@link DFSSource}
*/
public class TestDFSSource extends UtilitiesTestBase {
private FilebasedSchemaProvider schemaProvider;
@BeforeClass
public static void initClass() throws Exception {
UtilitiesTestBase.initClass();
}
@AfterClass
public static void cleanupClass() throws Exception {
UtilitiesTestBase.cleanupClass();
}
@Before
public void setup() throws Exception {
super.setup();
schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc);
}
@After
public void teardown() throws Exception {
super.teardown();
}
@Test
public void testJsonDFSSource() throws IOException {
dfs.mkdirs(new Path(dfsBasePath + "/jsonFiles"));
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsBasePath + "/jsonFiles");
JsonDFSSource jsonSource = new JsonDFSSource(props, jsc, schemaProvider);
// 1. Extract without any checkpoint => get all the data, respecting sourceLimit
assertEquals(Optional.empty(), jsonSource.fetchNewData(Optional.empty(), Long.MAX_VALUE).getKey());
UtilitiesTestBase.Helpers.saveStringsToDFS(
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 100)), dfs,
dfsBasePath + "/jsonFiles/1.json");
assertEquals(Optional.empty(), jsonSource.fetchNewData(Optional.empty(), 10).getKey());
Pair<Optional<JavaRDD<GenericRecord>>, String> fetch1 = jsonSource.fetchNewData(Optional.empty(), 1000000);
assertEquals(100, fetch1.getKey().get().count());
// 2. Produce new data, extract new data
UtilitiesTestBase.Helpers.saveStringsToDFS(
Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 10000)),
dfs, dfsBasePath + "/jsonFiles/2.json");
Pair<Optional<JavaRDD<GenericRecord>>, String> fetch2 = jsonSource.fetchNewData(
Optional.of(fetch1.getValue()), Long.MAX_VALUE);
assertEquals(10000, fetch2.getKey().get().count());
// 3. Extract with previous checkpoint => gives same data back (idempotent)
Pair<Optional<JavaRDD<GenericRecord>>, String> fetch3 = jsonSource.fetchNewData(
Optional.of(fetch1.getValue()), Long.MAX_VALUE);
assertEquals(10000, fetch3.getKey().get().count());
assertEquals(fetch2.getValue(), fetch3.getValue());
// 4. Extract with latest checkpoint => no new data returned
Pair<Optional<JavaRDD<GenericRecord>>, String> fetch4 = jsonSource.fetchNewData(
Optional.of(fetch2.getValue()), Long.MAX_VALUE);
assertEquals(Optional.empty(), fetch4.getKey());
}
}

View File

@@ -0,0 +1,99 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.common.util.collection.ImmutablePair;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
/**
* An implementation of {@link Source}, that emits test upserts.
*/
public class TestDataSource extends Source {
private static volatile Logger log = LogManager.getLogger(TestDataSource.class);
// Static instance, helps with reuse across a test.
private static HoodieTestDataGenerator dataGenerator;
public static void initDataGen() {
dataGenerator = new HoodieTestDataGenerator();
}
public static void resetDataGen() {
dataGenerator = null;
}
public TestDataSource(TypedProperties props, JavaSparkContext sparkContext, SchemaProvider schemaProvider) {
super(props, sparkContext, schemaProvider);
}
private GenericRecord toGenericRecord(HoodieRecord hoodieRecord) {
try {
Optional<IndexedRecord> recordOpt = hoodieRecord.getData().getInsertValue(dataGenerator.avroSchema);
return (GenericRecord) recordOpt.get();
} catch (IOException e) {
return null;
}
}
@Override
public Pair<Optional<JavaRDD<GenericRecord>>, String> fetchNewData(Optional<String> lastCheckpointStr,
long sourceLimit) {
int nextCommitNum = lastCheckpointStr.isPresent() ? Integer.parseInt(lastCheckpointStr.get()) + 1 : 0;
String commitTime = String.format("%05d", nextCommitNum);
// No new data.
if (sourceLimit <= 0) {
return new ImmutablePair<>(Optional.empty(), commitTime);
}
// generate `sourceLimit` number of upserts each time.
int numExistingKeys = dataGenerator.getExistingKeysList().size();
int numUpdates = Math.min(numExistingKeys, (int) sourceLimit / 2);
int numInserts = (int) sourceLimit - numUpdates;
List<GenericRecord> records = new ArrayList<>();
try {
records.addAll(dataGenerator.generateUniqueUpdates(commitTime, numUpdates).stream()
.map(this::toGenericRecord).collect(Collectors.toList()));
records.addAll(dataGenerator.generateInserts(commitTime, numInserts).stream()
.map(this::toGenericRecord).collect(Collectors.toList()));
} catch (IOException e) {
log.error("Error generating test data.", e);
}
JavaRDD<GenericRecord> avroRDD = sparkContext.<GenericRecord>parallelize(records, 4);
return new ImmutablePair<>(Optional.of(avroRDD), commitTime);
}
}

View File

@@ -0,0 +1,186 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities.sources;
import static com.uber.hoodie.utilities.sources.KafkaSource.CheckpointUtils;
import static org.junit.Assert.assertEquals;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.utilities.UtilitiesTestBase;
import com.uber.hoodie.utilities.schema.FilebasedSchemaProvider;
import java.io.IOException;
import java.util.HashMap;
import java.util.Optional;
import kafka.common.TopicAndPartition;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset;
import org.apache.spark.streaming.kafka.KafkaTestUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Tests against {@link KafkaSource}
*/
public class TestKafkaSource extends UtilitiesTestBase {
private static String TEST_TOPIC_NAME = "hoodie_test";
private FilebasedSchemaProvider schemaProvider;
private KafkaTestUtils testUtils;
@BeforeClass
public static void initClass() throws Exception {
UtilitiesTestBase.initClass();
}
@AfterClass
public static void cleanupClass() throws Exception {
UtilitiesTestBase.cleanupClass();
}
@Before
public void setup() throws Exception {
super.setup();
schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc);
testUtils = new KafkaTestUtils();
testUtils.setup();
}
@After
public void teardown() throws Exception {
super.teardown();
testUtils.teardown();
}
@Test
public void testJsonKafkaSource() throws IOException {
// topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
props.setProperty("metadata.broker.list", testUtils.brokerAddress());
props.setProperty("auto.offset.reset", "smallest");
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Source kafkaSource = new JsonKafkaSource(props, jsc, schemaProvider);
// 1. Extract without any checkpoint => get all the data, respecting sourceLimit
assertEquals(Optional.empty(), kafkaSource.fetchNewData(Optional.empty(), Long.MAX_VALUE).getKey());
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
Pair<Optional<JavaRDD<GenericRecord>>, String> fetch1 = kafkaSource.fetchNewData(Optional.empty(), 900);
assertEquals(900, fetch1.getKey().get().count());
// 2. Produce new data, extract new data
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
Pair<Optional<JavaRDD<GenericRecord>>, String> fetch2 = kafkaSource.fetchNewData(
Optional.of(fetch1.getValue()), Long.MAX_VALUE);
assertEquals(1100, fetch2.getKey().get().count());
// 3. Extract with previous checkpoint => gives same data back (idempotent)
Pair<Optional<JavaRDD<GenericRecord>>, String> fetch3 = kafkaSource.fetchNewData(
Optional.of(fetch1.getValue()), Long.MAX_VALUE);
assertEquals(fetch2.getKey().get().count(), fetch3.getKey().get().count());
assertEquals(fetch2.getValue(), fetch3.getValue());
// 4. Extract with latest checkpoint => no new data returned
Pair<Optional<JavaRDD<GenericRecord>>, String> fetch4 = kafkaSource.fetchNewData(
Optional.of(fetch2.getValue()), Long.MAX_VALUE);
assertEquals(Optional.empty(), fetch4.getKey());
}
private static HashMap<TopicAndPartition, LeaderOffset> makeOffsetMap(int[] partitions, long[] offsets) {
HashMap<TopicAndPartition, LeaderOffset> map = new HashMap<>();
for (int i = 0; i < partitions.length; i++) {
map.put(new TopicAndPartition(TEST_TOPIC_NAME, partitions[i]), new LeaderOffset("", -1, offsets[i]));
}
return map;
}
@Test
public void testComputeOffsetRanges() {
// test totalNewMessages()
long totalMsgs = CheckpointUtils.totalNewMessages(new OffsetRange[]{
OffsetRange.apply(TEST_TOPIC_NAME, 0, 0, 100),
OffsetRange.apply(TEST_TOPIC_NAME, 0, 100, 200)
});
assertEquals(200, totalMsgs);
// should consume all the full data
OffsetRange[] ranges = CheckpointUtils.computeOffsetRanges(
makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}),
makeOffsetMap(new int[]{0, 1}, new long[]{300000, 350000}),
1000000L
);
assertEquals(200000, CheckpointUtils.totalNewMessages(ranges));
// should only consume upto limit
ranges = CheckpointUtils.computeOffsetRanges(
makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}),
makeOffsetMap(new int[]{0, 1}, new long[]{300000, 350000}),
10000
);
assertEquals(10000, CheckpointUtils.totalNewMessages(ranges));
assertEquals(200000, ranges[0].fromOffset());
assertEquals(205000, ranges[0].untilOffset());
assertEquals(250000, ranges[1].fromOffset());
assertEquals(255000, ranges[1].untilOffset());
// should also consume from new partitions.
ranges = CheckpointUtils.computeOffsetRanges(
makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}),
makeOffsetMap(new int[]{0, 1, 2}, new long[]{300000, 350000, 100000}),
1000000L
);
assertEquals(300000, CheckpointUtils.totalNewMessages(ranges));
assertEquals(3, ranges.length);
// for skewed offsets, does not starve any partition & can catch up
ranges = CheckpointUtils.computeOffsetRanges(
makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}),
makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010, 350000, 10000}),
100000
);
assertEquals(100000, CheckpointUtils.totalNewMessages(ranges));
assertEquals(10, ranges[0].count());
assertEquals(89990, ranges[1].count());
assertEquals(10000, ranges[2].count());
ranges = CheckpointUtils.computeOffsetRanges(
makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}),
makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010, 350000, 10000}),
1000000
);
assertEquals(110010, CheckpointUtils.totalNewMessages(ranges));
assertEquals(10, ranges[0].count());
assertEquals(100000, ranges[1].count());
assertEquals(10000, ranges[2].count());
}
}

View File

@@ -15,4 +15,7 @@
#
#
#
# Common hoodie client configs
hoodie.upsert.shuffle.parallelism=2
hoodie.insert.shuffle.parallelism=2
hoodie.bulkinsert.shuffle.parallelism=2

View File

@@ -14,5 +14,14 @@
# limitations under the License.
#
#
hoodie.deltastreamer.filebased.schemaprovider.source.schema.file=file:///Users/vinoth/bin/hoodie/hoodie-utilities/src/main/resources/delta-streamer-config/source.avsc
hoodie.deltastreamer.filebased.schemaprovider.target.schema.file=file:///Users/vinoth/bin/hoodie/hoodie-utilities/src/main/resources/delta-streamer-config/target.avsc
#
include=base.properties
# Key generator props
hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.partitionpath.field=driver
# Schema provider props (change to absolute path based on your installation)
hoodie.deltastreamer.filebased.schemaprovider.source.schema.file=file:///path/to/hoodie/hoodie-utilities/src/main/resources/delta-streamer-props/source.avsc
hoodie.deltastreamer.filebased.schemaprovider.target.schema.file=file:///path/to/hoodie/hoodie-utilities/src/main/resources/delta-streamer-props/target.avsc
# DFS Source
hoodie.deltastreamer.source.dfs.root=file:///tmp/hoodie-dfs-input

View File

@@ -15,9 +15,16 @@
#
#
#
# DFS Source
hoodie.deltastreamer.source.dfs.root=file:///tmp/hoodie-dfs-input
include=base.properties
# Key fields, for kafka example
hoodie.datasource.write.recordkey.field=impressionid
hoodie.datasource.write.partitionpath.field=userid
# schema provider configs
hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/impressions-value/versions/latest
# Kafka Source
hoodie.deltastreamer.source.kafka.topic=uber_trips
#hoodie.deltastreamer.source.kafka.topic=uber_trips
hoodie.deltastreamer.source.kafka.topic=impressions
#Kafka props
metadata.broker.list=localhost:9092
auto.offset.reset=smallest
schema.registry.url=http://localhost:8081

View File

@@ -1,7 +1,11 @@
{
"type" : "record",
"name" : "triprec",
"fields" : [ {
"fields" : [
{
"name" : "timestamp",
"type" : "double"
}, {
"name" : "_row_key",
"type" : "string"
}, {