1
0

[MINOR] Fix Call Procedure code style (#6186)

* Fix Call Procedure code style.
Co-authored-by: superche <superche@tencent.com>
This commit is contained in:
superche
2022-07-23 17:18:38 +08:00
committed by GitHub
parent a5348cc685
commit 859157ec01
33 changed files with 167 additions and 162 deletions

View File

@@ -29,7 +29,7 @@ import org.apache.spark.sql.types._
import java.io.FileNotFoundException import java.io.FileNotFoundException
import java.util.function.Supplier import java.util.function.Supplier
class MetadataCreateProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport { class CreateMetadataTableProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport {
private val PARAMETERS = Array[ProcedureParameter]( private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None) ProcedureParameter.required(0, "table", DataTypes.StringType, None)
) )
@@ -67,14 +67,14 @@ class MetadataCreateProcedure extends BaseProcedure with ProcedureBuilder with S
Seq(Row("Created Metadata Table in " + metadataPath + " (duration=" + timer.endTimer / 1000.0 + "secs)")) Seq(Row("Created Metadata Table in " + metadataPath + " (duration=" + timer.endTimer / 1000.0 + "secs)"))
} }
override def build = new MetadataCreateProcedure() override def build = new CreateMetadataTableProcedure()
} }
object MetadataCreateProcedure { object CreateMetadataTableProcedure {
val NAME = "metadata_create" val NAME = "create_metadata_table"
var metadataBaseDirectory: Option[String] = None var metadataBaseDirectory: Option[String] = None
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new MetadataCreateProcedure() override def get() = new CreateMetadataTableProcedure()
} }
} }

View File

@@ -26,10 +26,10 @@ import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
import java.util.function.Supplier import java.util.function.Supplier
class CreateSavepointsProcedure extends BaseProcedure with ProcedureBuilder with Logging { class CreateSavepointProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter]( private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None), ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "commit_Time", DataTypes.StringType, None), ProcedureParameter.required(1, "commit_time", DataTypes.StringType, None),
ProcedureParameter.optional(2, "user", DataTypes.StringType, ""), ProcedureParameter.optional(2, "user", DataTypes.StringType, ""),
ProcedureParameter.optional(3, "comments", DataTypes.StringType, "") ProcedureParameter.optional(3, "comments", DataTypes.StringType, "")
) )
@@ -75,14 +75,14 @@ class CreateSavepointsProcedure extends BaseProcedure with ProcedureBuilder with
Seq(Row(result)) Seq(Row(result))
} }
override def build: Procedure = new CreateSavepointsProcedure() override def build: Procedure = new CreateSavepointProcedure()
} }
object CreateSavepointsProcedure { object CreateSavepointProcedure {
val NAME: String = "create_savepoints" val NAME: String = "create_savepoint"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get(): CreateSavepointsProcedure = new CreateSavepointsProcedure() override def get(): CreateSavepointProcedure = new CreateSavepointProcedure()
} }
} }

View File

@@ -29,7 +29,7 @@ import scala.util.{Failure, Success, Try}
class DeleteMarkerProcedure extends BaseProcedure with ProcedureBuilder with Logging { class DeleteMarkerProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter]( private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None), ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "instant_Time", DataTypes.StringType, None) ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None)
) )
private val OUTPUT_TYPE = new StructType(Array[StructField]( private val OUTPUT_TYPE = new StructType(Array[StructField](

View File

@@ -27,7 +27,7 @@ import org.apache.spark.sql.types._
import java.io.FileNotFoundException import java.io.FileNotFoundException
import java.util.function.Supplier import java.util.function.Supplier
class MetadataDeleteProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport { class DeleteMetadataTableProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport {
private val PARAMETERS = Array[ProcedureParameter]( private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None) ProcedureParameter.required(0, "table", DataTypes.StringType, None)
) )
@@ -58,14 +58,14 @@ class MetadataDeleteProcedure extends BaseProcedure with ProcedureBuilder with S
Seq(Row("Removed Metadata Table from " + metadataPath)) Seq(Row("Removed Metadata Table from " + metadataPath))
} }
override def build = new MetadataDeleteProcedure() override def build = new DeleteMetadataTableProcedure()
} }
object MetadataDeleteProcedure { object DeleteMetadataTableProcedure {
val NAME = "metadata_delete" val NAME = "delete_metadata_table"
var metadataBaseDirectory: Option[String] = None var metadataBaseDirectory: Option[String] = None
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new MetadataDeleteProcedure() override def get() = new DeleteMetadataTableProcedure()
} }
} }

View File

@@ -26,7 +26,7 @@ import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
import java.util.function.Supplier import java.util.function.Supplier
class DeleteSavepointsProcedure extends BaseProcedure with ProcedureBuilder with Logging { class DeleteSavepointProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter]( private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None), ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None) ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None)
@@ -74,14 +74,14 @@ class DeleteSavepointsProcedure extends BaseProcedure with ProcedureBuilder with
Seq(Row(result)) Seq(Row(result))
} }
override def build: Procedure = new DeleteSavepointsProcedure() override def build: Procedure = new DeleteSavepointProcedure()
} }
object DeleteSavepointsProcedure { object DeleteSavepointProcedure {
val NAME: String = "delete_savepoints" val NAME: String = "delete_savepoint"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get(): DeleteSavepointsProcedure = new DeleteSavepointsProcedure() override def get(): DeleteSavepointProcedure = new DeleteSavepointProcedure()
} }
} }

View File

@@ -49,7 +49,7 @@ class ExportInstantsProcedure extends BaseProcedure with ProcedureBuilder with L
private val PARAMETERS = Array[ProcedureParameter]( private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None), ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "localFolder", DataTypes.StringType, None), ProcedureParameter.required(1, "local_folder", DataTypes.StringType, None),
ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, -1), ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, -1),
ProcedureParameter.optional(3, "actions", DataTypes.StringType, defaultActions), ProcedureParameter.optional(3, "actions", DataTypes.StringType, defaultActions),
ProcedureParameter.optional(4, "desc", DataTypes.BooleanType, false) ProcedureParameter.optional(4, "desc", DataTypes.BooleanType, false)

View File

