1
0

[HUDI-1415] Read Hoodie Table As Spark DataSource Table (#2283)

This commit is contained in:
pengzhiwei
2021-04-21 05:21:38 +08:00
committed by GitHub
parent 3253079507
commit aacb8be521
13 changed files with 382 additions and 48 deletions

View File

@@ -22,6 +22,7 @@ import java.util
import java.util.{Collections, Date, UUID}
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.client.{SparkRDDWriteClient, TestBootstrap}
import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordPayload}
@@ -29,10 +30,13 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.hive.HiveSyncConfig
import org.apache.hudi.testutils.DataSourceTestUtils
import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils}
import org.apache.spark.SparkContext
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext, SaveMode, SparkSession}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{spy, times, verify}
@@ -486,6 +490,46 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
}
})
test("Test build sync config for spark sql") {
initSparkContext("test build sync config")
val addSqlTablePropertiesMethod =
HoodieSparkSqlWriter.getClass.getDeclaredMethod("addSqlTableProperties",
classOf[SQLConf], classOf[StructType], classOf[Map[_,_]])
addSqlTablePropertiesMethod.setAccessible(true)
val schema = DataSourceTestUtils.getStructTypeExampleSchema
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
val basePath = "/tmp/hoodie_test"
val params = Map(
"path" -> basePath,
DataSourceWriteOptions.TABLE_NAME_OPT_KEY -> "test_hoodie",
DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "partition"
)
val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params)
val newParams = addSqlTablePropertiesMethod.invoke(HoodieSparkSqlWriter,
spark.sessionState.conf, structType, parameters)
.asInstanceOf[Map[String, String]]
val buildSyncConfigMethod =
HoodieSparkSqlWriter.getClass.getDeclaredMethod("buildSyncConfig", classOf[Path],
classOf[Map[_,_]])
buildSyncConfigMethod.setAccessible(true)
val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter,
new Path(basePath), newParams).asInstanceOf[HiveSyncConfig]
assertResult("spark.sql.sources.provider=hudi\n" +
"spark.sql.sources.schema.partCol.0=partition\n" +
"spark.sql.sources.schema.numParts=1\n" +
"spark.sql.sources.schema.numPartCols=1\n" +
"spark.sql.sources.schema.part.0=" +
"{\"type\":\"struct\",\"fields\":[{\"name\":\"_row_key\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}," +
"{\"name\":\"ts\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}," +
"{\"name\":\"partition\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}")(hiveSyncConfig.tableProperties)
assertResult("path=/tmp/hoodie_test")(hiveSyncConfig.serdeProperties)
}
case class Test(uuid: String, ts: Long)
import scala.collection.JavaConverters