[HUDI-3707] Fix target schema handling in HoodieSparkUtils while creating RDD (#5347)
This commit is contained in:
@@ -26,6 +26,9 @@ import org.apache.hudi.client.utils.SparkRowSerDe
|
|||||||
import org.apache.hudi.common.config.TypedProperties
|
import org.apache.hudi.common.config.TypedProperties
|
||||||
import org.apache.hudi.common.model.HoodieRecord
|
import org.apache.hudi.common.model.HoodieRecord
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient
|
import org.apache.hudi.common.table.HoodieTableMetaClient
|
||||||
|
import org.apache.hudi.internal.schema.InternalSchema
|
||||||
|
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
|
||||||
|
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils
|
||||||
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
|
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
|
||||||
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
|
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
|
||||||
import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator}
|
import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator}
|
||||||
@@ -36,12 +39,8 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
|
|||||||
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Literal}
|
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Literal}
|
||||||
import org.apache.spark.sql.sources._
|
import org.apache.spark.sql.sources._
|
||||||
import org.apache.spark.sql.types.{StringType, StructField, StructType}
|
import org.apache.spark.sql.types.{StringType, StructField, StructType}
|
||||||
|
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
|
|
||||||
import org.apache.hudi.internal.schema.InternalSchema
|
|
||||||
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
|
|
||||||
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils
|
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
object HoodieSparkUtils extends SparkAdapterSupport {
|
object HoodieSparkUtils extends SparkAdapterSupport {
|
||||||
@@ -130,7 +129,15 @@ object HoodieSparkUtils extends SparkAdapterSupport {
|
|||||||
*/
|
*/
|
||||||
def createRdd(df: DataFrame, structName: String, recordNamespace: String, reconcileToLatestSchema: Boolean,
|
def createRdd(df: DataFrame, structName: String, recordNamespace: String, reconcileToLatestSchema: Boolean,
|
||||||
latestTableSchema: org.apache.hudi.common.util.Option[Schema] = org.apache.hudi.common.util.Option.empty()): RDD[GenericRecord] = {
|
latestTableSchema: org.apache.hudi.common.util.Option[Schema] = org.apache.hudi.common.util.Option.empty()): RDD[GenericRecord] = {
|
||||||
val latestTableSchemaConverted = if (latestTableSchema.isPresent && reconcileToLatestSchema) Some(latestTableSchema.get()) else None
|
var latestTableSchemaConverted : Option[Schema] = None
|
||||||
|
|
||||||
|
if (latestTableSchema.isPresent && reconcileToLatestSchema) {
|
||||||
|
latestTableSchemaConverted = Some(latestTableSchema.get())
|
||||||
|
} else {
|
||||||
|
// cases when users want to use latestTableSchema but have not turned on reconcileToLatestSchema explicitly
|
||||||
|
// for example, when using a Transformer implementation to transform source RDD to target RDD
|
||||||
|
latestTableSchemaConverted = if (latestTableSchema.isPresent) Some(latestTableSchema.get()) else None
|
||||||
|
}
|
||||||
createRdd(df, structName, recordNamespace, latestTableSchemaConverted)
|
createRdd(df, structName, recordNamespace, latestTableSchemaConverted)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1690,7 +1690,6 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
|||||||
testParquetDFSSource(false, null, true);
|
testParquetDFSSource(false, null, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Disabled("HUDI-3707 To investigate problem with schema provider and transformer")
|
|
||||||
@Test
|
@Test
|
||||||
public void testParquetDFSSourceWithoutSchemaProviderAndTransformer() throws Exception {
|
public void testParquetDFSSourceWithoutSchemaProviderAndTransformer() throws Exception {
|
||||||
testParquetDFSSource(false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
|
testParquetDFSSource(false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
|
||||||
@@ -1701,7 +1700,6 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
|||||||
testParquetDFSSource(true, null);
|
testParquetDFSSource(true, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Disabled("HUDI-3707 To investigate problem with schema provider and transformer")
|
|
||||||
@Test
|
@Test
|
||||||
public void testParquetDFSSourceWithSchemaFilesAndTransformer() throws Exception {
|
public void testParquetDFSSourceWithSchemaFilesAndTransformer() throws Exception {
|
||||||
testParquetDFSSource(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
|
testParquetDFSSource(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
|
||||||
@@ -1712,7 +1710,6 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
|||||||
testORCDFSSource(false, null);
|
testORCDFSSource(false, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Disabled("HUDI-3707 To investigate problem with schema provider and transformer")
|
|
||||||
@Test
|
@Test
|
||||||
public void testORCDFSSourceWithSchemaProviderAndWithTransformer() throws Exception {
|
public void testORCDFSSourceWithSchemaProviderAndWithTransformer() throws Exception {
|
||||||
testORCDFSSource(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
|
testORCDFSSource(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
|
||||||
@@ -1807,7 +1804,6 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
|||||||
testCsvDFSSource(true, '\t', false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
|
testCsvDFSSource(true, '\t', false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Disabled("HUDI-3707 To investigate problem with schema provider and transformer")
|
|
||||||
@Test
|
@Test
|
||||||
public void testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndTransformer() throws Exception {
|
public void testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndTransformer() throws Exception {
|
||||||
// The CSV files have header, the columns are separated by '\t'
|
// The CSV files have header, the columns are separated by '\t'
|
||||||
@@ -1850,7 +1846,6 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
|||||||
assertTrue(e.getMessage().contains("cannot resolve '`begin_lat`' given input columns:"));
|
assertTrue(e.getMessage().contains("cannot resolve '`begin_lat`' given input columns:"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Disabled("HUDI-3707 To investigate problem with schema provider and transformer")
|
|
||||||
@Test
|
@Test
|
||||||
public void testCsvDFSSourceNoHeaderWithSchemaProviderAndTransformer() throws Exception {
|
public void testCsvDFSSourceNoHeaderWithSchemaProviderAndTransformer() throws Exception {
|
||||||
// The CSV files do not have header, the columns are separated by '\t'
|
// The CSV files do not have header, the columns are separated by '\t'
|
||||||
|
|||||||
Reference in New Issue
Block a user