@@ -28,17 +28,17 @@ import scala.language.higherKinds
class HdfsParquetImportProcedure extends BaseProcedure with ProcedureBuilder with Logging { class HdfsParquetImportProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter]( private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None), ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "tableType", DataTypes.StringType, None), ProcedureParameter.required(1, "table_type", DataTypes.StringType, None),
ProcedureParameter.required(2, "srcPath", DataTypes.StringType, None), ProcedureParameter.required(2, "src_path", DataTypes.StringType, None),
ProcedureParameter.required(3, "targetPath", DataTypes.StringType, None), ProcedureParameter.required(3, "target_path", DataTypes.StringType, None),
ProcedureParameter.required(4, "rowKey", DataTypes.StringType, None), ProcedureParameter.required(4, "row_key", DataTypes.StringType, None),
ProcedureParameter.required(5, "partitionKey", DataTypes.StringType, None), ProcedureParameter.required(5, "partition_key", DataTypes.StringType, None),
ProcedureParameter.required(6, "schemaFilePath", DataTypes.StringType, None), ProcedureParameter.required(6, "schema_file_path", DataTypes.StringType, None),
ProcedureParameter.optional(7, "format", DataTypes.StringType, "parquet"), ProcedureParameter.optional(7, "format", DataTypes.StringType, "parquet"),
ProcedureParameter.optional(8, "command", DataTypes.StringType, "insert"), ProcedureParameter.optional(8, "command", DataTypes.StringType, "insert"),
ProcedureParameter.optional(9, "retry", DataTypes.IntegerType, 0), ProcedureParameter.optional(9, "retry", DataTypes.IntegerType, 0),
ProcedureParameter.optional(10, "parallelism", DataTypes.IntegerType, jsc.defaultParallelism), ProcedureParameter.optional(10, "parallelism", DataTypes.IntegerType, jsc.defaultParallelism),
ProcedureParameter.optional(11, "propsFilePath", DataTypes.StringType, "") ProcedureParameter.optional(11, "props_file_path", DataTypes.StringType, "")
) )
private val OUTPUT_TYPE = new StructType(Array[StructField]( private val OUTPUT_TYPE = new StructType(Array[StructField](

View File

@@ -35,9 +35,9 @@ object HoodieProcedures {
val mapBuilder: ImmutableMap.Builder[String, Supplier[ProcedureBuilder]] = ImmutableMap.builder() val mapBuilder: ImmutableMap.Builder[String, Supplier[ProcedureBuilder]] = ImmutableMap.builder()
mapBuilder.put(RunCompactionProcedure.NAME, RunCompactionProcedure.builder) mapBuilder.put(RunCompactionProcedure.NAME, RunCompactionProcedure.builder)
mapBuilder.put(ShowCompactionProcedure.NAME, ShowCompactionProcedure.builder) mapBuilder.put(ShowCompactionProcedure.NAME, ShowCompactionProcedure.builder)
mapBuilder.put(CreateSavepointsProcedure.NAME, CreateSavepointsProcedure.builder) mapBuilder.put(CreateSavepointProcedure.NAME, CreateSavepointProcedure.builder)
mapBuilder.put(DeleteSavepointsProcedure.NAME, DeleteSavepointsProcedure.builder) mapBuilder.put(DeleteSavepointProcedure.NAME, DeleteSavepointProcedure.builder)
mapBuilder.put(RollbackSavepointsProcedure.NAME, RollbackSavepointsProcedure.builder) mapBuilder.put(RollbackToSavepointProcedure.NAME, RollbackToSavepointProcedure.builder)
mapBuilder.put(RollbackToInstantTimeProcedure.NAME, RollbackToInstantTimeProcedure.builder) mapBuilder.put(RollbackToInstantTimeProcedure.NAME, RollbackToInstantTimeProcedure.builder)
mapBuilder.put(RunClusteringProcedure.NAME, RunClusteringProcedure.builder) mapBuilder.put(RunClusteringProcedure.NAME, RunClusteringProcedure.builder)
mapBuilder.put(ShowClusteringProcedure.NAME, ShowClusteringProcedure.builder) mapBuilder.put(ShowClusteringProcedure.NAME, ShowClusteringProcedure.builder)
@@ -66,13 +66,13 @@ object HoodieProcedures {
mapBuilder.put(ShowBootstrapPartitionsProcedure.NAME, ShowBootstrapPartitionsProcedure.builder) mapBuilder.put(ShowBootstrapPartitionsProcedure.NAME, ShowBootstrapPartitionsProcedure.builder)
mapBuilder.put(UpgradeTableProcedure.NAME, UpgradeTableProcedure.builder) mapBuilder.put(UpgradeTableProcedure.NAME, UpgradeTableProcedure.builder)
mapBuilder.put(DowngradeTableProcedure.NAME, DowngradeTableProcedure.builder) mapBuilder.put(DowngradeTableProcedure.NAME, DowngradeTableProcedure.builder)
mapBuilder.put(ListMetadataFilesProcedure.NAME, ListMetadataFilesProcedure.builder) mapBuilder.put(ShowMetadataTableFilesProcedure.NAME, ShowMetadataTableFilesProcedure.builder)
mapBuilder.put(ListMetadataPartitionsProcedure.NAME, ListMetadataPartitionsProcedure.builder) mapBuilder.put(ShowMetadataTablePartitionsProcedure.NAME, ShowMetadataTablePartitionsProcedure.builder)
mapBuilder.put(MetadataCreateProcedure.NAME, MetadataCreateProcedure.builder) mapBuilder.put(CreateMetadataTableProcedure.NAME, CreateMetadataTableProcedure.builder)
mapBuilder.put(MetadataDeleteProcedure.NAME, MetadataDeleteProcedure.builder) mapBuilder.put(DeleteMetadataTableProcedure.NAME, DeleteMetadataTableProcedure.builder)
mapBuilder.put(MetadataInitProcedure.NAME, MetadataInitProcedure.builder) mapBuilder.put(InitMetadataTableProcedure.NAME, InitMetadataTableProcedure.builder)
mapBuilder.put(ShowMetadataStatsProcedure.NAME, ShowMetadataStatsProcedure.builder) mapBuilder.put(ShowMetadataTableStatsProcedure.NAME, ShowMetadataTableStatsProcedure.builder)
mapBuilder.put(ValidateMetadataFilesProcedure.NAME, ValidateMetadataFilesProcedure.builder) mapBuilder.put(ValidateMetadataTableFilesProcedure.NAME, ValidateMetadataTableFilesProcedure.builder)
mapBuilder.put(ShowFsPathDetailProcedure.NAME, ShowFsPathDetailProcedure.builder) mapBuilder.put(ShowFsPathDetailProcedure.NAME, ShowFsPathDetailProcedure.builder)
mapBuilder.put(CopyToTableProcedure.NAME, CopyToTableProcedure.builder) mapBuilder.put(CopyToTableProcedure.NAME, CopyToTableProcedure.builder)
mapBuilder.put(RepairAddpartitionmetaProcedure.NAME, RepairAddpartitionmetaProcedure.builder) mapBuilder.put(RepairAddpartitionmetaProcedure.NAME, RepairAddpartitionmetaProcedure.builder)

View File

@@ -30,10 +30,10 @@ import org.apache.spark.sql.types._
import java.io.FileNotFoundException import java.io.FileNotFoundException
import java.util.function.Supplier import java.util.function.Supplier
class MetadataInitProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport with Logging { class InitMetadataTableProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport with Logging {
private val PARAMETERS = Array[ProcedureParameter]( private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None), ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "readOnly", DataTypes.BooleanType, false) ProcedureParameter.optional(1, "read_only", DataTypes.BooleanType, false)
) )
private val OUTPUT_TYPE = new StructType(Array[StructField]( private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -71,14 +71,14 @@ class MetadataInitProcedure extends BaseProcedure with ProcedureBuilder with Spa
Seq(Row(action + " Metadata Table in " + metadataPath + " (duration=" + timer.endTimer / 1000.0 + "sec)")) Seq(Row(action + " Metadata Table in " + metadataPath + " (duration=" + timer.endTimer / 1000.0 + "sec)"))
} }
override def build = new MetadataInitProcedure() override def build = new InitMetadataTableProcedure()
} }
object MetadataInitProcedure { object InitMetadataTableProcedure {
val NAME = "metadata_init" val NAME = "init_metadata_table"
var metadataBaseDirectory: Option[String] = None var metadataBaseDirectory: Option[String] = None
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new MetadataInitProcedure() override def get() = new InitMetadataTableProcedure()
} }
} }

View File

@@ -26,7 +26,7 @@ import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
import java.util.function.Supplier import java.util.function.Supplier
class RollbackSavepointsProcedure extends BaseProcedure with ProcedureBuilder with Logging { class RollbackToSavepointProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter]( private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None), ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None) ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None)
@@ -74,14 +74,14 @@ class RollbackSavepointsProcedure extends BaseProcedure with ProcedureBuilder wi
Seq(Row(result)) Seq(Row(result))
} }
override def build: Procedure = new RollbackSavepointsProcedure() override def build: Procedure = new RollbackToSavepointProcedure()
} }
object RollbackSavepointsProcedure { object RollbackToSavepointProcedure {
val NAME: String = "rollback_savepoints" val NAME: String = "rollback_to_savepoint"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get(): RollbackSavepointsProcedure = new RollbackSavepointsProcedure() override def get(): RollbackToSavepointProcedure = new RollbackToSavepointProcedure()
} }
} }

