1. Fix merge on read DAG to make docker demo pass (#2092)
1. Fix merge on read DAG to make docker demo pass (#2092) 2. Fix repeat_count, rollback node
This commit is contained in:
@@ -18,12 +18,6 @@
|
||||
|
||||
package org.apache.hudi.integ.testsuite;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||
import org.apache.hudi.client.HoodieReadClient;
|
||||
@@ -38,16 +32,24 @@ import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
import org.apache.hudi.config.HoodieIndexConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
|
||||
import org.apache.hudi.integ.testsuite.dag.nodes.CleanNode;
|
||||
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
|
||||
import org.apache.hudi.integ.testsuite.dag.nodes.RollbackNode;
|
||||
import org.apache.hudi.integ.testsuite.dag.nodes.ScheduleCompactNode;
|
||||
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
|
||||
import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
|
||||
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
|
||||
import org.apache.hudi.utilities.schema.SchemaProvider;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* A writer abstraction for the Hudi test suite. This class wraps different implementations of writers used to perform
|
||||
* write operations into the target hudi dataset. Current supported writers are {@link HoodieDeltaStreamerWrapper}
|
||||
@@ -66,6 +68,7 @@ public class HoodieTestSuiteWriter {
|
||||
private transient JavaSparkContext sparkContext;
|
||||
private static Set<String> VALID_DAG_NODES_TO_ALLOW_WRITE_CLIENT_IN_DELTASTREAMER_MODE = new HashSet<>(
|
||||
Arrays.asList(RollbackNode.class.getName(), CleanNode.class.getName(), ScheduleCompactNode.class.getName()));
|
||||
private static final String GENERATED_DATA_PATH = "generated.data.path";
|
||||
|
||||
public HoodieTestSuiteWriter(JavaSparkContext jsc, Properties props, HoodieTestSuiteConfig cfg, String schema) throws
|
||||
Exception {
|
||||
@@ -181,12 +184,17 @@ public class HoodieTestSuiteWriter {
|
||||
}
|
||||
}
|
||||
|
||||
public void commit(JavaRDD<WriteStatus> records, Option<String> instantTime) {
|
||||
public void commit(JavaRDD<WriteStatus> records, JavaRDD<DeltaWriteStats> generatedDataStats,
|
||||
Option<String> instantTime) {
|
||||
if (!cfg.useDeltaStreamer) {
|
||||
Map<String, String> extraMetadata = new HashMap<>();
|
||||
/** Store the checkpoint in the commit metadata just like
|
||||
* {@link HoodieDeltaStreamer#commit(SparkRDDWriteClient, JavaRDD, Option)} **/
|
||||
extraMetadata.put(HoodieDeltaStreamerWrapper.CHECKPOINT_KEY, lastCheckpoint.get());
|
||||
if (generatedDataStats != null) {
|
||||
// Just stores the path where this batch of data is generated to
|
||||
extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map(s -> s.getFilePath()).collect().get(0));
|
||||
}
|
||||
writeClient.commit(instantTime.get(), records, Option.of(extraMetadata));
|
||||
}
|
||||
}
|
||||
@@ -218,4 +226,8 @@ public class HoodieTestSuiteWriter {
|
||||
public JavaSparkContext getSparkContext() {
|
||||
return sparkContext;
|
||||
}
|
||||
|
||||
public Option<String> getLastCheckpoint() {
|
||||
return lastCheckpoint;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -96,6 +96,14 @@ public class WriterContext {
|
||||
return deltaGenerator;
|
||||
}
|
||||
|
||||
public HoodieTestSuiteConfig getCfg() {
|
||||
return cfg;
|
||||
}
|
||||
|
||||
public TypedProperties getProps() {
|
||||
return props;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return this.hoodieTestSuiteWriter.toString() + "\n" + this.deltaGenerator.toString() + "\n";
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter;
|
||||
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
|
||||
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
|
||||
import org.apache.hudi.integ.testsuite.generator.DeltaGenerator;
|
||||
import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
|
||||
/**
|
||||
@@ -31,6 +32,8 @@ import org.apache.spark.api.java.JavaRDD;
|
||||
*/
|
||||
public class InsertNode extends DagNode<JavaRDD<WriteStatus>> {
|
||||
|
||||
protected JavaRDD<DeltaWriteStats> deltaWriteStatsRDD;
|
||||
|
||||
public InsertNode(Config config) {
|
||||
this.config = config;
|
||||
}
|
||||
@@ -48,7 +51,7 @@ public class InsertNode extends DagNode<JavaRDD<WriteStatus>> {
|
||||
log.info("Inserting input data {}", this.getName());
|
||||
Option<String> commitTime = executionContext.getHoodieTestSuiteWriter().startCommit();
|
||||
JavaRDD<WriteStatus> writeStatus = ingest(executionContext.getHoodieTestSuiteWriter(), commitTime);
|
||||
executionContext.getHoodieTestSuiteWriter().commit(writeStatus, commitTime);
|
||||
executionContext.getHoodieTestSuiteWriter().commit(writeStatus, this.deltaWriteStatsRDD, commitTime);
|
||||
this.result = writeStatus;
|
||||
}
|
||||
}
|
||||
@@ -56,7 +59,8 @@ public class InsertNode extends DagNode<JavaRDD<WriteStatus>> {
|
||||
protected void generate(DeltaGenerator deltaGenerator) throws Exception {
|
||||
if (!config.isDisableGenerate()) {
|
||||
log.info("Generating input data for node {}", this.getName());
|
||||
deltaGenerator.writeRecords(deltaGenerator.generateInserts(config)).count();
|
||||
this.deltaWriteStatsRDD = deltaGenerator.writeRecords(deltaGenerator.generateInserts(config));
|
||||
this.deltaWriteStatsRDD.count();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -18,11 +18,16 @@
|
||||
|
||||
package org.apache.hudi.integ.testsuite.dag.nodes;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.exception.HoodieNotSupportedException;
|
||||
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
|
||||
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
|
||||
import org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector;
|
||||
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
|
||||
|
||||
/**
|
||||
* A rollback node in the DAG helps to perform rollback operations.
|
||||
@@ -49,7 +54,12 @@ public class RollbackNode extends DagNode<Option<HoodieInstant>> {
|
||||
Option<HoodieInstant> lastInstant = metaClient.getActiveTimeline().getCommitsTimeline().lastInstant();
|
||||
if (lastInstant.isPresent()) {
|
||||
log.info("Rolling back last instant {}", lastInstant.get());
|
||||
log.info("Cleaning up generated data for the instant being rolled back {}", lastInstant.get());
|
||||
ValidationUtils.checkArgument(executionContext.getWriterContext().getProps().getOrDefault(DFSPathSelector.Config.SOURCE_INPUT_SELECTOR,
|
||||
DFSPathSelector.class.getName()).toString().equalsIgnoreCase(DFSTestSuitePathSelector.class.getName()), "Test Suite only supports DFSTestSuitePathSelector");
|
||||
executionContext.getHoodieTestSuiteWriter().getWriteClient(this).rollback(lastInstant.get().getTimestamp());
|
||||
metaClient.getFs().delete(new Path(executionContext.getWriterContext().getCfg().inputBasePath,
|
||||
executionContext.getWriterContext().getHoodieTestSuiteWriter().getLastCheckpoint().orElse("")), true);
|
||||
this.result = lastInstant;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,15 @@
|
||||
|
||||
package org.apache.hudi.integ.testsuite.dag.scheduler;
|
||||
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
|
||||
import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
|
||||
import org.apache.hudi.integ.testsuite.dag.WriterContext;
|
||||
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
|
||||
import org.apache.hudi.metrics.Metrics;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
@@ -28,14 +37,6 @@ import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
|
||||
import org.apache.hudi.metrics.Metrics;
|
||||
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
|
||||
import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
|
||||
import org.apache.hudi.integ.testsuite.dag.WriterContext;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* The Dag scheduler schedules the workflow DAGs. It will convert DAG to node set and execute the nodes according to
|
||||
@@ -113,12 +114,16 @@ public class DagScheduler {
|
||||
throw new RuntimeException("DagNode already completed! Cannot re-execute");
|
||||
}
|
||||
try {
|
||||
log.info("executing node: " + node.getName() + " of type: " + node.getClass());
|
||||
node.execute(executionContext);
|
||||
int repeatCount = node.getConfig().getRepeatCount();
|
||||
while (repeatCount > 0) {
|
||||
log.warn("executing node: " + node.getName() + " of type: " + node.getClass());
|
||||
node.execute(executionContext);
|
||||
log.info("Finished executing {}", node.getName());
|
||||
repeatCount--;
|
||||
}
|
||||
node.setCompleted(true);
|
||||
log.info("Finished executing {}", node.getName());
|
||||
} catch (Exception e) {
|
||||
log.error("Exception executing node");
|
||||
log.error("Exception executing node", e);
|
||||
throw new HoodieException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,12 +18,17 @@
|
||||
|
||||
package org.apache.hudi.integ.testsuite.helpers;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FsStatus;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
@@ -62,19 +67,27 @@ public class DFSTestSuitePathSelector extends DFSPathSelector {
|
||||
lastBatchId = 0;
|
||||
nextBatchId = 1;
|
||||
}
|
||||
|
||||
log.info("Using DFSTestSuitePathSelector, checkpoint: " + lastCheckpointStr + " sourceLimit: " + sourceLimit
|
||||
+ " lastBatchId: " + lastBatchId + " nextBatchId: " + nextBatchId);
|
||||
// obtain all eligible files for the batch
|
||||
List<FileStatus> eligibleFiles = new ArrayList<>();
|
||||
FileStatus[] fileStatuses = fs.globStatus(
|
||||
new Path(props.getString(Config.ROOT_INPUT_PATH_PROP), "*"));
|
||||
// Say input data is as follow input/1, input/2, input/5 since 3,4 was rolled back and 5 is new generated data
|
||||
// checkpoint from the latest commit metadata will be 2 since 3,4 has been rolled back. We need to set the
|
||||
// next batch id correctly as 5 instead of 3
|
||||
Optional<String> correctBatchIdDueToRollback = Arrays.stream(fileStatuses)
|
||||
.map(f -> f.getPath().toString().split("/")[f.getPath().toString().split("/").length - 1])
|
||||
.filter(bid1 -> Integer.parseInt(bid1) > lastBatchId)
|
||||
.min((bid1, bid2) -> Integer.min(Integer.parseInt(bid1), Integer.parseInt(bid2)));
|
||||
if (correctBatchIdDueToRollback.isPresent() && Integer.parseInt(correctBatchIdDueToRollback.get()) > nextBatchId) {
|
||||
nextBatchId = Integer.parseInt(correctBatchIdDueToRollback.get());
|
||||
}
|
||||
log.info("Using DFSTestSuitePathSelector, checkpoint: " + lastCheckpointStr + " sourceLimit: " + sourceLimit
|
||||
+ " lastBatchId: " + lastBatchId + " nextBatchId: " + nextBatchId);
|
||||
for (FileStatus fileStatus : fileStatuses) {
|
||||
if (!fileStatus.isDirectory() || IGNORE_FILEPREFIX_LIST.stream()
|
||||
.anyMatch(pfx -> fileStatus.getPath().getName().startsWith(pfx))) {
|
||||
continue;
|
||||
} else if (fileStatus.getPath().getName().compareTo(lastBatchId.toString()) > 0 && fileStatus.getPath()
|
||||
.getName().compareTo(nextBatchId.toString()) <= 0) {
|
||||
} else if (fileStatus.getPath().getName().compareTo(lastBatchId.toString()) > 0) {
|
||||
RemoteIterator<LocatedFileStatus> files = fs.listFiles(fileStatus.getPath(), true);
|
||||
while (files.hasNext()) {
|
||||
eligibleFiles.add(files.next());
|
||||
|
||||
Reference in New Issue
Block a user