diff --git a/hudi-cli/pom.xml b/hudi-cli/pom.xml
index d3b08dc7c..6aecd3b92 100644
--- a/hudi-cli/pom.xml
+++ b/hudi-cli/pom.xml
@@ -122,6 +122,31 @@
false
+
+ org.apache.maven.plugins
+ maven-failsafe-plugin
+ 2.22.0
+
+
+ **/ITT*.java
+
+
+
+
+ integration-test
+
+ integration-test
+
+
+
+ verify
+ verify
+
+ verify
+
+
+
+
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
index 001a54ac9..925649373 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
@@ -23,9 +23,18 @@ package org.apache.hudi.cli;
*/
public class HoodieTableHeaderFields {
public static final String HEADER_PARTITION = "Partition";
+ public static final String HEADER_PARTITION_PATH = HEADER_PARTITION + " Path";
public static final String HEADER_FILE_ID = "FileId";
public static final String HEADER_BASE_INSTANT = "Base-Instant";
+ public static final String HEADER_CLEAN_TIME = "CleanTime";
+ public static final String HEADER_EARLIEST_COMMAND_RETAINED = "EarliestCommandRetained";
+ public static final String HEADER_CLEANING_POLICY = "Cleaning policy";
+ public static final String HEADER_TOTAL_FILES_DELETED = "Total Files Deleted";
+ public static final String HEADER_TOTAL_FILES_SUCCESSFULLY_DELETED = "Total Files Successfully Deleted";
+ public static final String HEADER_TOTAL_FAILED_DELETIONS = "Total Failed Deletions";
+ public static final String HEADER_TOTAL_TIME_TAKEN = "Total Time Taken";
+
/**
* Fields of data header.
*/
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java
index 609e44b5d..218cf362a 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper;
+import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
@@ -76,13 +77,15 @@ public class CleansCommand implements CommandMarker {
}
TableHeader header =
- new TableHeader().addTableHeaderField("CleanTime").addTableHeaderField("EarliestCommandRetained")
- .addTableHeaderField("Total Files Deleted").addTableHeaderField("Total Time Taken");
+ new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_CLEAN_TIME)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_EARLIEST_COMMAND_RETAINED)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_DELETED)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_TIME_TAKEN);
return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows);
}
@CliCommand(value = "cleans refresh", help = "Refresh the commits")
- public String refreshCleans() throws IOException {
+ public String refreshCleans() {
HoodieCLI.refreshTableMetadata();
return "Metadata for table " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " refreshed.";
}
@@ -116,8 +119,10 @@ public class CleansCommand implements CommandMarker {
rows.add(new Comparable[] {path, policy, totalSuccessDeletedFiles, totalFailedDeletedFiles});
}
- TableHeader header = new TableHeader().addTableHeaderField("Partition Path").addTableHeaderField("Cleaning policy")
- .addTableHeaderField("Total Files Successfully Deleted").addTableHeaderField("Total Failed Deletions");
+ TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_CLEANING_POLICY)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_SUCCESSFULLY_DELETED)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FAILED_DELETIONS);
return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows);
}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 5e14fc596..3534cc541 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -166,7 +166,7 @@ public class SparkMain {
return masterContained.contains(command);
}
- private static void clean(JavaSparkContext jsc, String basePath, String propsFilePath,
+ protected static void clean(JavaSparkContext jsc, String basePath, String propsFilePath,
List configs) {
HoodieCleaner.Config cfg = new HoodieCleaner.Config();
cfg.basePath = basePath;
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java
new file mode 100644
index 000000000..499197953
--- /dev/null
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.HoodiePrintHelper;
+import org.apache.hudi.cli.HoodieTableHeaderFields;
+import org.apache.hudi.cli.TableHeader;
+import org.apache.hudi.cli.common.HoodieTestCommitMetadataGenerator;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test Cases for {@link CleansCommand}.
+ */
+public class TestCleansCommand extends AbstractShellIntegrationTest {
+
+ private String tablePath;
+ private URL propsFilePath;
+
+ @Before
+ public void init() throws IOException {
+ HoodieCLI.conf = jsc.hadoopConfiguration();
+
+ String tableName = "test_table";
+ tablePath = basePath + File.separator + tableName;
+ propsFilePath = TestCleansCommand.class.getClassLoader().getResource("clean.properties");
+
+ // Create table and connect
+ new TableCommand().createTable(
+ tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
+ "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+
+ Configuration conf = HoodieCLI.conf;
+
+ metaClient = HoodieCLI.getTableMetaClient();
+ // Create four commits
+ for (int i = 100; i < 104; i++) {
+ String timestamp = String.valueOf(i);
+ // Requested Compaction
+ HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath,
+ new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, timestamp), conf);
+ // Inflight Compaction
+ HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath,
+ new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, timestamp), conf);
+ HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, timestamp, conf);
+ }
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ // reload the timeline and get all the commits before archive
+ metaClient.getActiveTimeline().reload();
+ }
+
+ /**
+ * Test case for show all cleans.
+ */
+ @Test
+ public void testShowCleans() throws Exception {
+ // Check properties file exists.
+ assertNotNull("Not found properties file", propsFilePath);
+
+ // First, run clean
+ new File(tablePath + File.separator + HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH
+ + File.separator + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE).createNewFile();
+ SparkMain.clean(jsc, HoodieCLI.basePath, propsFilePath.getPath(), new ArrayList<>());
+ assertEquals("Loaded 1 clean and the count should match", 1,
+ metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().count());
+
+ CommandResult cr = getShell().executeCommand("cleans show");
+ assertTrue(cr.isSuccess());
+
+ HoodieInstant clean = metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().findFirst().orElse(null);
+ assertNotNull(clean);
+
+ TableHeader header =
+ new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_CLEAN_TIME)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_EARLIEST_COMMAND_RETAINED)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_DELETED)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_TIME_TAKEN);
+ List rows = new ArrayList<>();
+
+ // EarliestCommandRetained should be 102, since hoodie.cleaner.commits.retained=2
+ // Total Time Taken need read from metadata
+ rows.add(new Comparable[]{clean.getTimestamp(), "102", "0", getLatestCleanTimeTakenInMillis().toString()});
+
+ String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", false, -1, false, rows);
+ assertEquals(expected, cr.getResult().toString());
+ }
+
+ /**
+ * Test case for show partitions of a clean instant.
+ */
+ @Test
+ public void testShowCleanPartitions() throws IOException {
+ // Check properties file exists.
+ assertNotNull("Not found properties file", propsFilePath);
+
+ // First, run clean with two partition
+ new File(tablePath + File.separator + HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH
+ + File.separator + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE).createNewFile();
+ new File(tablePath + File.separator + HoodieTestCommitMetadataGenerator.DEFAULT_SECOND_PARTITION_PATH
+ + File.separator + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE).createNewFile();
+ SparkMain.clean(jsc, HoodieCLI.basePath, propsFilePath.toString(), new ArrayList<>());
+ assertEquals("Loaded 1 clean and the count should match", 1,
+ metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().count());
+
+ HoodieInstant clean = metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().findFirst().get();
+
+ CommandResult cr = getShell().executeCommand("clean showpartitions --clean " + clean.getTimestamp());
+ assertTrue(cr.isSuccess());
+
+ TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_CLEANING_POLICY)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_SUCCESSFULLY_DELETED)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FAILED_DELETIONS);
+
+ // There should be two partition path
+ List rows = new ArrayList<>();
+ rows.add(new Comparable[]{HoodieTestCommitMetadataGenerator.DEFAULT_SECOND_PARTITION_PATH,
+ HoodieCleaningPolicy.KEEP_LATEST_COMMITS, "0", "0"});
+ rows.add(new Comparable[]{HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+ HoodieCleaningPolicy.KEEP_LATEST_COMMITS, "0", "0"});
+
+ String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", false, -1, false, rows);
+ assertEquals(expected, cr.getResult().toString());
+ }
+
+ /**
+ * Get time taken of latest instant.
+ */
+ private Long getLatestCleanTimeTakenInMillis() throws IOException {
+ HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
+ HoodieTimeline timeline = activeTimeline.getCleanerTimeline().filterCompletedInstants();
+ HoodieInstant clean = timeline.getReverseOrderedInstants().findFirst().orElse(null);
+ if (clean != null) {
+ HoodieCleanMetadata cleanMetadata =
+ TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(clean).get());
+ return cleanMetadata.getTimeTakenInMillis();
+ }
+ return -1L;
+ }
+}
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCleansCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCleansCommand.java
new file mode 100644
index 000000000..c42d97fa8
--- /dev/null
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCleansCommand.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.cli.common.HoodieTestCommitMetadataGenerator;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class ITTestCleansCommand extends AbstractShellIntegrationTest {
+ private String tablePath;
+ private URL propsFilePath;
+
+ @Before
+ public void init() throws IOException {
+ HoodieCLI.conf = jsc.hadoopConfiguration();
+
+ String tableName = "test_table";
+ tablePath = basePath + File.separator + tableName;
+ propsFilePath = this.getClass().getClassLoader().getResource("clean.properties");
+
+ // Create table and connect
+ new TableCommand().createTable(
+ tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
+ "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+
+ Configuration conf = HoodieCLI.conf;
+
+ metaClient = HoodieCLI.getTableMetaClient();
+ // Create four commits
+ for (int i = 100; i < 104; i++) {
+ String timestamp = String.valueOf(i);
+ // Requested Compaction
+ HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath,
+ new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, timestamp), conf);
+ // Inflight Compaction
+ HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath,
+ new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, timestamp), conf);
+ HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, timestamp, conf);
+ }
+ }
+
+ /**
+ * Test case for cleans run.
+ */
+ @Test
+ public void testRunClean() throws IOException {
+ // First, there should none of clean instant.
+ assertEquals(0, metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().count());
+
+ // Check properties file exists.
+ assertNotNull("Not found properties file", propsFilePath);
+
+ // Create partition metadata
+ new File(tablePath + File.separator + HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH
+ + File.separator + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE).createNewFile();
+ new File(tablePath + File.separator + HoodieTestCommitMetadataGenerator.DEFAULT_SECOND_PARTITION_PATH
+ + File.separator + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE).createNewFile();
+
+ CommandResult cr = getShell().executeCommand("cleans run --sparkMaster local --propsFilePath " + propsFilePath.toString());
+ assertTrue(cr.isSuccess());
+
+ // After run clean, there should have 1 clean instant
+ assertEquals("Loaded 1 clean and the count should match", 1,
+ metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().count());
+ }
+}
diff --git a/hudi-cli/src/test/resources/clean.properties b/hudi-cli/src/test/resources/clean.properties
new file mode 100644
index 000000000..33c1a66c4
--- /dev/null
+++ b/hudi-cli/src/test/resources/clean.properties
@@ -0,0 +1,19 @@
+###
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###
+hoodie.cleaner.incremental.mode=true
+hoodie.cleaner.commits.retained=2
diff --git a/scripts/run_travis_tests.sh b/scripts/run_travis_tests.sh
index f55e332e9..f3cb39cb4 100755
--- a/scripts/run_travis_tests.sh
+++ b/scripts/run_travis_tests.sh
@@ -17,6 +17,8 @@
# limitations under the License.
mode=$1
+sparkVersion=2.4.4
+hadoopVersion=2.7
if [ "$mode" = "unit" ];
then
@@ -24,6 +26,11 @@ then
mvn test -DskipITs=true -B
elif [ "$mode" = "integration" ];
then
+ echo "Downloading Apache Spark-${sparkVersion}-bin-hadoop${hadoopVersion}"
+ wget http://archive.apache.org/dist/spark/spark-${sparkVersion}/spark-${sparkVersion}-bin-hadoop${hadoopVersion}.tgz -O /tmp/spark-${sparkVersion}.tgz
+ tar -xvf /tmp/spark-${sparkVersion}.tgz
+ export SPARK_HOME=$PWD/spark-${sparkVersion}-bin-hadoop${hadoopVersion}
+ mkdir /tmp/spark-events/
echo "Running Integration Tests"
mvn verify -DskipUTs=true -B
else