1
0

[HUDI-3842] Integ tests for non partitioned datasets (#5276)

- Adding non-partitioned support to integ tests
- Fixing some of the test yamls and properties
This commit is contained in:
Sivabalan Narayanan
2022-04-10 17:09:48 -07:00
committed by GitHub
parent 976840e8eb
commit 12731f5b89
28 changed files with 742 additions and 82 deletions

View File

@@ -81,7 +81,7 @@ public abstract class BaseValidateDatasetNode extends DagNode<Boolean> {
SparkSession session = SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate();
// todo: Fix partitioning schemes. For now, assumes data based partitioning.
String inputPath = context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*";
log.warn("Validation using data from input path " + inputPath);
log.info("Validation using data from input path " + inputPath);
// listing batches to be validated
String inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
if (log.isDebugEnabled()) {
@@ -166,7 +166,7 @@ public abstract class BaseValidateDatasetNode extends DagNode<Boolean> {
ExpressionEncoder encoder = getEncoder(inputDf.schema());
return inputDf.groupByKey(
(MapFunction<Row, String>) value ->
value.getAs(partitionPathField) + "+" + value.getAs(recordKeyField), Encoders.STRING())
(partitionPathField.isEmpty() ? value.getAs(recordKeyField) : (value.getAs(partitionPathField) + "+" + value.getAs(recordKeyField))), Encoders.STRING())
.reduceGroups((ReduceFunction<Row>) (v1, v2) -> {
int ts1 = v1.getAs(SchemaUtils.SOURCE_ORDERING_FIELD);
int ts2 = v2.getAs(SchemaUtils.SOURCE_ORDERING_FIELD);

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.integ.testsuite.dag.nodes;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.spark.sql.Dataset;
@@ -48,8 +49,8 @@ public class ValidateDatasetNode extends BaseValidateDatasetNode {
@Override
public Dataset<Row> getDatasetToValidate(SparkSession session, ExecutionContext context,
StructType inputSchema) {
String hudiPath = context.getHoodieTestSuiteWriter().getCfg().targetBasePath + "/*/*/*";
log.info("Validate data in target hudi path " + hudiPath);
String partitionPathField = context.getWriterContext().getProps().getString(DataSourceWriteOptions.PARTITIONPATH_FIELD().key());
String hudiPath = context.getHoodieTestSuiteWriter().getCfg().targetBasePath + (partitionPathField.isEmpty() ? "/" : "/*/*/*");
Dataset<Row> hudiDf = session.read().option(HoodieMetadataConfig.ENABLE.key(), String.valueOf(config.isEnableMetadataValidate()))
.format("hudi").load(hudiPath);
return hudiDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)

View File

@@ -64,16 +64,20 @@ class SparkDataSourceContinuousIngest(val spark: SparkSession, val conf: Configu
}
}
orderedBatch.foreach(entry => {
log.info("Consuming from batch " + entry)
val pathToConsume = new Path(sourcePath.toString + "/" + entry.getPath.getName)
val df = spark.read.format(sourceFormat).load(pathToConsume.toString)
if (orderedBatch.isEmpty) {
log.info("All batches have been consumed. Exiting.")
} else {
orderedBatch.foreach(entry => {
log.info("Consuming from batch " + entry)
val pathToConsume = new Path(sourcePath.toString + "/" + entry.getPath.getName)
val df = spark.read.format(sourceFormat).load(pathToConsume.toString)
df.write.format("hudi").options(hudiOptions).mode(SaveMode.Append).save(hudiBasePath.toString)
writeToFile(checkpointFile, entry.getPath.getName, checkPointFs)
log.info("Completed batch " + entry + ". Moving to next batch. Sleeping for " + minSyncIntervalSeconds + " secs before next batch")
Thread.sleep(minSyncIntervalSeconds * 1000)
})
df.write.format("hudi").options(hudiOptions).mode(SaveMode.Append).save(hudiBasePath.toString)
writeToFile(checkpointFile, entry.getPath.getName, checkPointFs)
log.info("Completed batch " + entry + ". Moving to next batch. Sleeping for " + minSyncIntervalSeconds + " secs before next batch")
Thread.sleep(minSyncIntervalSeconds * 1000)
})
}
}
def fetchListOfFilesToConsume(fs: FileSystem, basePath: Path, pathFilter: PathFilter): Array[FileStatus] = {