Test Suite should work with Docker + Unit Tests
This commit is contained in:
@@ -90,6 +90,7 @@ public class HoodieTestSuiteJob {
|
||||
public HoodieTestSuiteJob(HoodieTestSuiteConfig cfg, JavaSparkContext jsc) throws IOException {
|
||||
this.cfg = cfg;
|
||||
this.jsc = jsc;
|
||||
cfg.propsFilePath = FSUtils.addSchemeIfLocalPath(cfg.propsFilePath).toString();
|
||||
this.sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();
|
||||
this.fs = FSUtils.getFs(cfg.inputBasePath, jsc.hadoopConfiguration());
|
||||
this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
|
||||
@@ -123,11 +124,18 @@ public class HoodieTestSuiteJob {
|
||||
new HoodieTestSuiteJob(cfg, jssc).runTestSuite();
|
||||
}
|
||||
|
||||
public WorkflowDag createWorkflowDag() throws IOException {
|
||||
WorkflowDag workflowDag = this.cfg.workloadYamlPath == null ? ((WorkflowDagGenerator) ReflectionUtils
|
||||
.loadClass((this.cfg).workloadDagGenerator)).build()
|
||||
: DagUtils.convertYamlPathToDag(
|
||||
FSUtils.getFs(this.cfg.workloadYamlPath, jsc.hadoopConfiguration(), true),
|
||||
this.cfg.workloadYamlPath);
|
||||
return workflowDag;
|
||||
}
|
||||
|
||||
public void runTestSuite() {
|
||||
try {
|
||||
WorkflowDag workflowDag = this.cfg.workloadYamlPath == null ? ((WorkflowDagGenerator) ReflectionUtils
|
||||
.loadClass((this.cfg).workloadDagGenerator)).build()
|
||||
: DagUtils.convertYamlPathToDag(this.fs, this.cfg.workloadYamlPath);
|
||||
WorkflowDag workflowDag = createWorkflowDag();
|
||||
log.info("Workflow Dag => " + DagUtils.convertDagToYaml(workflowDag));
|
||||
long startTime = System.currentTimeMillis();
|
||||
String schemaStr = schemaProvider.getSourceSchema().toString();
|
||||
@@ -143,6 +151,7 @@ public class HoodieTestSuiteJob {
|
||||
log.error("Failed to run Test Suite ", e);
|
||||
throw new HoodieException("Failed to run Test Suite ", e);
|
||||
} finally {
|
||||
sparkSession.stop();
|
||||
jsc.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,7 +39,6 @@ public class HiveSyncNode extends DagNode<Boolean> {
|
||||
log.info("Executing hive sync node");
|
||||
this.hiveServiceProvider.startLocalHiveServiceIfNeeded(executionContext.getHoodieTestSuiteWriter().getConfiguration());
|
||||
this.hiveServiceProvider.syncToLocalHiveIfNeeded(executionContext.getHoodieTestSuiteWriter());
|
||||
executionContext.getHoodieTestSuiteWriter().getDeltaStreamerWrapper().getDeltaSyncService().getDeltaSync().syncHive();
|
||||
this.hiveServiceProvider.stopLocalHiveServiceIfNeeded();
|
||||
}
|
||||
|
||||
|
||||
@@ -80,7 +80,7 @@ public class DagScheduler {
|
||||
private void execute(ExecutorService service, List<DagNode> nodes) throws Exception {
|
||||
// Nodes at the same level are executed in parallel
|
||||
Queue<DagNode> queue = new PriorityQueue<>(nodes);
|
||||
log.info("Running workloads");
|
||||
log.warn("Running workloads");
|
||||
do {
|
||||
List<Future> futures = new ArrayList<>();
|
||||
Set<DagNode> childNodes = new HashSet<>();
|
||||
@@ -110,6 +110,7 @@ public class DagScheduler {
|
||||
throw new RuntimeException("DagNode already completed! Cannot re-execute");
|
||||
}
|
||||
try {
|
||||
log.warn("executing node: " + node.getName() + " of type: " + node.getClass());
|
||||
node.execute(executionContext);
|
||||
node.setCompleted(true);
|
||||
log.info("Finished executing {}", node.getName());
|
||||
|
||||
@@ -94,11 +94,12 @@ public class DeltaGenerator implements Serializable {
|
||||
|
||||
public JavaRDD<GenericRecord> generateInserts(Config operation) {
|
||||
long recordsPerPartition = operation.getNumRecordsInsert();
|
||||
int numPartitions = operation.getNumInsertPartitions();
|
||||
int minPayloadSize = operation.getRecordSize();
|
||||
JavaRDD<GenericRecord> inputBatch = jsc.parallelize(Collections.EMPTY_LIST)
|
||||
.repartition(operation.getNumInsertPartitions()).mapPartitions(p -> {
|
||||
return new LazyRecordGeneratorIterator(new FlexibleSchemaRecordGenerationIterator(recordsPerPartition,
|
||||
minPayloadSize, schemaStr, partitionPathFieldNames));
|
||||
minPayloadSize, schemaStr, partitionPathFieldNames, numPartitions));
|
||||
});
|
||||
return inputBatch;
|
||||
}
|
||||
|
||||
@@ -40,15 +40,15 @@ public class FlexibleSchemaRecordGenerationIterator implements Iterator<GenericR
|
||||
private List<String> partitionPathFieldNames;
|
||||
|
||||
public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, String schema) {
|
||||
this(maxEntriesToProduce, GenericRecordFullPayloadGenerator.DEFAULT_PAYLOAD_SIZE, schema, null);
|
||||
this(maxEntriesToProduce, GenericRecordFullPayloadGenerator.DEFAULT_PAYLOAD_SIZE, schema, null, GenericRecordFullPayloadGenerator.DEFAULT_NUM_DATE_PARTITIONS);
|
||||
}
|
||||
|
||||
public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, int minPayloadSize, String schemaStr,
|
||||
List<String> partitionPathFieldNames) {
|
||||
List<String> partitionPathFieldNames, int numPartitions) {
|
||||
this.counter = maxEntriesToProduce;
|
||||
this.partitionPathFieldNames = partitionPathFieldNames;
|
||||
Schema schema = new Schema.Parser().parse(schemaStr);
|
||||
this.generator = new GenericRecordFullPayloadGenerator(schema, minPayloadSize);
|
||||
this.generator = new GenericRecordFullPayloadGenerator(schema, minPayloadSize, numPartitions);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -27,6 +27,8 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.Schema.Type;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
@@ -44,6 +46,7 @@ import org.slf4j.LoggerFactory;
|
||||
public class GenericRecordFullPayloadGenerator implements Serializable {
|
||||
|
||||
public static final int DEFAULT_PAYLOAD_SIZE = 1024 * 10; // 10 KB
|
||||
public static final int DEFAULT_NUM_DATE_PARTITIONS = 50;
|
||||
private static Logger log = LoggerFactory.getLogger(GenericRecordFullPayloadGenerator.class);
|
||||
protected final Random random = new Random();
|
||||
// The source schema used to generate a payload
|
||||
@@ -58,6 +61,8 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
|
||||
private int numberOfComplexFields;
|
||||
// The size of a full record where every field of a generic record created contains 1 random value
|
||||
private int estimatedFullPayloadSize;
|
||||
// The number of unique dates to create
|
||||
private int numDatePartitions = DEFAULT_NUM_DATE_PARTITIONS;
|
||||
// LogicalTypes in Avro 1.8.2
|
||||
private static final String DECIMAL = "decimal";
|
||||
private static final String UUID_NAME = "uuid";
|
||||
@@ -86,6 +91,10 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
|
||||
}
|
||||
}
|
||||
}
|
||||
public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize, int numDatePartitions) {
|
||||
this(schema, minPayloadSize);
|
||||
this.numDatePartitions = numDatePartitions;
|
||||
}
|
||||
|
||||
protected static boolean isPrimitive(Schema localSchema) {
|
||||
if (localSchema.getType() != Type.ARRAY
|
||||
@@ -170,6 +179,12 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
|
||||
return record;
|
||||
}
|
||||
|
||||
private long getNextConstrainedLong() {
|
||||
int numPartitions = random.nextInt(numDatePartitions);
|
||||
long unixTimeStamp = TimeUnit.SECONDS.convert(numPartitions, TimeUnit.DAYS);
|
||||
return unixTimeStamp;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate random value according to their type.
|
||||
*/
|
||||
@@ -188,7 +203,7 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
|
||||
case INT:
|
||||
return random.nextInt();
|
||||
case LONG:
|
||||
return random.nextLong();
|
||||
return getNextConstrainedLong();
|
||||
case STRING:
|
||||
return UUID.randomUUID().toString();
|
||||
case ENUM:
|
||||
|
||||
Reference in New Issue
Block a user