View File

@@ -37,22 +37,22 @@ import java.util.function.Supplier
class RunBootstrapProcedure extends BaseProcedure with ProcedureBuilder with Logging { class RunBootstrapProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter]( private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None), ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "tableType", DataTypes.StringType, None), ProcedureParameter.required(1, "table_type", DataTypes.StringType, None),
ProcedureParameter.required(2, "bootstrapPath", DataTypes.StringType, None), ProcedureParameter.required(2, "bootstrap_path", DataTypes.StringType, None),
ProcedureParameter.required(3, "basePath", DataTypes.StringType, None), ProcedureParameter.required(3, "base_path", DataTypes.StringType, None),
ProcedureParameter.required(4, "rowKeyField", DataTypes.StringType, None), ProcedureParameter.required(4, "rowKey_field", DataTypes.StringType, None),
ProcedureParameter.optional(5, "baseFileFormat", DataTypes.StringType, "PARQUET"), ProcedureParameter.optional(5, "base_file_format", DataTypes.StringType, "PARQUET"),
ProcedureParameter.optional(6, "partitionPathField", DataTypes.StringType, ""), ProcedureParameter.optional(6, "partition_path_field", DataTypes.StringType, ""),
ProcedureParameter.optional(7, "bootstrapIndexClass", DataTypes.StringType, "org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex"), ProcedureParameter.optional(7, "bootstrap_index_class", DataTypes.StringType, "org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex"),
ProcedureParameter.optional(8, "selectorClass", DataTypes.StringType, "org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector"), ProcedureParameter.optional(8, "selector_class", DataTypes.StringType, "org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector"),
ProcedureParameter.optional(9, "keyGeneratorClass", DataTypes.StringType, "org.apache.hudi.keygen.SimpleKeyGenerator"), ProcedureParameter.optional(9, "key_generator_glass", DataTypes.StringType, "org.apache.hudi.keygen.SimpleKeyGenerator"),
ProcedureParameter.optional(10, "fullBootstrapInputProvider", DataTypes.StringType, "org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider"), ProcedureParameter.optional(10, "full_bootstrap_input_provider", DataTypes.StringType, "org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider"),
ProcedureParameter.optional(11, "schemaProviderClass", DataTypes.StringType, ""), ProcedureParameter.optional(11, "schema_provider_class", DataTypes.StringType, ""),
ProcedureParameter.optional(12, "payloadClass", DataTypes.StringType, "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload"), ProcedureParameter.optional(12, "payload_class", DataTypes.StringType, "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload"),
ProcedureParameter.optional(13, "parallelism", DataTypes.IntegerType, 1500), ProcedureParameter.optional(13, "parallelism", DataTypes.IntegerType, 1500),
ProcedureParameter.optional(14, "enableHiveSync", DataTypes.BooleanType, false), ProcedureParameter.optional(14, "enable_hive_sync", DataTypes.BooleanType, false),
ProcedureParameter.optional(15, "propsFilePath", DataTypes.StringType, ""), ProcedureParameter.optional(15, "props_file_path", DataTypes.StringType, ""),
ProcedureParameter.optional(16, "bootstrapOverwrite", DataTypes.BooleanType, false) ProcedureParameter.optional(16, "bootstrap_overwrite", DataTypes.BooleanType, false)
) )
private val OUTPUT_TYPE = new StructType(Array[StructField]( private val OUTPUT_TYPE = new StructType(Array[StructField](

View File

@@ -30,10 +30,10 @@ class RunCleanProcedure extends BaseProcedure with ProcedureBuilder with Logging
private val PARAMETERS = Array[ProcedureParameter]( private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None), ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "skipLocking", DataTypes.BooleanType, false), ProcedureParameter.optional(1, "skip_locking", DataTypes.BooleanType, false),
ProcedureParameter.optional(2, "scheduleInLine", DataTypes.BooleanType, true), ProcedureParameter.optional(2, "schedule_in_line", DataTypes.BooleanType, true),
ProcedureParameter.optional(3, "cleanPolicy", DataTypes.StringType, None), ProcedureParameter.optional(3, "clean_policy", DataTypes.StringType, None),
ProcedureParameter.optional(4, "retainCommits", DataTypes.IntegerType, 10) ProcedureParameter.optional(4, "retain_commits", DataTypes.IntegerType, 10)
) )
private val OUTPUT_TYPE = new StructType(Array[StructField]( private val OUTPUT_TYPE = new StructType(Array[StructField](

View File

@@ -36,8 +36,8 @@ class ShowArchivedCommitsProcedure(includeExtraMetadata: Boolean) extends BasePr
private val PARAMETERS = Array[ProcedureParameter]( private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None), ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10), ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10),
ProcedureParameter.optional(2, "startTs", DataTypes.StringType, ""), ProcedureParameter.optional(2, "start_ts", DataTypes.StringType, ""),
ProcedureParameter.optional(3, "endTs", DataTypes.StringType, "") ProcedureParameter.optional(3, "end_ts", DataTypes.StringType, "")
) )
private val OUTPUT_TYPE = new StructType(Array[StructField]( private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -63,7 +63,7 @@ class ShowArchivedCommitsProcedure(includeExtraMetadata: Boolean) extends BasePr
StructField("num_update_writes", DataTypes.LongType, nullable = true, Metadata.empty), StructField("num_update_writes", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_errors", DataTypes.LongType, nullable = true, Metadata.empty), StructField("total_errors", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_log_blocks", DataTypes.LongType, nullable = true, Metadata.empty), StructField("total_log_blocks", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_corrupt_logblocks", DataTypes.LongType, nullable = true, Metadata.empty), StructField("total_corrupt_log_blocks", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_rollback_blocks", DataTypes.LongType, nullable = true, Metadata.empty), StructField("total_rollback_blocks", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_log_records", DataTypes.LongType, nullable = true, Metadata.empty), StructField("total_log_records", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_updated_records_compacted", DataTypes.LongType, nullable = true, Metadata.empty), StructField("total_updated_records_compacted", DataTypes.LongType, nullable = true, Metadata.empty),

View File

@@ -33,17 +33,17 @@ import scala.collection.JavaConverters._
class ShowBootstrapMappingProcedure extends BaseProcedure with ProcedureBuilder { class ShowBootstrapMappingProcedure extends BaseProcedure with ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter]( private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None), ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "partitionPath", DataTypes.StringType, ""), ProcedureParameter.optional(1, "partition_path", DataTypes.StringType, ""),
ProcedureParameter.optional(2, "fileIds", DataTypes.StringType, ""), ProcedureParameter.optional(2, "file_ids", DataTypes.StringType, ""),
ProcedureParameter.optional(3, "limit", DataTypes.IntegerType, 10), ProcedureParameter.optional(3, "limit", DataTypes.IntegerType, 10),
ProcedureParameter.optional(4, "sortBy", DataTypes.StringType, "partition"), ProcedureParameter.optional(4, "sort_by", DataTypes.StringType, "partition"),
ProcedureParameter.optional(5, "desc", DataTypes.BooleanType, false) ProcedureParameter.optional(5, "desc", DataTypes.BooleanType, false)
) )
private val OUTPUT_TYPE = new StructType(Array[StructField]( private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty), StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("fileid", DataTypes.StringType, nullable = true, Metadata.empty), StructField("file_id", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("source_basepath", DataTypes.StringType, nullable = true, Metadata.empty), StructField("source_base_path", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("source_partition", DataTypes.StringType, nullable = true, Metadata.empty), StructField("source_partition", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("source_file", DataTypes.StringType, nullable = true, Metadata.empty)) StructField("source_file", DataTypes.StringType, nullable = true, Metadata.empty))
) )

