From 2eaba0962a67d72000a3b1033f5f3d5bde5f4d7f Mon Sep 17 00:00:00 2001 From: hongdd Date: Sat, 26 Sep 2020 00:36:41 +0800 Subject: [PATCH] [HUDI-544] Archived commits command code cleanup (#1242) * Archived commits command code cleanup --- .../hudi/cli/commands/ArchivedCommitsCommand.java | 9 ++++++--- .../org/apache/hudi/integ/ITTestHoodieSanity.java | 7 +++++++ .../org/apache/hudi/HoodieSparkSqlWriter.scala | 15 +++++++++++---- 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java index c531eea35..1dc925b47 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java @@ -25,6 +25,7 @@ import org.apache.hudi.cli.HoodiePrintHelper; import org.apache.hudi.cli.TableHeader; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat.Reader; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; @@ -63,7 +64,7 @@ public class ArchivedCommitsCommand implements CommandMarker { throws IOException { System.out.println("===============> Showing only " + limit + " archived commits <==============="); String basePath = HoodieCLI.getTableMetaClient().getBasePath(); - Path archivePath = new Path(basePath + "/.hoodie/.commits_.archive*"); + Path archivePath = new Path(HoodieCLI.getTableMetaClient().getArchivePath() + "/.commits_.archive*"); if (folder != null && !folder.isEmpty()) { archivePath = new Path(basePath + "/.hoodie/" + folder); } @@ -138,9 +139,11 @@ public class ArchivedCommitsCommand implements CommandMarker { throws IOException { System.out.println("===============> Showing only " + limit + " archived commits <==============="); - String basePath = HoodieCLI.getTableMetaClient().getBasePath(); + HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); + String basePath = metaClient.getBasePath(); + Path archivePath = new Path(metaClient.getArchivePath() + "/.commits_.archive*"); FileStatus[] fsStatuses = - FSUtils.getFs(basePath, HoodieCLI.conf).globStatus(new Path(basePath + "/.hoodie/.commits_.archive*")); + FSUtils.getFs(basePath, HoodieCLI.conf).globStatus(archivePath); List allCommits = new ArrayList<>(); for (FileStatus fs : fsStatuses) { // read the archived file diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java index aba1d54e3..e432f9dc4 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java @@ -132,6 +132,13 @@ public class ITTestHoodieSanity extends ITTestBase { String hdfsPath = "/" + hiveTableName; String hdfsUrl = HDFS_BASE_URL + hdfsPath; + // Delete hdfs path if it exists + try { + executeCommandStringInDocker(ADHOC_1_CONTAINER, "hdfs dfs -rm -r " + hdfsUrl, true); + } catch (AssertionError ex) { + // Path not exists, pass + } + // Drop Table if it exists try { dropHiveTables(hiveTableName, tableType); diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 450bd73e4..8cab7c16e 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -29,8 +29,11 @@ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.client.{HoodieWriteClient, WriteStatus} import org.apache.hudi.common.config.TypedProperties -import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType} -import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.model.HoodieRecordPayload +import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.model.WriteOperationType +import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.common.util.ReflectionUtils import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, BOOTSTRAP_INDEX_CLASS_PROP, DEFAULT_BOOTSTRAP_INDEX_CLASS} @@ -107,8 +110,10 @@ private[hudi] object HoodieSparkSqlWriter { handleSaveModes(mode, basePath, tableConfig, tblName, operation, fs) // Create the table if not present if (!tableExists) { + val archiveLogFolder = parameters.getOrElse( + HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived") val tableMetaClient = HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, - HoodieTableType.valueOf(tableType), tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), + HoodieTableType.valueOf(tableType), tblName, archiveLogFolder, parameters(PAYLOAD_CLASS_OPT_KEY), null.asInstanceOf[String]) tableConfig = tableMetaClient.getTableConfig } @@ -244,8 +249,10 @@ private[hudi] object HoodieSparkSqlWriter { } if (!tableExists) { + val archiveLogFolder = parameters.getOrElse( + HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived") HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration, path, - HoodieTableType.valueOf(tableType), tableName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), + HoodieTableType.valueOf(tableType), tableName, archiveLogFolder, parameters(PAYLOAD_CLASS_OPT_KEY), null, bootstrapIndexClass, bootstrapBasePath) }