1
0

[HUDI-531] Add java doc for hudi test suite general classes (#1900)

This commit is contained in:
Mathieu
2020-08-28 08:44:40 +08:00
committed by GitHub
parent 3a578d7402
commit fa81248247
21 changed files with 174 additions and 6 deletions

View File

@@ -557,7 +557,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted()); metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files" LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
+ " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain() + " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
+ " cleanerElaspsedMs" + durationMs); + " cleanerElapsedMs" + durationMs);
} }
return metadata; return metadata;
} }

View File

@@ -41,7 +41,7 @@ Depending on the type of workload generated, data is either ingested into the ta
dataset or the corresponding workload operation is executed. For example compaction does not necessarily need a workload dataset or the corresponding workload operation is executed. For example compaction does not necessarily need a workload
to be generated/ingested but can require an execution. to be generated/ingested but can require an execution.
## Other actions/operatons ## Other actions/operations
The test suite supports different types of operations besides ingestion such as Hive Query execution, Clean action etc. The test suite supports different types of operations besides ingestion such as Hive Query execution, Clean action etc.
@@ -66,9 +66,9 @@ link#HudiDeltaStreamer page to learn about all the available configs applicable
There are 2 ways to generate a workload pattern There are 2 ways to generate a workload pattern
1.Programatically 1.Programmatically
Choose to write up the entire DAG of operations programatically, take a look at `WorkflowDagGenerator` class. Choose to write up the entire DAG of operations programmatically, take a look at `WorkflowDagGenerator` class.
Once you're ready with the DAG you want to execute, simply pass the class name as follows: Once you're ready with the DAG you want to execute, simply pass the class name as follows:
``` ```

View File

@@ -29,5 +29,11 @@ import org.apache.spark.api.java.JavaRDD;
*/ */
public interface Converter<I, O> extends Serializable { public interface Converter<I, O> extends Serializable {
/**
* Convert data from one format to another.
*
* @param inputRDD Input data
* @return Data in target format
*/
JavaRDD<O> convert(JavaRDD<I> inputRDD); JavaRDD<O> convert(JavaRDD<I> inputRDD);
} }

View File