View File

@@ -44,7 +44,7 @@ class ShowCommitFilesProcedure() extends BaseProcedure with ProcedureBuilder {
StructField("file_id", DataTypes.StringType, nullable = true, Metadata.empty), StructField("file_id", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("previous_commit", DataTypes.StringType, nullable = true, Metadata.empty), StructField("previous_commit", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("total_records_updated", DataTypes.LongType, nullable = true, Metadata.empty), StructField("total_records_updated", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_tecords_written", DataTypes.LongType, nullable = true, Metadata.empty), StructField("total_records_written", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_bytes_written", DataTypes.LongType, nullable = true, Metadata.empty), StructField("total_bytes_written", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_errors", DataTypes.LongType, nullable = true, Metadata.empty), StructField("total_errors", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("file_size", DataTypes.LongType, nullable = true, Metadata.empty) StructField("file_size", DataTypes.LongType, nullable = true, Metadata.empty)

View File

@@ -61,7 +61,7 @@ class ShowCommitsProcedure(includeExtraMetadata: Boolean) extends BaseProcedure
StructField("num_update_writes", DataTypes.LongType, nullable = true, Metadata.empty), StructField("num_update_writes", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_errors", DataTypes.LongType, nullable = true, Metadata.empty), StructField("total_errors", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_log_blocks", DataTypes.LongType, nullable = true, Metadata.empty), StructField("total_log_blocks", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_corrupt_logblocks", DataTypes.LongType, nullable = true, Metadata.empty), StructField("total_corrupt_log_blocks", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_rollback_blocks", DataTypes.LongType, nullable = true, Metadata.empty), StructField("total_rollback_blocks", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_log_records", DataTypes.LongType, nullable = true, Metadata.empty), StructField("total_log_records", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_updated_records_compacted", DataTypes.LongType, nullable = true, Metadata.empty), StructField("total_updated_records_compacted", DataTypes.LongType, nullable = true, Metadata.empty),

View File

@@ -37,7 +37,7 @@ class ShowFileSystemViewProcedure(showLatest: Boolean) extends BaseProcedure wit
ProcedureParameter.required(0, "table", DataTypes.StringType, None), ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "max_instant", DataTypes.StringType, ""), ProcedureParameter.optional(1, "max_instant", DataTypes.StringType, ""),
ProcedureParameter.optional(2, "include_max", DataTypes.BooleanType, false), ProcedureParameter.optional(2, "include_max", DataTypes.BooleanType, false),
ProcedureParameter.optional(3, "include_inflight", DataTypes.BooleanType, false), ProcedureParameter.optional(3, "include_in_flight", DataTypes.BooleanType, false),
ProcedureParameter.optional(4, "exclude_compaction", DataTypes.BooleanType, false), ProcedureParameter.optional(4, "exclude_compaction", DataTypes.BooleanType, false),
ProcedureParameter.optional(5, "limit", DataTypes.IntegerType, 10), ProcedureParameter.optional(5, "limit", DataTypes.IntegerType, 10),
ProcedureParameter.optional(6, "path_regex", DataTypes.StringType, "*/*/*") ProcedureParameter.optional(6, "path_regex", DataTypes.StringType, "*/*/*")

View File

@@ -21,7 +21,7 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.engine.HoodieLocalEngineContext import org.apache.hudi.common.engine.HoodieLocalEngineContext
import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.HoodieTimer import org.apache.hudi.common.util.{HoodieTimer, StringUtils}
import org.apache.hudi.exception.HoodieException import org.apache.hudi.exception.HoodieException
import org.apache.hudi.metadata.HoodieBackedTableMetadata import org.apache.hudi.metadata.HoodieBackedTableMetadata
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
@@ -31,10 +31,10 @@ import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
import java.util import java.util
import java.util.function.Supplier import java.util.function.Supplier
class ListMetadataFilesProcedure() extends BaseProcedure with ProcedureBuilder with Logging { class ShowMetadataTableFilesProcedure() extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter]( private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None), ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "partition", DataTypes.StringType, None) ProcedureParameter.optional(1, "partition", DataTypes.StringType, "")
) )
private val OUTPUT_TYPE = new StructType(Array[StructField]( private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -60,8 +60,13 @@ class ListMetadataFilesProcedure() extends BaseProcedure with ProcedureBuilder w
throw new HoodieException(s"Metadata Table not enabled/initialized.") throw new HoodieException(s"Metadata Table not enabled/initialized.")
} }
var partitionPath = new Path(basePath)
if (!StringUtils.isNullOrEmpty(partition)) {
partitionPath = new Path(basePath, partition)
}
val timer = new HoodieTimer().startTimer val timer = new HoodieTimer().startTimer
val statuses = metaReader.getAllFilesInPartition(new Path(basePath, partition)) val statuses = metaReader.getAllFilesInPartition(partitionPath)
logDebug("Took " + timer.endTimer + " ms") logDebug("Took " + timer.endTimer + " ms")
val rows = new util.ArrayList[Row] val rows = new util.ArrayList[Row]
@@ -71,13 +76,13 @@ class ListMetadataFilesProcedure() extends BaseProcedure with ProcedureBuilder w
rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
} }
override def build: Procedure = new ListMetadataFilesProcedure() override def build: Procedure = new ShowMetadataTableFilesProcedure()
} }
object ListMetadataFilesProcedure { object ShowMetadataTableFilesProcedure {
val NAME = "list_metadata_files" val NAME = "show_metadata_table_files"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new ListMetadataFilesProcedure() override def get() = new ShowMetadataTableFilesProcedure()
} }
} }

