@@ -21,7 +21,7 @@
|
|||||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
xmlns:context="http://www.springframework.org/schema/context"
|
xmlns:context="http://www.springframework.org/schema/context"
|
||||||
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
|
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
|
||||||
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
|
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
|
||||||
|
|
||||||
<context:component-scan base-package="org.apache.hudi.cli"/>
|
<context:component-scan base-package="org.apache.hudi.cli"/>
|
||||||
|
|
||||||
|
|||||||
@@ -171,7 +171,7 @@ public class HoodieTestSuiteWriter implements Serializable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public JavaRDD<WriteStatus> insertOverwrite(Option<String> instantTime) throws Exception {
|
public JavaRDD<WriteStatus> insertOverwrite(Option<String> instantTime) throws Exception {
|
||||||
if(cfg.useDeltaStreamer){
|
if (cfg.useDeltaStreamer) {
|
||||||
return deltaStreamerWrapper.insertOverwrite();
|
return deltaStreamerWrapper.insertOverwrite();
|
||||||
} else {
|
} else {
|
||||||
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = fetchSource();
|
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = fetchSource();
|
||||||
@@ -181,7 +181,7 @@ public class HoodieTestSuiteWriter implements Serializable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public JavaRDD<WriteStatus> insertOverwriteTable(Option<String> instantTime) throws Exception {
|
public JavaRDD<WriteStatus> insertOverwriteTable(Option<String> instantTime) throws Exception {
|
||||||
if(cfg.useDeltaStreamer){
|
if (cfg.useDeltaStreamer) {
|
||||||
return deltaStreamerWrapper.insertOverwriteTable();
|
return deltaStreamerWrapper.insertOverwriteTable();
|
||||||
} else {
|
} else {
|
||||||
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = fetchSource();
|
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = fetchSource();
|
||||||
|
|||||||
@@ -102,7 +102,7 @@ public class DagUtils {
|
|||||||
case DAG_CONTENT:
|
case DAG_CONTENT:
|
||||||
JsonNode dagContent = dagNode.getValue();
|
JsonNode dagContent = dagNode.getValue();
|
||||||
Iterator<Entry<String, JsonNode>> contentItr = dagContent.fields();
|
Iterator<Entry<String, JsonNode>> contentItr = dagContent.fields();
|
||||||
while(contentItr.hasNext()) {
|
while (contentItr.hasNext()) {
|
||||||
Entry<String, JsonNode> dagContentNode = contentItr.next();
|
Entry<String, JsonNode> dagContentNode = contentItr.next();
|
||||||
allNodes.put(dagContentNode.getKey(), convertJsonToDagNode(allNodes, dagContentNode.getKey(), dagContentNode.getValue()));
|
allNodes.put(dagContentNode.getKey(), convertJsonToDagNode(allNodes, dagContentNode.getKey(), dagContentNode.getValue()));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -43,7 +43,7 @@ public abstract class DagNode<O> implements Comparable<DagNode<O>> {
|
|||||||
|
|
||||||
public DagNode clone() {
|
public DagNode clone() {
|
||||||
List<DagNode<O>> tempChildNodes = new ArrayList<>();
|
List<DagNode<O>> tempChildNodes = new ArrayList<>();
|
||||||
for(DagNode dagNode: childNodes) {
|
for (DagNode dagNode: childNodes) {
|
||||||
tempChildNodes.add(dagNode.clone());
|
tempChildNodes.add(dagNode.clone());
|
||||||
}
|
}
|
||||||
this.childNodes = tempChildNodes;
|
this.childNodes = tempChildNodes;
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ public class DelayNode extends DagNode<Boolean> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(ExecutionContext context, int curItrCount) throws Exception {
|
public void execute(ExecutionContext context, int curItrCount) throws Exception {
|
||||||
log.warn("Waiting for "+ delayMins+" mins before going for next test run");
|
log.warn("Waiting for " + delayMins + " mins before going for next test run");
|
||||||
Thread.sleep(delayMins * 60 * 1000);
|
Thread.sleep(delayMins * 60 * 1000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -77,9 +77,9 @@ public class ValidateAsyncOperations extends DagNode<Option<String>> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (config.validateArchival() || config.validateClean()) {
|
if (config.validateArchival() || config.validateClean()) {
|
||||||
Pattern ARCHIVE_FILE_PATTERN =
|
final Pattern ARCHIVE_FILE_PATTERN =
|
||||||
Pattern.compile("\\.commits_\\.archive\\..*");
|
Pattern.compile("\\.commits_\\.archive\\..*");
|
||||||
Pattern CLEAN_FILE_PATTERN =
|
final Pattern CLEAN_FILE_PATTERN =
|
||||||
Pattern.compile(".*\\.clean\\..*");
|
Pattern.compile(".*\\.clean\\..*");
|
||||||
|
|
||||||
String metadataPath = executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath + "/.hoodie";
|
String metadataPath = executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath + "/.hoodie";
|
||||||
|
|||||||
@@ -72,8 +72,8 @@ public class HoodieTestHiveBase extends ITTestBase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Run Hoodie Java App
|
// Run Hoodie Java App
|
||||||
String cmd = String.format("%s --hive-sync --table-path %s --hive-url %s --table-type %s --hive-table %s" +
|
String cmd = String.format("%s --hive-sync --table-path %s --hive-url %s --table-type %s --hive-table %s"
|
||||||
" --commit-type %s --table-name %s", HOODIE_GENERATE_APP, hdfsUrl, HIVE_SERVER_JDBC_URL,
|
+ " --commit-type %s --table-name %s", HOODIE_GENERATE_APP, hdfsUrl, HIVE_SERVER_JDBC_URL,
|
||||||
tableType, hiveTableName, commitType, hoodieTableName);
|
tableType, hiveTableName, commitType, hoodieTableName);
|
||||||
if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) {
|
if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) {
|
||||||
cmd = cmd + " --use-multi-partition-keys";
|
cmd = cmd + " --use-multi-partition-keys";
|
||||||
|
|||||||
@@ -90,7 +90,6 @@ public class ITTestHoodieDemo extends ITTestBase {
|
|||||||
+ " --hoodie-conf hoodie.datasource.hive_sync.database=default "
|
+ " --hoodie-conf hoodie.datasource.hive_sync.database=default "
|
||||||
+ " --hoodie-conf hoodie.datasource.hive_sync.table=%s";
|
+ " --hoodie-conf hoodie.datasource.hive_sync.table=%s";
|
||||||
|
|
||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
public void clean() throws Exception {
|
public void clean() throws Exception {
|
||||||
String hdfsCmd = "hdfs dfs -rm -R ";
|
String hdfsCmd = "hdfs dfs -rm -R ";
|
||||||
|
|||||||
@@ -57,8 +57,8 @@ public class TestGenericRecordPayloadGenerator {
|
|||||||
@Test
|
@Test
|
||||||
public void testComplexPayload() throws IOException {
|
public void testComplexPayload() throws IOException {
|
||||||
Schema schema = new Schema.Parser().parse(UtilitiesTestBase.Helpers
|
Schema schema = new Schema.Parser().parse(UtilitiesTestBase.Helpers
|
||||||
.readFileFromAbsolutePath(System.getProperty("user.dir") + "/.." +
|
.readFileFromAbsolutePath(System.getProperty("user.dir") + "/.."
|
||||||
COMPLEX_SOURCE_SCHEMA_DOCKER_DEMO_RELATIVE_PATH));
|
+ COMPLEX_SOURCE_SCHEMA_DOCKER_DEMO_RELATIVE_PATH));
|
||||||
GenericRecordFullPayloadGenerator payloadGenerator = new GenericRecordFullPayloadGenerator(schema);
|
GenericRecordFullPayloadGenerator payloadGenerator = new GenericRecordFullPayloadGenerator(schema);
|
||||||
GenericRecord record = payloadGenerator.getNewPayload();
|
GenericRecord record = payloadGenerator.getNewPayload();
|
||||||
// The generated payload should validate with the provided schema
|
// The generated payload should validate with the provided schema
|
||||||
@@ -68,8 +68,8 @@ public class TestGenericRecordPayloadGenerator {
|
|||||||
@Test
|
@Test
|
||||||
public void testComplexPartialPayload() throws IOException {
|
public void testComplexPartialPayload() throws IOException {
|
||||||
Schema schema = new Schema.Parser().parse(UtilitiesTestBase.Helpers
|
Schema schema = new Schema.Parser().parse(UtilitiesTestBase.Helpers
|
||||||
.readFileFromAbsolutePath(System.getProperty("user.dir") + "/.." +
|
.readFileFromAbsolutePath(System.getProperty("user.dir") + "/.."
|
||||||
COMPLEX_SOURCE_SCHEMA_DOCKER_DEMO_RELATIVE_PATH));
|
+ COMPLEX_SOURCE_SCHEMA_DOCKER_DEMO_RELATIVE_PATH));
|
||||||
GenericRecordPartialPayloadGenerator payloadGenerator = new GenericRecordPartialPayloadGenerator(schema);
|
GenericRecordPartialPayloadGenerator payloadGenerator = new GenericRecordPartialPayloadGenerator(schema);
|
||||||
IntStream.range(0, 10).forEach(a -> {
|
IntStream.range(0, 10).forEach(a -> {
|
||||||
GenericRecord record = payloadGenerator.getNewPayload();
|
GenericRecord record = payloadGenerator.getNewPayload();
|
||||||
@@ -124,8 +124,8 @@ public class TestGenericRecordPayloadGenerator {
|
|||||||
@Test
|
@Test
|
||||||
public void testComplexPayloadWithLargeMinSize() throws Exception {
|
public void testComplexPayloadWithLargeMinSize() throws Exception {
|
||||||
Schema schema = new Schema.Parser().parse(UtilitiesTestBase.Helpers
|
Schema schema = new Schema.Parser().parse(UtilitiesTestBase.Helpers
|
||||||
.readFileFromAbsolutePath(System.getProperty("user.dir") + "/.." +
|
.readFileFromAbsolutePath(System.getProperty("user.dir") + "/.."
|
||||||
COMPLEX_SOURCE_SCHEMA_DOCKER_DEMO_RELATIVE_PATH));
|
+ COMPLEX_SOURCE_SCHEMA_DOCKER_DEMO_RELATIVE_PATH));
|
||||||
int minPayloadSize = 10000;
|
int minPayloadSize = 10000;
|
||||||
GenericRecordFullPayloadGenerator payloadGenerator = new GenericRecordFullPayloadGenerator(
|
GenericRecordFullPayloadGenerator payloadGenerator = new GenericRecordFullPayloadGenerator(
|
||||||
schema, minPayloadSize);
|
schema, minPayloadSize);
|
||||||
|
|||||||
@@ -51,7 +51,8 @@ class TestSchemaRegistryProvider {
|
|||||||
put("hoodie.deltastreamer.schemaprovider.registry.urlSuffix", "-value");
|
put("hoodie.deltastreamer.schemaprovider.registry.urlSuffix", "-value");
|
||||||
put("hoodie.deltastreamer.schemaprovider.registry.url", "http://foo:bar@localhost");
|
put("hoodie.deltastreamer.schemaprovider.registry.url", "http://foo:bar@localhost");
|
||||||
put("hoodie.deltastreamer.source.kafka.topic", "foo");
|
put("hoodie.deltastreamer.source.kafka.topic", "foo");
|
||||||
}};
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
private Schema getExpectedSchema(String response) throws IOException {
|
private Schema getExpectedSchema(String response) throws IOException {
|
||||||
|
|||||||
@@ -67,7 +67,7 @@
|
|||||||
<include>org.apache.hudi:hudi-common</include>
|
<include>org.apache.hudi:hudi-common</include>
|
||||||
<include>org.apache.hudi:hudi-hadoop-mr</include>
|
<include>org.apache.hudi:hudi-hadoop-mr</include>
|
||||||
<include>org.apache.hudi:hudi-sync-common</include>
|
<include>org.apache.hudi:hudi-sync-common</include>
|
||||||
<include>org.apache.hudi:hudi-hive-sync</include>
|
<include>org.apache.hudi:hudi-hive-sync</include>
|
||||||
|
|
||||||
<include>com.beust:jcommander</include>
|
<include>com.beust:jcommander</include>
|
||||||
<include>org.apache.avro:avro</include>
|
<include>org.apache.avro:avro</include>
|
||||||
|
|||||||
Reference in New Issue
Block a user