1
0

HUDI-70 : Making DeltaStreamer run in continuous mode with concurrent compaction

This commit is contained in:
Balaji Varadarajan
2019-05-15 13:21:55 -07:00
committed by Balaji Varadarajan
parent 3a210ef08e
commit a0d7ab2384
32 changed files with 2000 additions and 441 deletions

View File

@@ -37,7 +37,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.StringReader;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -132,7 +134,11 @@ public class UtilHelpers {
}
private static SparkConf buildSparkConf(String appName, String defaultMaster) {
SparkConf sparkConf = new SparkConf().setAppName(appName);
return buildSparkConf(appName, defaultMaster, new HashMap<>());
}
private static SparkConf buildSparkConf(String appName, String defaultMaster, Map<String, String> additionalConfigs) {
final SparkConf sparkConf = new SparkConf().setAppName(appName);
String master = sparkConf.get("spark.master", defaultMaster);
sparkConf.setMaster(master);
if (master.startsWith("yarn")) {
@@ -147,8 +153,13 @@ public class UtilHelpers {
"org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
sparkConf = HoodieWriteClient.registerClasses(sparkConf);
return sparkConf;
additionalConfigs.entrySet().forEach(e -> sparkConf.set(e.getKey(), e.getValue()));
SparkConf newSparkConf = HoodieWriteClient.registerClasses(sparkConf);
return newSparkConf;
}
public static JavaSparkContext buildSparkContext(String appName, String defaultMaster, Map<String, String> configs) {
return new JavaSparkContext(buildSparkConf(appName, defaultMaster, configs));
}
public static JavaSparkContext buildSparkContext(String appName, String defaultMaster) {

View File

@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.utilities.deltastreamer;
import com.uber.hoodie.common.util.collection.Pair;
import java.io.Serializable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
/**
* Base Class for running delta-sync/compaction in separate thread and controlling their life-cyle
*/
public abstract class AbstractDeltaStreamerService implements Serializable {
protected static volatile Logger log = LogManager.getLogger(AbstractDeltaStreamerService.class);
// Flag to track if the service is started.
private boolean started;
// Flag indicating shutdown is externally requested
private boolean shutdownRequested;
// Flag indicating the service is shutdown
private volatile boolean shutdown;
// Executor Service for running delta-sync/compaction
private transient ExecutorService executor;
// Future tracking delta-sync/compaction
private transient CompletableFuture future;
AbstractDeltaStreamerService() {
shutdownRequested = false;
}
boolean isShutdownRequested() {
return shutdownRequested;
}
boolean isShutdown() {
return shutdown;
}
/**
* Wait till the service shutdown. If the service shutdown with exception, it will be thrown
* @throws ExecutionException
* @throws InterruptedException
*/
void waitForShutdown() throws ExecutionException, InterruptedException {
try {
future.get();
} catch (ExecutionException ex) {
log.error("Service shutdown with error", ex);
throw ex;
}
}
/**
* Request shutdown either forcefully or gracefully. Graceful shutdown allows the service to finish up the current
* round of work and shutdown. For graceful shutdown, it waits till the service is shutdown
* @param force Forcefully shutdown
*/
void shutdown(boolean force) {
if (!shutdownRequested || force) {
shutdownRequested = true;
if (executor != null) {
if (force) {
executor.shutdownNow();
} else {
executor.shutdown();
try {
// Wait for some max time after requesting shutdown
executor.awaitTermination(24, TimeUnit.HOURS);
} catch (InterruptedException ie) {
log.error("Interrupted while waiting for shutdown", ie);
}
}
}
}
}
/**
* Start the service. Runs the service in a different thread and returns. Also starts a monitor thread
* to run-callbacks in case of shutdown
* @param onShutdownCallback
*/
public void start(Function<Boolean, Boolean> onShutdownCallback) {
Pair<CompletableFuture, ExecutorService> res = startService();
future = res.getKey();
executor = res.getValue();
started = true;
monitorThreads(onShutdownCallback);
}
/**
* Service implementation
* @return
*/
protected abstract Pair<CompletableFuture, ExecutorService> startService();
/**
* A monitor thread is started which would trigger a callback if the service is shutdown
* @param onShutdownCallback
*/
private void monitorThreads(Function<Boolean, Boolean> onShutdownCallback) {
log.info("Submitting monitor thread !!");
Executors.newSingleThreadExecutor().submit(() -> {
boolean error = false;
try {
log.info("Monitoring thread(s) !!");
future.get();
} catch (ExecutionException ex) {
log.error("Monitor noticed one or more threads failed."
+ " Requesting graceful shutdown of other threads", ex);
error = true;
shutdown(false);
} catch (InterruptedException ie) {
log.error("Got interrupted Monitoring threads", ie);
error = true;
shutdown(false);
} finally {
// Mark as shutdown
shutdown = true;
onShutdownCallback.apply(error);
}
});
}
}

View File

@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.utilities.deltastreamer;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.exception.HoodieException;
import java.io.IOException;
import java.io.Serializable;
import java.util.Optional;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
/**
* Run one round of compaction
*/
public class Compactor implements Serializable {
protected static volatile Logger log = LogManager.getLogger(Compactor.class);
private transient HoodieWriteClient compactionClient;
private transient JavaSparkContext jssc;
public Compactor(HoodieWriteClient compactionClient, JavaSparkContext jssc) {
this.jssc = jssc;
this.compactionClient = compactionClient;
}
public void compact(HoodieInstant instant) throws IOException {
log.info("Compactor executing compaction " + instant);
JavaRDD<WriteStatus> res = compactionClient.compact(instant.getTimestamp());
long numWriteErrors = res.collect().stream().filter(r -> r.hasErrors()).count();
if (numWriteErrors != 0) {
// We treat even a single error in compaction as fatal
log.error("Compaction for instant (" + instant + ") failed with write errors. "
+ "Errors :" + numWriteErrors);
throw new HoodieException("Compaction for instant (" + instant + ") failed with write errors. "
+ "Errors :" + numWriteErrors);
}
// Commit compaction
compactionClient.commitCompaction(instant.getTimestamp(), res, Optional.empty());
}
}

View File

@@ -0,0 +1,495 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.utilities.deltastreamer;
import static com.uber.hoodie.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
import static com.uber.hoodie.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
import com.codahale.metrics.Timer;
import com.uber.hoodie.AvroConversionUtils;
import com.uber.hoodie.DataSourceUtils;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.KeyGenerator;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.hive.HiveSyncConfig;
import com.uber.hoodie.hive.HiveSyncTool;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.utilities.UtilHelpers;
import com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException;
import com.uber.hoodie.utilities.schema.RowBasedSchemaProvider;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import com.uber.hoodie.utilities.sources.InputBatch;
import com.uber.hoodie.utilities.transform.Transformer;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.collection.JavaConversions;
/**
* Sync's one batch of data to hoodie dataset
*/
public class DeltaSync implements Serializable {
protected static volatile Logger log = LogManager.getLogger(DeltaSync.class);
public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
/**
* Delta Sync Config
*/
private final HoodieDeltaStreamer.Config cfg;
/**
* Source to pull deltas from
*/
private transient SourceFormatAdapter formatAdapter;
/**
* Schema provider that supplies the command for reading the input and writing out the target table.
*/
private transient SchemaProvider schemaProvider;
/**
* Allows transforming source to target dataset before writing
*/
private transient Transformer transformer;
/**
* Extract the key for the target dataset
*/
private KeyGenerator keyGenerator;
/**
* Filesystem used
*/
private transient FileSystem fs;
/**
* Spark context
*/
private transient JavaSparkContext jssc;
/**
* Spark Session
*/
private transient SparkSession sparkSession;
/**
* Hive Config
*/
private transient HiveConf hiveConf;
/**
* Bag of properties with source, hoodie client, key generator etc.
*/
private final TypedProperties props;
/**
* Callback when write client is instantiated
*/
private transient Function<HoodieWriteClient, Boolean> onInitializingHoodieWriteClient;
/**
* Timeline with completed commits
*/
private transient Optional<HoodieTimeline> commitTimelineOpt;
/**
* Write Client
*/
private transient HoodieWriteClient writeClient;
/**
* Table Type
*/
private final HoodieTableType tableType;
public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,
SchemaProvider schemaProvider, HoodieTableType tableType, TypedProperties props,
JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf,
Function<HoodieWriteClient, Boolean> onInitializingHoodieWriteClient)
throws IOException {
this.cfg = cfg;
this.jssc = jssc;
this.sparkSession = sparkSession;
this.fs = fs;
this.tableType = tableType;
this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
this.props = props;
log.info("Creating delta streamer with configs : " + props.toString());
this.schemaProvider = schemaProvider;
refreshTimeline();
this.transformer = UtilHelpers.createTransformer(cfg.transformerClassName);
this.keyGenerator = DataSourceUtils.createKeyGenerator(cfg.keyGeneratorClass, props);
this.formatAdapter = new SourceFormatAdapter(UtilHelpers.createSource(cfg.sourceClassName, props, jssc,
sparkSession, schemaProvider));
this.hiveConf = hiveConf;
if (cfg.filterDupes) {
cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation;
}
// If schemaRegistry already resolved, setup write-client
setupWriteClient();
}
/**
* Refresh Timeline
*/
private void refreshTimeline() throws IOException {
if (fs.exists(new Path(cfg.targetBasePath))) {
HoodieTableMetaClient meta = new HoodieTableMetaClient(new Configuration(fs.getConf()), cfg.targetBasePath);
this.commitTimelineOpt = Optional.of(meta.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants());
} else {
this.commitTimelineOpt = Optional.empty();
HoodieTableMetaClient.initTableType(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath,
cfg.storageType, cfg.targetTableName, "archived");
}
}
/**
* Run one round of delta sync and return new compaction instant if one got scheduled
*/
public Optional<String> syncOnce() throws Exception {
Optional<String> scheduledCompaction = Optional.empty();
HoodieDeltaStreamerMetrics metrics = new HoodieDeltaStreamerMetrics(getHoodieClientConfig(schemaProvider));
Timer.Context overallTimerContext = metrics.getOverallTimerContext();
// Refresh Timeline
refreshTimeline();
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> srcRecordsWithCkpt =
readFromSource(commitTimelineOpt);
if (null != srcRecordsWithCkpt) {
// this is the first input batch. If schemaProvider not set, use it and register Avro Schema and start
// compactor
if (null == schemaProvider) {
// Set the schemaProvider if not user-provided
this.schemaProvider = srcRecordsWithCkpt.getKey();
// Setup HoodieWriteClient and compaction now that we decided on schema
setupWriteClient();
}
scheduledCompaction = writeToSink(srcRecordsWithCkpt.getRight().getRight(),
srcRecordsWithCkpt.getRight().getLeft(), metrics, overallTimerContext);
}
// Clear persistent RDDs
jssc.getPersistentRDDs().values().forEach(JavaRDD::unpersist);
return scheduledCompaction;
}
/**
* Read from Upstream Source and apply transformation if needed
*/
private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource(
Optional<HoodieTimeline> commitTimelineOpt) throws Exception {
// Retrieve the previous round checkpoints, if any
Optional<String> resumeCheckpointStr = Optional.empty();
if (commitTimelineOpt.isPresent()) {
Optional<HoodieInstant> lastCommit = commitTimelineOpt.get().lastInstant();
if (lastCommit.isPresent()) {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), HoodieCommitMetadata.class);
if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null) {
resumeCheckpointStr = Optional.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
} else {
throw new HoodieDeltaStreamerException(
"Unable to find previous checkpoint. Please double check if this table "
+ "was indeed built via delta streamer ");
}
}
} else {
HoodieTableMetaClient.initTableType(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath,
cfg.storageType, cfg.targetTableName, "archived");
}
log.info("Checkpoint to resume from : " + resumeCheckpointStr);
final Optional<JavaRDD<GenericRecord>> avroRDDOptional;
final String checkpointStr;
final SchemaProvider schemaProvider;
if (transformer != null) {
// Transformation is needed. Fetch New rows in Row Format, apply transformation and then convert them
// to generic records for writing
InputBatch<Dataset<Row>> dataAndCheckpoint = formatAdapter.fetchNewDataInRowFormat(
resumeCheckpointStr, cfg.sourceLimit);
Optional<Dataset<Row>> transformed =
dataAndCheckpoint.getBatch().map(data -> transformer.apply(jssc, sparkSession, data, props));
checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
avroRDDOptional = transformed.map(t ->
AvroConversionUtils.createRdd(t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
);
// Use Transformed Row's schema if not overridden
schemaProvider =
this.schemaProvider == null ? transformed.map(r -> (SchemaProvider) new RowBasedSchemaProvider(r.schema()))
.orElse(dataAndCheckpoint.getSchemaProvider()) : this.schemaProvider;
} else {
// Pull the data from the source & prepare the write
InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint =
formatAdapter.fetchNewDataInAvroFormat(resumeCheckpointStr, cfg.sourceLimit);
avroRDDOptional = dataAndCheckpoint.getBatch();
checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
schemaProvider = dataAndCheckpoint.getSchemaProvider();
}
if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) {
log.info("No new data, nothing to commit.. ");
return null;
}
JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName, gr,
(Comparable) gr.get(cfg.sourceOrderingField));
return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
});
return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
}
/**
* Perform Hoodie Write. Run Cleaner, schedule compaction and syncs to hive if needed
*
* @param records Input Records
* @param checkpointStr Checkpoint String
* @param metrics Metrics
* @return Optional Compaction instant if one is scheduled
*/
private Optional<String> writeToSink(JavaRDD<HoodieRecord> records, String checkpointStr,
HoodieDeltaStreamerMetrics metrics, Timer.Context overallTimerContext) throws Exception {
Optional<String> scheduledCompactionInstant = Optional.empty();
// filter dupes if needed
if (cfg.filterDupes) {
// turn upserts to insert
cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation;
records = DataSourceUtils.dropDuplicates(jssc, records, writeClient.getConfig(),
writeClient.getTimelineServer());
if (records.isEmpty()) {
log.info("No new data, nothing to commit.. ");
return Optional.empty();
}
}
String commitTime = startCommit();
log.info("Starting commit : " + commitTime);
JavaRDD<WriteStatus> writeStatusRDD;
if (cfg.operation == Operation.INSERT) {
writeStatusRDD = writeClient.insert(records, commitTime);
} else if (cfg.operation == Operation.UPSERT) {
writeStatusRDD = writeClient.upsert(records, commitTime);
} else if (cfg.operation == Operation.BULK_INSERT) {
writeStatusRDD = writeClient.bulkInsert(records, commitTime);
} else {
throw new HoodieDeltaStreamerException("Unknown operation :" + cfg.operation);
}
long totalErrorRecords = writeStatusRDD.mapToDouble(ws -> ws.getTotalErrorRecords()).sum().longValue();
long totalRecords = writeStatusRDD.mapToDouble(ws -> ws.getTotalRecords()).sum().longValue();
boolean hasErrors = totalErrorRecords > 0;
long hiveSyncTimeMs = 0;
if (!hasErrors || cfg.commitOnErrors) {
HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr);
if (hasErrors) {
log.warn("Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total="
+ totalErrorRecords + "/" + totalRecords);
}
boolean success = writeClient.commit(commitTime, writeStatusRDD,
Optional.of(checkpointCommitMetadata));
if (success) {
log.info("Commit " + commitTime + " successful!");
// Schedule compaction if needed
if (tableType.equals(HoodieTableType.MERGE_ON_READ) && cfg.continuousMode) {
scheduledCompactionInstant = writeClient
.scheduleCompaction(Optional.of(checkpointCommitMetadata));
}
// Sync to hive if enabled
Timer.Context hiveSyncContext = metrics.getHiveSyncTimerContext();
syncHive();
hiveSyncTimeMs = hiveSyncContext != null ? hiveSyncContext.stop() : 0;
} else {
log.info("Commit " + commitTime + " failed!");
throw new HoodieException("Commit " + commitTime + " failed!");
}
} else {
log.error("Delta Sync found errors when writing. Errors/Total="
+ totalErrorRecords + "/" + totalRecords);
log.error("Printing out the top 100 errors");
writeStatusRDD.filter(ws -> ws.hasErrors()).take(100).forEach(ws -> {
log.error("Global error :", ws.getGlobalError());
if (ws.getErrors().size() > 0) {
ws.getErrors().entrySet().forEach(r ->
log.trace("Error for key:" + r.getKey() + " is " + r.getValue()));
}
});
// Rolling back instant
writeClient.rollback(commitTime);
throw new HoodieException("Commit " + commitTime + " failed and rolled-back !");
}
long overallTimeMs = overallTimerContext != null ? overallTimerContext.stop() : 0;
// Send DeltaStreamer Metrics
metrics.updateDeltaStreamerMetrics(overallTimeMs, hiveSyncTimeMs);
return scheduledCompactionInstant;
}
private String startCommit() {
final int maxRetries = 2;
int retryNum = 1;
RuntimeException lastException = null;
while (retryNum <= maxRetries) {
try {
return writeClient.startCommit();
} catch (IllegalArgumentException ie) {
lastException = ie;
log.error("Got error trying to start a new commit. Retrying after sleeping for a sec", ie);
retryNum++;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//No-Op
}
}
}
throw lastException;
}
/**
* Sync to Hive
*/
private void syncHive() {
if (cfg.enableHiveSync) {
HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath);
log.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName
+ "). Hive metastore URL :" + hiveSyncConfig.jdbcUrl + ", basePath :" + cfg.targetBasePath);
new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable();
}
}
/**
* Note that depending on configs and source-type, schemaProvider could either be eagerly or lazily created.
* SchemaProvider creation is a precursor to HoodieWriteClient and AsyncCompactor creation. This method takes care of
* this constraint.
*/
public void setupWriteClient() {
log.info("Setting up Hoodie Write Client");
if ((null != schemaProvider) && (null == writeClient)) {
registerAvroSchemas(schemaProvider);
HoodieWriteConfig hoodieCfg = getHoodieClientConfig(schemaProvider);
writeClient = new HoodieWriteClient<>(jssc, hoodieCfg, true);
onInitializingHoodieWriteClient.apply(writeClient);
}
}
/**
* Helper to construct Write Client config
*
* @param schemaProvider Schema Provider
*/
private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) {
HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder()
.withProps(props)
.withPath(cfg.targetBasePath)
.combineInput(cfg.filterDupes, true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withPayloadClass(cfg.payloadClassName)
// Inline compaction is disabled for continuous mode. otherwise enabled for MOR
.withInlineCompaction(!cfg.continuousMode && tableType.equals(HoodieTableType.MERGE_ON_READ)).build())
.forTable(cfg.targetTableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withAutoCommit(false);
if (null != schemaProvider) {
builder = builder.withSchema(schemaProvider.getTargetSchema().toString());
}
return builder.build();
}
/**
* Register Avro Schemas
*
* @param schemaProvider Schema Provider
*/
private void registerAvroSchemas(SchemaProvider schemaProvider) {
// register the schemas, so that shuffle does not serialize the full schemas
if (null != schemaProvider) {
List<Schema> schemas = Arrays.asList(schemaProvider.getSourceSchema(), schemaProvider.getTargetSchema());
log.info("Registering Schema :" + schemas);
jssc.sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList());
}
}
/**
* Close all resources
*/
public void close() {
if (null != writeClient) {
writeClient.close();
writeClient = null;
}
}
}