View File

@@ -31,7 +31,7 @@ import java.util.Collections
import java.util.function.Supplier import java.util.function.Supplier
import scala.collection.JavaConverters.asScalaIteratorConverter import scala.collection.JavaConverters.asScalaIteratorConverter
class ListMetadataPartitionsProcedure() extends BaseProcedure with ProcedureBuilder with Logging { class ShowMetadataTablePartitionsProcedure() extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter]( private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None) ProcedureParameter.required(0, "table", DataTypes.StringType, None)
) )
@@ -69,13 +69,13 @@ class ListMetadataPartitionsProcedure() extends BaseProcedure with ProcedureBuil
rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
} }
override def build: Procedure = new ListMetadataPartitionsProcedure() override def build: Procedure = new ShowMetadataTablePartitionsProcedure()
} }
object ListMetadataPartitionsProcedure { object ShowMetadataTablePartitionsProcedure {
val NAME = "list_metadata_partitions" val NAME = "show_metadata_table_partitions"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new ListMetadataPartitionsProcedure() override def get() = new ShowMetadataTablePartitionsProcedure()
} }
} }

View File

@@ -28,7 +28,7 @@ import java.util
import java.util.function.Supplier import java.util.function.Supplier
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
class ShowMetadataStatsProcedure() extends BaseProcedure with ProcedureBuilder { class ShowMetadataTableStatsProcedure() extends BaseProcedure with ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter]( private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None) ProcedureParameter.required(0, "table", DataTypes.StringType, None)
) )
@@ -61,14 +61,14 @@ class ShowMetadataStatsProcedure() extends BaseProcedure with ProcedureBuilder {
rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
} }
override def build: Procedure = new ShowMetadataStatsProcedure() override def build: Procedure = new ShowMetadataTableStatsProcedure()
} }
object ShowMetadataStatsProcedure { object ShowMetadataTableStatsProcedure {
val NAME = "show_metadata_stats" val NAME = "show_metadata_table_stats"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new ShowMetadataStatsProcedure() override def get() = new ShowMetadataTableStatsProcedure()
} }
} }

View File

@@ -46,7 +46,7 @@ class StatsFileSizeProcedure extends BaseProcedure with ProcedureBuilder {
StructField("95th", DataTypes.DoubleType, nullable = true, Metadata.empty), StructField("95th", DataTypes.DoubleType, nullable = true, Metadata.empty),
StructField("max", DataTypes.LongType, nullable = true, Metadata.empty), StructField("max", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("num_files", DataTypes.IntegerType, nullable = true, Metadata.empty), StructField("num_files", DataTypes.IntegerType, nullable = true, Metadata.empty),
StructField("stddev", DataTypes.DoubleType, nullable = true, Metadata.empty) StructField("std_dev", DataTypes.DoubleType, nullable = true, Metadata.empty)
)) ))
override def call(args: ProcedureArgs): Seq[Row] = { override def call(args: ProcedureArgs): Seq[Row] = {
@@ -100,7 +100,7 @@ class StatsFileSizeProcedure extends BaseProcedure with ProcedureBuilder {
object StatsFileSizeProcedure { object StatsFileSizeProcedure {
val MAX_FILES = 1000000 val MAX_FILES = 1000000
val NAME = "stats_filesizes" val NAME = "stats_file_sizes"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get(): ProcedureBuilder = new StatsFileSizeProcedure() override def get(): ProcedureBuilder = new StatsFileSizeProcedure()

View File

@@ -35,7 +35,7 @@ import scala.util.{Failure, Success, Try}
class UpgradeOrDowngradeProcedure extends BaseProcedure with ProcedureBuilder with Logging { class UpgradeOrDowngradeProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter]( private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None), ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "toVersion", DataTypes.StringType, None) ProcedureParameter.required(1, "to_version", DataTypes.StringType, None)
) )
private val OUTPUT_TYPE = new StructType(Array[StructField]( private val OUTPUT_TYPE = new StructType(Array[StructField](

View File

@@ -34,7 +34,7 @@ import java.util.function.Supplier
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.collection.JavaConverters.asScalaIteratorConverter import scala.collection.JavaConverters.asScalaIteratorConverter
class ValidateMetadataFilesProcedure() extends BaseProcedure with ProcedureBuilder with Logging { class ValidateMetadataTableFilesProcedure() extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter]( private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None), ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "verbose", DataTypes.BooleanType, false) ProcedureParameter.optional(1, "verbose", DataTypes.BooleanType, false)
@@ -135,13 +135,13 @@ class ValidateMetadataFilesProcedure() extends BaseProcedure with ProcedureBuild
rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
} }
override def build: Procedure = new ValidateMetadataFilesProcedure() override def build: Procedure = new ValidateMetadataTableFilesProcedure()
} }
object ValidateMetadataFilesProcedure { object ValidateMetadataTableFilesProcedure {
val NAME = "validate_metadata_files" val NAME = "validate_metadata_table_files"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new ValidateMetadataFilesProcedure() override def get() = new ValidateMetadataTableFilesProcedure()
} }
} }

View File

