From 56cd8ffae0aa1dc92633ebe880a5188cb9e74f5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=91=A3=E5=8F=AF=E4=BC=A6?= Date: Sun, 23 Jan 2022 14:11:27 +0800 Subject: [PATCH] [HUDI-2837] Add support for using database name in incremental query (#4083) --- .../hudi/common/table/HoodieTableConfig.java | 13 +++ .../common/table/HoodieTableMetaClient.java | 12 +++ .../common/testutils/HoodieTestUtils.java | 22 ++++ .../hadoop/HoodieFileInputFormatBase.java | 8 +- .../apache/hudi/hadoop/InputPathHandler.java | 30 ++++-- .../HoodieParquetRealtimeInputFormat.java | 5 +- .../hudi/hadoop/utils/HoodieHiveUtils.java | 6 ++ .../hadoop/TestHoodieHFileInputFormat.java | 102 ++++++++++++++++-- .../hadoop/TestHoodieParquetInputFormat.java | 102 ++++++++++++++++-- .../hadoop/testutils/InputFormatTestUtil.java | 24 ++++- .../apache/hudi/HoodieSparkSqlWriter.scala | 2 + .../catalyst/catalog/HoodieCatalogTable.scala | 5 +- .../spark/sql/hudi/HoodieOptionConfig.scala | 10 +- .../command/CreateHoodieTableCommand.scala | 4 +- .../command/TruncateHoodieTableCommand.scala | 2 +- .../command/payload/ExpressionPayload.scala | 4 +- .../InsertIntoHoodieTableCommand.scala | 10 +- .../command/MergeIntoHoodieTableCommand.scala | 2 +- .../spark/sql/hudi/TestCreateTable.scala | 30 ++++-- 19 files changed, 330 insertions(+), 63 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 54724d5f3..624c02726 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -76,6 +76,12 @@ public class HoodieTableConfig extends HoodieConfig { public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties"; public static final String HOODIE_PROPERTIES_FILE_BACKUP = "hoodie.properties.backup"; + public static final ConfigProperty DATABASE_NAME = ConfigProperty + .key("hoodie.database.name") + .noDefaultValue() + .withDocumentation("Database name that will be used for incremental query.If different databases have the same table name during incremental query, " + + "we can set it to limit the table name under a specific database"); + public static final ConfigProperty NAME = ConfigProperty .key("hoodie.table.name") .noDefaultValue() @@ -422,6 +428,13 @@ public class HoodieTableConfig extends HoodieConfig { } } + /** + * Read the database name. + */ + public String getDatabaseName() { + return getString(DATABASE_NAME); + } + /** * Read the table name. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index f44d28eca..b9a367396 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -624,6 +624,7 @@ public class HoodieTableMetaClient implements Serializable { public static class PropertyBuilder { private HoodieTableType tableType; + private String databaseName; private String tableName; private String tableCreateSchema; private String recordKeyFields; @@ -655,6 +656,11 @@ public class HoodieTableMetaClient implements Serializable { return setTableType(HoodieTableType.valueOf(tableType)); } + public PropertyBuilder setDatabaseName(String databaseName) { + this.databaseName = databaseName; + return this; + } + public PropertyBuilder setTableName(String tableName) { this.tableName = tableName; return this; @@ -753,6 +759,9 @@ public class HoodieTableMetaClient implements Serializable { public PropertyBuilder fromProperties(Properties properties) { HoodieConfig hoodieConfig = new HoodieConfig(properties); + if (hoodieConfig.contains(HoodieTableConfig.DATABASE_NAME)) { + setDatabaseName(hoodieConfig.getString(HoodieTableConfig.DATABASE_NAME)); + } if (hoodieConfig.contains(HoodieTableConfig.NAME)) { setTableName(hoodieConfig.getString(HoodieTableConfig.NAME)); } @@ -819,6 +828,9 @@ public class HoodieTableMetaClient implements Serializable { ValidationUtils.checkArgument(tableName != null, "tableName is null"); HoodieTableConfig tableConfig = new HoodieTableConfig(); + if (databaseName != null) { + tableConfig.setValue(HoodieTableConfig.DATABASE_NAME, databaseName); + } tableConfig.setValue(HoodieTableConfig.NAME, tableName); tableConfig.setValue(HoodieTableConfig.TYPE, tableType.name()); tableConfig.setValue(HoodieTableConfig.VERSION, diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java index d03dca0c8..c623c2f5d 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java @@ -46,6 +46,7 @@ import java.util.UUID; */ public class HoodieTestUtils { + public static final String HOODIE_DATABASE = "test_incremental"; public static final String RAW_TRIPS_TEST_NAME = "raw_trips"; public static final String DEFAULT_WRITE_TOKEN = "1-0-1"; public static final int DEFAULT_LOG_VERSION = 1; @@ -91,6 +92,14 @@ public class HoodieTestUtils { return init(hadoopConf, basePath, tableType, properties); } + public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType, + HoodieFileFormat baseFileFormat, String databaseName) + throws IOException { + Properties properties = new Properties(); + properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), baseFileFormat.toString()); + return init(hadoopConf, basePath, tableType, properties, databaseName); + } + public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType, HoodieFileFormat baseFileFormat) throws IOException { @@ -111,6 +120,19 @@ public class HoodieTestUtils { return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties); } + public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType, + Properties properties, String databaseName) + throws IOException { + properties = HoodieTableMetaClient.withPropertyBuilder() + .setDatabaseName(databaseName) + .setTableName(RAW_TRIPS_TEST_NAME) + .setTableType(tableType) + .setPayloadClass(HoodieAvroPayload.class) + .fromProperties(properties) + .build(); + return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties); + } + public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, HoodieFileFormat baseFileFormat) throws IOException { Properties props = new Properties(); props.setProperty(HoodieTableConfig.BOOTSTRAP_BASE_PATH.key(), bootstrapBasePath); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java index 9597256eb..a35eb5094 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java @@ -121,7 +121,7 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat inputPaths = inputPathHandler.getGroupedIncrementalPaths().get(metaClient); - List result = listStatusForIncrementalMode(job, metaClient, inputPaths); + List result = listStatusForIncrementalMode(job, metaClient, inputPaths, table); if (result != null) { returns.addAll(result); } @@ -229,14 +229,14 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat listStatusForIncrementalMode(JobConf job, HoodieTableMetaClient tableMetaClient, List inputPaths) throws IOException { - String tableName = tableMetaClient.getTableConfig().getTableName(); + protected List listStatusForIncrementalMode(JobConf job, HoodieTableMetaClient tableMetaClient, + List inputPaths, String incrementalTable) throws IOException { Job jobContext = Job.getInstance(job); Option timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient); if (!timeline.isPresent()) { return null; } - Option> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, timeline.get()); + Option> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, incrementalTable, timeline.get()); if (!commitsToCheck.isPresent()) { return null; } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java index f7adf38e4..07bd82afa 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java @@ -19,11 +19,13 @@ package org.apache.hudi.hadoop; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.InvalidTableException; import org.apache.hudi.exception.TableNotFoundException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hudi.hadoop.utils.HoodieHiveUtils; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -53,11 +55,12 @@ public class InputPathHandler { public static final Logger LOG = LogManager.getLogger(InputPathHandler.class); private final Configuration conf; - // tablename to metadata mapping for all Hoodie tables(both incremental & snapshot) + // tableName to metadata mapping for all Hoodie tables(both incremental & snapshot) private final Map tableMetaClientMap; private final Map> groupedIncrementalPaths; private final List snapshotPaths; private final List nonHoodieInputPaths; + private boolean isIncrementalUseDatabase; public InputPathHandler(Configuration conf, Path[] inputPaths, List incrementalTables) throws IOException { this.conf = conf; @@ -65,13 +68,14 @@ public class InputPathHandler { snapshotPaths = new ArrayList<>(); nonHoodieInputPaths = new ArrayList<>(); groupedIncrementalPaths = new HashMap<>(); + this.isIncrementalUseDatabase = HoodieHiveUtils.isIncrementalUseDatabase(conf); parseInputPaths(inputPaths, incrementalTables); } /** * Takes in the original InputPaths and classifies each of them into incremental, snapshot and * non-hoodie InputPaths. The logic is as follows: - * 1. Check if an inputPath starts with the same basepath as any of the metadata basepaths we know + * 1. Check if an inputPath starts with the same basePath as any of the metadata basePaths we know * 1a. If yes, this belongs to a Hoodie table that we already know about. Simply classify this * as incremental or snapshot - We can get the table name of this inputPath from the * metadata. Then based on the list of incrementalTables, we can classify this inputPath. @@ -95,19 +99,17 @@ public class InputPathHandler { // We already know the base path for this inputPath. basePathKnown = true; // Check if this is for a snapshot query - String tableName = metaClient.getTableConfig().getTableName(); - tagAsIncrementalOrSnapshot(inputPath, tableName, metaClient, incrementalTables); + tagAsIncrementalOrSnapshot(inputPath, metaClient, incrementalTables); break; } } if (!basePathKnown) { - // This path is for a table that we dont know about yet. + // This path is for a table that we don't know about yet. HoodieTableMetaClient metaClient; try { metaClient = getTableMetaClientForBasePath(inputPath.getFileSystem(conf), inputPath); - String tableName = metaClient.getTableConfig().getTableName(); - tableMetaClientMap.put(tableName, metaClient); - tagAsIncrementalOrSnapshot(inputPath, tableName, metaClient, incrementalTables); + tableMetaClientMap.put(getIncrementalTable(metaClient), metaClient); + tagAsIncrementalOrSnapshot(inputPath, metaClient, incrementalTables); } catch (TableNotFoundException | InvalidTableException e) { // This is a non Hoodie inputPath LOG.info("Handling a non-hoodie path " + inputPath); @@ -117,9 +119,8 @@ public class InputPathHandler { } } - private void tagAsIncrementalOrSnapshot(Path inputPath, String tableName, - HoodieTableMetaClient metaClient, List incrementalTables) { - if (!incrementalTables.contains(tableName)) { + private void tagAsIncrementalOrSnapshot(Path inputPath, HoodieTableMetaClient metaClient, List incrementalTables) { + if (!incrementalTables.contains(getIncrementalTable(metaClient))) { snapshotPaths.add(inputPath); } else { // Group incremental Paths belonging to same table. @@ -145,4 +146,11 @@ public class InputPathHandler { public List getNonHoodieInputPaths() { return nonHoodieInputPaths; } + + private String getIncrementalTable(HoodieTableMetaClient metaClient) { + String databaseName = metaClient.getTableConfig().getDatabaseName(); + String tableName = metaClient.getTableConfig().getTableName(); + return isIncrementalUseDatabase && !StringUtils.isNullOrEmpty(databaseName) + ? databaseName + "." + tableName : tableName; + } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java index b6a7fe9f4..f3cf4ffa8 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java @@ -113,9 +113,8 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i */ @Override protected List listStatusForIncrementalMode( - JobConf job, HoodieTableMetaClient tableMetaClient, List inputPaths) throws IOException { + JobConf job, HoodieTableMetaClient tableMetaClient, List inputPaths, String incrementalTable) throws IOException { List result = new ArrayList<>(); - String tableName = tableMetaClient.getTableConfig().getTableName(); Job jobContext = Job.getInstance(job); // step1 @@ -123,7 +122,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i if (!timeline.isPresent()) { return result; } - HoodieTimeline commitsTimelineToReturn = HoodieInputFormatUtils.getHoodieTimelineForIncrementalQuery(jobContext, tableName, timeline.get()); + HoodieTimeline commitsTimelineToReturn = HoodieInputFormatUtils.getHoodieTimelineForIncrementalQuery(jobContext, incrementalTable, timeline.get()); Option> commitsToCheck = Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList())); if (!commitsToCheck.isPresent()) { return result; diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java index 8abf27ea0..b4f7e3363 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java @@ -18,6 +18,7 @@ package org.apache.hudi.hadoop.utils; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CollectionUtils; @@ -43,6 +44,7 @@ public class HoodieHiveUtils { public static final Logger LOG = LogManager.getLogger(HoodieHiveUtils.class); + public static final String HOODIE_INCREMENTAL_USE_DATABASE = "hoodie.incremental.use.database"; public static final String HOODIE_CONSUME_MODE_PATTERN = "hoodie.%s.consume.mode"; public static final String HOODIE_START_COMMIT_PATTERN = "hoodie.%s.consume.start.timestamp"; public static final String HOODIE_MAX_COMMIT_PATTERN = "hoodie.%s.consume.max.commits"; @@ -178,4 +180,8 @@ public class HoodieHiveUtils { } return timeline.findInstantsBeforeOrEquals(maxCommit); } + + public static boolean isIncrementalUseDatabase(Configuration conf) { + return conf.getBoolean(HOODIE_INCREMENTAL_USE_DATABASE, false); + } } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHFileInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHFileInputFormat.java index 2c3402702..67d15f9b9 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHFileInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHFileInputFormat.java @@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; @@ -232,9 +233,90 @@ public class TestHoodieHFileInputFormat { InputFormatTestUtil.setupIncremental(jobConf, "100", 1); + HoodieTableMetaClient metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), + HoodieTableType.COPY_ON_WRITE, baseFileFormat); + assertEquals(null, metaClient.getTableConfig().getDatabaseName(), + "When hoodie.database.name is not set, it should default to null"); + FileStatus[] files = inputFormat.listStatus(jobConf); assertEquals(0, files.length, "We should exclude commit 100 when returning incremental pull with start commit time as 100"); + + InputFormatTestUtil.setupIncremental(jobConf, "100", 1, true); + + files = inputFormat.listStatus(jobConf); + assertEquals(0, files.length, + "We should exclude commit 100 when returning incremental pull with start commit time as 100"); + + metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE, + baseFileFormat, HoodieTestUtils.HOODIE_DATABASE); + assertEquals(HoodieTestUtils.HOODIE_DATABASE, metaClient.getTableConfig().getDatabaseName(), + String.format("The hoodie.database.name should be %s ", HoodieTestUtils.HOODIE_DATABASE)); + + files = inputFormat.listStatus(jobConf); + assertEquals(10, files.length, + "When hoodie.incremental.use.database is true and hoodie.database.name is not null or empty" + + " and the incremental database name is not set, then the incremental query will not take effect"); + } + + @Test + public void testIncrementalWithDatabaseName() throws IOException { + // initial commit + File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100"); + createCommitFile(basePath, "100", "2016/05/01"); + + // Add the paths + FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + + InputFormatTestUtil.setupIncremental(jobConf, "100", 1, HoodieTestUtils.HOODIE_DATABASE, true); + + HoodieTableMetaClient metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), + HoodieTableType.COPY_ON_WRITE, baseFileFormat); + assertEquals(null, metaClient.getTableConfig().getDatabaseName(), + "When hoodie.database.name is not set, it should default to null"); + + FileStatus[] files = inputFormat.listStatus(jobConf); + assertEquals(10, files.length, + "When hoodie.database.name is null, then the incremental query will not take effect"); + + metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE, + baseFileFormat, ""); + assertEquals("", metaClient.getTableConfig().getDatabaseName(), + "The hoodie.database.name should be empty"); + + files = inputFormat.listStatus(jobConf); + assertEquals(10, files.length, + "When hoodie.database.name is empty, then the incremental query will not take effect"); + + metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE, + baseFileFormat, HoodieTestUtils.HOODIE_DATABASE); + assertEquals(HoodieTestUtils.HOODIE_DATABASE, metaClient.getTableConfig().getDatabaseName(), + String.format("The hoodie.database.name should be %s ", HoodieTestUtils.HOODIE_DATABASE)); + + files = inputFormat.listStatus(jobConf); + assertEquals(0, files.length, + "We should exclude commit 100 when returning incremental pull with start commit time as 100"); + + InputFormatTestUtil.setupIncremental(jobConf, "100", 1, HoodieTestUtils.HOODIE_DATABASE, false); + + files = inputFormat.listStatus(jobConf); + assertEquals(10, files.length, + "When hoodie.incremental.use.database is false and the incremental database name is set," + + "then the incremental query will not take effect"); + + // The configuration with and without database name exists together + InputFormatTestUtil.setupIncremental(jobConf, "1", 1, true); + + files = inputFormat.listStatus(jobConf); + assertEquals(0, files.length, + "When hoodie.incremental.use.database is true, " + + "We should exclude commit 100 because the returning incremental pull with start commit time is 100"); + + InputFormatTestUtil.setupIncremental(jobConf, "1", 1, false); + files = inputFormat.listStatus(jobConf); + assertEquals(10, files.length, + "When hoodie.incremental.use.database is false, " + + "We should include commit 100 because the returning incremental pull with start commit time is 1"); } private void createCommitFile(java.nio.file.Path basePath, String commitNumber, String partitionPath) @@ -316,7 +398,7 @@ public class TestHoodieHFileInputFormat { ensureFilesInCommit("Pulling all commits from 100, should get us the 1 files from 200 commit", files, "200", 1); } - // TODO enable this after enabling predicate pushdown + // TODO enable this after enabling predicate push down public void testPredicatePushDown() throws IOException { // initial commit Schema schema = getSchemaFromResource(TestHoodieHFileInputFormat.class, "/sample1.avsc"); @@ -337,7 +419,7 @@ public class TestHoodieHFileInputFormat { // check whether we have 2 records at this point ensureRecordsInCommit("We need to have 2 records that was modified at commit " + commit2 + " and no more", commit2, 2, 2); - // Make sure we have the 10 records if we roll back the stattime + // Make sure we have the 10 records if we roll back the start time InputFormatTestUtil.setupIncremental(jobConf, "0", 2); ensureRecordsInCommit("We need to have 8 records that was modified at commit " + commit1 + " and no more", commit1, 8, 10); @@ -347,19 +429,19 @@ public class TestHoodieHFileInputFormat { @Test public void testGetIncrementalTableNames() throws IOException { - String[] expectedincrTables = {"db1.raw_trips", "db2.model_trips", "db3.model_trips"}; + String[] expectedIncrTables = {"db1.raw_trips", "db2.model_trips", "db3.model_trips"}; JobConf conf = new JobConf(); - String incrementalMode1 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[0]); + String incrementalMode1 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedIncrTables[0]); conf.set(incrementalMode1, HoodieHiveUtils.INCREMENTAL_SCAN_MODE); - String incrementalMode2 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[1]); + String incrementalMode2 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedIncrTables[1]); conf.set(incrementalMode2,HoodieHiveUtils.INCREMENTAL_SCAN_MODE); String incrementalMode3 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.model_trips"); conf.set(incrementalMode3, HoodieHiveUtils.INCREMENTAL_SCAN_MODE.toLowerCase()); - String defaultmode = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips"); - conf.set(defaultmode, HoodieHiveUtils.DEFAULT_SCAN_MODE); - List actualincrTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(conf)); - for (String expectedincrTable : expectedincrTables) { - assertTrue(actualincrTables.contains(expectedincrTable)); + String defaultMode = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips"); + conf.set(defaultMode, HoodieHiveUtils.DEFAULT_SCAN_MODE); + List actualIncrTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(conf)); + for (String expectedIncrTable : expectedIncrTables) { + assertTrue(actualIncrTables.contains(expectedIncrTable)); } } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java index c45c61467..5c7a1fdf2 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java @@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; @@ -286,9 +287,90 @@ public class TestHoodieParquetInputFormat { InputFormatTestUtil.setupIncremental(jobConf, "100", 1); + HoodieTableMetaClient metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), + HoodieTableType.COPY_ON_WRITE, baseFileFormat); + assertEquals(null, metaClient.getTableConfig().getDatabaseName(), + "When hoodie.database.name is not set, it should default to null"); + FileStatus[] files = inputFormat.listStatus(jobConf); assertEquals(0, files.length, "We should exclude commit 100 when returning incremental pull with start commit time as 100"); + + InputFormatTestUtil.setupIncremental(jobConf, "100", 1, true); + + files = inputFormat.listStatus(jobConf); + assertEquals(0, files.length, + "We should exclude commit 100 when returning incremental pull with start commit time as 100"); + + metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE, + baseFileFormat, HoodieTestUtils.HOODIE_DATABASE); + assertEquals(HoodieTestUtils.HOODIE_DATABASE, metaClient.getTableConfig().getDatabaseName(), + String.format("The hoodie.database.name should be %s ", HoodieTestUtils.HOODIE_DATABASE)); + + files = inputFormat.listStatus(jobConf); + assertEquals(10, files.length, + "When hoodie.incremental.use.database is true and hoodie.database.name is not null or empty" + + " and the incremental database name is not set, then the incremental query will not take effect"); + } + + @Test + public void testIncrementalWithDatabaseName() throws IOException { + // initial commit + File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100"); + createCommitFile(basePath, "100", "2016/05/01"); + + // Add the paths + FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + + InputFormatTestUtil.setupIncremental(jobConf, "100", 1, HoodieTestUtils.HOODIE_DATABASE, true); + + HoodieTableMetaClient metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), + HoodieTableType.COPY_ON_WRITE, baseFileFormat); + assertEquals(null, metaClient.getTableConfig().getDatabaseName(), + "When hoodie.database.name is not set, it should default to null"); + + FileStatus[] files = inputFormat.listStatus(jobConf); + assertEquals(10, files.length, + "When hoodie.database.name is null, then the incremental query will not take effect"); + + metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE, + baseFileFormat, ""); + assertEquals("", metaClient.getTableConfig().getDatabaseName(), + "The hoodie.database.name should be empty"); + + files = inputFormat.listStatus(jobConf); + assertEquals(10, files.length, + "When hoodie.database.name is empty, then the incremental query will not take effect"); + + metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE, + baseFileFormat, HoodieTestUtils.HOODIE_DATABASE); + assertEquals(HoodieTestUtils.HOODIE_DATABASE, metaClient.getTableConfig().getDatabaseName(), + String.format("The hoodie.database.name should be %s ", HoodieTestUtils.HOODIE_DATABASE)); + + files = inputFormat.listStatus(jobConf); + assertEquals(0, files.length, + "We should exclude commit 100 when returning incremental pull with start commit time as 100"); + + InputFormatTestUtil.setupIncremental(jobConf, "100", 1, HoodieTestUtils.HOODIE_DATABASE, false); + + files = inputFormat.listStatus(jobConf); + assertEquals(10, files.length, + "When hoodie.incremental.use.database is false and the incremental database name is set, " + + "then the incremental query will not take effect"); + + // The configuration with and without database name exists together + InputFormatTestUtil.setupIncremental(jobConf, "1", 1, true); + + files = inputFormat.listStatus(jobConf); + assertEquals(0, files.length, + "When hoodie.incremental.use.database is true, " + + "We should exclude commit 100 because the returning incremental pull with start commit time is 100"); + + InputFormatTestUtil.setupIncremental(jobConf, "1", 1, false); + files = inputFormat.listStatus(jobConf); + assertEquals(10, files.length, + "When hoodie.incremental.use.database is false, " + + "We should include commit 100 because the returning incremental pull with start commit time is 1"); } @Test @@ -429,7 +511,7 @@ public class TestHoodieParquetInputFormat { ensureFilesInCommit("Pulling all commits from 100, should get us the 1 files from 200 commit", files, "200", 1); } - @Disabled("enable this after enabling predicate pushdown") + @Disabled("enable this after enabling predicate push down") @Test public void testPredicatePushDown() throws IOException { // initial commit @@ -451,7 +533,7 @@ public class TestHoodieParquetInputFormat { // check whether we have 2 records at this point ensureRecordsInCommit("We need to have 2 records that was modified at commit " + commit2 + " and no more", commit2, 2, 2); - // Make sure we have the 10 records if we roll back the stattime + // Make sure we have the 10 records if we roll back the start time InputFormatTestUtil.setupIncremental(jobConf, "0", 2); ensureRecordsInCommit("We need to have 8 records that was modified at commit " + commit1 + " and no more", commit1, 8, 10); @@ -461,19 +543,19 @@ public class TestHoodieParquetInputFormat { @Test public void testGetIncrementalTableNames() throws IOException { - String[] expectedincrTables = {"db1.raw_trips", "db2.model_trips", "db3.model_trips"}; + String[] expectedIncrTables = {"db1.raw_trips", "db2.model_trips", "db3.model_trips"}; JobConf conf = new JobConf(); - String incrementalMode1 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[0]); + String incrementalMode1 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedIncrTables[0]); conf.set(incrementalMode1, HoodieHiveUtils.INCREMENTAL_SCAN_MODE); - String incrementalMode2 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[1]); + String incrementalMode2 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedIncrTables[1]); conf.set(incrementalMode2, HoodieHiveUtils.INCREMENTAL_SCAN_MODE); String incrementalMode3 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.model_trips"); conf.set(incrementalMode3, HoodieHiveUtils.INCREMENTAL_SCAN_MODE.toLowerCase()); - String defaultmode = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips"); - conf.set(defaultmode, HoodieHiveUtils.DEFAULT_SCAN_MODE); - List actualincrTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(conf)); - for (String expectedincrTable : expectedincrTables) { - assertTrue(actualincrTables.contains(expectedincrTable)); + String defaultMode = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips"); + conf.set(defaultMode, HoodieHiveUtils.DEFAULT_SCAN_MODE); + List actualIncrTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(conf)); + for (String expectedIncrTable : expectedIncrTables) { + assertTrue(actualIncrTables.contains(expectedIncrTable)); } } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index b71652b67..8c19524a2 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -130,6 +130,10 @@ public class InputFormatTestUtil { } public static void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull) { + setupIncremental(jobConf, startCommit, numberOfCommitsToPull, false); + } + + public static void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull, boolean isIncrementalUseDatabase) { String modePropertyName = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); jobConf.set(modePropertyName, HoodieHiveUtils.INCREMENTAL_SCAN_MODE); @@ -141,8 +145,26 @@ public class InputFormatTestUtil { String maxCommitPulls = String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); jobConf.setInt(maxCommitPulls, numberOfCommitsToPull); + + jobConf.setBoolean(HoodieHiveUtils.HOODIE_INCREMENTAL_USE_DATABASE, isIncrementalUseDatabase); } - + + public static void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull, String databaseName, boolean isIncrementalUseDatabase) { + String modePropertyName = + String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, databaseName + "." + HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.set(modePropertyName, HoodieHiveUtils.INCREMENTAL_SCAN_MODE); + + String startCommitTimestampName = + String.format(HoodieHiveUtils.HOODIE_START_COMMIT_PATTERN, databaseName + "." + HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.set(startCommitTimestampName, startCommit); + + String maxCommitPulls = + String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, databaseName + "." + HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.setInt(maxCommitPulls, numberOfCommitsToPull); + + jobConf.setBoolean(HoodieHiveUtils.HOODIE_INCREMENTAL_USE_DATABASE, isIncrementalUseDatabase); + } + public static void setupSnapshotIncludePendingCommits(JobConf jobConf, String instantTime) { setupSnapshotScanMode(jobConf, true); String validateTimestampName = diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 5dcf03d3e..1f2aae411 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -85,6 +85,7 @@ object HoodieSparkSqlWriter { validateTableConfig(sqlContext.sparkSession, optParams, tableConfig) val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig) + val databaseName = hoodieConfig.getStringOrDefault(HoodieTableConfig.DATABASE_NAME, "") val tblName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.").trim assert(!StringUtils.isNullOrEmpty(hoodieConfig.getString(HoodieWriteConfig.TBL_NAME)), @@ -131,6 +132,7 @@ object HoodieSparkSqlWriter { val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder() .setTableType(tableType) + .setDatabaseName(databaseName) .setTableName(tblName) .setRecordKeyFields(recordKeyFields) .setBaseFileFormat(baseFileFormat) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala index 377d4db08..f14ccbe60 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -21,8 +21,7 @@ import org.apache.hudi.AvroConversionUtils import org.apache.hudi.HoodieWriterUtils._ import org.apache.hudi.common.config.DFSPropertiesConfiguration import org.apache.hudi.common.model.HoodieTableType -import org.apache.hudi.common.table.HoodieTableConfig -import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.util.ValidationUtils import org.apache.hudi.keygen.ComplexKeyGenerator import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory @@ -184,8 +183,10 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten } else { val (recordName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(table.identifier.table) val schema = SchemaConverters.toAvroType(finalSchema, false, recordName, namespace) + val hoodieDatabaseName = formatName(spark, table.identifier.database.getOrElse(spark.sessionState.catalog.getCurrentDatabase)) HoodieTableMetaClient.withPropertyBuilder() .fromProperties(properties) + .setDatabaseName(hoodieDatabaseName) .setTableName(table.identifier.table) .setTableCreateSchema(schema.toString()) .setPartitionFields(table.partitionColumnNames.mkString(",")) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala index e3388e221..24c6e21df 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala @@ -202,11 +202,11 @@ object HoodieOptionConfig { s"Can't find primaryKey `$primaryKey` in ${schema.treeString}.") } - // validate precombine key - val precombineKey = sqlOptions.get(SQL_KEY_PRECOMBINE_FIELD.sqlKeyName) - if (precombineKey.isDefined && precombineKey.get.nonEmpty) { - ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, precombineKey.get)), - s"Can't find preCombineKey `${precombineKey.get}` in ${schema.treeString}.") + // validate preCombine key + val preCombineKey = sqlOptions.get(SQL_KEY_PRECOMBINE_FIELD.sqlKeyName) + if (preCombineKey.isDefined && preCombineKey.get.nonEmpty) { + ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, preCombineKey.get)), + s"Can't find preCombineKey `${preCombineKey.get}` in ${schema.treeString}.") } // validate table type diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala index 8c9d902d9..da9fcb8d4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala @@ -123,12 +123,12 @@ object CreateHoodieTableCommand { table.storage.compressed, storageProperties + ("path" -> path)) - val tablName = HoodieSqlCommonUtils.formatName(sparkSession, table.identifier.table) + val tableName = HoodieSqlCommonUtils.formatName(sparkSession, table.identifier.table) val newDatabaseName = HoodieSqlCommonUtils.formatName(sparkSession, table.identifier.database .getOrElse(catalog.getCurrentDatabase)) val newTableIdentifier = table.identifier - .copy(table = tablName, database = Some(newDatabaseName)) + .copy(table = tableName, database = Some(newDatabaseName)) val partitionColumnNames = hoodieCatalogTable.partitionSchema.map(_.name) // append pk, preCombineKey, type to the properties of table diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala index 12ec22499..4d2debbe9 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala @@ -50,7 +50,7 @@ class TruncateHoodieTableCommand( } // If we have not specified the partition, truncate will delete all the data in the table path - // include the hoodi.properties. In this case we should reInit the table. + // include the hoodie.properties. In this case we should reInit the table. if (partitionSpec.isEmpty) { val hadoopConf = sparkSession.sessionState.newHadoopConf() // ReInit hoodie.properties diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala index e660fe870..0800d1712 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala @@ -115,7 +115,7 @@ class ExpressionPayload(record: GenericRecord, if (targetRecord.isEmpty || needUpdatingPersistedRecord(targetRecord.get, resultRecord, properties)) { resultRecordOpt = HOption.of(resultRecord) } else { - // if the PreCombine field value of targetRecord is greate + // if the PreCombine field value of targetRecord is greater // than the new incoming record, just keep the old record value. resultRecordOpt = HOption.of(targetRecord.get) } @@ -270,7 +270,7 @@ class ExpressionPayload(record: GenericRecord, object ExpressionPayload { /** - * Property for pass the merge-into delete clause condition expresssion. + * Property for pass the merge-into delete clause condition expression. */ val PAYLOAD_DELETE_CONDITION = "hoodie.payload.delete.condition" diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 560c7e17a..030d3e3c6 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -146,7 +146,7 @@ object InsertIntoHoodieTableCommand extends Logging { queryOutputWithoutMetaFields } // Align for the data fields of the query - val dataProjectsWithputMetaFields = queryDataFieldsWithoutMetaFields.zip( + val dataProjectsWithoutMetaFields = queryDataFieldsWithoutMetaFields.zip( hoodieCatalogTable.dataSchemaWithoutMetaFields.fields).map { case (dataAttr, targetField) => val castAttr = castIfNeeded(dataAttr.withNullability(targetField.nullable), targetField.dataType, conf) @@ -171,7 +171,7 @@ object InsertIntoHoodieTableCommand extends Logging { Alias(castAttr, f.name)() }) } - val alignedProjects = dataProjectsWithputMetaFields ++ partitionProjects + val alignedProjects = dataProjectsWithoutMetaFields ++ partitionProjects Project(alignedProjects, query) } @@ -217,7 +217,7 @@ object InsertIntoHoodieTableCommand extends Logging { DataSourceWriteOptions.SQL_INSERT_MODE.defaultValue())) val isNonStrictMode = insertMode == InsertMode.NON_STRICT val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty - val hasPrecombineColumn = preCombineColumn.nonEmpty + val hasPreCombineColumn = preCombineColumn.nonEmpty val operation = (enableBulkInsert, isOverwrite, dropDuplicate, isNonStrictMode, isPartitionedTable) match { case (true, _, _, false, _) => @@ -234,7 +234,7 @@ object InsertIntoHoodieTableCommand extends Logging { // insert overwrite partition case (_, true, _, _, true) => INSERT_OVERWRITE_OPERATION_OPT_VAL // disable dropDuplicate, and provide preCombineKey, use the upsert operation for strict and upsert mode. - case (false, false, false, false, _) if hasPrecombineColumn => UPSERT_OPERATION_OPT_VAL + case (false, false, false, false, _) if hasPreCombineColumn => UPSERT_OPERATION_OPT_VAL // if table is pk table and has enableBulkInsert use bulk insert for non-strict mode. case (true, _, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL // for the rest case, use the insert operation @@ -267,7 +267,7 @@ object InsertIntoHoodieTableCommand extends Logging { PARTITIONPATH_FIELD.key -> partitionFields, PAYLOAD_CLASS_NAME.key -> payloadClassName, ENABLE_ROW_WRITER.key -> enableBulkInsert.toString, - HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn), + HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPreCombineColumn), META_SYNC_ENABLED.key -> enableHive.toString, HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(), HIVE_USE_JDBC.key -> "false", diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index b3ba034d8..2c76ad567 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -450,7 +450,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie "path" -> path, RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp, PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""), - TBL_NAME.key -> targetTableName, + TBL_NAME.key -> hoodieCatalogTable.tableName, PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp, PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName, HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala index f8a658ae0..8ebef198a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala @@ -33,6 +33,10 @@ import scala.collection.JavaConverters._ class TestCreateTable extends TestHoodieSqlBase { test("Test Create Managed Hoodie Table") { + val databaseName = "hudi_database" + spark.sql(s"create database if not exists $databaseName") + spark.sql(s"use $databaseName") + val tableName = generateTableName // Create a managed table spark.sql( @@ -60,6 +64,14 @@ class TestCreateTable extends TestHoodieSqlBase { StructField("price", DoubleType), StructField("ts", LongType)) )(table.schema.fields) + + val tablePath = table.storage.properties("path") + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(tablePath) + .setConf(spark.sessionState.newHadoopConf()) + .build() + val tableConfig = metaClient.getTableConfig + assertResult(databaseName)(tableConfig.getDatabaseName) } test("Test Create Hoodie Table With Options") { @@ -88,7 +100,7 @@ class TestCreateTable extends TestHoodieSqlBase { assertResult(CatalogTableType.MANAGED)(table.tableType) assertResult( HoodieRecord.HOODIE_META_COLUMNS.asScala.map(StructField(_, StringType)) - ++ Seq( + ++ Seq( StructField("id", IntegerType), StructField("name", StringType), StructField("price", DoubleType), @@ -192,7 +204,7 @@ class TestCreateTable extends TestHoodieSqlBase { } test("Test Table Column Validate") { - withTempDir {tmp => + withTempDir { tmp => val tableName = generateTableName assertThrows[IllegalArgumentException] { spark.sql( @@ -277,7 +289,7 @@ class TestCreateTable extends TestHoodieSqlBase { | select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt """.stripMargin ) - checkAnswer(s"select id, name, price, dt from $tableName2") ( + checkAnswer(s"select id, name, price, dt from $tableName2")( Seq(1, "a1", 10, "2021-04-01") ) @@ -360,6 +372,10 @@ class TestCreateTable extends TestHoodieSqlBase { test("Test Create Table From Existing Hoodie Table") { withTempDir { tmp => + val databaseName = "hudi_database" + spark.sql(s"create database if not exists $databaseName") + spark.sql(s"use $databaseName") + Seq("2021-08-02", "2021/08/02").foreach { partitionValue => val tableName = generateTableName val tablePath = s"${tmp.getCanonicalPath}/$tableName" @@ -367,7 +383,7 @@ class TestCreateTable extends TestHoodieSqlBase { val df = Seq((1, "a1", 10, 1000, partitionValue)).toDF("id", "name", "value", "ts", "dt") // Write a table by spark dataframe. df.write.format("hudi") - .option(HoodieWriteConfig.TBL_NAME.key, tableName) + .option(HoodieWriteConfig.TBL_NAME.key, s"original_$tableName") .option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL) .option(RECORDKEY_FIELD.key, "id") .option(PRECOMBINE_FIELD.key, "ts") @@ -386,7 +402,7 @@ class TestCreateTable extends TestHoodieSqlBase { |partitioned by (dt) |location '$tablePath' |""".stripMargin - ) ("It is not allowed to specify partition columns when the table schema is not defined.") + )("It is not allowed to specify partition columns when the table schema is not defined.") spark.sql( s""" @@ -405,6 +421,8 @@ class TestCreateTable extends TestHoodieSqlBase { assertResult(true)(properties.contains(HoodieTableConfig.CREATE_SCHEMA.key)) assertResult("dt")(properties(HoodieTableConfig.PARTITION_FIELDS.key)) assertResult("ts")(properties(HoodieTableConfig.PRECOMBINE_FIELD.key)) + assertResult("")(metaClient.getTableConfig.getDatabaseName) + assertResult(s"original_$tableName")(metaClient.getTableConfig.getTableName) // Test insert into spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, '$partitionValue')") @@ -512,7 +530,7 @@ class TestCreateTable extends TestHoodieSqlBase { } test("Test Create Table From Existing Hoodie Table For None Partitioned Table") { - withTempDir{tmp => + withTempDir { tmp => // Write a table by spark dataframe. val tableName = generateTableName import spark.implicits._