View File

@@ -18,71 +18,68 @@
package com.uber.hoodie.utilities.deltastreamer;
import static com.uber.hoodie.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
import static com.uber.hoodie.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
import static com.uber.hoodie.common.table.HoodieTimeline.COMPACTION_ACTION;
import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.codahale.metrics.Timer;
import com.uber.hoodie.AvroConversionUtils;
import com.uber.hoodie.DataSourceUtils;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.KeyGenerator;
import com.uber.hoodie.OverwriteWithLatestAvroPayload;
import com.uber.hoodie.SimpleKeyGenerator;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.timeline.HoodieInstant.State;
import com.uber.hoodie.common.util.CompactionUtils;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.hive.HiveSyncConfig;
import com.uber.hoodie.hive.HiveSyncTool;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.utilities.HiveIncrementalPuller;
import com.uber.hoodie.utilities.UtilHelpers;
import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException;
import com.uber.hoodie.utilities.schema.RowBasedSchemaProvider;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import com.uber.hoodie.utilities.sources.InputBatch;
import com.uber.hoodie.utilities.sources.JsonDFSSource;
import com.uber.hoodie.utilities.transform.Transformer;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.collection.JavaConversions;
/**
* An Utility which can incrementally take the output from {@link HiveIncrementalPuller} and apply
* it to the target dataset. Does not maintain any state, queries at runtime to see how far behind
* the target dataset is from the source dataset. This can be overriden to force sync from a
* timestamp.
* An Utility which can incrementally take the output from {@link HiveIncrementalPuller} and apply it to the target
* dataset. Does not maintain any state, queries at runtime to see how far behind the target dataset is from the source
* dataset. This can be overriden to force sync from a timestamp.
*
* In continuous mode, DeltaStreamer runs in loop-mode going through the below operations
* (a) pull-from-source
* (b) write-to-sink
* (c) Schedule Compactions if needed
* (d) Conditionally Sync to Hive
* each cycle. For MOR table with continuous mode enabled, a seperate compactor thread is allocated to execute
* compactions
*/
public class HoodieDeltaStreamer implements Serializable {
@@ -90,58 +87,9 @@ public class HoodieDeltaStreamer implements Serializable {
public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
private final Config cfg;
private final transient Config cfg;
/**
* Source to pull deltas from
*/
private transient SourceFormatAdapter formatAdapter;
/**
* Schema provider that supplies the command for reading the input and writing out the target
* table.
*/
private transient SchemaProvider schemaProvider;
/**
* Allows transforming source to target dataset before writing
*/
private transient Transformer transformer;
/**
* Extract the key for the target dataset
*/
private KeyGenerator keyGenerator;
/**
* Filesystem used
*/
private transient FileSystem fs;
/**
* Timeline with completed commits
*/
private transient Optional<HoodieTimeline> commitTimelineOpt;
/**
* Spark context
*/
private transient JavaSparkContext jssc;
/**
* Spark Session
*/
private transient SparkSession sparkSession;
/**
* Hive Config
*/
private transient HiveConf hiveConf;
/**
* Bag of properties with source, hoodie client, key generator etc.
*/
TypedProperties props;
private transient DeltaSyncService deltaSyncService;
public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc) throws IOException {
this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()),
@@ -150,29 +98,11 @@ public class HoodieDeltaStreamer implements Serializable {
public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf) throws IOException {
this.cfg = cfg;
this.jssc = jssc;
this.sparkSession = SparkSession.builder().config(jssc.getConf()).getOrCreate();
this.fs = FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration());
this.deltaSyncService = new DeltaSyncService(cfg, jssc, fs, hiveConf);
}
if (fs.exists(new Path(cfg.targetBasePath))) {
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), cfg.targetBasePath);
this.commitTimelineOpt = Optional.of(meta.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants());
} else {
this.commitTimelineOpt = Optional.empty();
}
this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
log.info("Creating delta streamer with configs : " + props.toString());
this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc);
this.transformer = UtilHelpers.createTransformer(cfg.transformerClassName);
this.keyGenerator = DataSourceUtils.createKeyGenerator(cfg.keyGeneratorClass, props);
this.formatAdapter =
new SourceFormatAdapter(UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession,
schemaProvider));
this.hiveConf = hiveConf;
public void shutdownGracefully() {
deltaSyncService.shutdown(false);
}
private static HiveConf getDefaultHiveConf(Configuration cfg) {
@@ -181,184 +111,27 @@ public class HoodieDeltaStreamer implements Serializable {
return hiveConf;
}
public void sync() throws Exception {
HoodieDeltaStreamerMetrics metrics = new HoodieDeltaStreamerMetrics(getHoodieClientConfig(null));
Timer.Context overallTimerContext = metrics.getOverallTimerContext();
// Retrieve the previous round checkpoints, if any
Optional<String> resumeCheckpointStr = Optional.empty();
if (commitTimelineOpt.isPresent()) {
Optional<HoodieInstant> lastCommit = commitTimelineOpt.get().lastInstant();
if (lastCommit.isPresent()) {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), HoodieCommitMetadata.class);
if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null) {
resumeCheckpointStr = Optional.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
} else {
throw new HoodieDeltaStreamerException(
"Unable to find previous checkpoint. Please double check if this table "
+ "was indeed built via delta streamer ");
}
}
} else {
HoodieTableMetaClient.initTableType(jssc.hadoopConfiguration(), cfg.targetBasePath,
cfg.storageType, cfg.targetTableName, "archived");
}
log.info("Checkpoint to resume from : " + resumeCheckpointStr);
final Optional<JavaRDD<GenericRecord>> avroRDDOptional;
final String checkpointStr;
final SchemaProvider schemaProvider;
if (transformer != null) {
// Transformation is needed. Fetch New rows in Row Format, apply transformation and then convert them
// to generic records for writing
InputBatch<Dataset<Row>> dataAndCheckpoint = formatAdapter.fetchNewDataInRowFormat(
resumeCheckpointStr, cfg.sourceLimit);
Optional<Dataset<Row>> transformed =
dataAndCheckpoint.getBatch().map(data -> transformer.apply(jssc, sparkSession, data, props));
checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
avroRDDOptional = transformed.map(t ->
AvroConversionUtils.createRdd(t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
);
// Use Transformed Row's schema if not overridden
schemaProvider =
this.schemaProvider == null ? transformed.map(r -> (SchemaProvider)new RowBasedSchemaProvider(r.schema()))
.orElse(dataAndCheckpoint.getSchemaProvider()) : this.schemaProvider;
} else {
// Pull the data from the source & prepare the write
InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint =
formatAdapter.fetchNewDataInAvroFormat(resumeCheckpointStr, cfg.sourceLimit);
avroRDDOptional = dataAndCheckpoint.getBatch();
checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
schemaProvider = dataAndCheckpoint.getSchemaProvider();
}
if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) {
log.info("No new data, nothing to commit.. ");
return;
}
registerAvroSchemas(schemaProvider);
JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName, gr,
(Comparable) DataSourceUtils.getNestedFieldVal(gr, cfg.sourceOrderingField));
return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
});
// filter dupes if needed
HoodieWriteConfig hoodieCfg = getHoodieClientConfig(schemaProvider);
if (cfg.filterDupes) {
// turn upserts to insert
cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation;
records = DataSourceUtils.dropDuplicates(jssc, records, hoodieCfg);
if (records.isEmpty()) {
log.info("No new data, nothing to commit.. ");
return;
}
}
// Perform the write
HoodieWriteClient client = new HoodieWriteClient<>(jssc, hoodieCfg, true);
String commitTime = client.startCommit();
log.info("Starting commit : " + commitTime);
JavaRDD<WriteStatus> writeStatusRDD;
if (cfg.operation == Operation.INSERT) {
writeStatusRDD = client.insert(records, commitTime);
} else if (cfg.operation == Operation.UPSERT) {
writeStatusRDD = client.upsert(records, commitTime);
} else if (cfg.operation == Operation.BULK_INSERT) {
writeStatusRDD = client.bulkInsert(records, commitTime);
} else {
throw new HoodieDeltaStreamerException("Unknown operation :" + cfg.operation);
}
long totalErrorRecords = writeStatusRDD.mapToDouble(ws -> ws.getTotalErrorRecords()).sum().longValue();
long totalRecords = writeStatusRDD.mapToDouble(ws -> ws.getTotalRecords()).sum().longValue();
boolean hasErrors = totalErrorRecords > 0;
long hiveSyncTimeMs = 0;
if (!hasErrors || cfg.commitOnErrors) {
HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr);
if (hasErrors) {
log.warn("Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total="
+ totalErrorRecords + "/" + totalRecords);
}
boolean success = client.commit(commitTime, writeStatusRDD,
Optional.of(checkpointCommitMetadata));
if (success) {
log.info("Commit " + commitTime + " successful!");
// Sync to hive if enabled
Timer.Context hiveSyncContext = metrics.getHiveSyncTimerContext();
syncHive();
hiveSyncTimeMs = hiveSyncContext != null ? hiveSyncContext.stop() : 0;
} else {
log.info("Commit " + commitTime + " failed!");
}
} else {
log.error("There are errors when ingesting records. Errors/Total="
+ totalErrorRecords + "/" + totalRecords);
log.error("Printing out the top 100 errors");
writeStatusRDD.filter(ws -> ws.hasErrors()).take(100).forEach(ws -> {
log.error("Global error :", ws.getGlobalError());
if (ws.getErrors().size() > 0) {
ws.getErrors().entrySet().forEach(r ->
log.trace("Error for key:" + r.getKey() + " is " + r.getValue()));
}
});
}
client.close();
long overallTimeMs = overallTimerContext != null ? overallTimerContext.stop() : 0;
// Send DeltaStreamer Metrics
metrics.updateDeltaStreamerMetrics(overallTimeMs, hiveSyncTimeMs);
}
public void syncHive() {
if (cfg.enableHiveSync) {
HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath);
log.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName
+ "). Hive metastore URL :" + hiveSyncConfig.jdbcUrl + ", basePath :" + cfg.targetBasePath);
new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable();
}
}
/**
* Register Avro Schemas
* @param schemaProvider Schema Provider
* Main method to start syncing
* @throws Exception
*/
private void registerAvroSchemas(SchemaProvider schemaProvider) {
// register the schemas, so that shuffle does not serialize the full schemas
if (null != schemaProvider) {
List<Schema> schemas = Arrays.asList(schemaProvider.getSourceSchema(), schemaProvider.getTargetSchema());
log.info("Registering Schema :" + schemas);
jssc.sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList());
public void sync() throws Exception {
if (cfg.continuousMode) {
deltaSyncService.start(this::onDeltaSyncShutdown);
deltaSyncService.waitForShutdown();
log.info("Delta Sync shutting down");
} else {
log.info("Delta Streamer running only single round");
deltaSyncService.getDeltaSync().syncOnce();
deltaSyncService.close();
log.info("Shut down deltastreamer");
}
}
private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) {
HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath)
.withAutoCommit(false).combineInput(cfg.filterDupes, true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withPayloadClass(cfg.payloadClassName)
// turn on inline compaction by default, for MOR tables
.withInlineCompaction(HoodieTableType.valueOf(cfg.storageType) == HoodieTableType.MERGE_ON_READ)
.build())
.forTable(cfg.targetTableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withProps(props);
if (null != schemaProvider) {
builder = builder.withSchema(schemaProvider.getTargetSchema().toString());
}
return builder.build();
private boolean onDeltaSyncShutdown(boolean error) {
log.info("DeltaSync shutdown. Closing write client. Error?" + error);
deltaSyncService.close();
return true;
}
public enum Operation {
@@ -366,6 +139,7 @@ public class HoodieDeltaStreamer implements Serializable {
}
private static class OperationConvertor implements IStringConverter<Operation> {
@Override
public Operation convert(String value) throws ParameterException {
return Operation.valueOf(value);
@@ -426,9 +200,9 @@ public class HoodieDeltaStreamer implements Serializable {
@Parameter(names = {"--transformer-class"},
description = "subclass of com.uber.hoodie.utilities.transform.Transformer"
+ ". Allows transforming raw source dataset to a target dataset (conforming to target schema) before writing."
+ " Default : Not set. E:g - com.uber.hoodie.utilities.transform.SqlQueryBasedTransformer (which allows"
+ "a SQL query templated to be passed as a transformation function)")
+ ". Allows transforming raw source dataset to a target dataset (conforming to target schema) before "
+ "writing. Default : Not set. E:g - com.uber.hoodie.utilities.transform.SqlQueryBasedTransformer (which "
+ "allows a SQL query templated to be passed as a transformation function)")
public String transformerClassName = null;
@Parameter(names = {"--source-limit"}, description = "Maximum amount of data to read from source. "
@@ -447,16 +221,57 @@ public class HoodieDeltaStreamer implements Serializable {
@Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive")
public Boolean enableHiveSync = false;
@Parameter(names = {"--max-pending-compactions"},
description = "Maximum number of outstanding inflight/requested compactions. Delta Sync will not happen unless"
+ "outstanding compactions is less than this number")
public Integer maxPendingCompactions = 5;
@Parameter(names = {"--continuous"}, description = "Delta Streamer runs in continuous mode running"
+ " source-fetch -> Transform -> Hudi Write in loop")
public Boolean continuousMode = false;
@Parameter(names = {"--spark-master"}, description = "spark master to use.")
public String sparkMaster = "local[2]";
@Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written")
public Boolean commitOnErrors = false;
@Parameter(names = {"--delta-sync-scheduling-weight"}, description =
"Scheduling weight for delta sync as defined in "
+ "https://spark.apache.org/docs/latest/job-scheduling.html")
public Integer deltaSyncSchedulingWeight = 1;
@Parameter(names = {"--compact-scheduling-weight"}, description = "Scheduling weight for compaction as defined in "
+ "https://spark.apache.org/docs/latest/job-scheduling.html")
public Integer compactSchedulingWeight = 1;
@Parameter(names = {"--delta-sync-scheduling-minshare"}, description = "Minshare for delta sync as defined in "
+ "https://spark.apache.org/docs/latest/job-scheduling.html")
public Integer deltaSyncSchedulingMinShare = 0;
@Parameter(names = {"--compact-scheduling-minshare"}, description = "Minshare for compaction as defined in "
+ "https://spark.apache.org/docs/latest/job-scheduling.html")
public Integer compactSchedulingMinShare = 0;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
}
/**
* Helper to set Spark Scheduling Configs dynamically
*
* @param cfg Config
*/
public static Map<String, String> getSparkSchedulingConfigs(Config cfg) throws Exception {
Map<String, String> additionalSparkConfigs = new HashMap<>();
if (cfg.continuousMode && cfg.storageType.equals(HoodieTableType.MERGE_ON_READ.name())) {
String sparkSchedulingConfFile = SchedulerConfGenerator.generateAndStoreConfig(cfg.deltaSyncSchedulingWeight,
cfg.compactSchedulingWeight, cfg.deltaSyncSchedulingMinShare, cfg.compactSchedulingMinShare);
additionalSparkConfigs.put("spark.scheduler.allocation.file", sparkSchedulingConfFile);
}
return additionalSparkConfigs;
}
public static void main(String[] args) throws Exception {
final Config cfg = new Config();
JCommander cmd = new JCommander(cfg, args);
@@ -465,47 +280,288 @@ public class HoodieDeltaStreamer implements Serializable {
System.exit(1);
}
JavaSparkContext jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, cfg.sparkMaster);
Map<String, String> additionalSparkConfigs = getSparkSchedulingConfigs(cfg);
JavaSparkContext jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName,
cfg.sparkMaster, additionalSparkConfigs);
if (!("FAIR".equals(jssc.getConf().get("spark.scheduler.mode")))
&& cfg.continuousMode && cfg.storageType.equals(HoodieTableType.MERGE_ON_READ.name())) {
log.warn("Job Scheduling Configs will not be in effect as spark.scheduler.mode "
+ "is not set to FAIR at instatiation time. Continuing without scheduling configs");
}
new HoodieDeltaStreamer(cfg, jssc).sync();
}
public SourceFormatAdapter getFormatAdapter() {
return formatAdapter;
/**
* Syncs data either in single-run or in continuous mode.
*/
public static class DeltaSyncService extends AbstractDeltaStreamerService {
/**
* Delta Sync Config
*/
private final HoodieDeltaStreamer.Config cfg;
/**
* Schema provider that supplies the command for reading the input and writing out the target table.
*/
private transient SchemaProvider schemaProvider;
/**
* Spark Session
*/
private transient SparkSession sparkSession;
/**
* Spark context
*/
private transient JavaSparkContext jssc;
/**
* Bag of properties with source, hoodie client, key generator etc.
*/
TypedProperties props;
/**
* Async Compactor Service
*/
private AsyncCompactService asyncCompactService;
/**
* Table Type
*/
private final HoodieTableType tableType;
/**
* Delta Sync
*/
private transient DeltaSync deltaSync;
public DeltaSyncService(HoodieDeltaStreamer.Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf)
throws IOException {
this.cfg = cfg;
this.jssc = jssc;
this.sparkSession = SparkSession.builder().config(jssc.getConf()).getOrCreate();
if (fs.exists(new Path(cfg.targetBasePath))) {
HoodieTableMetaClient meta = new HoodieTableMetaClient(
new Configuration(fs.getConf()), cfg.targetBasePath, false);
tableType = meta.getTableType();
} else {
tableType = HoodieTableType.valueOf(cfg.storageType);
}
this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
log.info("Creating delta streamer with configs : " + props.toString());
this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc);
if (cfg.filterDupes) {
cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation;
}
deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, tableType,
props, jssc, fs, hiveConf, this::onInitializingWriteClient);
}
public DeltaSync getDeltaSync() {
return deltaSync;
}
@Override
protected Pair<CompletableFuture, ExecutorService> startService() {
ExecutorService executor = Executors.newFixedThreadPool(1);
return Pair.of(CompletableFuture.supplyAsync(() -> {
boolean error = false;
if (cfg.continuousMode && tableType.equals(HoodieTableType.MERGE_ON_READ)) {
// set Scheduler Pool.
log.info("Setting Spark Pool name for delta-sync to " + SchedulerConfGenerator.DELTASYNC_POOL_NAME);
jssc.setLocalProperty("spark.scheduler.pool", SchedulerConfGenerator.DELTASYNC_POOL_NAME);
}
try {
while (!isShutdownRequested()) {
try {
Optional<String> scheduledCompactionInstant = deltaSync.syncOnce();
if (scheduledCompactionInstant.isPresent()) {
log.info("Enqueuing new pending compaction instant (" + scheduledCompactionInstant + ")");
asyncCompactService.enqueuePendingCompaction(new HoodieInstant(State.REQUESTED, COMPACTION_ACTION,
scheduledCompactionInstant.get()));
asyncCompactService.waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions);
}
} catch (Exception e) {
log.error("Shutting down delta-sync due to exception", e);
error = true;
throw new HoodieException(e.getMessage(), e);
}
}
} finally {
shutdownCompactor(error);
}
return true;
}, executor), executor);
}
/**
* Shutdown compactor as DeltaSync is shutdown
*/
private void shutdownCompactor(boolean error) {
log.info("Delta Sync shutdown. Error ?" + error);
if (asyncCompactService != null) {
log.warn("Gracefully shutting down compactor");
asyncCompactService.shutdown(false);
}
}
/**
* Callback to initialize write client and start compaction service if required
* @param writeClient HoodieWriteClient
* @return
*/
protected Boolean onInitializingWriteClient(HoodieWriteClient writeClient) {
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
asyncCompactService = new AsyncCompactService(jssc, writeClient);
// Enqueue existing pending compactions first
HoodieTableMetaClient meta = new HoodieTableMetaClient(
new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, true);
List<HoodieInstant> pending = CompactionUtils.getPendingCompactionInstantTimes(meta);
pending.stream().forEach(hoodieInstant -> asyncCompactService.enqueuePendingCompaction(hoodieInstant));
asyncCompactService.start((error) -> {
// Shutdown DeltaSync
shutdown(false);
return true;
});
try {
asyncCompactService.waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions);
} catch (InterruptedException ie) {
throw new HoodieException(ie);
}
}
return true;
}
/**
* Close all resources
*/
public void close() {
if (null != deltaSync) {
deltaSync.close();
}
}
public SchemaProvider getSchemaProvider() {
return schemaProvider;
}
public SparkSession getSparkSession() {
return sparkSession;
}
public JavaSparkContext getJavaSparkContext() {
return jssc;
}
public AsyncCompactService getAsyncCompactService() {
return asyncCompactService;
}
public TypedProperties getProps() {
return props;
}
}
public SchemaProvider getSchemaProvider() {
return schemaProvider;
/**
* Async Compactor Service tha runs in separate thread. Currently, only one compactor is allowed to run at any time.
*/
public static class AsyncCompactService extends AbstractDeltaStreamerService {
private final int maxConcurrentCompaction;
private transient Compactor compactor;
private transient JavaSparkContext jssc;
private transient BlockingQueue<HoodieInstant> pendingCompactions = new LinkedBlockingQueue<>();
private transient ReentrantLock queueLock = new ReentrantLock();
private transient Condition consumed = queueLock.newCondition();
public AsyncCompactService(JavaSparkContext jssc, HoodieWriteClient client) {
this.jssc = jssc;
this.compactor = new Compactor(client, jssc);
//TODO: HUDI-157 : Only allow 1 compactor to run in parallel till Incremental View on MOR is fully implemented.
this.maxConcurrentCompaction = 1;
}
/**
* Enqueues new Pending compaction
*/
public void enqueuePendingCompaction(HoodieInstant instant) {
pendingCompactions.add(instant);
}
/**
* Wait till outstanding pending compactions reduces to the passed in value
* @param numPendingCompactions Maximum pending compactions allowed
* @throws InterruptedException
*/
public void waitTillPendingCompactionsReducesTo(int numPendingCompactions) throws InterruptedException {
try {
queueLock.lock();
while (!isShutdown() && (pendingCompactions.size() > numPendingCompactions)) {
consumed.await();
}
} finally {
queueLock.unlock();
}
}
/**
* Fetch Next pending compaction if available
* @return
* @throws InterruptedException
*/
private HoodieInstant fetchNextCompactionInstant() throws InterruptedException {
log.info("Compactor waiting for next instant for compaction upto 60 seconds");
HoodieInstant instant = pendingCompactions.poll(60, TimeUnit.SECONDS);
if (instant != null) {
try {
queueLock.lock();
// Signal waiting thread
consumed.signal();
} finally {
queueLock.unlock();
}
}
return instant;
}
/**
* Start Compaction Service
*/
protected Pair<CompletableFuture, ExecutorService> startService() {
ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentCompaction);
List<CompletableFuture<Boolean>> compactionFutures =
IntStream.range(0, maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
try {
// Set Compactor Pool Name for allowing users to prioritize compaction
log.info("Setting Spark Pool name for compaction to " + SchedulerConfGenerator.COMPACT_POOL_NAME);
jssc.setLocalProperty("spark.scheduler.pool", SchedulerConfGenerator.COMPACT_POOL_NAME);
while (!isShutdownRequested()) {
final HoodieInstant instant = fetchNextCompactionInstant();
if (null != instant) {
compactor.compact(instant);
}
}
log.info("Compactor shutting down properly!!");
} catch (InterruptedException ie) {
log.warn("Compactor executor thread got interrupted exception. Stopping", ie);
} catch (IOException e) {
log.error("Compactor executor failed", e);
throw new HoodieIOException(e.getMessage(), e);
}
return true;
}, executor)).collect(Collectors.toList());
return Pair.of(CompletableFuture.allOf(compactionFutures.stream().toArray(CompletableFuture[]::new)), executor);
}
}
public Transformer getTransformer() {
return transformer;
}
public KeyGenerator getKeyGenerator() {
return keyGenerator;
}
public FileSystem getFs() {
return fs;
}
public Optional<HoodieTimeline> getCommitTimelineOpt() {
return commitTimelineOpt;
}
public JavaSparkContext getJssc() {
return jssc;
}
public SparkSession getSparkSession() {
return sparkSession;
}
public HiveConf getHiveConf() {
return hiveConf;
}
public TypedProperties getProps() {
return props;
public DeltaSyncService getDeltaSyncService() {
return deltaSyncService;
}
}