@@ -55,12 +55,12 @@ class TestBootstrapProcedure extends HoodieSparkSqlTestBase {
checkAnswer( checkAnswer(
s"""call run_bootstrap( s"""call run_bootstrap(
|table => '$tableName', |table => '$tableName',
|basePath => '$tablePath', |base_path => '$tablePath',
|tableType => '${HoodieTableType.COPY_ON_WRITE.name}', |table_type => '${HoodieTableType.COPY_ON_WRITE.name}',
|bootstrapPath => '$sourcePath', |bootstrap_path => '$sourcePath',
|rowKeyField => '$RECORD_KEY_FIELD', |rowKey_field => '$RECORD_KEY_FIELD',
|partitionPathField => '$PARTITION_FIELD', |partition_path_field => '$PARTITION_FIELD',
|bootstrapOverwrite => true)""".stripMargin) { |bootstrap_overwrite => true)""".stripMargin) {
Seq(0) Seq(0)
} }

View File

@@ -156,7 +156,7 @@ class TestCallProcedure extends HoodieSparkSqlTestBase {
// Check required fields // Check required fields
checkExceptionContain(s"""call delete_marker(table => '$tableName')""")( checkExceptionContain(s"""call delete_marker(table => '$tableName')""")(
s"Argument: instant_Time is required") s"Argument: instant_time is required")
val instantTime = "101" val instantTime = "101"
FileCreateUtils.createMarkerFile(tablePath, "", instantTime, "f0", IOType.APPEND) FileCreateUtils.createMarkerFile(tablePath, "", instantTime, "f0", IOType.APPEND)
@@ -164,7 +164,7 @@ class TestCallProcedure extends HoodieSparkSqlTestBase {
FileCreateUtils.getTotalMarkerFileCount(tablePath, "", instantTime, IOType.APPEND) FileCreateUtils.getTotalMarkerFileCount(tablePath, "", instantTime, IOType.APPEND)
} }
checkAnswer(s"""call delete_marker(table => '$tableName', instant_Time => '$instantTime')""")(Seq(true)) checkAnswer(s"""call delete_marker(table => '$tableName', instant_time => '$instantTime')""")(Seq(true))
assertResult(0) { assertResult(0) {
FileCreateUtils.getTotalMarkerFileCount(tablePath, "", instantTime, IOType.APPEND) FileCreateUtils.getTotalMarkerFileCount(tablePath, "", instantTime, IOType.APPEND)

View File

@@ -48,7 +48,7 @@ class TestCleanProcedure extends HoodieSparkSqlTestBase {
spark.sql(s"update $tableName set price = 12 where id = 1") spark.sql(s"update $tableName set price = 12 where id = 1")
spark.sql(s"update $tableName set price = 13 where id = 1") spark.sql(s"update $tableName set price = 13 where id = 1")
val result1 = spark.sql(s"call run_clean(table => '$tableName', retainCommits => 1)") val result1 = spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)")
.collect() .collect()
.map(row => Seq(row.getString(0), row.getLong(1), row.getInt(2), row.getString(3), row.getString(4), row.getInt(5))) .map(row => Seq(row.getString(0), row.getLong(1), row.getInt(2), row.getString(3), row.getString(4), row.getInt(5)))

View File

@@ -62,7 +62,7 @@ class TestCommitsProcedure extends HoodieSparkSqlTestBase {
// collect archived commits for table // collect archived commits for table
val endTs = commits(0).get(0).toString val endTs = commits(0).get(0).toString
val archivedCommits = spark.sql(s"""call show_archived_commits(table => '$tableName', endTs => '$endTs')""").collect() val archivedCommits = spark.sql(s"""call show_archived_commits(table => '$tableName', end_ts => '$endTs')""").collect()
assertResult(4) { assertResult(4) {
archivedCommits.length archivedCommits.length
} }
@@ -110,7 +110,7 @@ class TestCommitsProcedure extends HoodieSparkSqlTestBase {
// collect archived commits for table // collect archived commits for table
val endTs = commits(0).get(0).toString val endTs = commits(0).get(0).toString
val archivedCommits = spark.sql(s"""call show_archived_commits_metadata(table => '$tableName', endTs => '$endTs')""").collect() val archivedCommits = spark.sql(s"""call show_archived_commits_metadata(table => '$tableName', end_ts => '$endTs')""").collect()
assertResult(4) { assertResult(4) {
archivedCommits.length archivedCommits.length
} }

View File

@@ -43,7 +43,7 @@ class TestExportInstantsProcedure extends HoodieSparkSqlTestBase {
// insert data to table // insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
val result = spark.sql(s"""call export_instants(table => '$tableName', localFolder => '${tmp.getCanonicalPath}/$tableName')""").limit(1).collect() val result = spark.sql(s"""call export_instants(table => '$tableName', local_folder => '${tmp.getCanonicalPath}/$tableName')""").limit(1).collect()
assertResult(1) { assertResult(1) {
result.length result.length
} }

View File

@@ -56,15 +56,15 @@ class TestHdfsParquetImportProcedure extends HoodieSparkSqlTestBase {
val insertData: util.List[GenericRecord] = createInsertRecords(sourcePath) val insertData: util.List[GenericRecord] = createInsertRecords(sourcePath)
// Check required fields // Check required fields
checkExceptionContain(s"""call hdfs_parquet_import(tableType => 'mor')""")( checkExceptionContain(s"""call hdfs_parquet_import(table_type => 'mor')""")(
s"Argument: table is required") s"Argument: table is required")
checkAnswer( checkAnswer(
s"""call hdfs_parquet_import( s"""call hdfs_parquet_import(
|table => '$tableName', tableType => '${HoodieTableType.COPY_ON_WRITE.name}', |table => '$tableName', table_type => '${HoodieTableType.COPY_ON_WRITE.name}',
|srcPath => '$sourcePath', targetPath => '$targetPath', |src_path => '$sourcePath', target_path => '$targetPath',
|rowKey => '_row_key', partitionKey => 'timestamp', |row_key => '_row_key', partition_key => 'timestamp',
|schemaFilePath => '$schemaFile')""".stripMargin) { |schema_file_path => '$schemaFile')""".stripMargin) {
Seq(0) Seq(0)
} }
@@ -89,15 +89,15 @@ class TestHdfsParquetImportProcedure extends HoodieSparkSqlTestBase {
val insertData: util.List[GenericRecord] = createUpsertRecords(sourcePath) val insertData: util.List[GenericRecord] = createUpsertRecords(sourcePath)
// Check required fields // Check required fields
checkExceptionContain(s"""call hdfs_parquet_import(tableType => 'mor')""")( checkExceptionContain(s"""call hdfs_parquet_import(table_type => 'mor')""")(
s"Argument: table is required") s"Argument: table is required")
checkAnswer( checkAnswer(
s"""call hdfs_parquet_import( s"""call hdfs_parquet_import(
|table => '$tableName', tableType => '${HoodieTableType.COPY_ON_WRITE.name}', |table => '$tableName', table_type => '${HoodieTableType.COPY_ON_WRITE.name}',
|srcPath => '$sourcePath', targetPath => '$targetPath', |src_path => '$sourcePath', target_path => '$targetPath',
|rowKey => '_row_key', partitionKey => 'timestamp', |row_key => '_row_key', partition_key => 'timestamp',
|schemaFilePath => '$schemaFile', command => 'upsert')""".stripMargin) { |schema_file_path => '$schemaFile', command => 'upsert')""".stripMargin) {
Seq(0) Seq(0)
} }

View File

@@ -21,7 +21,7 @@ import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
class TestMetadataProcedure extends HoodieSparkSqlTestBase { class TestMetadataProcedure extends HoodieSparkSqlTestBase {
test("Test Call metadata_delete Procedure") { test("Test Call delete_metadata_table Procedure") {
withTempDir { tmp => withTempDir { tmp =>
val tableName = generateTableName val tableName = generateTableName
// create table // create table
@@ -44,14 +44,14 @@ class TestMetadataProcedure extends HoodieSparkSqlTestBase {
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
// delete the metadata // delete the metadata
val deleteResult = spark.sql(s"""call metadata_delete(table => '$tableName')""").collect() val deleteResult = spark.sql(s"""call delete_metadata_table(table => '$tableName')""").collect()
assertResult(1) { assertResult(1) {
deleteResult.length deleteResult.length
} }
} }
} }
test("Test Call metadata_create Procedure") { test("Test Call create_metadata_table Procedure") {
withTempDir { tmp => withTempDir { tmp =>
val tableName = generateTableName val tableName = generateTableName
// create table // create table
@@ -74,20 +74,20 @@ class TestMetadataProcedure extends HoodieSparkSqlTestBase {
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
// The first step is delete the metadata // The first step is delete the metadata
val deleteResult = spark.sql(s"""call metadata_delete(table => '$tableName')""").collect() val deleteResult = spark.sql(s"""call delete_metadata_table(table => '$tableName')""").collect()
assertResult(1) { assertResult(1) {
deleteResult.length deleteResult.length
} }
// The second step is create the metadata // The second step is create the metadata
val createResult = spark.sql(s"""call metadata_create(table => '$tableName')""").collect() val createResult = spark.sql(s"""call create_metadata_table(table => '$tableName')""").collect()
assertResult(1) { assertResult(1) {
createResult.length createResult.length
} }
} }
} }
test("Test Call metadata_init Procedure") { test("Test Call init_metadata_table Procedure") {
withTempDir { tmp => withTempDir { tmp =>
val tableName = generateTableName val tableName = generateTableName
// create table // create table
@@ -110,20 +110,20 @@ class TestMetadataProcedure extends HoodieSparkSqlTestBase {
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
// read only, no initialize // read only, no initialize
val readResult = spark.sql(s"""call metadata_init(table => '$tableName', readOnly => true)""").collect() val readResult = spark.sql(s"""call init_metadata_table(table => '$tableName', read_only => true)""").collect()
assertResult(1) { assertResult(1) {
readResult.length readResult.length
} }
// initialize metadata // initialize metadata
val initResult = spark.sql(s"""call metadata_init(table => '$tableName')""").collect() val initResult = spark.sql(s"""call init_metadata_table(table => '$tableName')""").collect()
assertResult(1) { assertResult(1) {
initResult.length initResult.length
} }
} }
} }
test("Test Call show_metadata_stats Procedure") { test("Test Call show_metadata_table_stats Procedure") {
withTempDir { tmp => withTempDir { tmp =>
val tableName = generateTableName val tableName = generateTableName
// create table // create table
@@ -147,14 +147,14 @@ class TestMetadataProcedure extends HoodieSparkSqlTestBase {
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
// collect metadata stats for table // collect metadata stats for table
val metadataStats = spark.sql(s"""call show_metadata_stats(table => '$tableName')""").collect() val metadataStats = spark.sql(s"""call show_metadata_table_stats(table => '$tableName')""").collect()
assertResult(0) { assertResult(0) {
metadataStats.length metadataStats.length
} }
} }
} }
test("Test Call list_metadata_partitions Procedure") { test("Test Call show_metadata_table_partitions Procedure") {
withTempDir { tmp => withTempDir { tmp =>
val tableName = generateTableName val tableName = generateTableName
// create table // create table
@@ -178,14 +178,14 @@ class TestMetadataProcedure extends HoodieSparkSqlTestBase {
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
// collect metadata partitions for table // collect metadata partitions for table
val partitions = spark.sql(s"""call list_metadata_partitions(table => '$tableName')""").collect() val partitions = spark.sql(s"""call show_metadata_table_partitions(table => '$tableName')""").collect()
assertResult(2) { assertResult(2) {
partitions.length partitions.length
} }
} }
} }
test("Test Call list_metadata_files Procedure") { test("Test Call show_metadata_table_files Procedure") {
withTempDir { tmp => withTempDir { tmp =>
val tableName = generateTableName val tableName = generateTableName
// create table // create table
@@ -209,21 +209,21 @@ class TestMetadataProcedure extends HoodieSparkSqlTestBase {
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
// collect metadata partitions for table // collect metadata partitions for table
val partitions = spark.sql(s"""call list_metadata_partitions(table => '$tableName')""").collect() val partitions = spark.sql(s"""call show_metadata_table_partitions(table => '$tableName')""").collect()
assertResult(2) { assertResult(2) {
partitions.length partitions.length
} }
// collect metadata files for a partition of a table // collect metadata files for a partition of a table
val partition = partitions(0).get(0).toString val partition = partitions(0).get(0).toString
val filesResult = spark.sql(s"""call list_metadata_files(table => '$tableName', partition => '$partition')""").collect() val filesResult = spark.sql(s"""call show_metadata_table_files(table => '$tableName', partition => '$partition')""").collect()
assertResult(1) { assertResult(1) {
filesResult.length filesResult.length
} }
} }
} }
test("Test Call validate_metadata_files Procedure") { test("Test Call validate_metadata_table_files Procedure") {
withTempDir { tmp => withTempDir { tmp =>
val tableName = generateTableName val tableName = generateTableName
// create table // create table
@@ -247,13 +247,13 @@ class TestMetadataProcedure extends HoodieSparkSqlTestBase {
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
// collect validate metadata files result // collect validate metadata files result
val validateFilesResult = spark.sql(s"""call validate_metadata_files(table => '$tableName')""").collect() val validateFilesResult = spark.sql(s"""call validate_metadata_table_files(table => '$tableName')""").collect()
assertResult(0) { assertResult(0) {
validateFilesResult.length validateFilesResult.length
} }
// collect validate metadata files result with verbose // collect validate metadata files result with verbose
val validateFilesVerboseResult = spark.sql(s"""call validate_metadata_files(table => '$tableName', verbose => true)""").collect() val validateFilesVerboseResult = spark.sql(s"""call validate_metadata_table_files(table => '$tableName', verbose => true)""").collect()
assertResult(2) { assertResult(2) {
validateFilesVerboseResult.length validateFilesVerboseResult.length
} }

View File

@@ -21,7 +21,7 @@ import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
class TestSavepointsProcedure extends HoodieSparkSqlTestBase { class TestSavepointsProcedure extends HoodieSparkSqlTestBase {
test("Test Call create_savepoints Procedure") { test("Test Call create_savepoint Procedure") {
withTempDir { tmp => withTempDir { tmp =>
val tableName = generateTableName val tableName = generateTableName
// create table // create table
@@ -49,7 +49,7 @@ class TestSavepointsProcedure extends HoodieSparkSqlTestBase {
} }
val commitTime = commits.apply(0).getString(0) val commitTime = commits.apply(0).getString(0)
checkAnswer(s"""call create_savepoints('$tableName', '$commitTime', 'admin', '1')""")(Seq(true)) checkAnswer(s"""call create_savepoint('$tableName', '$commitTime', 'admin', '1')""")(Seq(true))
} }
} }
@@ -83,7 +83,7 @@ class TestSavepointsProcedure extends HoodieSparkSqlTestBase {
} }
val commitTime = commits.apply(1).getString(0) val commitTime = commits.apply(1).getString(0)
checkAnswer(s"""call create_savepoints('$tableName', '$commitTime')""")(Seq(true)) checkAnswer(s"""call create_savepoint('$tableName', '$commitTime')""")(Seq(true))
// show savepoints // show savepoints
val savepoints = spark.sql(s"""call show_savepoints(table => '$tableName')""").collect() val savepoints = spark.sql(s"""call show_savepoints(table => '$tableName')""").collect()
@@ -93,7 +93,7 @@ class TestSavepointsProcedure extends HoodieSparkSqlTestBase {
} }
} }
test("Test Call delete_savepoints Procedure") { test("Test Call delete_savepoint Procedure") {
withTempDir { tmp => withTempDir { tmp =>
val tableName = generateTableName val tableName = generateTableName
// create table // create table
@@ -124,11 +124,11 @@ class TestSavepointsProcedure extends HoodieSparkSqlTestBase {
// create 3 savepoints // create 3 savepoints
commits.foreach(r => { commits.foreach(r => {
checkAnswer(s"""call create_savepoints('$tableName', '${r.getString(0)}')""")(Seq(true)) checkAnswer(s"""call create_savepoint('$tableName', '${r.getString(0)}')""")(Seq(true))
}) })
// delete savepoints // delete savepoints
checkAnswer(s"""call delete_savepoints('$tableName', '${commits.apply(1).getString(0)}')""")(Seq(true)) checkAnswer(s"""call delete_savepoint('$tableName', '${commits.apply(1).getString(0)}')""")(Seq(true))
// show savepoints with only 2 // show savepoints with only 2
val savepoints = spark.sql(s"""call show_savepoints(table => '$tableName')""").collect() val savepoints = spark.sql(s"""call show_savepoints(table => '$tableName')""").collect()
@@ -138,7 +138,7 @@ class TestSavepointsProcedure extends HoodieSparkSqlTestBase {
} }
} }
test("Test Call rollback_savepoints Procedure") { test("Test Call rollback_to_savepoint Procedure") {
withTempDir { tmp => withTempDir { tmp =>
val tableName = generateTableName val tableName = generateTableName
// create table // create table
@@ -168,11 +168,11 @@ class TestSavepointsProcedure extends HoodieSparkSqlTestBase {
// create 2 savepoints // create 2 savepoints
commits.foreach(r => { commits.foreach(r => {
checkAnswer(s"""call create_savepoints('$tableName', '${r.getString(0)}')""")(Seq(true)) checkAnswer(s"""call create_savepoint('$tableName', '${r.getString(0)}')""")(Seq(true))
}) })
// rollback savepoints // rollback savepoints
checkAnswer(s"""call rollback_savepoints('$tableName', '${commits.apply(0).getString(0)}')""")(Seq(true)) checkAnswer(s"""call rollback_to_savepoint('$tableName', '${commits.apply(0).getString(0)}')""")(Seq(true))
} }
} }
} }

View File

@@ -60,7 +60,7 @@ class TestStatsProcedure extends HoodieSparkSqlTestBase {
} }
} }
test("Test Call stats_filesizes Procedure") { test("Test Call stats_file_sizes Procedure") {
withTempDir { tmp => withTempDir { tmp =>
val tableName = generateTableName val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName" val tablePath = s"${tmp.getCanonicalPath}/$tableName"
@@ -85,12 +85,12 @@ class TestStatsProcedure extends HoodieSparkSqlTestBase {
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
// Check required fields // Check required fields
checkExceptionContain(s"""call stats_filesizes(limit => 10)""")( checkExceptionContain(s"""call stats_file_sizes(limit => 10)""")(
s"Argument: table is required") s"Argument: table is required")
// collect result for table // collect result for table
val result = spark.sql( val result = spark.sql(
s"""call stats_filesizes(table => '$tableName', partition_path => '/*')""".stripMargin).collect() s"""call stats_file_sizes(table => '$tableName', partition_path => '/*')""".stripMargin).collect()
assertResult(3) { assertResult(3) {
result.length result.length
} }

View File

@@ -48,7 +48,7 @@ class TestUpgradeOrDowngradeProcedure extends HoodieSparkSqlTestBase {
""".stripMargin) """.stripMargin)
// Check required fields // Check required fields
checkExceptionContain(s"""call downgrade_table(table => '$tableName')""")( checkExceptionContain(s"""call downgrade_table(table => '$tableName')""")(
s"Argument: toVersion is required") s"Argument: to_version is required")
var metaClient = HoodieTableMetaClient.builder var metaClient = HoodieTableMetaClient.builder
.setConf(new JavaSparkContext(spark.sparkContext).hadoopConfiguration()) .setConf(new JavaSparkContext(spark.sparkContext).hadoopConfiguration())
@@ -62,7 +62,7 @@ class TestUpgradeOrDowngradeProcedure extends HoodieSparkSqlTestBase {
assertTableVersionFromPropertyFile(metaClient, HoodieTableVersion.FOUR.versionCode) assertTableVersionFromPropertyFile(metaClient, HoodieTableVersion.FOUR.versionCode)
// downgrade table to ZERO // downgrade table to ZERO
checkAnswer(s"""call downgrade_table(table => '$tableName', toVersion => 'ZERO')""")(Seq(true)) checkAnswer(s"""call downgrade_table(table => '$tableName', to_version => 'ZERO')""")(Seq(true))
// verify the downgraded hoodie.table.version // verify the downgraded hoodie.table.version
metaClient = HoodieTableMetaClient.reload(metaClient) metaClient = HoodieTableMetaClient.reload(metaClient)
@@ -72,7 +72,7 @@ class TestUpgradeOrDowngradeProcedure extends HoodieSparkSqlTestBase {
assertTableVersionFromPropertyFile(metaClient, HoodieTableVersion.ZERO.versionCode) assertTableVersionFromPropertyFile(metaClient, HoodieTableVersion.ZERO.versionCode)
// upgrade table to ONE // upgrade table to ONE
checkAnswer(s"""call upgrade_table(table => '$tableName', toVersion => 'ONE')""")(Seq(true)) checkAnswer(s"""call upgrade_table(table => '$tableName', to_version => 'ONE')""")(Seq(true))
// verify the upgraded hoodie.table.version // verify the upgraded hoodie.table.version
metaClient = HoodieTableMetaClient.reload(metaClient) metaClient = HoodieTableMetaClient.reload(metaClient)