diff --git a/docker/demo/config/hoodie-incr.properties b/docker/demo/config/hoodie-incr.properties new file mode 100644 index 000000000..95a6627c8 --- /dev/null +++ b/docker/demo/config/hoodie-incr.properties @@ -0,0 +1,31 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +hoodie.upsert.shuffle.parallelism=2 +hoodie.insert.shuffle.parallelism=2 +hoodie.bulkinsert.shuffle.parallelism=2 +hoodie.datasource.write.recordkey.field=_row_key +hoodie.datasource.write.partitionpath.field=partition +hoodie.deltastreamer.schemaprovider.source.schema.file=file:///var/hoodie/ws/docker/demo/config/hoodie-schema.avsc +hoodie.deltastreamer.schemaprovider.target.schema.file=file:///var/hoodie/ws/docker/demo/config/hoodie-schema.avsc +hoodie.deltastreamer.source.hoodieincr.partition.fields=partition +hoodie.deltastreamer.source.hoodieincr.path=/docker_hoodie_sync_valid_test +hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=true +# hive sync +hoodie.datasource.hive_sync.table=docker_hoodie_sync_valid_test_2 +hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000 +hoodie.datasource.hive_sync.partition_fields=partition \ No newline at end of file diff --git a/docker/demo/config/hoodie-schema.avsc b/docker/demo/config/hoodie-schema.avsc new file mode 100644 index 000000000..55e255ff2 --- /dev/null +++ b/docker/demo/config/hoodie-schema.avsc @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "type": "record", + "name": "triprec", + "fields": [ + { + "name": "timestamp", + "type": "double" + }, + { + "name": "_row_key", + "type": "string" + }, + { + "name": "rider", + "type": "string" + }, + { + "name": "driver", + "type": "string" + }, + { + "name": "begin_lat", + "type": "double" + }, + { + "name": "begin_lon", + "type": "double" + }, + { + "name": "end_lat", + "type": "double" + }, + { + "name": "end_lon", + "type": "double" + }, + { + "name": "distance_in_meters", + "type": "int" + }, + { + "name": "seconds_since_epoch", + "type": "long" + }, + { + "name": "weight", + "type": "float" + }, + { + "name": "nation", + "type": "bytes" + }, + { + "name": "current_date", + "type": { + "type": "int", + "logicalType": "date" + } + }, + { + "name": "current_ts", + "type": { + "type": "long", + "logicalType": "timestamp-micros" + } + }, + { + "name": "height", + "type": { + "type": "fixed", + "name": "abc", + "size": 5, + "logicalType": "decimal", + "precision": 10, + "scale": 6 + } + }, + { + "name": "city_to_state", + "type": { + "type": "map", + "values": "string" + } + }, + { + "name": "fare", + "type": { + "type": "record", + "name": "fare", + "fields": [ + { + "name": "amount", + "type": "double" + }, + { + "name": "currency", + "type": "string" + } + ] + } + }, + { + "name": "tip_history", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "tip_history", + "fields": [ + { + "name": "amount", + "type": "double" + }, + { + "name": "currency", + "type": "string" + } + ] + } + } + }, + { + "name": "_hoodie_is_deleted", + "type": "boolean", + "default": false + } + ] +} \ No newline at end of file diff --git a/docker/demo/sync-validate.commands b/docker/demo/sync-validate.commands new file mode 100644 index 000000000..32c334eee --- /dev/null +++ b/docker/demo/sync-validate.commands @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +connect --path /docker_hoodie_sync_valid_test +commits sync --path /docker_hoodie_sync_valid_test_2 +sync validate --sourceDb default --targetDb default --hiveServerUrl jdbc:hive2://hiveserver:10000 --hiveUser hive --hivePass hive \ No newline at end of file diff --git a/hudi-cli/pom.xml b/hudi-cli/pom.xml index 13bc84862..388e4145a 100644 --- a/hudi-cli/pom.xml +++ b/hudi-cli/pom.xml @@ -181,6 +181,23 @@ test-jar + + + ${hive.groupid} + hive-jdbc + ${hive.version} + + + org.eclipse.jetty.orbit + javax.servlet + + + org.apache.parquet + parquet-hadoop-bundle + + + + log4j diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java index e0ceb2ed4..66c2eb021 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java @@ -74,9 +74,9 @@ public class HoodieSyncCommand implements CommandMarker { } String targetLatestCommit = - targetTimeline.getInstants().iterator().hasNext() ? "0" : targetTimeline.lastInstant().get().getTimestamp(); + targetTimeline.getInstants().iterator().hasNext() ? targetTimeline.lastInstant().get().getTimestamp() : "0"; String sourceLatestCommit = - sourceTimeline.getInstants().iterator().hasNext() ? "0" : sourceTimeline.lastInstant().get().getTimestamp(); + sourceTimeline.getInstants().iterator().hasNext() ? sourceTimeline.lastInstant().get().getTimestamp() : "0"; if (sourceLatestCommit != null && HoodieTimeline.compareTimestamps(targetLatestCommit, HoodieTimeline.GREATER_THAN, sourceLatestCommit)) { diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/HoodieTestHiveBase.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/HoodieTestHiveBase.java new file mode 100644 index 000000000..e2e927655 --- /dev/null +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/HoodieTestHiveBase.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ; + +import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.util.collection.Pair; + +import java.io.IOException; +import java.io.InputStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Base class to run cmd and generate data in hive. + */ +public class HoodieTestHiveBase extends ITTestBase { + + protected enum PartitionType { + SINGLE_KEY_PARTITIONED, MULTI_KEYS_PARTITIONED, NON_PARTITIONED, + } + + private final static int DEFAULT_TIME_WAIT = 5000; + private final static String OVERWRITE_COMMIT_TYPE = "overwrite"; + + /** + * A basic integration test that runs HoodieJavaApp to create a sample Hoodie data-set and performs upserts on it. + * Hive integration and upsert functionality is checked by running a count query in hive console. TODO: Add + * spark-shell test-case + */ + public void generateDataByHoodieJavaApp(String hiveTableName, String tableType, PartitionType partitionType, + String commitType, String hoodieTableName) throws Exception { + + String hdfsPath = getHDFSPath(hiveTableName); + String hdfsUrl = "hdfs://namenode" + hdfsPath; + + Pair stdOutErr; + if (OVERWRITE_COMMIT_TYPE.equals(commitType)) { + // Drop Table if it exists + try { + dropHiveTables(hiveTableName, tableType); + } catch (AssertionError ex) { + // In travis, sometimes, the hivemetastore is not ready even though we wait for the port to be up + // Workaround to sleep for 5 secs and retry + // Set sleep time by hoodie.hiveserver.time.wait + Thread.sleep(getTimeWait()); + dropHiveTables(hiveTableName, tableType); + } + + // Ensure table does not exist + stdOutErr = executeHiveCommand("show tables like '" + hiveTableName + "'"); + if (!stdOutErr.getLeft().isEmpty()) { + throw new TableExistsException("Dropped table " + hiveTableName + " exists!"); + } + } + + // Run Hoodie Java App + String cmd = String.format("%s %s --hive-sync --table-path %s --hive-url %s --table-type %s --hive-table %s" + + " --commit-type %s --table-name %s", HOODIE_JAVA_APP, "HoodieJavaGenerateApp", hdfsUrl, HIVE_SERVER_JDBC_URL, + tableType, hiveTableName, commitType, hoodieTableName); + if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) { + cmd = cmd + " --use-multi-partition-keys"; + } else if (partitionType == PartitionType.NON_PARTITIONED){ + cmd = cmd + " --non-partitioned"; + } + executeCommandStringInDocker(ADHOC_1_CONTAINER, cmd, true); + + String snapshotTableName = getSnapshotTableName(tableType, hiveTableName); + + // Ensure table does exist + stdOutErr = executeHiveCommand("show tables like '" + snapshotTableName + "'"); + assertEquals(snapshotTableName, stdOutErr.getLeft(), "Table exists"); + } + + protected void dropHiveTables(String hiveTableName, String tableType) throws Exception { + if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) { + executeHiveCommand("drop table if exists " + hiveTableName + "_rt"); + executeHiveCommand("drop table if exists " + hiveTableName + "_ro"); + } else { + executeHiveCommand("drop table if exists " + hiveTableName); + } + } + + protected String getHDFSPath(String hiveTableName) { + return "/" + hiveTableName; + } + + protected String getSnapshotTableName(String tableType, String hiveTableName) { + return tableType.equals(HoodieTableType.MERGE_ON_READ.name()) + ? hiveTableName + "_rt" : hiveTableName; + } + + private int getTimeWait() { + try (InputStream stream = HoodieTestHiveBase.class.getClassLoader().getResourceAsStream("hoodie-docker.properties")) { + TypedProperties properties = new TypedProperties(); + properties.load(stream); + return properties.getInteger("hoodie.hiveserver.time.wait", DEFAULT_TIME_WAIT); + } catch (IOException e) { + LOG.warn("Can not load property file, use default time wait for hiveserver."); + return DEFAULT_TIME_WAIT; + } + } +} diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java index cf8727394..c2312d0cf 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java @@ -180,7 +180,7 @@ public abstract class ITTestBase { } } - TestExecStartResultCallback executeCommandStringInDocker(String containerName, String cmd, boolean expectedToSucceed) + protected TestExecStartResultCallback executeCommandStringInDocker(String containerName, String cmd, boolean expectedToSucceed) throws Exception { LOG.info("\n\n#################################################################################################"); LOG.info("Container : " + containerName + ", Running command :" + cmd); @@ -190,7 +190,7 @@ public abstract class ITTestBase { return executeCommandInDocker(containerName, cmdSplits, expectedToSucceed); } - Pair executeHiveCommand(String hiveCommand) throws Exception { + protected Pair executeHiveCommand(String hiveCommand) throws Exception { LOG.info("\n\n#################################################################################################"); LOG.info("Running hive command :" + hiveCommand); diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java index 1d1ef8dda..4b586a342 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java @@ -137,13 +137,13 @@ public class ITTestHoodieSanity extends ITTestBase { // Run Hoodie Java App String cmd; if (partitionType == PartitionType.SINGLE_KEY_PARTITIONED) { - cmd = HOODIE_JAVA_APP + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL + cmd = HOODIE_JAVA_APP + " HoodieJavaApp --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL + " --table-type " + tableType + " --hive-table " + hiveTableName; } else if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) { - cmd = HOODIE_JAVA_APP + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL + cmd = HOODIE_JAVA_APP + " HoodieJavaApp --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL + " --table-type " + tableType + " --hive-table " + hiveTableName + " --use-multi-partition-keys"; } else { - cmd = HOODIE_JAVA_APP + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL + cmd = HOODIE_JAVA_APP + " HoodieJavaApp --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL + " --table-type " + tableType + " --hive-table " + hiveTableName + " --non-partitioned"; } executeCommandStringInDocker(ADHOC_1_CONTAINER, cmd, true); diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/command/ITTestHoodieSyncCommand.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/command/ITTestHoodieSyncCommand.java new file mode 100644 index 000000000..a6a4c3ec4 --- /dev/null +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/command/ITTestHoodieSyncCommand.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ.command; + +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieTableType; + +import org.apache.hudi.integ.HoodieTestHiveBase; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Integration test class for HoodieSyncCommand in hudi-cli module. + */ +public class ITTestHoodieSyncCommand extends HoodieTestHiveBase { + + private static final String HUDI_CLI_TOOL = HOODIE_WS_ROOT + "/hudi-cli/hudi-cli.sh"; + private static final String SYNC_VALIDATE_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/sync-validate.commands"; + + @Test + public void testValidateSync() throws Exception { + String hiveTableName = "docker_hoodie_sync_valid_test"; + String hiveTableName2 = "docker_hoodie_sync_valid_test_2"; + + generateDataByHoodieJavaApp( + hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.SINGLE_KEY_PARTITIONED, "overwrite", hiveTableName); + + syncHoodieTable(hiveTableName2, "INSERT"); + + generateDataByHoodieJavaApp( + hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.SINGLE_KEY_PARTITIONED, "append", hiveTableName); + + TestExecStartResultCallback result = + executeCommandStringInDocker(ADHOC_1_CONTAINER, HUDI_CLI_TOOL + " --cmdfile " + SYNC_VALIDATE_COMMANDS, true); + + String expected = String.format("Count difference now is (count(%s) - count(%s) == %d. Catch up count is %d", + hiveTableName, hiveTableName2, 100, 200); + assertTrue(result.getStderr().toString().contains(expected)); + + dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name()); + dropHiveTables(hiveTableName2, HoodieTableType.COPY_ON_WRITE.name()); + } + + private void syncHoodieTable(String hiveTableName, String op) throws Exception { + StringBuilder cmdBuilder = new StringBuilder("spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.4 ") + .append(" --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer ").append(HUDI_UTILITIES_BUNDLE) + .append(" --table-type COPY_ON_WRITE ") + .append(" --base-file-format ").append(HoodieFileFormat.PARQUET.toString()) + .append(" --source-class org.apache.hudi.utilities.sources.HoodieIncrSource --source-ordering-field timestamp ") + .append(" --target-base-path ").append(getHDFSPath(hiveTableName)) + .append(" --target-table ").append(hiveTableName) + .append(" --op ").append(op) + .append(" --props file:///var/hoodie/ws/docker/demo/config/hoodie-incr.properties") + .append(" --enable-hive-sync"); + executeCommandStringInDocker(ADHOC_1_CONTAINER, cmdBuilder.toString(), true); + } +} diff --git a/hudi-integ-test/src/test/resources/hoodie-docker.properties b/hudi-integ-test/src/test/resources/hoodie-docker.properties new file mode 100644 index 000000000..d51137005 --- /dev/null +++ b/hudi-integ-test/src/test/resources/hoodie-docker.properties @@ -0,0 +1,18 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +hoodie.hiveserver.time.wait=5000 diff --git a/hudi-spark/run_hoodie_app.sh b/hudi-spark/run_hoodie_app.sh index 7093c70f3..e2acc6cf5 100755 --- a/hudi-spark/run_hoodie_app.sh +++ b/hudi-spark/run_hoodie_app.sh @@ -36,5 +36,5 @@ fi OTHER_JARS=`ls -1 $DIR/target/lib/*jar | grep -v '*avro*-1.' | tr '\n' ':'` #TODO - Need to move TestDataGenerator and HoodieJavaApp out of tests -echo "Running command : java -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS HoodieJavaApp $@" -java -Xmx1G -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS HoodieJavaApp "$@" +echo "Running command : java -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS $@" +java -Xmx1G -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS "$@" diff --git a/hudi-spark/src/test/java/HoodieJavaGenerateApp.java b/hudi-spark/src/test/java/HoodieJavaGenerateApp.java new file mode 100644 index 000000000..64245e9c5 --- /dev/null +++ b/hudi-spark/src/test/java/HoodieJavaGenerateApp.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.HoodieDataSourceHelpers; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.hive.MultiPartKeysValueExtractor; +import org.apache.hudi.hive.NonPartitionedExtractor; +import org.apache.hudi.keygen.NonpartitionedKeyGenerator; +import org.apache.hudi.keygen.SimpleKeyGenerator; +import org.apache.hudi.testutils.DataSourceTestUtils; +import org.apache.hudi.testutils.HoodieTestDataGenerator; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.DataFrameWriter; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class HoodieJavaGenerateApp { + @Parameter(names = {"--table-path", "-p"}, description = "Path for Hoodie sample table") + private String tablePath = "file:///tmp/hoodie/sample-table"; + + @Parameter(names = {"--table-name", "-n"}, description = "Table name for Hoodie sample table") + private String tableName = "hoodie_test"; + + @Parameter(names = {"--table-type", "-t"}, description = "One of COPY_ON_WRITE or MERGE_ON_READ") + private String tableType = HoodieTableType.COPY_ON_WRITE.name(); + + @Parameter(names = {"--hive-sync", "-hs"}, description = "Enable syncing to hive") + private Boolean enableHiveSync = false; + + @Parameter(names = {"--hive-db", "-hd"}, description = "Hive database") + private String hiveDB = "default"; + + @Parameter(names = {"--hive-table", "-ht"}, description = "Hive table") + private String hiveTable = "hoodie_sample_test"; + + @Parameter(names = {"--hive-user", "-hu"}, description = "Hive username") + private String hiveUser = "hive"; + + @Parameter(names = {"--hive-password", "-hp"}, description = "Hive password") + private String hivePass = "hive"; + + @Parameter(names = {"--hive-url", "-hl"}, description = "Hive JDBC URL") + private String hiveJdbcUrl = "jdbc:hive2://localhost:10000"; + + @Parameter(names = {"--non-partitioned", "-np"}, description = "Use non-partitioned Table") + private Boolean nonPartitionedTable = false; + + @Parameter(names = {"--use-multi-partition-keys", "-mp"}, description = "Use Multiple Partition Keys") + private Boolean useMultiPartitionKeys = false; + + @Parameter(names = {"--commit-type", "-ct"}, description = "How may commits will run") + private String commitType = "overwrite"; + + @Parameter(names = {"--help", "-h"}, help = true) + public Boolean help = false; + + private static final Logger LOG = LogManager.getLogger(HoodieJavaGenerateApp.class); + + public static void main(String[] args) throws Exception { + HoodieJavaGenerateApp cli = new HoodieJavaGenerateApp(); + JCommander cmd = new JCommander(cli, null, args); + + if (cli.help) { + cmd.usage(); + System.exit(1); + } + try (SparkSession spark = cli.getOrCreateSparkSession()) { + cli.insert(spark); + } + } + + private SparkSession getOrCreateSparkSession() { + // Spark session setup.. + SparkSession spark = SparkSession.builder().appName("Hoodie Spark APP") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[1]").getOrCreate(); + spark.sparkContext().setLogLevel("WARN"); + return spark; + } + + private HoodieTestDataGenerator getDataGenerate() { + // Generator of some records to be loaded in. + if (nonPartitionedTable) { + // All data goes to base-path + return new HoodieTestDataGenerator(new String[]{""}); + } else { + return new HoodieTestDataGenerator(); + } + } + + /** + * Setup configs for syncing to hive. + */ + private DataFrameWriter updateHiveSyncConfig(DataFrameWriter writer) { + if (enableHiveSync) { + LOG.info("Enabling Hive sync to " + hiveJdbcUrl); + writer = writer.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), hiveTable) + .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), hiveDB) + .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), hiveJdbcUrl) + .option(DataSourceWriteOptions.HIVE_USER_OPT_KEY(), hiveUser) + .option(DataSourceWriteOptions.HIVE_PASS_OPT_KEY(), hivePass) + .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY(), "true"); + if (nonPartitionedTable) { + writer = writer + .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(), + NonPartitionedExtractor.class.getCanonicalName()) + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), ""); + } else if (useMultiPartitionKeys) { + writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "year,month,day").option( + DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(), + MultiPartKeysValueExtractor.class.getCanonicalName()); + } else { + writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "dateStr"); + } + } + return writer; + } + + private void insert(SparkSession spark) throws IOException { + HoodieTestDataGenerator dataGen = getDataGenerate(); + + JavaSparkContext jssc = new JavaSparkContext(spark.sparkContext()); + + // Generate some input.. + String instantTime = HoodieActiveTimeline.createNewInstantTime(); + List recordsSoFar = new ArrayList<>(dataGen.generateInserts(instantTime/* ignore */, 100)); + List records1 = DataSourceTestUtils.convertToStringList(recordsSoFar); + Dataset inputDF1 = spark.read().json(jssc.parallelize(records1, 2)); + + // Save as hoodie dataset (copy on write) + // specify the hoodie source + DataFrameWriter writer = inputDF1.write().format("org.apache.hudi") + // any hoodie client config can be passed like this + .option("hoodie.insert.shuffle.parallelism", "2") + // full list in HoodieWriteConfig & its package + .option("hoodie.upsert.shuffle.parallelism", "2") + // Hoodie Table Type + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY(), tableType) + // insert + .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL()) + // This is the record key + .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key") + // this is the partition to place it into + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition") + // use to combine duplicate records in input/with disk val + .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp") + // Used by hive sync and queries + .option(HoodieWriteConfig.TABLE_NAME, tableName) + // Add Key Extractor + .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY(), + nonPartitionedTable ? NonpartitionedKeyGenerator.class.getCanonicalName() + : SimpleKeyGenerator.class.getCanonicalName()) + .mode(commitType); + + updateHiveSyncConfig(writer); + // new dataset if needed + writer.save(tablePath); // ultimately where the dataset will be placed + FileSystem fs = FileSystem.get(jssc.hadoopConfiguration()); + String commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, tablePath); + LOG.info("Commit at instant time :" + commitInstantTime1); + } +}