[HUDI-703] Add test for HoodieSyncCommand (#1774)
This commit is contained in:
31
docker/demo/config/hoodie-incr.properties
Normal file
31
docker/demo/config/hoodie-incr.properties
Normal file
@@ -0,0 +1,31 @@
|
|||||||
|
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
hoodie.upsert.shuffle.parallelism=2
|
||||||
|
hoodie.insert.shuffle.parallelism=2
|
||||||
|
hoodie.bulkinsert.shuffle.parallelism=2
|
||||||
|
hoodie.datasource.write.recordkey.field=_row_key
|
||||||
|
hoodie.datasource.write.partitionpath.field=partition
|
||||||
|
hoodie.deltastreamer.schemaprovider.source.schema.file=file:///var/hoodie/ws/docker/demo/config/hoodie-schema.avsc
|
||||||
|
hoodie.deltastreamer.schemaprovider.target.schema.file=file:///var/hoodie/ws/docker/demo/config/hoodie-schema.avsc
|
||||||
|
hoodie.deltastreamer.source.hoodieincr.partition.fields=partition
|
||||||
|
hoodie.deltastreamer.source.hoodieincr.path=/docker_hoodie_sync_valid_test
|
||||||
|
hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=true
|
||||||
|
# hive sync
|
||||||
|
hoodie.datasource.hive_sync.table=docker_hoodie_sync_valid_test_2
|
||||||
|
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000
|
||||||
|
hoodie.datasource.hive_sync.partition_fields=partition
|
||||||
145
docker/demo/config/hoodie-schema.avsc
Normal file
145
docker/demo/config/hoodie-schema.avsc
Normal file
@@ -0,0 +1,145 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
{
|
||||||
|
"type": "record",
|
||||||
|
"name": "triprec",
|
||||||
|
"fields": [
|
||||||
|
{
|
||||||
|
"name": "timestamp",
|
||||||
|
"type": "double"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "_row_key",
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "rider",
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "driver",
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "begin_lat",
|
||||||
|
"type": "double"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "begin_lon",
|
||||||
|
"type": "double"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "end_lat",
|
||||||
|
"type": "double"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "end_lon",
|
||||||
|
"type": "double"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "distance_in_meters",
|
||||||
|
"type": "int"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "seconds_since_epoch",
|
||||||
|
"type": "long"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "weight",
|
||||||
|
"type": "float"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "nation",
|
||||||
|
"type": "bytes"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "current_date",
|
||||||
|
"type": {
|
||||||
|
"type": "int",
|
||||||
|
"logicalType": "date"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "current_ts",
|
||||||
|
"type": {
|
||||||
|
"type": "long",
|
||||||
|
"logicalType": "timestamp-micros"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "height",
|
||||||
|
"type": {
|
||||||
|
"type": "fixed",
|
||||||
|
"name": "abc",
|
||||||
|
"size": 5,
|
||||||
|
"logicalType": "decimal",
|
||||||
|
"precision": 10,
|
||||||
|
"scale": 6
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "city_to_state",
|
||||||
|
"type": {
|
||||||
|
"type": "map",
|
||||||
|
"values": "string"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "fare",
|
||||||
|
"type": {
|
||||||
|
"type": "record",
|
||||||
|
"name": "fare",
|
||||||
|
"fields": [
|
||||||
|
{
|
||||||
|
"name": "amount",
|
||||||
|
"type": "double"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "currency",
|
||||||
|
"type": "string"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "tip_history",
|
||||||
|
"type": {
|
||||||
|
"type": "array",
|
||||||
|
"items": {
|
||||||
|
"type": "record",
|
||||||
|
"name": "tip_history",
|
||||||
|
"fields": [
|
||||||
|
{
|
||||||
|
"name": "amount",
|
||||||
|
"type": "double"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "currency",
|
||||||
|
"type": "string"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "_hoodie_is_deleted",
|
||||||
|
"type": "boolean",
|
||||||
|
"default": false
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
19
docker/demo/sync-validate.commands
Normal file
19
docker/demo/sync-validate.commands
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
connect --path /docker_hoodie_sync_valid_test
|
||||||
|
commits sync --path /docker_hoodie_sync_valid_test_2
|
||||||
|
sync validate --sourceDb default --targetDb default --hiveServerUrl jdbc:hive2://hiveserver:10000 --hiveUser hive --hivePass hive
|
||||||
@@ -181,6 +181,23 @@
|
|||||||
<type>test-jar</type>
|
<type>test-jar</type>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<!-- hive -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>${hive.groupid}</groupId>
|
||||||
|
<artifactId>hive-jdbc</artifactId>
|
||||||
|
<version>${hive.version}</version>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>org.eclipse.jetty.orbit</groupId>
|
||||||
|
<artifactId>javax.servlet</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>org.apache.parquet</groupId>
|
||||||
|
<artifactId>parquet-hadoop-bundle</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<!-- Logging -->
|
<!-- Logging -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>log4j</groupId>
|
<groupId>log4j</groupId>
|
||||||
|
|||||||
@@ -74,9 +74,9 @@ public class HoodieSyncCommand implements CommandMarker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
String targetLatestCommit =
|
String targetLatestCommit =
|
||||||
targetTimeline.getInstants().iterator().hasNext() ? "0" : targetTimeline.lastInstant().get().getTimestamp();
|
targetTimeline.getInstants().iterator().hasNext() ? targetTimeline.lastInstant().get().getTimestamp() : "0";
|
||||||
String sourceLatestCommit =
|
String sourceLatestCommit =
|
||||||
sourceTimeline.getInstants().iterator().hasNext() ? "0" : sourceTimeline.lastInstant().get().getTimestamp();
|
sourceTimeline.getInstants().iterator().hasNext() ? sourceTimeline.lastInstant().get().getTimestamp() : "0";
|
||||||
|
|
||||||
if (sourceLatestCommit != null
|
if (sourceLatestCommit != null
|
||||||
&& HoodieTimeline.compareTimestamps(targetLatestCommit, HoodieTimeline.GREATER_THAN, sourceLatestCommit)) {
|
&& HoodieTimeline.compareTimestamps(targetLatestCommit, HoodieTimeline.GREATER_THAN, sourceLatestCommit)) {
|
||||||
|
|||||||
@@ -0,0 +1,121 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.integ;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.TableExistsException;
|
||||||
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Base class to run cmd and generate data in hive.
|
||||||
|
*/
|
||||||
|
public class HoodieTestHiveBase extends ITTestBase {
|
||||||
|
|
||||||
|
protected enum PartitionType {
|
||||||
|
SINGLE_KEY_PARTITIONED, MULTI_KEYS_PARTITIONED, NON_PARTITIONED,
|
||||||
|
}
|
||||||
|
|
||||||
|
private final static int DEFAULT_TIME_WAIT = 5000;
|
||||||
|
private final static String OVERWRITE_COMMIT_TYPE = "overwrite";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A basic integration test that runs HoodieJavaApp to create a sample Hoodie data-set and performs upserts on it.
|
||||||
|
* Hive integration and upsert functionality is checked by running a count query in hive console. TODO: Add
|
||||||
|
* spark-shell test-case
|
||||||
|
*/
|
||||||
|
public void generateDataByHoodieJavaApp(String hiveTableName, String tableType, PartitionType partitionType,
|
||||||
|
String commitType, String hoodieTableName) throws Exception {
|
||||||
|
|
||||||
|
String hdfsPath = getHDFSPath(hiveTableName);
|
||||||
|
String hdfsUrl = "hdfs://namenode" + hdfsPath;
|
||||||
|
|
||||||
|
Pair<String, String> stdOutErr;
|
||||||
|
if (OVERWRITE_COMMIT_TYPE.equals(commitType)) {
|
||||||
|
// Drop Table if it exists
|
||||||
|
try {
|
||||||
|
dropHiveTables(hiveTableName, tableType);
|
||||||
|
} catch (AssertionError ex) {
|
||||||
|
// In travis, sometimes, the hivemetastore is not ready even though we wait for the port to be up
|
||||||
|
// Workaround to sleep for 5 secs and retry
|
||||||
|
// Set sleep time by hoodie.hiveserver.time.wait
|
||||||
|
Thread.sleep(getTimeWait());
|
||||||
|
dropHiveTables(hiveTableName, tableType);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure table does not exist
|
||||||
|
stdOutErr = executeHiveCommand("show tables like '" + hiveTableName + "'");
|
||||||
|
if (!stdOutErr.getLeft().isEmpty()) {
|
||||||
|
throw new TableExistsException("Dropped table " + hiveTableName + " exists!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run Hoodie Java App
|
||||||
|
String cmd = String.format("%s %s --hive-sync --table-path %s --hive-url %s --table-type %s --hive-table %s" +
|
||||||
|
" --commit-type %s --table-name %s", HOODIE_JAVA_APP, "HoodieJavaGenerateApp", hdfsUrl, HIVE_SERVER_JDBC_URL,
|
||||||
|
tableType, hiveTableName, commitType, hoodieTableName);
|
||||||
|
if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) {
|
||||||
|
cmd = cmd + " --use-multi-partition-keys";
|
||||||
|
} else if (partitionType == PartitionType.NON_PARTITIONED){
|
||||||
|
cmd = cmd + " --non-partitioned";
|
||||||
|
}
|
||||||
|
executeCommandStringInDocker(ADHOC_1_CONTAINER, cmd, true);
|
||||||
|
|
||||||
|
String snapshotTableName = getSnapshotTableName(tableType, hiveTableName);
|
||||||
|
|
||||||
|
// Ensure table does exist
|
||||||
|
stdOutErr = executeHiveCommand("show tables like '" + snapshotTableName + "'");
|
||||||
|
assertEquals(snapshotTableName, stdOutErr.getLeft(), "Table exists");
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void dropHiveTables(String hiveTableName, String tableType) throws Exception {
|
||||||
|
if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
|
||||||
|
executeHiveCommand("drop table if exists " + hiveTableName + "_rt");
|
||||||
|
executeHiveCommand("drop table if exists " + hiveTableName + "_ro");
|
||||||
|
} else {
|
||||||
|
executeHiveCommand("drop table if exists " + hiveTableName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getHDFSPath(String hiveTableName) {
|
||||||
|
return "/" + hiveTableName;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getSnapshotTableName(String tableType, String hiveTableName) {
|
||||||
|
return tableType.equals(HoodieTableType.MERGE_ON_READ.name())
|
||||||
|
? hiveTableName + "_rt" : hiveTableName;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getTimeWait() {
|
||||||
|
try (InputStream stream = HoodieTestHiveBase.class.getClassLoader().getResourceAsStream("hoodie-docker.properties")) {
|
||||||
|
TypedProperties properties = new TypedProperties();
|
||||||
|
properties.load(stream);
|
||||||
|
return properties.getInteger("hoodie.hiveserver.time.wait", DEFAULT_TIME_WAIT);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Can not load property file, use default time wait for hiveserver.");
|
||||||
|
return DEFAULT_TIME_WAIT;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -180,7 +180,7 @@ public abstract class ITTestBase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TestExecStartResultCallback executeCommandStringInDocker(String containerName, String cmd, boolean expectedToSucceed)
|
protected TestExecStartResultCallback executeCommandStringInDocker(String containerName, String cmd, boolean expectedToSucceed)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
LOG.info("\n\n#################################################################################################");
|
LOG.info("\n\n#################################################################################################");
|
||||||
LOG.info("Container : " + containerName + ", Running command :" + cmd);
|
LOG.info("Container : " + containerName + ", Running command :" + cmd);
|
||||||
@@ -190,7 +190,7 @@ public abstract class ITTestBase {
|
|||||||
return executeCommandInDocker(containerName, cmdSplits, expectedToSucceed);
|
return executeCommandInDocker(containerName, cmdSplits, expectedToSucceed);
|
||||||
}
|
}
|
||||||
|
|
||||||
Pair<String, String> executeHiveCommand(String hiveCommand) throws Exception {
|
protected Pair<String, String> executeHiveCommand(String hiveCommand) throws Exception {
|
||||||
|
|
||||||
LOG.info("\n\n#################################################################################################");
|
LOG.info("\n\n#################################################################################################");
|
||||||
LOG.info("Running hive command :" + hiveCommand);
|
LOG.info("Running hive command :" + hiveCommand);
|
||||||
|
|||||||
@@ -137,13 +137,13 @@ public class ITTestHoodieSanity extends ITTestBase {
|
|||||||
// Run Hoodie Java App
|
// Run Hoodie Java App
|
||||||
String cmd;
|
String cmd;
|
||||||
if (partitionType == PartitionType.SINGLE_KEY_PARTITIONED) {
|
if (partitionType == PartitionType.SINGLE_KEY_PARTITIONED) {
|
||||||
cmd = HOODIE_JAVA_APP + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
|
cmd = HOODIE_JAVA_APP + " HoodieJavaApp --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
|
||||||
+ " --table-type " + tableType + " --hive-table " + hiveTableName;
|
+ " --table-type " + tableType + " --hive-table " + hiveTableName;
|
||||||
} else if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) {
|
} else if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) {
|
||||||
cmd = HOODIE_JAVA_APP + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
|
cmd = HOODIE_JAVA_APP + " HoodieJavaApp --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
|
||||||
+ " --table-type " + tableType + " --hive-table " + hiveTableName + " --use-multi-partition-keys";
|
+ " --table-type " + tableType + " --hive-table " + hiveTableName + " --use-multi-partition-keys";
|
||||||
} else {
|
} else {
|
||||||
cmd = HOODIE_JAVA_APP + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
|
cmd = HOODIE_JAVA_APP + " HoodieJavaApp --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
|
||||||
+ " --table-type " + tableType + " --hive-table " + hiveTableName + " --non-partitioned";
|
+ " --table-type " + tableType + " --hive-table " + hiveTableName + " --non-partitioned";
|
||||||
}
|
}
|
||||||
executeCommandStringInDocker(ADHOC_1_CONTAINER, cmd, true);
|
executeCommandStringInDocker(ADHOC_1_CONTAINER, cmd, true);
|
||||||
|
|||||||
@@ -0,0 +1,75 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.integ.command;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||||
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
|
|
||||||
|
import org.apache.hudi.integ.HoodieTestHiveBase;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Integration test class for HoodieSyncCommand in hudi-cli module.
|
||||||
|
*/
|
||||||
|
public class ITTestHoodieSyncCommand extends HoodieTestHiveBase {
|
||||||
|
|
||||||
|
private static final String HUDI_CLI_TOOL = HOODIE_WS_ROOT + "/hudi-cli/hudi-cli.sh";
|
||||||
|
private static final String SYNC_VALIDATE_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/sync-validate.commands";
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testValidateSync() throws Exception {
|
||||||
|
String hiveTableName = "docker_hoodie_sync_valid_test";
|
||||||
|
String hiveTableName2 = "docker_hoodie_sync_valid_test_2";
|
||||||
|
|
||||||
|
generateDataByHoodieJavaApp(
|
||||||
|
hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.SINGLE_KEY_PARTITIONED, "overwrite", hiveTableName);
|
||||||
|
|
||||||
|
syncHoodieTable(hiveTableName2, "INSERT");
|
||||||
|
|
||||||
|
generateDataByHoodieJavaApp(
|
||||||
|
hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.SINGLE_KEY_PARTITIONED, "append", hiveTableName);
|
||||||
|
|
||||||
|
TestExecStartResultCallback result =
|
||||||
|
executeCommandStringInDocker(ADHOC_1_CONTAINER, HUDI_CLI_TOOL + " --cmdfile " + SYNC_VALIDATE_COMMANDS, true);
|
||||||
|
|
||||||
|
String expected = String.format("Count difference now is (count(%s) - count(%s) == %d. Catch up count is %d",
|
||||||
|
hiveTableName, hiveTableName2, 100, 200);
|
||||||
|
assertTrue(result.getStderr().toString().contains(expected));
|
||||||
|
|
||||||
|
dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
|
||||||
|
dropHiveTables(hiveTableName2, HoodieTableType.COPY_ON_WRITE.name());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void syncHoodieTable(String hiveTableName, String op) throws Exception {
|
||||||
|
StringBuilder cmdBuilder = new StringBuilder("spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.4 ")
|
||||||
|
.append(" --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer ").append(HUDI_UTILITIES_BUNDLE)
|
||||||
|
.append(" --table-type COPY_ON_WRITE ")
|
||||||
|
.append(" --base-file-format ").append(HoodieFileFormat.PARQUET.toString())
|
||||||
|
.append(" --source-class org.apache.hudi.utilities.sources.HoodieIncrSource --source-ordering-field timestamp ")
|
||||||
|
.append(" --target-base-path ").append(getHDFSPath(hiveTableName))
|
||||||
|
.append(" --target-table ").append(hiveTableName)
|
||||||
|
.append(" --op ").append(op)
|
||||||
|
.append(" --props file:///var/hoodie/ws/docker/demo/config/hoodie-incr.properties")
|
||||||
|
.append(" --enable-hive-sync");
|
||||||
|
executeCommandStringInDocker(ADHOC_1_CONTAINER, cmdBuilder.toString(), true);
|
||||||
|
}
|
||||||
|
}
|
||||||
18
hudi-integ-test/src/test/resources/hoodie-docker.properties
Normal file
18
hudi-integ-test/src/test/resources/hoodie-docker.properties
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
###
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
###
|
||||||
|
hoodie.hiveserver.time.wait=5000
|
||||||
@@ -36,5 +36,5 @@ fi
|
|||||||
|
|
||||||
OTHER_JARS=`ls -1 $DIR/target/lib/*jar | grep -v '*avro*-1.' | tr '\n' ':'`
|
OTHER_JARS=`ls -1 $DIR/target/lib/*jar | grep -v '*avro*-1.' | tr '\n' ':'`
|
||||||
#TODO - Need to move TestDataGenerator and HoodieJavaApp out of tests
|
#TODO - Need to move TestDataGenerator and HoodieJavaApp out of tests
|
||||||
echo "Running command : java -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS HoodieJavaApp $@"
|
echo "Running command : java -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS $@"
|
||||||
java -Xmx1G -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS HoodieJavaApp "$@"
|
java -Xmx1G -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS "$@"
|
||||||
|
|||||||
190
hudi-spark/src/test/java/HoodieJavaGenerateApp.java
Normal file
190
hudi-spark/src/test/java/HoodieJavaGenerateApp.java
Normal file
@@ -0,0 +1,190 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import com.beust.jcommander.JCommander;
|
||||||
|
import com.beust.jcommander.Parameter;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hudi.DataSourceWriteOptions;
|
||||||
|
import org.apache.hudi.HoodieDataSourceHelpers;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||||
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
|
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
|
||||||
|
import org.apache.hudi.hive.NonPartitionedExtractor;
|
||||||
|
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
|
||||||
|
import org.apache.hudi.keygen.SimpleKeyGenerator;
|
||||||
|
import org.apache.hudi.testutils.DataSourceTestUtils;
|
||||||
|
import org.apache.hudi.testutils.HoodieTestDataGenerator;
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.sql.DataFrameWriter;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Row;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class HoodieJavaGenerateApp {
|
||||||
|
@Parameter(names = {"--table-path", "-p"}, description = "Path for Hoodie sample table")
|
||||||
|
private String tablePath = "file:///tmp/hoodie/sample-table";
|
||||||
|
|
||||||
|
@Parameter(names = {"--table-name", "-n"}, description = "Table name for Hoodie sample table")
|
||||||
|
private String tableName = "hoodie_test";
|
||||||
|
|
||||||
|
@Parameter(names = {"--table-type", "-t"}, description = "One of COPY_ON_WRITE or MERGE_ON_READ")
|
||||||
|
private String tableType = HoodieTableType.COPY_ON_WRITE.name();
|
||||||
|
|
||||||
|
@Parameter(names = {"--hive-sync", "-hs"}, description = "Enable syncing to hive")
|
||||||
|
private Boolean enableHiveSync = false;
|
||||||
|
|
||||||
|
@Parameter(names = {"--hive-db", "-hd"}, description = "Hive database")
|
||||||
|
private String hiveDB = "default";
|
||||||
|
|
||||||
|
@Parameter(names = {"--hive-table", "-ht"}, description = "Hive table")
|
||||||
|
private String hiveTable = "hoodie_sample_test";
|
||||||
|
|
||||||
|
@Parameter(names = {"--hive-user", "-hu"}, description = "Hive username")
|
||||||
|
private String hiveUser = "hive";
|
||||||
|
|
||||||
|
@Parameter(names = {"--hive-password", "-hp"}, description = "Hive password")
|
||||||
|
private String hivePass = "hive";
|
||||||
|
|
||||||
|
@Parameter(names = {"--hive-url", "-hl"}, description = "Hive JDBC URL")
|
||||||
|
private String hiveJdbcUrl = "jdbc:hive2://localhost:10000";
|
||||||
|
|
||||||
|
@Parameter(names = {"--non-partitioned", "-np"}, description = "Use non-partitioned Table")
|
||||||
|
private Boolean nonPartitionedTable = false;
|
||||||
|
|
||||||
|
@Parameter(names = {"--use-multi-partition-keys", "-mp"}, description = "Use Multiple Partition Keys")
|
||||||
|
private Boolean useMultiPartitionKeys = false;
|
||||||
|
|
||||||
|
@Parameter(names = {"--commit-type", "-ct"}, description = "How may commits will run")
|
||||||
|
private String commitType = "overwrite";
|
||||||
|
|
||||||
|
@Parameter(names = {"--help", "-h"}, help = true)
|
||||||
|
public Boolean help = false;
|
||||||
|
|
||||||
|
private static final Logger LOG = LogManager.getLogger(HoodieJavaGenerateApp.class);
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
HoodieJavaGenerateApp cli = new HoodieJavaGenerateApp();
|
||||||
|
JCommander cmd = new JCommander(cli, null, args);
|
||||||
|
|
||||||
|
if (cli.help) {
|
||||||
|
cmd.usage();
|
||||||
|
System.exit(1);
|
||||||
|
}
|
||||||
|
try (SparkSession spark = cli.getOrCreateSparkSession()) {
|
||||||
|
cli.insert(spark);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private SparkSession getOrCreateSparkSession() {
|
||||||
|
// Spark session setup..
|
||||||
|
SparkSession spark = SparkSession.builder().appName("Hoodie Spark APP")
|
||||||
|
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[1]").getOrCreate();
|
||||||
|
spark.sparkContext().setLogLevel("WARN");
|
||||||
|
return spark;
|
||||||
|
}
|
||||||
|
|
||||||
|
private HoodieTestDataGenerator getDataGenerate() {
|
||||||
|
// Generator of some records to be loaded in.
|
||||||
|
if (nonPartitionedTable) {
|
||||||
|
// All data goes to base-path
|
||||||
|
return new HoodieTestDataGenerator(new String[]{""});
|
||||||
|
} else {
|
||||||
|
return new HoodieTestDataGenerator();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Setup configs for syncing to hive.
|
||||||
|
*/
|
||||||
|
private DataFrameWriter<Row> updateHiveSyncConfig(DataFrameWriter<Row> writer) {
|
||||||
|
if (enableHiveSync) {
|
||||||
|
LOG.info("Enabling Hive sync to " + hiveJdbcUrl);
|
||||||
|
writer = writer.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), hiveTable)
|
||||||
|
.option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), hiveDB)
|
||||||
|
.option(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), hiveJdbcUrl)
|
||||||
|
.option(DataSourceWriteOptions.HIVE_USER_OPT_KEY(), hiveUser)
|
||||||
|
.option(DataSourceWriteOptions.HIVE_PASS_OPT_KEY(), hivePass)
|
||||||
|
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY(), "true");
|
||||||
|
if (nonPartitionedTable) {
|
||||||
|
writer = writer
|
||||||
|
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),
|
||||||
|
NonPartitionedExtractor.class.getCanonicalName())
|
||||||
|
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "");
|
||||||
|
} else if (useMultiPartitionKeys) {
|
||||||
|
writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "year,month,day").option(
|
||||||
|
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),
|
||||||
|
MultiPartKeysValueExtractor.class.getCanonicalName());
|
||||||
|
} else {
|
||||||
|
writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "dateStr");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return writer;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void insert(SparkSession spark) throws IOException {
|
||||||
|
HoodieTestDataGenerator dataGen = getDataGenerate();
|
||||||
|
|
||||||
|
JavaSparkContext jssc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
// Generate some input..
|
||||||
|
String instantTime = HoodieActiveTimeline.createNewInstantTime();
|
||||||
|
List<HoodieRecord> recordsSoFar = new ArrayList<>(dataGen.generateInserts(instantTime/* ignore */, 100));
|
||||||
|
List<String> records1 = DataSourceTestUtils.convertToStringList(recordsSoFar);
|
||||||
|
Dataset<Row> inputDF1 = spark.read().json(jssc.parallelize(records1, 2));
|
||||||
|
|
||||||
|
// Save as hoodie dataset (copy on write)
|
||||||
|
// specify the hoodie source
|
||||||
|
DataFrameWriter<Row> writer = inputDF1.write().format("org.apache.hudi")
|
||||||
|
// any hoodie client config can be passed like this
|
||||||
|
.option("hoodie.insert.shuffle.parallelism", "2")
|
||||||
|
// full list in HoodieWriteConfig & its package
|
||||||
|
.option("hoodie.upsert.shuffle.parallelism", "2")
|
||||||
|
// Hoodie Table Type
|
||||||
|
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY(), tableType)
|
||||||
|
// insert
|
||||||
|
.option(DataSourceWriteOptions.OPERATION_OPT_KEY(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())
|
||||||
|
// This is the record key
|
||||||
|
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
|
||||||
|
// this is the partition to place it into
|
||||||
|
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
|
||||||
|
// use to combine duplicate records in input/with disk val
|
||||||
|
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp")
|
||||||
|
// Used by hive sync and queries
|
||||||
|
.option(HoodieWriteConfig.TABLE_NAME, tableName)
|
||||||
|
// Add Key Extractor
|
||||||
|
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY(),
|
||||||
|
nonPartitionedTable ? NonpartitionedKeyGenerator.class.getCanonicalName()
|
||||||
|
: SimpleKeyGenerator.class.getCanonicalName())
|
||||||
|
.mode(commitType);
|
||||||
|
|
||||||
|
updateHiveSyncConfig(writer);
|
||||||
|
// new dataset if needed
|
||||||
|
writer.save(tablePath); // ultimately where the dataset will be placed
|
||||||
|
FileSystem fs = FileSystem.get(jssc.hadoopConfiguration());
|
||||||
|
String commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, tablePath);
|
||||||
|
LOG.info("Commit at instant time :" + commitInstantTime1);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user