@@ -24,6 +24,9 @@ import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
/**
* Represents a bulk insert node in the DAG of operations for a workflow.
*/
public class BulkInsertNode extends InsertNode { public class BulkInsertNode extends InsertNode {
public BulkInsertNode(Config config) { public BulkInsertNode(Config config) {

View File

@@ -20,6 +20,10 @@ package org.apache.hudi.integ.testsuite.dag.nodes;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext; import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
/**
* Represents a clean node in the DAG of operations for a workflow. Clean up any stale/old files/data lying around
* (either on file storage or index storage) based on configurations and CleaningPolicy used.
*/
public class CleanNode extends DagNode<Boolean> { public class CleanNode extends DagNode<Boolean> {
public CleanNode() { public CleanNode() {

View File

@@ -26,12 +26,22 @@ import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext; import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
/**
* Represents a compact node in the DAG of operations for a workflow.
*/
public class CompactNode extends DagNode<JavaRDD<WriteStatus>> { public class CompactNode extends DagNode<JavaRDD<WriteStatus>> {
public CompactNode(Config config) { public CompactNode(Config config) {
this.config = config; this.config = config;
} }
/**
* Method helps to start the compact operation. It will compact the last pending compact instant in the timeline
* if it has one.
*
* @param executionContext Execution context to run this compaction
* @throws Exception will be thrown if any error occurred.
*/
@Override @Override
public void execute(ExecutionContext executionContext) throws Exception { public void execute(ExecutionContext executionContext) throws Exception {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(executionContext.getHoodieTestSuiteWriter().getConfiguration(), HoodieTableMetaClient metaClient = new HoodieTableMetaClient(executionContext.getHoodieTestSuiteWriter().getConfiguration(),

View File

@@ -29,7 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* Represents a Node in the DAG of operations for a workflow. * Base abstraction of an compute node in the DAG of operations for a workflow.
*/ */
public abstract class DagNode<O> implements Comparable<DagNode<O>> { public abstract class DagNode<O> implements Comparable<DagNode<O>> {
@@ -76,6 +76,12 @@ public abstract class DagNode<O> implements Comparable<DagNode<O>> {
this.parentNodes = parentNodes; this.parentNodes = parentNodes;
} }
/**
* Execute the {@link DagNode}.
*
* @param context The context needed for an execution of a node.
* @throws Exception Thrown if the execution failed.
*/
public abstract void execute(ExecutionContext context) throws Exception; public abstract void execute(ExecutionContext context) throws Exception;
public boolean isCompleted() { public boolean isCompleted() {

View File

@@ -30,6 +30,9 @@ import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext; import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.helpers.HiveServiceProvider; import org.apache.hudi.integ.testsuite.helpers.HiveServiceProvider;
/**
* A hive query node in the DAG of operations for a workflow. used to perform a hive query with given config.
*/
public class HiveQueryNode extends DagNode<Boolean> { public class HiveQueryNode extends DagNode<Boolean> {
private HiveServiceProvider hiveServiceProvider; private HiveServiceProvider hiveServiceProvider;

View File

@@ -22,6 +22,9 @@ import org.apache.hudi.integ.testsuite.helpers.HiveServiceProvider;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext; import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
/**
* Represents a hive sync node in the DAG of operations for a workflow. Helps to sync hoodie data to hive table.
*/
public class HiveSyncNode extends DagNode<Boolean> { public class HiveSyncNode extends DagNode<Boolean> {
private HiveServiceProvider hiveServiceProvider; private HiveServiceProvider hiveServiceProvider;

View File

@@ -26,6 +26,9 @@ import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.generator.DeltaGenerator; import org.apache.hudi.integ.testsuite.generator.DeltaGenerator;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
/**
* An insert node in the DAG of operations for a workflow.
*/
public class InsertNode extends DagNode<JavaRDD<WriteStatus>> { public class InsertNode extends DagNode<JavaRDD<WriteStatus>> {
public InsertNode(Config config) { public InsertNode(Config config) {

View File

@@ -24,12 +24,21 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext; import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
/**
* A rollback node in the DAG helps to perform rollback operations.
*/
public class RollbackNode extends DagNode<Option<HoodieInstant>> { public class RollbackNode extends DagNode<Option<HoodieInstant>> {
public RollbackNode(Config config) { public RollbackNode(Config config) {
this.config = config; this.config = config;
} }
/**
* Method helps to rollback the last commit instant in the timeline, if it has one.
*
* @param executionContext Execution context to perform this rollback
* @throws Exception will be thrown if any error occurred
*/
@Override @Override
public void execute(ExecutionContext executionContext) throws Exception { public void execute(ExecutionContext executionContext) throws Exception {
log.info("Executing rollback node {}", this.getName()); log.info("Executing rollback node {}", this.getName());

View File

@@ -25,6 +25,9 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext; import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
/**
* A schedule node in the DAG of operations for a workflow helps to schedule compact operation.
*/
public class ScheduleCompactNode extends DagNode<Option<String>> { public class ScheduleCompactNode extends DagNode<Option<String>> {
public ScheduleCompactNode(Config config) { public ScheduleCompactNode(Config config) {

View File

@@ -26,6 +26,9 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
/**
* A SparkSQL query node in the DAG of operations for a workflow.
*/
public class SparkSQLQueryNode extends DagNode<Boolean> { public class SparkSQLQueryNode extends DagNode<Boolean> {
HiveServiceProvider hiveServiceProvider; HiveServiceProvider hiveServiceProvider;
@@ -35,6 +38,12 @@ public class SparkSQLQueryNode extends DagNode<Boolean> {
this.hiveServiceProvider = new HiveServiceProvider(config); this.hiveServiceProvider = new HiveServiceProvider(config);
} }
/**
* Method helps to execute a sparkSql query from a hive table.
*
* @param executionContext Execution context to perform this query.
* @throws Exception will be thrown if ant error occurred
*/
@Override @Override
public void execute(ExecutionContext executionContext) throws Exception { public void execute(ExecutionContext executionContext) throws Exception {
log.info("Executing spark sql query node"); log.info("Executing spark sql query node");

View File

@@ -25,6 +25,9 @@ import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.generator.DeltaGenerator; import org.apache.hudi.integ.testsuite.generator.DeltaGenerator;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
/**
* Represents an upsert node in the DAG of operations for a workflow.
*/
public class UpsertNode extends InsertNode { public class UpsertNode extends InsertNode {
public UpsertNode(Config config) { public UpsertNode(Config config) {

View File

@@ -23,6 +23,9 @@ import java.util.function.Function;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext; import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
/**
* A validate node helps to validate its parent nodes with given function.
*/
public class ValidateNode<R> extends DagNode { public class ValidateNode<R> extends DagNode {
protected Function<List<DagNode>, R> function; protected Function<List<DagNode>, R> function;
@@ -32,6 +35,12 @@ public class ValidateNode<R> extends DagNode {
this.config = config; this.config = config;
} }
/**
* Method to start the validate operation. Exceptions will be thrown if its parent nodes exist and WAIT_FOR_PARENTS
* was set to true or default, but the parent nodes have not completed yet.
*
* @param executionContext Context to execute this node
*/
@Override @Override
public void execute(ExecutionContext executionContext) { public void execute(ExecutionContext executionContext) {
if (this.getParentNodes().size() > 0 && (Boolean) this.config.getOtherConfigs().getOrDefault("WAIT_FOR_PARENTS", if (this.getParentNodes().size() > 0 && (Boolean) this.config.getOtherConfigs().getOrDefault("WAIT_FOR_PARENTS",

View File

@@ -37,6 +37,10 @@ import org.apache.hudi.integ.testsuite.generator.DeltaGenerator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/**
* The Dag scheduler schedules the workflow DAGs. It will convert DAG to node set and execute the nodes according to
* the relations between nodes.
*/
public class DagScheduler { public class DagScheduler {
private static Logger log = LoggerFactory.getLogger(DagScheduler.class); private static Logger log = LoggerFactory.getLogger(DagScheduler.class);
@@ -48,6 +52,11 @@ public class DagScheduler {
this.executionContext = new ExecutionContext(null, hoodieTestSuiteWriter, deltaGenerator); this.executionContext = new ExecutionContext(null, hoodieTestSuiteWriter, deltaGenerator);
} }
/**
* Method to start executing workflow DAGs.
*
* @throws Exception Thrown if schedule failed.
*/
public void schedule() throws Exception { public void schedule() throws Exception {
ExecutorService service = Executors.newFixedThreadPool(2); ExecutorService service = Executors.newFixedThreadPool(2);
try { try {
@@ -61,6 +70,13 @@ public class DagScheduler {
} }
} }
/**
* Method to start executing the nodes in workflow DAGs.
*
* @param service ExecutorService
* @param nodes Nodes to be executed
* @throws Exception will be thrown if ant error occurred
*/
private void execute(ExecutorService service, List<DagNode> nodes) throws Exception { private void execute(ExecutorService service, List<DagNode> nodes) throws Exception {
// Nodes at the same level are executed in parallel // Nodes at the same level are executed in parallel
Queue<DagNode> queue = new PriorityQueue<>(nodes); Queue<DagNode> queue = new PriorityQueue<>(nodes);
@@ -84,6 +100,11 @@ public class DagScheduler {
log.info("Finished workloads"); log.info("Finished workloads");
} }
/**
* Execute the given node.
*
* @param node The node to be executed
*/
private void executeNode(DagNode node) { private void executeNode(DagNode node) {
if (node.isCompleted()) { if (node.isCompleted()) {
throw new RuntimeException("DagNode already completed! Cannot re-execute"); throw new RuntimeException("DagNode already completed! Cannot re-execute");

View File

@@ -57,7 +57,7 @@ import scala.Tuple2;
*/ */
public class DeltaGenerator implements Serializable { public class DeltaGenerator implements Serializable {
private static Logger log = LoggerFactory.getLogger(DFSHoodieDatasetInputReader.class); private static Logger log = LoggerFactory.getLogger(DeltaGenerator.class);
private DeltaConfig deltaOutputConfig; private DeltaConfig deltaOutputConfig;
private transient JavaSparkContext jsc; private transient JavaSparkContext jsc;

View File

@@ -98,14 +98,32 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
} }
} }
/**
* Create a new {@link GenericRecord} with random value according to given schema.
*
* @return {@link GenericRecord} with random value
*/
public GenericRecord getNewPayload() { public GenericRecord getNewPayload() {
return convert(baseSchema); return convert(baseSchema);
} }
/**
* Update a given {@link GenericRecord} with random value. The fields in {@code blacklistFields} will not be updated.
*
* @param record GenericRecord to update
* @param blacklistFields Fields whose value should not be touched
* @return The updated {@link GenericRecord}
*/
public GenericRecord getUpdatePayload(GenericRecord record, List<String> blacklistFields) { public GenericRecord getUpdatePayload(GenericRecord record, List<String> blacklistFields) {
return randomize(record, blacklistFields); return randomize(record, blacklistFields);
} }
/**
* Create a {@link GenericRecord} with random value according to given schema.
*
* @param schema Schema to create record with
* @return {@link GenericRecord} with random value
*/
protected GenericRecord convert(Schema schema) { protected GenericRecord convert(Schema schema) {
GenericRecord result = new GenericData.Record(schema); GenericRecord result = new GenericData.Record(schema);
for (Schema.Field f : schema.getFields()) { for (Schema.Field f : schema.getFields()) {
@@ -114,6 +132,13 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
return result; return result;
} }
/**
* Create a new {@link GenericRecord} with random values. Not all the fields have value, it is random, and its value
* is random too.
*
* @param schema Schema to create with.
* @return A {@link GenericRecord} with random value.
*/
protected GenericRecord convertPartial(Schema schema) { protected GenericRecord convertPartial(Schema schema) {
GenericRecord result = new GenericData.Record(schema); GenericRecord result = new GenericData.Record(schema);
for (Schema.Field f : schema.getFields()) { for (Schema.Field f : schema.getFields()) {
@@ -128,6 +153,14 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
return result; return result;
} }
/**
* Set random value to {@link GenericRecord} according to the schema type of field.
* The field in blacklist will not be set.
*
* @param record GenericRecord to randomize.
* @param blacklistFields blacklistFields where the filed will not be randomized.
* @return Randomized GenericRecord.
*/
protected GenericRecord randomize(GenericRecord record, List<String> blacklistFields) { protected GenericRecord randomize(GenericRecord record, List<String> blacklistFields) {
for (Schema.Field f : record.getSchema().getFields()) { for (Schema.Field f : record.getSchema().getFields()) {
if (blacklistFields == null || !blacklistFields.contains(f.name())) { if (blacklistFields == null || !blacklistFields.contains(f.name())) {
@@ -137,6 +170,9 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
return record; return record;
} }
/**
* Generate random value according to their type.
*/
private Object typeConvert(Schema schema) { private Object typeConvert(Schema schema) {
Schema localSchema = schema; Schema localSchema = schema;
if (isOption(schema)) { if (isOption(schema)) {
@@ -215,10 +251,26 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
} }
} }
/**
* Validate whether the record match schema.
*
* @param record Record to validate.
* @return True if matches.
*/
public boolean validate(GenericRecord record) { public boolean validate(GenericRecord record) {
return genericData.validate(baseSchema, record); return genericData.validate(baseSchema, record);
} }
/**
* Check whether a schema is option.
* return true if it match the follows:
* 1. Its type is Type.UNION
* 2. Has two types
* 3. Has a NULL type.
*
* @param schema
* @return
*/
protected boolean isOption(Schema schema) { protected boolean isOption(Schema schema) {
return schema.getType().equals(Schema.Type.UNION) return schema.getType().equals(Schema.Type.UNION)
&& schema.getTypes().size() == 2 && schema.getTypes().size() == 2
@@ -260,6 +312,12 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
} }
} }
/**
* Method help to calculate the number of entries to add.
*
* @param elementSchema
* @return Number of entries to add
*/
private int numEntriesToAdd(Schema elementSchema) { private int numEntriesToAdd(Schema elementSchema) {
// Find the size of the primitive data type in bytes // Find the size of the primitive data type in bytes
int primitiveDataTypeSize = getSize(elementSchema); int primitiveDataTypeSize = getSize(elementSchema);

View File

@@ -61,6 +61,12 @@ public class GenericRecordFullPayloadSizeEstimator implements Serializable {
return (int) size; return (int) size;
} }
/**
* Estimate the size of a given schema according to their type.
*
* @param schema schema to estimate.
* @return Size of the given schema.
*/
private long typeEstimate(Schema schema) { private long typeEstimate(Schema schema) {
Schema localSchema = schema; Schema localSchema = schema;
if (isOption(schema)) { if (isOption(schema)) {
@@ -112,6 +118,12 @@ public class GenericRecordFullPayloadSizeEstimator implements Serializable {
|| schema.getTypes().get(1).getType().equals(Schema.Type.NULL)); || schema.getTypes().get(1).getType().equals(Schema.Type.NULL));
} }
/**
* Get the nonNull Schema of a given UNION Schema.
*
* @param schema
* @return
*/
protected Schema getNonNull(Schema schema) { protected Schema getNonNull(Schema schema) {
List<Schema> types = schema.getTypes(); List<Schema> types = schema.getTypes();
return types.get(0).getType().equals(Schema.Type.NULL) ? types.get(1) : types.get(0); return types.get(0).getType().equals(Schema.Type.NULL) ? types.get(1) : types.get(0);

View File

@@ -24,6 +24,9 @@ import java.util.List;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
/**
* A lazy update payload generator to generate {@link GenericRecord}s lazily.
*/
public class UpdateGeneratorIterator implements Iterator<GenericRecord> { public class UpdateGeneratorIterator implements Iterator<GenericRecord> {
// Use the full payload generator as default // Use the full payload generator as default

View File

@@ -23,6 +23,9 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
/**
* Adapter use Delta Writer.
*/
public interface DeltaWriterAdapter<I> { public interface DeltaWriterAdapter<I> {
List<DeltaWriteStats> write(Iterator<I> input) throws IOException; List<DeltaWriteStats> write(Iterator<I> input) throws IOException;