View File

@@ -0,0 +1,94 @@
/*
* Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.uber.hoodie.utilities.deltastreamer;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.lang.text.StrSubstitutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
/**
* Utility Class to generate Spark Scheduling allocation file. This kicks in only when user
* sets spark.scheduler.mode=FAIR at spark-submit time
*/
public class SchedulerConfGenerator {
protected static volatile Logger log = LogManager.getLogger(SchedulerConfGenerator.class);
public static final String DELTASYNC_POOL_NAME = "hoodiedeltasync";
public static final String COMPACT_POOL_NAME = "hoodiecompact";
private static final String DELTASYNC_POOL_KEY = "deltasync_pool";
private static final String COMPACT_POOL_KEY = "compact_pool";
private static final String DELTASYNC_POLICY_KEY = "deltasync_policy";
private static final String COMPACT_POLICY_KEY = "compact_policy";
private static final String DELTASYNC_WEIGHT_KEY = "deltasync_weight";
private static final String DELTASYNC_MINSHARE_KEY = "deltasync_minshare";
private static final String COMPACT_WEIGHT_KEY = "compact_weight";
private static final String COMPACT_MINSHARE_KEY = "compact_minshare";
private static String SPARK_SCHEDULING_PATTERN =
"<?xml version=\"1.0\"?>\n"
+ "<allocations>\n"
+ " <pool name=\"%(deltasync_pool)\">\n"
+ " <schedulingMode>%(deltasync_policy)</schedulingMode>\n"
+ " <weight>%(deltasync_weight)</weight>\n"
+ " <minShare>%(deltasync_minshare)</minShare>\n"
+ " </pool>\n"
+ " <pool name=\"%(compact_pool)\">\n"
+ " <schedulingMode>%(compact_policy)</schedulingMode>\n"
+ " <weight>%(compact_weight)</weight>\n"
+ " <minShare>%(compact_minshare)</minShare>\n"
+ " </pool>\n"
+ "</allocations>";
private static String generateConfig(Integer deltaSyncWeight, Integer compactionWeight, Integer deltaSyncMinShare,
Integer compactionMinShare) {
Map<String, String> schedulingProps = new HashMap<>();
schedulingProps.put(DELTASYNC_POOL_KEY, DELTASYNC_POOL_NAME);
schedulingProps.put(COMPACT_POOL_KEY, COMPACT_POOL_NAME);
schedulingProps.put(DELTASYNC_POLICY_KEY, "FAIR");
schedulingProps.put(COMPACT_POLICY_KEY, "FAIR");
schedulingProps.put(DELTASYNC_WEIGHT_KEY, deltaSyncWeight.toString());
schedulingProps.put(DELTASYNC_MINSHARE_KEY, deltaSyncMinShare.toString());
schedulingProps.put(COMPACT_WEIGHT_KEY, compactionWeight.toString());
schedulingProps.put(COMPACT_MINSHARE_KEY, compactionMinShare.toString());
StrSubstitutor sub = new StrSubstitutor(schedulingProps, "%(", ")");
String xmlString = sub.replace(SPARK_SCHEDULING_PATTERN);
log.info("Scheduling Configurations generated. Config=\n" + xmlString);
return xmlString;
}
public static String generateAndStoreConfig(Integer deltaSyncWeight, Integer compactionWeight,
Integer deltaSyncMinShare, Integer compactionMinShare) throws IOException {
File tempConfigFile = File.createTempFile(UUID.randomUUID().toString(), ".xml");
BufferedWriter bw = new BufferedWriter(new FileWriter(tempConfigFile));
bw.write(generateConfig(deltaSyncWeight, compactionWeight, deltaSyncMinShare, compactionMinShare));
bw.close();
log.info("Configs written to file" + tempConfigFile.getAbsolutePath());
return tempConfigFile.getAbsolutePath();
}
}

