1
0

[HUDI-394] Provide a basic implementation of test suite

This commit is contained in:
Nishith Agarwal
2019-11-01 12:40:09 -07:00
committed by n3nash
parent d5b593b7d9
commit 2fc2b01d86
102 changed files with 8633 additions and 64 deletions

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.utilities;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -35,6 +36,7 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
import org.apache.hudi.utilities.transform.ChainedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
@@ -348,4 +350,15 @@ public class UtilHelpers {
throw new HoodieException(String.format("%s table does not exists!", table));
}
}
public static DFSPathSelector createSourceSelector(String sourceSelectorClass, TypedProperties props,
Configuration conf) throws IOException {
try {
return (DFSPathSelector) ReflectionUtils.loadClass(sourceSelectorClass,
new Class<?>[]{TypedProperties.class, Configuration.class},
props, conf);
} catch (Throwable e) {
throw new IOException("Could not load source selector class " + sourceSelectorClass, e);
}
}
}

View File

@@ -40,6 +40,7 @@ import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.exception.HoodieDeltaStreamerException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Config;
import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -212,8 +213,8 @@ public class DeltaSync implements Serializable {
/**
* Run one round of delta sync and return new compaction instant if one got scheduled.
*/
public Option<String> syncOnce() throws Exception {
Option<String> scheduledCompaction = Option.empty();
public Pair<Option<String>, JavaRDD<WriteStatus>> syncOnce() throws Exception {
Pair<Option<String>, JavaRDD<WriteStatus>> result = null;
HoodieDeltaStreamerMetrics metrics = new HoodieDeltaStreamerMetrics(getHoodieClientConfig(schemaProvider));
Timer.Context overallTimerContext = metrics.getOverallTimerContext();
@@ -231,13 +232,13 @@ public class DeltaSync implements Serializable {
setupWriteClient();
}
scheduledCompaction = writeToSink(srcRecordsWithCkpt.getRight().getRight(),
result = writeToSink(srcRecordsWithCkpt.getRight().getRight(),
srcRecordsWithCkpt.getRight().getLeft(), metrics, overallTimerContext);
}
// Clear persistent RDDs
jssc.getPersistentRDDs().values().forEach(JavaRDD::unpersist);
return scheduledCompaction;
return result;
}
/**
@@ -248,7 +249,7 @@ public class DeltaSync implements Serializable {
* of schemaProvider, checkpointStr and hoodieRecord
* @throws Exception in case of any Exception
*/
private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource(
public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource(
Option<HoodieTimeline> commitTimelineOpt) throws Exception {
// Retrieve the previous round checkpoints, if any
Option<String> resumeCheckpointStr = Option.empty();
@@ -355,8 +356,8 @@ public class DeltaSync implements Serializable {
* @param overallTimerContext Timer Context
* @return Option Compaction instant if one is scheduled
*/
private Option<String> writeToSink(JavaRDD<HoodieRecord> records, String checkpointStr,
HoodieDeltaStreamerMetrics metrics, Timer.Context overallTimerContext) throws Exception {
private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(JavaRDD<HoodieRecord> records, String checkpointStr,
HoodieDeltaStreamerMetrics metrics, Timer.Context overallTimerContext) throws Exception {
Option<String> scheduledCompactionInstant = Option.empty();
// filter dupes if needed
@@ -413,7 +414,7 @@ public class DeltaSync implements Serializable {
if (!isEmpty) {
// Sync to hive if enabled
Timer.Context hiveSyncContext = metrics.getHiveSyncTimerContext();
syncHive();
syncHiveIfNeeded();
hiveSyncTimeMs = hiveSyncContext != null ? hiveSyncContext.stop() : 0;
}
} else {
@@ -438,7 +439,7 @@ public class DeltaSync implements Serializable {
// Send DeltaStreamer Metrics
metrics.updateDeltaStreamerMetrics(overallTimeMs, hiveSyncTimeMs);
return scheduledCompactionInstant;
return Pair.of(scheduledCompactionInstant, writeStatusRDD);
}
/**
@@ -472,15 +473,27 @@ public class DeltaSync implements Serializable {
/**
* Sync to Hive.
*/
private void syncHive() {
public void syncHiveIfNeeded() throws ClassNotFoundException {
if (cfg.enableHiveSync) {
HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath, cfg.baseFileFormat);
LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :"
+ hiveSyncConfig.jdbcUrl + ", basePath :" + cfg.targetBasePath);
new HiveSyncTool(hiveSyncConfig, new HiveConf(conf, HiveConf.class), fs).syncHoodieTable();
syncHive();
}
}
public void syncHive() {
HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath, cfg.baseFileFormat);
LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :"
+ hiveSyncConfig.jdbcUrl + ", basePath :" + cfg.targetBasePath);
HiveConf hiveConf = new HiveConf(conf, HiveConf.class);
LOG.info("Hive Conf => " + hiveConf.getAllProperties().toString());
LOG.info("Hive Sync Conf => " + hiveSyncConfig.toString());
new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable();
}
public void syncHive(HiveConf conf) {
this.conf = conf;
syncHive();
}
/**
* Note that depending on configs and source-type, schemaProvider could either be eagerly or lazily created.
* SchemaProvider creation is a precursor to HoodieWriteClient and AsyncCompactor creation. This method takes care of
@@ -558,4 +571,20 @@ public class DeltaSync implements Serializable {
writeClient = null;
}
}
public FileSystem getFs() {
return fs;
}
public TypedProperties getProps() {
return props;
}
public Config getCfg() {
return cfg;
}
public Option<HoodieTimeline> getCommitTimelineOpt() {
return commitTimelineOpt;
}
}

View File

@@ -18,8 +18,9 @@
package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.async.AbstractAsyncService;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
@@ -49,6 +50,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
@@ -83,9 +85,9 @@ public class HoodieDeltaStreamer implements Serializable {
public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
private final transient Config cfg;
protected final transient Config cfg;
private transient DeltaSyncService deltaSyncService;
protected transient DeltaSyncService deltaSyncService;
public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc) throws IOException {
this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()),
@@ -435,11 +437,11 @@ public class HoodieDeltaStreamer implements Serializable {
while (!isShutdownRequested()) {
try {
long start = System.currentTimeMillis();
Option<String> scheduledCompactionInstant = deltaSync.syncOnce();
if (scheduledCompactionInstant.isPresent()) {
LOG.info("Enqueuing new pending compaction instant (" + scheduledCompactionInstant + ")");
Pair<Option<String>, JavaRDD<WriteStatus>> scheduledCompactionInstantAndRDD = deltaSync.syncOnce();
if (scheduledCompactionInstantAndRDD.getLeft().isPresent()) {
LOG.info("Enqueuing new pending compaction instant (" + scheduledCompactionInstantAndRDD.getLeft() + ")");
asyncCompactService.enqueuePendingCompaction(new HoodieInstant(State.REQUESTED,
HoodieTimeline.COMPACTION_ACTION, scheduledCompactionInstant.get()));
HoodieTimeline.COMPACTION_ACTION, scheduledCompactionInstantAndRDD.getLeft().get()));
asyncCompactService.waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions);
}
long toSleepMs = cfg.minSyncIntervalSeconds * 1000 - (System.currentTimeMillis() - start);
@@ -623,4 +625,8 @@ public class HoodieDeltaStreamer implements Serializable {
}, executor)).toArray(CompletableFuture[]::new)), executor);
}
}
public DeltaSyncService getDeltaSyncService() {
return deltaSyncService;
}
}

View File

@@ -23,15 +23,17 @@ import org.apache.hudi.metrics.Metrics;
import com.codahale.metrics.Timer;
public class HoodieDeltaStreamerMetrics {
import java.io.Serializable;
public class HoodieDeltaStreamerMetrics implements Serializable {
private HoodieWriteConfig config;
private String tableName;
public String overallTimerName = null;
public String hiveSyncTimerName = null;
private Timer overallTimer = null;
public Timer hiveSyncTimer = null;
private transient Timer overallTimer = null;
public transient Timer hiveSyncTimer = null;
public HoodieDeltaStreamerMetrics(HoodieWriteConfig config) {
this.config = config;

View File

@@ -71,11 +71,11 @@ public final class SourceFormatAdapter {
// pass in the schema for the Row-to-Avro conversion
// to avoid nullability mismatch between Avro schema and Row schema
? AvroConversionUtils.createRdd(
rdd, r.getSchemaProvider().getSourceSchema(),
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
rdd, r.getSchemaProvider().getSourceSchema(),
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
: AvroConversionUtils.createRdd(
rdd, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
))
))
.orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
default:
@@ -116,4 +116,4 @@ public final class SourceFormatAdapter {
throw new IllegalArgumentException("Unknown source type (" + source.getSourceType() + ")");
}
}
}
}

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
@@ -33,6 +34,8 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import java.io.IOException;
/**
* DFS Source that reads avro data.
*/
@@ -41,9 +44,11 @@ public class AvroDFSSource extends AvroSource {
private final DFSPathSelector pathSelector;
public AvroDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
SchemaProvider schemaProvider) throws IOException {
super(props, sparkContext, sparkSession, schemaProvider);
this.pathSelector = new DFSPathSelector(props, sparkContext.hadoopConfiguration());
this.pathSelector = UtilHelpers
.createSourceSelector(DFSPathSelector.class.getName(), props, sparkContext
.hadoopConfiguration());
}
@Override

View File

@@ -32,6 +32,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
@@ -42,18 +44,20 @@ import java.util.stream.Collectors;
public class DFSPathSelector {
protected static volatile Logger log = LogManager.getLogger(DFSPathSelector.class);
/**
* Configs supported.
*/
static class Config {
public static class Config {
private static final String ROOT_INPUT_PATH_PROP = "hoodie.deltastreamer.source.dfs.root";
public static final String ROOT_INPUT_PATH_PROP = "hoodie.deltastreamer.source.dfs.root";
}
private static final List<String> IGNORE_FILEPREFIX_LIST = Arrays.asList(".", "_");
protected static final List<String> IGNORE_FILEPREFIX_LIST = Arrays.asList(".", "_");
private final transient FileSystem fs;
private final TypedProperties props;
protected final transient FileSystem fs;
protected final TypedProperties props;
public DFSPathSelector(TypedProperties props, Configuration hadoopConf) {
DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.ROOT_INPUT_PATH_PROP));
@@ -66,6 +70,7 @@ public class DFSPathSelector {
try {
// obtain all eligible files under root folder.
log.info("Root path => " + props.getString(Config.ROOT_INPUT_PATH_PROP) + " source limit => " + sourceLimit);
List<FileStatus> eligibleFiles = new ArrayList<>();
RemoteIterator<LocatedFileStatus> fitr =
fs.listFiles(new Path(props.getString(Config.ROOT_INPUT_PATH_PROP)), true);
@@ -79,7 +84,6 @@ public class DFSPathSelector {
}
// sort them by modification time.
eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime));
// Filter based on checkpoint & input size, if needed
long currentBytes = 0;
long maxModificationTime = Long.MIN_VALUE;

View File

@@ -42,7 +42,7 @@ public class TestCsvDFSSource extends AbstractDFSSourceTestBase {
this.fileSuffix = ".json";
this.useFlattenedSchema = true;
this.schemaProvider = new FilebasedSchemaProvider(
Helpers.setupSchemaOnDFS("source-flattened.avsc"), jsc);
Helpers.setupSchemaOnDFS("delta-streamer-config", "source-flattened.avsc"), jsc);
}
@Override

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.utilities.testutils;
import java.io.FileInputStream;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
@@ -189,6 +190,17 @@ public class UtilitiesTestBase {
return sb.toString();
}
public static String readFileFromAbsolutePath(String absolutePathForResource) throws IOException {
BufferedReader reader =
new BufferedReader(new InputStreamReader(new FileInputStream(absolutePathForResource)));
StringBuffer sb = new StringBuffer();
String line;
while ((line = reader.readLine()) != null) {
sb.append(line + "\n");
}
return sb.toString();
}
public static void copyToDFS(String testResourcePath, FileSystem fs, String targetPath) throws IOException {
PrintStream os = new PrintStream(fs.create(new Path(targetPath), true));
os.print(readFile(testResourcePath));
@@ -196,6 +208,14 @@ public class UtilitiesTestBase {
os.close();
}
public static void copyToDFSFromAbsolutePath(String absolutePathForResource, FileSystem fs, String targetPath)
throws IOException {
PrintStream os = new PrintStream(fs.create(new Path(targetPath), true));
os.print(readFileFromAbsolutePath(absolutePathForResource));
os.flush();
os.close();
}
public static void savePropsToDFS(TypedProperties props, FileSystem fs, String targetPath) throws IOException {
String[] lines = props.keySet().stream().map(k -> String.format("%s=%s", k, props.get(k))).toArray(String[]::new);
saveStringsToDFS(lines, fs, targetPath);
@@ -258,11 +278,18 @@ public class UtilitiesTestBase {
}
public static TypedProperties setupSchemaOnDFS() throws IOException {
return setupSchemaOnDFS("source.avsc");
return setupSchemaOnDFS("delta-streamer-config", "source.avsc");
}
public static TypedProperties setupSchemaOnDFS(String filename) throws IOException {
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/" + filename, dfs, dfsBasePath + "/" + filename);
public static TypedProperties setupSchemaOnDFS(String scope, String filename) throws IOException {
UtilitiesTestBase.Helpers.copyToDFS(scope + "/" + filename, dfs, dfsBasePath + "/" + filename);
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/" + filename);
return props;
}
public static TypedProperties setupSchemaOnDFSWithAbsoluteScope(String scope, String filename) throws IOException {
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(scope + "/" + filename, dfs, dfsBasePath + "/" + filename);
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/" + filename);
return props;
@@ -278,7 +305,7 @@ public class UtilitiesTestBase {
}
public static List<GenericRecord> toGenericRecords(List<HoodieRecord> hoodieRecords) {
List<GenericRecord> records = new ArrayList<GenericRecord>();
List<GenericRecord> records = new ArrayList<>();
for (HoodieRecord hoodieRecord : hoodieRecords) {
records.add(toGenericRecord(hoodieRecord));
}

View File

@@ -0,0 +1,466 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"name": "COMPLEX",
"fields": [
{
"default": null,
"type": [
"null",
{
"items": "string",
"type": "array"
}
],
"name": "array_of_string_fields1"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field1"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field2"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field3"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field4"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field5"
},
{
"default": null,
"type": [
"null",
"boolean"
],
"name": "boolean_field1"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field6"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field7"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field8"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field9"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field10"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field11"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field12"
},
{
"default": null,
"type": [
"null",
"double"
],
"name": "double_field1"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field13"
},
{
"default": null,
"type": [
"null",
"long"
],
"name": "long_field1"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field14"
},
{
"default": null,
"type": [
"null",
"long"
],
"name": "long_field2"
},
{
"default": null,
"type": [
"null",
{
"items": {
"fields": [
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field15"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field16"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field17"
},
{
"default": null,
"type": [
"null",
"long"
],
"name": "long_field3"
},
{
"default": null,
"type": [
"null",
"long"
],
"name": "long_field4"
},
{
"default": null,
"type": [
"null",
"double"
],
"name": "double_field2"
},
{
"default": null,
"type": [
"null",
"double"
],
"name": "double_field3"
}
],
"type": "record",
"name": "record_field1"
},
"type": "array"
}
],
"name": "record_name1"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field18"
},
{
"default": null,
"type": [
"null",
"long"
],
"name": "long_field5"
},
{
"default": null,
"type": [
"null",
"double"
],
"name": "double_field4"
},
{
"default": null,
"type": [
"null",
"double"
],
"name": "double_field5"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field19"
},
{
"default": null,
"type": [
"null",
"long"
],
"name": "long_field6"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field20"
},
{
"default": null,
"type": [
"null",
"long"
],
"name": "long_field7"
},
{
"default": null,
"type": [
"null",
"double"
],
"name": "double_field6"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field21"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field22"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field23"
},
{
"default": null,
"type": [
"null",
"long"
],
"name": "long_field8"
},
{
"default": null,
"type": [
"null",
"double"
],
"name": "double_field7"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field24"
},
{
"default": null,
"type": [
"null",
"long"
],
"name": "long_field10"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field25"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field26"
},
{
"default": null,
"type": [
"null",
"long"
],
"name": "long_field11"
},
{
"default": null,
"type": [
"null",
"boolean"
],
"name": "boolean_field3"
},
{
"default": null,
"type": [
"null",
"long"
],
"name": "long_field12"
},
{
"default": null,
"type": [
"null",
"double"
],
"name": "double_field8"
},
{
"default": null,
"type": [
"null",
"long"
],
"name": "long_field13"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field27"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field28"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field29"
},
{
"default": null,
"type": [
"null",
"string"
],
"name": "string_field30"
}
],
"type": "record"
}