[HUDI-544] Archived commits command code cleanup (#1242)
* Archived commits command code cleanup
This commit is contained in:
@@ -25,6 +25,7 @@ import org.apache.hudi.cli.HoodiePrintHelper;
|
|||||||
import org.apache.hudi.cli.TableHeader;
|
import org.apache.hudi.cli.TableHeader;
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
import org.apache.hudi.common.model.HoodieLogFile;
|
import org.apache.hudi.common.model.HoodieLogFile;
|
||||||
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.common.table.log.HoodieLogFormat;
|
import org.apache.hudi.common.table.log.HoodieLogFormat;
|
||||||
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
|
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
|
||||||
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
|
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
|
||||||
@@ -63,7 +64,7 @@ public class ArchivedCommitsCommand implements CommandMarker {
|
|||||||
throws IOException {
|
throws IOException {
|
||||||
System.out.println("===============> Showing only " + limit + " archived commits <===============");
|
System.out.println("===============> Showing only " + limit + " archived commits <===============");
|
||||||
String basePath = HoodieCLI.getTableMetaClient().getBasePath();
|
String basePath = HoodieCLI.getTableMetaClient().getBasePath();
|
||||||
Path archivePath = new Path(basePath + "/.hoodie/.commits_.archive*");
|
Path archivePath = new Path(HoodieCLI.getTableMetaClient().getArchivePath() + "/.commits_.archive*");
|
||||||
if (folder != null && !folder.isEmpty()) {
|
if (folder != null && !folder.isEmpty()) {
|
||||||
archivePath = new Path(basePath + "/.hoodie/" + folder);
|
archivePath = new Path(basePath + "/.hoodie/" + folder);
|
||||||
}
|
}
|
||||||
@@ -138,9 +139,11 @@ public class ArchivedCommitsCommand implements CommandMarker {
|
|||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
System.out.println("===============> Showing only " + limit + " archived commits <===============");
|
System.out.println("===============> Showing only " + limit + " archived commits <===============");
|
||||||
String basePath = HoodieCLI.getTableMetaClient().getBasePath();
|
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
|
||||||
|
String basePath = metaClient.getBasePath();
|
||||||
|
Path archivePath = new Path(metaClient.getArchivePath() + "/.commits_.archive*");
|
||||||
FileStatus[] fsStatuses =
|
FileStatus[] fsStatuses =
|
||||||
FSUtils.getFs(basePath, HoodieCLI.conf).globStatus(new Path(basePath + "/.hoodie/.commits_.archive*"));
|
FSUtils.getFs(basePath, HoodieCLI.conf).globStatus(archivePath);
|
||||||
List<Comparable[]> allCommits = new ArrayList<>();
|
List<Comparable[]> allCommits = new ArrayList<>();
|
||||||
for (FileStatus fs : fsStatuses) {
|
for (FileStatus fs : fsStatuses) {
|
||||||
// read the archived file
|
// read the archived file
|
||||||
|
|||||||
@@ -132,6 +132,13 @@ public class ITTestHoodieSanity extends ITTestBase {
|
|||||||
String hdfsPath = "/" + hiveTableName;
|
String hdfsPath = "/" + hiveTableName;
|
||||||
String hdfsUrl = HDFS_BASE_URL + hdfsPath;
|
String hdfsUrl = HDFS_BASE_URL + hdfsPath;
|
||||||
|
|
||||||
|
// Delete hdfs path if it exists
|
||||||
|
try {
|
||||||
|
executeCommandStringInDocker(ADHOC_1_CONTAINER, "hdfs dfs -rm -r " + hdfsUrl, true);
|
||||||
|
} catch (AssertionError ex) {
|
||||||
|
// Path not exists, pass
|
||||||
|
}
|
||||||
|
|
||||||
// Drop Table if it exists
|
// Drop Table if it exists
|
||||||
try {
|
try {
|
||||||
dropHiveTables(hiveTableName, tableType);
|
dropHiveTables(hiveTableName, tableType);
|
||||||
|
|||||||
@@ -29,8 +29,11 @@ import org.apache.hudi.DataSourceWriteOptions._
|
|||||||
import org.apache.hudi.avro.HoodieAvroUtils
|
import org.apache.hudi.avro.HoodieAvroUtils
|
||||||
import org.apache.hudi.client.{HoodieWriteClient, WriteStatus}
|
import org.apache.hudi.client.{HoodieWriteClient, WriteStatus}
|
||||||
import org.apache.hudi.common.config.TypedProperties
|
import org.apache.hudi.common.config.TypedProperties
|
||||||
import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType}
|
import org.apache.hudi.common.model.HoodieRecordPayload
|
||||||
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
|
import org.apache.hudi.common.model.HoodieTableType
|
||||||
|
import org.apache.hudi.common.model.WriteOperationType
|
||||||
|
import org.apache.hudi.common.table.HoodieTableConfig
|
||||||
|
import org.apache.hudi.common.table.HoodieTableMetaClient
|
||||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
|
||||||
import org.apache.hudi.common.util.ReflectionUtils
|
import org.apache.hudi.common.util.ReflectionUtils
|
||||||
import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, BOOTSTRAP_INDEX_CLASS_PROP, DEFAULT_BOOTSTRAP_INDEX_CLASS}
|
import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, BOOTSTRAP_INDEX_CLASS_PROP, DEFAULT_BOOTSTRAP_INDEX_CLASS}
|
||||||
@@ -107,8 +110,10 @@ private[hudi] object HoodieSparkSqlWriter {
|
|||||||
handleSaveModes(mode, basePath, tableConfig, tblName, operation, fs)
|
handleSaveModes(mode, basePath, tableConfig, tblName, operation, fs)
|
||||||
// Create the table if not present
|
// Create the table if not present
|
||||||
if (!tableExists) {
|
if (!tableExists) {
|
||||||
|
val archiveLogFolder = parameters.getOrElse(
|
||||||
|
HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived")
|
||||||
val tableMetaClient = HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get,
|
val tableMetaClient = HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get,
|
||||||
HoodieTableType.valueOf(tableType), tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY),
|
HoodieTableType.valueOf(tableType), tblName, archiveLogFolder, parameters(PAYLOAD_CLASS_OPT_KEY),
|
||||||
null.asInstanceOf[String])
|
null.asInstanceOf[String])
|
||||||
tableConfig = tableMetaClient.getTableConfig
|
tableConfig = tableMetaClient.getTableConfig
|
||||||
}
|
}
|
||||||
@@ -244,8 +249,10 @@ private[hudi] object HoodieSparkSqlWriter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!tableExists) {
|
if (!tableExists) {
|
||||||
|
val archiveLogFolder = parameters.getOrElse(
|
||||||
|
HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived")
|
||||||
HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration, path,
|
HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration, path,
|
||||||
HoodieTableType.valueOf(tableType), tableName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY),
|
HoodieTableType.valueOf(tableType), tableName, archiveLogFolder, parameters(PAYLOAD_CLASS_OPT_KEY),
|
||||||
null, bootstrapIndexClass, bootstrapBasePath)
|
null, bootstrapIndexClass, bootstrapBasePath)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user