View File

@@ -24,11 +24,13 @@ import static org.junit.Assert.fail;
import com.uber.hoodie.DataSourceWriteOptions;
import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.DFSPropertiesConfiguration;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.exception.DatasetNotFoundException;
import com.uber.hoodie.hive.HiveSyncConfig;
import com.uber.hoodie.hive.HoodieHiveClient;
@@ -36,17 +38,28 @@ import com.uber.hoodie.hive.MultiPartKeysValueExtractor;
import com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer;
import com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
import com.uber.hoodie.utilities.schema.FilebasedSchemaProvider;
import com.uber.hoodie.utilities.sources.DistributedTestDataSource;
import com.uber.hoodie.utilities.sources.HoodieIncrSource;
import com.uber.hoodie.utilities.sources.InputBatch;
import com.uber.hoodie.utilities.sources.TestDataSource;
import com.uber.hoodie.utilities.sources.config.TestSourceConfig;
import com.uber.hoodie.utilities.transform.SqlQueryBasedTransformer;
import com.uber.hoodie.utilities.transform.Transformer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -57,6 +70,7 @@ import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -197,6 +211,22 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
assertEquals(expected, recordCount);
}
static void assertAtleastNCompactionCommits(int minExpected, String datasetPath, FileSystem fs) {
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), datasetPath);
HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
log.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
int numCompactionCommits = (int)timeline.getInstants().count();
assertTrue("Got=" + numCompactionCommits + ", exp >=" + minExpected, minExpected <= numCompactionCommits);
}
static void assertAtleastNDeltaCommits(int minExpected, String datasetPath, FileSystem fs) {
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), datasetPath);
HoodieTimeline timeline = meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
log.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
int numDeltaCommits = (int)timeline.getInstants().count();
assertTrue("Got=" + numDeltaCommits + ", exp >=" + minExpected, minExpected <= numDeltaCommits);
}
static String assertCommitMetadata(String expected, String datasetPath, FileSystem fs, int totalCommits)
throws IOException {
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), datasetPath);
@@ -208,6 +238,23 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
assertEquals(expected, commitMetadata.getMetadata(HoodieDeltaStreamer.CHECKPOINT_KEY));
return lastInstant.getTimestamp();
}
static void waitTillCondition(Function<Boolean, Boolean> condition, long timeoutInSecs) throws Exception {
Future<Boolean> res = Executors.newSingleThreadExecutor().submit(() -> {
boolean ret = false;
while (!ret) {
try {
Thread.sleep(3000);
ret = condition.apply(true);
} catch (Throwable error) {
log.warn("Got error :", error);
ret = false;
}
}
return true;
});
res.get(timeoutInSecs, TimeUnit.SECONDS);
}
}
@Test
@@ -261,6 +308,51 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
assertEquals(2000, counts.get(0).getLong(1));
}
@Test
public void testUpsertsCOWContinuousMode() throws Exception {
testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow");
}
@Test
public void testUpsertsMORContinuousMode() throws Exception {
testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor");
}
private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir) throws Exception {
String datasetBasePath = dfsBasePath + "/" + tempDir;
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(datasetBasePath, Operation.UPSERT);
cfg.continuousMode = true;
cfg.storageType = tableType.name();
cfg.configs.add(String.format("%s=%d", TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> {
try {
ds.sync();
} catch (Exception ex) {
throw new RuntimeException(ex.getMessage(), ex);
}
});
TestHelpers.waitTillCondition((r) -> {
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
TestHelpers.assertAtleastNDeltaCommits(5, datasetBasePath, dfs);
TestHelpers.assertAtleastNCompactionCommits(2, datasetBasePath, dfs);
} else {
TestHelpers.assertAtleastNCompactionCommits(5, datasetBasePath, dfs);
}
TestHelpers.assertRecordCount(totalRecords, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(totalRecords, datasetBasePath + "/*/*.parquet", sqlContext);
return true;
}, 180);
ds.shutdownGracefully();
dsFuture.get();
}
/**
* Test Bulk Insert and upserts with hive syncing. Tests Hudi incremental processing using a 2 step pipeline
* The first step involves using a SQL template to transform a source
@@ -366,6 +458,20 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
assertEquals(1000, counts.get(1).getLong(1));
}
@Test
public void testDistributedTestDataSource() throws Exception {
TypedProperties props = new TypedProperties();
props.setProperty(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, "1000");
props.setProperty(TestSourceConfig.NUM_SOURCE_PARTITIONS_PROP, "1");
props.setProperty(TestSourceConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS, "true");
DistributedTestDataSource distributedTestDataSource = new DistributedTestDataSource(props,
jsc, sparkSession, null);
InputBatch<JavaRDD<GenericRecord>> batch = distributedTestDataSource.fetchNext(Optional.empty(), 10000000);
batch.getBatch().get().cache();
long c = batch.getBatch().get().count();
Assert.assertEquals(1000, c);
}
/**
* UDF to calculate Haversine distance
*/

View File

@@ -0,0 +1,103 @@
package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.common.util.collection.RocksDBBasedMap;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import com.uber.hoodie.utilities.sources.config.TestSourceConfig;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
public abstract class AbstractBaseTestSource extends AvroSource {
// Static instance, helps with reuse across a test.
protected static transient HoodieTestDataGenerator dataGenerator;
public static void initDataGen() {
dataGenerator = new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS);
}
public static void initDataGen(TypedProperties props) {
try {
boolean useRocksForTestDataGenKeys = props.getBoolean(TestSourceConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS,
TestSourceConfig.DEFAULT_USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS);
String baseStoreDir = props.getString(TestSourceConfig.ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS, null);
if (null == baseStoreDir) {
baseStoreDir = File.createTempFile("test_data_gen", ".keys").getParent();
}
log.info("useRocksForTestDataGenKeys=" + useRocksForTestDataGenKeys + ", BaseStoreDir=" + baseStoreDir);
dataGenerator = new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS,
useRocksForTestDataGenKeys ? new RocksDBBasedMap<>(baseStoreDir) : new HashMap<>());
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}
public static void resetDataGen() {
if (null != dataGenerator) {
dataGenerator.close();
}
dataGenerator = null;
}
protected AbstractBaseTestSource(TypedProperties props,
JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
}
protected static Stream<GenericRecord> fetchNextBatch(TypedProperties props, int sourceLimit, String commitTime) {
int maxUniqueKeys = props.getInteger(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP,
TestSourceConfig.DEFAULT_MAX_UNIQUE_RECORDS);
// generate `sourceLimit` number of upserts each time.
int numExistingKeys = dataGenerator.getNumExistingKeys();
log.info("NumExistingKeys=" + numExistingKeys);
int numUpdates = Math.min(numExistingKeys, sourceLimit / 2);
int numInserts = sourceLimit - numUpdates;
log.info("Before adjustments => numInserts=" + numInserts + ", numUpdates=" + numUpdates);
if (numInserts + numExistingKeys > maxUniqueKeys) {
// Limit inserts so that maxUniqueRecords is maintained
numInserts = Math.max(0, maxUniqueKeys - numExistingKeys);
}
if ((numInserts + numUpdates) < sourceLimit) {
// try to expand updates to safe limit
numUpdates = Math.min(numExistingKeys, sourceLimit - numInserts);
}
log.info("NumInserts=" + numInserts + ", NumUpdates=" + numUpdates + ", maxUniqueRecords=" + maxUniqueKeys);
long memoryUsage1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
log.info("Before DataGen. Memory Usage=" + memoryUsage1 + ", Total Memory=" + Runtime.getRuntime().totalMemory()
+ ", Free Memory=" + Runtime.getRuntime().freeMemory());
List<GenericRecord> records = new ArrayList<>();
Stream<GenericRecord> updateStream = dataGenerator.generateUniqueUpdatesStream(commitTime, numUpdates)
.map(AbstractBaseTestSource::toGenericRecord);
Stream<GenericRecord> insertStream = dataGenerator.generateInsertsStream(commitTime, numInserts)
.map(AbstractBaseTestSource::toGenericRecord);
return Stream.concat(updateStream, insertStream);
}
private static GenericRecord toGenericRecord(HoodieRecord hoodieRecord) {
try {
Optional<IndexedRecord> recordOpt = hoodieRecord.getData().getInsertValue(dataGenerator.avroSchema);
return (GenericRecord) recordOpt.get();
} catch (IOException e) {
return null;
}
}
}

View File

@@ -0,0 +1,79 @@
/*
* Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import com.uber.hoodie.utilities.sources.config.TestSourceConfig;
import java.util.Iterator;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
/**
* A Test DataSource which scales test-data generation by using spark parallelism.
*/
public class DistributedTestDataSource extends AbstractBaseTestSource {
private final int numTestSourcePartitions;
public DistributedTestDataSource(TypedProperties props,
JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
this.numTestSourcePartitions = props.getInteger(TestSourceConfig.NUM_SOURCE_PARTITIONS_PROP,
TestSourceConfig.DEFAULT_NUM_SOURCE_PARTITIONS);
}
@Override
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Optional<String> lastCkptStr, long sourceLimit) {
int nextCommitNum = lastCkptStr.map(s -> Integer.parseInt(s) + 1).orElse(0);
String commitTime = String.format("%05d", nextCommitNum);
log.info("Source Limit is set to " + sourceLimit);
// No new data.
if (sourceLimit <= 0) {
return new InputBatch<>(Optional.empty(), commitTime);
}
TypedProperties newProps = new TypedProperties();
newProps.putAll(props);
// Set the maxUniqueRecords per partition for TestDataSource
int maxUniqueRecords = props.getInteger(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP,
TestSourceConfig.DEFAULT_MAX_UNIQUE_RECORDS);
String maxUniqueRecordsPerPartition = String.valueOf(Math.max(1, maxUniqueRecords / numTestSourcePartitions));
newProps.setProperty(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, maxUniqueRecordsPerPartition);
int perPartitionSourceLimit = Math.max(1, (int) (sourceLimit / numTestSourcePartitions));
JavaRDD<GenericRecord> avroRDD = sparkContext.parallelize(IntStream.range(0, numTestSourcePartitions).boxed()
.collect(Collectors.toList()), numTestSourcePartitions).mapPartitions(idx -> {
log.info("Initializing source with newProps=" + newProps);
if (null == dataGenerator) {
initDataGen(newProps);
}
Iterator<GenericRecord> itr = fetchNextBatch(newProps, perPartitionSourceLimit, commitTime).iterator();
return itr;
});
return new InputBatch<>(Optional.of(avroRDD), commitTime);
}
}

View File

@@ -18,17 +18,12 @@
package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -38,32 +33,15 @@ import org.apache.spark.sql.SparkSession;
/**
* An implementation of {@link Source}, that emits test upserts.
*/
public class TestDataSource extends AvroSource {
public class TestDataSource extends AbstractBaseTestSource {
private static volatile Logger log = LogManager.getLogger(TestDataSource.class);
// Static instance, helps with reuse across a test.
private static HoodieTestDataGenerator dataGenerator;
public static void initDataGen() {
dataGenerator = new HoodieTestDataGenerator();
}
public static void resetDataGen() {
dataGenerator = null;
}
public TestDataSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
}
private GenericRecord toGenericRecord(HoodieRecord hoodieRecord) {
try {
Optional<IndexedRecord> recordOpt = hoodieRecord.getData().getInsertValue(dataGenerator.avroSchema);
return (GenericRecord) recordOpt.get();
} catch (IOException e) {
return null;
if (null == dataGenerator) {
initDataGen(props);
}
}
@@ -73,26 +51,14 @@ public class TestDataSource extends AvroSource {
int nextCommitNum = lastCheckpointStr.map(s -> Integer.parseInt(s) + 1).orElse(0);
String commitTime = String.format("%05d", nextCommitNum);
log.info("Source Limit is set to " + sourceLimit);
// No new data.
if (sourceLimit <= 0) {
return new InputBatch<>(Optional.empty(), commitTime);
}
// generate `sourceLimit` number of upserts each time.
int numExistingKeys = dataGenerator.getExistingKeysList().size();
int numUpdates = Math.min(numExistingKeys, (int) sourceLimit / 2);
int numInserts = (int) sourceLimit - numUpdates;
List<GenericRecord> records = new ArrayList<>();
try {
records.addAll(dataGenerator.generateUniqueUpdates(commitTime, numUpdates).stream()
.map(this::toGenericRecord).collect(Collectors.toList()));
records.addAll(dataGenerator.generateInserts(commitTime, numInserts).stream()
.map(this::toGenericRecord).collect(Collectors.toList()));
} catch (IOException e) {
log.error("Error generating test data.", e);
}
List<GenericRecord> records = fetchNextBatch(props, (int)sourceLimit, commitTime).collect(Collectors.toList());
JavaRDD<GenericRecord> avroRDD = sparkContext.<GenericRecord>parallelize(records, 4);
return new InputBatch<>(Optional.of(avroRDD), commitTime);
}

View File

@@ -0,0 +1,43 @@
/*
* Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities.sources.config;
/**
* Configurations for Test Data Sources
*/
public class TestSourceConfig {
// Used by DistributedTestDataSource only. Number of partitions where each partitions generates test-data
public static final String NUM_SOURCE_PARTITIONS_PROP = "hoodie.deltastreamer.source.test.num_partitions";
public static final Integer DEFAULT_NUM_SOURCE_PARTITIONS = 10;
// Maximum number of unique records generated for the run
public static final String MAX_UNIQUE_RECORDS_PROP = "hoodie.deltastreamer.source.test.max_unique_records";
public static final Integer DEFAULT_MAX_UNIQUE_RECORDS = Integer.MAX_VALUE;
// Use Rocks DB for storing datagen keys
public static final String USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS =
"hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys";
public static final Boolean DEFAULT_USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS = false;
// Base Dir for storing datagen keys
public static final String ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS =
"hoodie.deltastreamer.source.test.datagen.rocksdb_base_dir";
}