1
0

[HUDI-2837] Add support for using database name in incremental query (#4083)

This commit is contained in:
董可伦
2022-01-23 14:11:27 +08:00
committed by GitHub
parent 64b1426005
commit 56cd8ffae0
19 changed files with 330 additions and 63 deletions

View File

@@ -76,6 +76,12 @@ public class HoodieTableConfig extends HoodieConfig {
public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties"; public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties";
public static final String HOODIE_PROPERTIES_FILE_BACKUP = "hoodie.properties.backup"; public static final String HOODIE_PROPERTIES_FILE_BACKUP = "hoodie.properties.backup";
public static final ConfigProperty<String> DATABASE_NAME = ConfigProperty
.key("hoodie.database.name")
.noDefaultValue()
.withDocumentation("Database name that will be used for incremental query.If different databases have the same table name during incremental query, "
+ "we can set it to limit the table name under a specific database");
public static final ConfigProperty<String> NAME = ConfigProperty public static final ConfigProperty<String> NAME = ConfigProperty
.key("hoodie.table.name") .key("hoodie.table.name")
.noDefaultValue() .noDefaultValue()
@@ -422,6 +428,13 @@ public class HoodieTableConfig extends HoodieConfig {
} }
} }
/**
* Read the database name.
*/
public String getDatabaseName() {
return getString(DATABASE_NAME);
}
/** /**
* Read the table name. * Read the table name.
*/ */

View File

@@ -624,6 +624,7 @@ public class HoodieTableMetaClient implements Serializable {
public static class PropertyBuilder { public static class PropertyBuilder {
private HoodieTableType tableType; private HoodieTableType tableType;
private String databaseName;
private String tableName; private String tableName;
private String tableCreateSchema; private String tableCreateSchema;
private String recordKeyFields; private String recordKeyFields;
@@ -655,6 +656,11 @@ public class HoodieTableMetaClient implements Serializable {
return setTableType(HoodieTableType.valueOf(tableType)); return setTableType(HoodieTableType.valueOf(tableType));
} }
public PropertyBuilder setDatabaseName(String databaseName) {
this.databaseName = databaseName;
return this;
}
public PropertyBuilder setTableName(String tableName) { public PropertyBuilder setTableName(String tableName) {
this.tableName = tableName; this.tableName = tableName;
return this; return this;
@@ -753,6 +759,9 @@ public class HoodieTableMetaClient implements Serializable {
public PropertyBuilder fromProperties(Properties properties) { public PropertyBuilder fromProperties(Properties properties) {
HoodieConfig hoodieConfig = new HoodieConfig(properties); HoodieConfig hoodieConfig = new HoodieConfig(properties);
if (hoodieConfig.contains(HoodieTableConfig.DATABASE_NAME)) {
setDatabaseName(hoodieConfig.getString(HoodieTableConfig.DATABASE_NAME));
}
if (hoodieConfig.contains(HoodieTableConfig.NAME)) { if (hoodieConfig.contains(HoodieTableConfig.NAME)) {
setTableName(hoodieConfig.getString(HoodieTableConfig.NAME)); setTableName(hoodieConfig.getString(HoodieTableConfig.NAME));
} }
@@ -819,6 +828,9 @@ public class HoodieTableMetaClient implements Serializable {
ValidationUtils.checkArgument(tableName != null, "tableName is null"); ValidationUtils.checkArgument(tableName != null, "tableName is null");
HoodieTableConfig tableConfig = new HoodieTableConfig(); HoodieTableConfig tableConfig = new HoodieTableConfig();
if (databaseName != null) {
tableConfig.setValue(HoodieTableConfig.DATABASE_NAME, databaseName);
}
tableConfig.setValue(HoodieTableConfig.NAME, tableName); tableConfig.setValue(HoodieTableConfig.NAME, tableName);
tableConfig.setValue(HoodieTableConfig.TYPE, tableType.name()); tableConfig.setValue(HoodieTableConfig.TYPE, tableType.name());
tableConfig.setValue(HoodieTableConfig.VERSION, tableConfig.setValue(HoodieTableConfig.VERSION,

View File

@@ -46,6 +46,7 @@ import java.util.UUID;
*/ */
public class HoodieTestUtils { public class HoodieTestUtils {
public static final String HOODIE_DATABASE = "test_incremental";
public static final String RAW_TRIPS_TEST_NAME = "raw_trips"; public static final String RAW_TRIPS_TEST_NAME = "raw_trips";
public static final String DEFAULT_WRITE_TOKEN = "1-0-1"; public static final String DEFAULT_WRITE_TOKEN = "1-0-1";
public static final int DEFAULT_LOG_VERSION = 1; public static final int DEFAULT_LOG_VERSION = 1;
@@ -91,6 +92,14 @@ public class HoodieTestUtils {
return init(hadoopConf, basePath, tableType, properties); return init(hadoopConf, basePath, tableType, properties);
} }
public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
HoodieFileFormat baseFileFormat, String databaseName)
throws IOException {
Properties properties = new Properties();
properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), baseFileFormat.toString());
return init(hadoopConf, basePath, tableType, properties, databaseName);
}
public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType, public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
HoodieFileFormat baseFileFormat) HoodieFileFormat baseFileFormat)
throws IOException { throws IOException {
@@ -111,6 +120,19 @@ public class HoodieTestUtils {
return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties); return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties);
} }
public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
Properties properties, String databaseName)
throws IOException {
properties = HoodieTableMetaClient.withPropertyBuilder()
.setDatabaseName(databaseName)
.setTableName(RAW_TRIPS_TEST_NAME)
.setTableType(tableType)
.setPayloadClass(HoodieAvroPayload.class)
.fromProperties(properties)
.build();
return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties);
}
public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, HoodieFileFormat baseFileFormat) throws IOException { public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, HoodieFileFormat baseFileFormat) throws IOException {
Properties props = new Properties(); Properties props = new Properties();
props.setProperty(HoodieTableConfig.BOOTSTRAP_BASE_PATH.key(), bootstrapBasePath); props.setProperty(HoodieTableConfig.BOOTSTRAP_BASE_PATH.key(), bootstrapBasePath);

View File

@@ -121,7 +121,7 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat<NullWrit
continue; continue;
} }
List<Path> inputPaths = inputPathHandler.getGroupedIncrementalPaths().get(metaClient); List<Path> inputPaths = inputPathHandler.getGroupedIncrementalPaths().get(metaClient);
List<FileStatus> result = listStatusForIncrementalMode(job, metaClient, inputPaths); List<FileStatus> result = listStatusForIncrementalMode(job, metaClient, inputPaths, table);
if (result != null) { if (result != null) {
returns.addAll(result); returns.addAll(result);
} }
@@ -229,14 +229,14 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat<NullWrit
* partitions and then filtering based on the commits of interest, this logic first extracts the * partitions and then filtering based on the commits of interest, this logic first extracts the
* partitions touched by the desired commits and then lists only those partitions. * partitions touched by the desired commits and then lists only those partitions.
*/ */
protected List<FileStatus> listStatusForIncrementalMode(JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException { protected List<FileStatus> listStatusForIncrementalMode(JobConf job, HoodieTableMetaClient tableMetaClient,
String tableName = tableMetaClient.getTableConfig().getTableName(); List<Path> inputPaths, String incrementalTable) throws IOException {
Job jobContext = Job.getInstance(job); Job jobContext = Job.getInstance(job);
Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient); Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
if (!timeline.isPresent()) { if (!timeline.isPresent()) {
return null; return null;
} }
Option<List<HoodieInstant>> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, timeline.get()); Option<List<HoodieInstant>> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, incrementalTable, timeline.get());
if (!commitsToCheck.isPresent()) { if (!commitsToCheck.isPresent()) {
return null; return null;
} }

View File

@@ -19,11 +19,13 @@
package org.apache.hudi.hadoop; package org.apache.hudi.hadoop;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.InvalidTableException; import org.apache.hudi.exception.InvalidTableException;
import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@@ -53,11 +55,12 @@ public class InputPathHandler {
public static final Logger LOG = LogManager.getLogger(InputPathHandler.class); public static final Logger LOG = LogManager.getLogger(InputPathHandler.class);
private final Configuration conf; private final Configuration conf;
// tablename to metadata mapping for all Hoodie tables(both incremental & snapshot) // tableName to metadata mapping for all Hoodie tables(both incremental & snapshot)
private final Map<String, HoodieTableMetaClient> tableMetaClientMap; private final Map<String, HoodieTableMetaClient> tableMetaClientMap;
private final Map<HoodieTableMetaClient, List<Path>> groupedIncrementalPaths; private final Map<HoodieTableMetaClient, List<Path>> groupedIncrementalPaths;
private final List<Path> snapshotPaths; private final List<Path> snapshotPaths;
private final List<Path> nonHoodieInputPaths; private final List<Path> nonHoodieInputPaths;
private boolean isIncrementalUseDatabase;
public InputPathHandler(Configuration conf, Path[] inputPaths, List<String> incrementalTables) throws IOException { public InputPathHandler(Configuration conf, Path[] inputPaths, List<String> incrementalTables) throws IOException {
this.conf = conf; this.conf = conf;
@@ -65,13 +68,14 @@ public class InputPathHandler {
snapshotPaths = new ArrayList<>(); snapshotPaths = new ArrayList<>();
nonHoodieInputPaths = new ArrayList<>(); nonHoodieInputPaths = new ArrayList<>();
groupedIncrementalPaths = new HashMap<>(); groupedIncrementalPaths = new HashMap<>();
this.isIncrementalUseDatabase = HoodieHiveUtils.isIncrementalUseDatabase(conf);
parseInputPaths(inputPaths, incrementalTables); parseInputPaths(inputPaths, incrementalTables);
} }
/** /**
* Takes in the original InputPaths and classifies each of them into incremental, snapshot and * Takes in the original InputPaths and classifies each of them into incremental, snapshot and
* non-hoodie InputPaths. The logic is as follows: * non-hoodie InputPaths. The logic is as follows:
* 1. Check if an inputPath starts with the same basepath as any of the metadata basepaths we know * 1. Check if an inputPath starts with the same basePath as any of the metadata basePaths we know
* 1a. If yes, this belongs to a Hoodie table that we already know about. Simply classify this * 1a. If yes, this belongs to a Hoodie table that we already know about. Simply classify this
* as incremental or snapshot - We can get the table name of this inputPath from the * as incremental or snapshot - We can get the table name of this inputPath from the
* metadata. Then based on the list of incrementalTables, we can classify this inputPath. * metadata. Then based on the list of incrementalTables, we can classify this inputPath.
@@ -95,19 +99,17 @@ public class InputPathHandler {
// We already know the base path for this inputPath. // We already know the base path for this inputPath.
basePathKnown = true; basePathKnown = true;
// Check if this is for a snapshot query // Check if this is for a snapshot query
String tableName = metaClient.getTableConfig().getTableName(); tagAsIncrementalOrSnapshot(inputPath, metaClient, incrementalTables);
tagAsIncrementalOrSnapshot(inputPath, tableName, metaClient, incrementalTables);
break; break;
} }
} }
if (!basePathKnown) { if (!basePathKnown) {
// This path is for a table that we dont know about yet. // This path is for a table that we don't know about yet.
HoodieTableMetaClient metaClient; HoodieTableMetaClient metaClient;
try { try {
metaClient = getTableMetaClientForBasePath(inputPath.getFileSystem(conf), inputPath); metaClient = getTableMetaClientForBasePath(inputPath.getFileSystem(conf), inputPath);
String tableName = metaClient.getTableConfig().getTableName(); tableMetaClientMap.put(getIncrementalTable(metaClient), metaClient);
tableMetaClientMap.put(tableName, metaClient); tagAsIncrementalOrSnapshot(inputPath, metaClient, incrementalTables);
tagAsIncrementalOrSnapshot(inputPath, tableName, metaClient, incrementalTables);
} catch (TableNotFoundException | InvalidTableException e) { } catch (TableNotFoundException | InvalidTableException e) {
// This is a non Hoodie inputPath // This is a non Hoodie inputPath
LOG.info("Handling a non-hoodie path " + inputPath); LOG.info("Handling a non-hoodie path " + inputPath);
@@ -117,9 +119,8 @@ public class InputPathHandler {
} }
} }
private void tagAsIncrementalOrSnapshot(Path inputPath, String tableName, private void tagAsIncrementalOrSnapshot(Path inputPath, HoodieTableMetaClient metaClient, List<String> incrementalTables) {
HoodieTableMetaClient metaClient, List<String> incrementalTables) { if (!incrementalTables.contains(getIncrementalTable(metaClient))) {
if (!incrementalTables.contains(tableName)) {
snapshotPaths.add(inputPath); snapshotPaths.add(inputPath);
} else { } else {
// Group incremental Paths belonging to same table. // Group incremental Paths belonging to same table.
@@ -145,4 +146,11 @@ public class InputPathHandler {
public List<Path> getNonHoodieInputPaths() { public List<Path> getNonHoodieInputPaths() {
return nonHoodieInputPaths; return nonHoodieInputPaths;
} }
private String getIncrementalTable(HoodieTableMetaClient metaClient) {
String databaseName = metaClient.getTableConfig().getDatabaseName();
String tableName = metaClient.getTableConfig().getTableName();
return isIncrementalUseDatabase && !StringUtils.isNullOrEmpty(databaseName)
? databaseName + "." + tableName : tableName;
}
} }

View File

@@ -113,9 +113,8 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
*/ */
@Override @Override
protected List<FileStatus> listStatusForIncrementalMode( protected List<FileStatus> listStatusForIncrementalMode(
JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException { JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths, String incrementalTable) throws IOException {
List<FileStatus> result = new ArrayList<>(); List<FileStatus> result = new ArrayList<>();
String tableName = tableMetaClient.getTableConfig().getTableName();
Job jobContext = Job.getInstance(job); Job jobContext = Job.getInstance(job);
// step1 // step1
@@ -123,7 +122,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
if (!timeline.isPresent()) { if (!timeline.isPresent()) {
return result; return result;
} }
HoodieTimeline commitsTimelineToReturn = HoodieInputFormatUtils.getHoodieTimelineForIncrementalQuery(jobContext, tableName, timeline.get()); HoodieTimeline commitsTimelineToReturn = HoodieInputFormatUtils.getHoodieTimelineForIncrementalQuery(jobContext, incrementalTable, timeline.get());
Option<List<HoodieInstant>> commitsToCheck = Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList())); Option<List<HoodieInstant>> commitsToCheck = Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList()));
if (!commitsToCheck.isPresent()) { if (!commitsToCheck.isPresent()) {
return result; return result;

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.hadoop.utils; package org.apache.hudi.hadoop.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.CollectionUtils;
@@ -43,6 +44,7 @@ public class HoodieHiveUtils {
public static final Logger LOG = LogManager.getLogger(HoodieHiveUtils.class); public static final Logger LOG = LogManager.getLogger(HoodieHiveUtils.class);
public static final String HOODIE_INCREMENTAL_USE_DATABASE = "hoodie.incremental.use.database";
public static final String HOODIE_CONSUME_MODE_PATTERN = "hoodie.%s.consume.mode"; public static final String HOODIE_CONSUME_MODE_PATTERN = "hoodie.%s.consume.mode";
public static final String HOODIE_START_COMMIT_PATTERN = "hoodie.%s.consume.start.timestamp"; public static final String HOODIE_START_COMMIT_PATTERN = "hoodie.%s.consume.start.timestamp";
public static final String HOODIE_MAX_COMMIT_PATTERN = "hoodie.%s.consume.max.commits"; public static final String HOODIE_MAX_COMMIT_PATTERN = "hoodie.%s.consume.max.commits";
@@ -178,4 +180,8 @@ public class HoodieHiveUtils {
} }
return timeline.findInstantsBeforeOrEquals(maxCommit); return timeline.findInstantsBeforeOrEquals(maxCommit);
} }
public static boolean isIncrementalUseDatabase(Configuration conf) {
return conf.getBoolean(HOODIE_INCREMENTAL_USE_DATABASE, false);
}
} }

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -232,9 +233,90 @@ public class TestHoodieHFileInputFormat {
InputFormatTestUtil.setupIncremental(jobConf, "100", 1); InputFormatTestUtil.setupIncremental(jobConf, "100", 1);
HoodieTableMetaClient metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(),
HoodieTableType.COPY_ON_WRITE, baseFileFormat);
assertEquals(null, metaClient.getTableConfig().getDatabaseName(),
"When hoodie.database.name is not set, it should default to null");
FileStatus[] files = inputFormat.listStatus(jobConf); FileStatus[] files = inputFormat.listStatus(jobConf);
assertEquals(0, files.length, assertEquals(0, files.length,
"We should exclude commit 100 when returning incremental pull with start commit time as 100"); "We should exclude commit 100 when returning incremental pull with start commit time as 100");
InputFormatTestUtil.setupIncremental(jobConf, "100", 1, true);
files = inputFormat.listStatus(jobConf);
assertEquals(0, files.length,
"We should exclude commit 100 when returning incremental pull with start commit time as 100");
metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE,
baseFileFormat, HoodieTestUtils.HOODIE_DATABASE);
assertEquals(HoodieTestUtils.HOODIE_DATABASE, metaClient.getTableConfig().getDatabaseName(),
String.format("The hoodie.database.name should be %s ", HoodieTestUtils.HOODIE_DATABASE));
files = inputFormat.listStatus(jobConf);
assertEquals(10, files.length,
"When hoodie.incremental.use.database is true and hoodie.database.name is not null or empty"
+ " and the incremental database name is not set, then the incremental query will not take effect");
}
@Test
public void testIncrementalWithDatabaseName() throws IOException {
// initial commit
File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
createCommitFile(basePath, "100", "2016/05/01");
// Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
InputFormatTestUtil.setupIncremental(jobConf, "100", 1, HoodieTestUtils.HOODIE_DATABASE, true);
HoodieTableMetaClient metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(),
HoodieTableType.COPY_ON_WRITE, baseFileFormat);
assertEquals(null, metaClient.getTableConfig().getDatabaseName(),
"When hoodie.database.name is not set, it should default to null");
FileStatus[] files = inputFormat.listStatus(jobConf);
assertEquals(10, files.length,
"When hoodie.database.name is null, then the incremental query will not take effect");
metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE,
baseFileFormat, "");
assertEquals("", metaClient.getTableConfig().getDatabaseName(),
"The hoodie.database.name should be empty");
files = inputFormat.listStatus(jobConf);
assertEquals(10, files.length,
"When hoodie.database.name is empty, then the incremental query will not take effect");
metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE,
baseFileFormat, HoodieTestUtils.HOODIE_DATABASE);
assertEquals(HoodieTestUtils.HOODIE_DATABASE, metaClient.getTableConfig().getDatabaseName(),
String.format("The hoodie.database.name should be %s ", HoodieTestUtils.HOODIE_DATABASE));
files = inputFormat.listStatus(jobConf);
assertEquals(0, files.length,
"We should exclude commit 100 when returning incremental pull with start commit time as 100");
InputFormatTestUtil.setupIncremental(jobConf, "100", 1, HoodieTestUtils.HOODIE_DATABASE, false);
files = inputFormat.listStatus(jobConf);
assertEquals(10, files.length,
"When hoodie.incremental.use.database is false and the incremental database name is set,"
+ "then the incremental query will not take effect");
// The configuration with and without database name exists together
InputFormatTestUtil.setupIncremental(jobConf, "1", 1, true);
files = inputFormat.listStatus(jobConf);
assertEquals(0, files.length,
"When hoodie.incremental.use.database is true, "
+ "We should exclude commit 100 because the returning incremental pull with start commit time is 100");
InputFormatTestUtil.setupIncremental(jobConf, "1", 1, false);
files = inputFormat.listStatus(jobConf);
assertEquals(10, files.length,
"When hoodie.incremental.use.database is false, "
+ "We should include commit 100 because the returning incremental pull with start commit time is 1");
} }
private void createCommitFile(java.nio.file.Path basePath, String commitNumber, String partitionPath) private void createCommitFile(java.nio.file.Path basePath, String commitNumber, String partitionPath)
@@ -316,7 +398,7 @@ public class TestHoodieHFileInputFormat {
ensureFilesInCommit("Pulling all commits from 100, should get us the 1 files from 200 commit", files, "200", 1); ensureFilesInCommit("Pulling all commits from 100, should get us the 1 files from 200 commit", files, "200", 1);
} }
// TODO enable this after enabling predicate pushdown // TODO enable this after enabling predicate push down
public void testPredicatePushDown() throws IOException { public void testPredicatePushDown() throws IOException {
// initial commit // initial commit
Schema schema = getSchemaFromResource(TestHoodieHFileInputFormat.class, "/sample1.avsc"); Schema schema = getSchemaFromResource(TestHoodieHFileInputFormat.class, "/sample1.avsc");
@@ -337,7 +419,7 @@ public class TestHoodieHFileInputFormat {
// check whether we have 2 records at this point // check whether we have 2 records at this point
ensureRecordsInCommit("We need to have 2 records that was modified at commit " + commit2 + " and no more", commit2, ensureRecordsInCommit("We need to have 2 records that was modified at commit " + commit2 + " and no more", commit2,
2, 2); 2, 2);
// Make sure we have the 10 records if we roll back the stattime // Make sure we have the 10 records if we roll back the start time
InputFormatTestUtil.setupIncremental(jobConf, "0", 2); InputFormatTestUtil.setupIncremental(jobConf, "0", 2);
ensureRecordsInCommit("We need to have 8 records that was modified at commit " + commit1 + " and no more", commit1, ensureRecordsInCommit("We need to have 8 records that was modified at commit " + commit1 + " and no more", commit1,
8, 10); 8, 10);
@@ -347,19 +429,19 @@ public class TestHoodieHFileInputFormat {
@Test @Test
public void testGetIncrementalTableNames() throws IOException { public void testGetIncrementalTableNames() throws IOException {
String[] expectedincrTables = {"db1.raw_trips", "db2.model_trips", "db3.model_trips"}; String[] expectedIncrTables = {"db1.raw_trips", "db2.model_trips", "db3.model_trips"};
JobConf conf = new JobConf(); JobConf conf = new JobConf();
String incrementalMode1 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[0]); String incrementalMode1 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedIncrTables[0]);
conf.set(incrementalMode1, HoodieHiveUtils.INCREMENTAL_SCAN_MODE); conf.set(incrementalMode1, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
String incrementalMode2 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[1]); String incrementalMode2 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedIncrTables[1]);
conf.set(incrementalMode2,HoodieHiveUtils.INCREMENTAL_SCAN_MODE); conf.set(incrementalMode2,HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
String incrementalMode3 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.model_trips"); String incrementalMode3 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.model_trips");
conf.set(incrementalMode3, HoodieHiveUtils.INCREMENTAL_SCAN_MODE.toLowerCase()); conf.set(incrementalMode3, HoodieHiveUtils.INCREMENTAL_SCAN_MODE.toLowerCase());
String defaultmode = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips"); String defaultMode = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips");
conf.set(defaultmode, HoodieHiveUtils.DEFAULT_SCAN_MODE); conf.set(defaultMode, HoodieHiveUtils.DEFAULT_SCAN_MODE);
List<String> actualincrTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(conf)); List<String> actualIncrTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(conf));
for (String expectedincrTable : expectedincrTables) { for (String expectedIncrTable : expectedIncrTables) {
assertTrue(actualincrTables.contains(expectedincrTable)); assertTrue(actualIncrTables.contains(expectedIncrTable));
} }
} }

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -286,9 +287,90 @@ public class TestHoodieParquetInputFormat {
InputFormatTestUtil.setupIncremental(jobConf, "100", 1); InputFormatTestUtil.setupIncremental(jobConf, "100", 1);
HoodieTableMetaClient metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(),
HoodieTableType.COPY_ON_WRITE, baseFileFormat);
assertEquals(null, metaClient.getTableConfig().getDatabaseName(),
"When hoodie.database.name is not set, it should default to null");
FileStatus[] files = inputFormat.listStatus(jobConf); FileStatus[] files = inputFormat.listStatus(jobConf);
assertEquals(0, files.length, assertEquals(0, files.length,
"We should exclude commit 100 when returning incremental pull with start commit time as 100"); "We should exclude commit 100 when returning incremental pull with start commit time as 100");
InputFormatTestUtil.setupIncremental(jobConf, "100", 1, true);
files = inputFormat.listStatus(jobConf);
assertEquals(0, files.length,
"We should exclude commit 100 when returning incremental pull with start commit time as 100");
metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE,
baseFileFormat, HoodieTestUtils.HOODIE_DATABASE);
assertEquals(HoodieTestUtils.HOODIE_DATABASE, metaClient.getTableConfig().getDatabaseName(),
String.format("The hoodie.database.name should be %s ", HoodieTestUtils.HOODIE_DATABASE));
files = inputFormat.listStatus(jobConf);
assertEquals(10, files.length,
"When hoodie.incremental.use.database is true and hoodie.database.name is not null or empty"
+ " and the incremental database name is not set, then the incremental query will not take effect");
}
@Test
public void testIncrementalWithDatabaseName() throws IOException {
// initial commit
File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
createCommitFile(basePath, "100", "2016/05/01");
// Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
InputFormatTestUtil.setupIncremental(jobConf, "100", 1, HoodieTestUtils.HOODIE_DATABASE, true);
HoodieTableMetaClient metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(),
HoodieTableType.COPY_ON_WRITE, baseFileFormat);
assertEquals(null, metaClient.getTableConfig().getDatabaseName(),
"When hoodie.database.name is not set, it should default to null");
FileStatus[] files = inputFormat.listStatus(jobConf);
assertEquals(10, files.length,
"When hoodie.database.name is null, then the incremental query will not take effect");
metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE,
baseFileFormat, "");
assertEquals("", metaClient.getTableConfig().getDatabaseName(),
"The hoodie.database.name should be empty");
files = inputFormat.listStatus(jobConf);
assertEquals(10, files.length,
"When hoodie.database.name is empty, then the incremental query will not take effect");
metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE,
baseFileFormat, HoodieTestUtils.HOODIE_DATABASE);
assertEquals(HoodieTestUtils.HOODIE_DATABASE, metaClient.getTableConfig().getDatabaseName(),
String.format("The hoodie.database.name should be %s ", HoodieTestUtils.HOODIE_DATABASE));
files = inputFormat.listStatus(jobConf);
assertEquals(0, files.length,
"We should exclude commit 100 when returning incremental pull with start commit time as 100");
InputFormatTestUtil.setupIncremental(jobConf, "100", 1, HoodieTestUtils.HOODIE_DATABASE, false);
files = inputFormat.listStatus(jobConf);
assertEquals(10, files.length,
"When hoodie.incremental.use.database is false and the incremental database name is set, "
+ "then the incremental query will not take effect");
// The configuration with and without database name exists together
InputFormatTestUtil.setupIncremental(jobConf, "1", 1, true);
files = inputFormat.listStatus(jobConf);
assertEquals(0, files.length,
"When hoodie.incremental.use.database is true, "
+ "We should exclude commit 100 because the returning incremental pull with start commit time is 100");
InputFormatTestUtil.setupIncremental(jobConf, "1", 1, false);
files = inputFormat.listStatus(jobConf);
assertEquals(10, files.length,
"When hoodie.incremental.use.database is false, "
+ "We should include commit 100 because the returning incremental pull with start commit time is 1");
} }
@Test @Test
@@ -429,7 +511,7 @@ public class TestHoodieParquetInputFormat {
ensureFilesInCommit("Pulling all commits from 100, should get us the 1 files from 200 commit", files, "200", 1); ensureFilesInCommit("Pulling all commits from 100, should get us the 1 files from 200 commit", files, "200", 1);
} }
@Disabled("enable this after enabling predicate pushdown") @Disabled("enable this after enabling predicate push down")
@Test @Test
public void testPredicatePushDown() throws IOException { public void testPredicatePushDown() throws IOException {
// initial commit // initial commit
@@ -451,7 +533,7 @@ public class TestHoodieParquetInputFormat {
// check whether we have 2 records at this point // check whether we have 2 records at this point
ensureRecordsInCommit("We need to have 2 records that was modified at commit " + commit2 + " and no more", commit2, ensureRecordsInCommit("We need to have 2 records that was modified at commit " + commit2 + " and no more", commit2,
2, 2); 2, 2);
// Make sure we have the 10 records if we roll back the stattime // Make sure we have the 10 records if we roll back the start time
InputFormatTestUtil.setupIncremental(jobConf, "0", 2); InputFormatTestUtil.setupIncremental(jobConf, "0", 2);
ensureRecordsInCommit("We need to have 8 records that was modified at commit " + commit1 + " and no more", commit1, ensureRecordsInCommit("We need to have 8 records that was modified at commit " + commit1 + " and no more", commit1,
8, 10); 8, 10);
@@ -461,19 +543,19 @@ public class TestHoodieParquetInputFormat {
@Test @Test
public void testGetIncrementalTableNames() throws IOException { public void testGetIncrementalTableNames() throws IOException {
String[] expectedincrTables = {"db1.raw_trips", "db2.model_trips", "db3.model_trips"}; String[] expectedIncrTables = {"db1.raw_trips", "db2.model_trips", "db3.model_trips"};
JobConf conf = new JobConf(); JobConf conf = new JobConf();
String incrementalMode1 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[0]); String incrementalMode1 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedIncrTables[0]);
conf.set(incrementalMode1, HoodieHiveUtils.INCREMENTAL_SCAN_MODE); conf.set(incrementalMode1, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
String incrementalMode2 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[1]); String incrementalMode2 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedIncrTables[1]);
conf.set(incrementalMode2, HoodieHiveUtils.INCREMENTAL_SCAN_MODE); conf.set(incrementalMode2, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
String incrementalMode3 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.model_trips"); String incrementalMode3 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.model_trips");
conf.set(incrementalMode3, HoodieHiveUtils.INCREMENTAL_SCAN_MODE.toLowerCase()); conf.set(incrementalMode3, HoodieHiveUtils.INCREMENTAL_SCAN_MODE.toLowerCase());
String defaultmode = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips"); String defaultMode = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips");
conf.set(defaultmode, HoodieHiveUtils.DEFAULT_SCAN_MODE); conf.set(defaultMode, HoodieHiveUtils.DEFAULT_SCAN_MODE);
List<String> actualincrTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(conf)); List<String> actualIncrTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(conf));
for (String expectedincrTable : expectedincrTables) { for (String expectedIncrTable : expectedIncrTables) {
assertTrue(actualincrTables.contains(expectedincrTable)); assertTrue(actualIncrTables.contains(expectedIncrTable));
} }
} }

View File

@@ -130,6 +130,10 @@ public class InputFormatTestUtil {
} }
public static void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull) { public static void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull) {
setupIncremental(jobConf, startCommit, numberOfCommitsToPull, false);
}
public static void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull, boolean isIncrementalUseDatabase) {
String modePropertyName = String modePropertyName =
String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.set(modePropertyName, HoodieHiveUtils.INCREMENTAL_SCAN_MODE); jobConf.set(modePropertyName, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
@@ -141,8 +145,26 @@ public class InputFormatTestUtil {
String maxCommitPulls = String maxCommitPulls =
String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.setInt(maxCommitPulls, numberOfCommitsToPull); jobConf.setInt(maxCommitPulls, numberOfCommitsToPull);
jobConf.setBoolean(HoodieHiveUtils.HOODIE_INCREMENTAL_USE_DATABASE, isIncrementalUseDatabase);
} }
public static void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull, String databaseName, boolean isIncrementalUseDatabase) {
String modePropertyName =
String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, databaseName + "." + HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.set(modePropertyName, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
String startCommitTimestampName =
String.format(HoodieHiveUtils.HOODIE_START_COMMIT_PATTERN, databaseName + "." + HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.set(startCommitTimestampName, startCommit);
String maxCommitPulls =
String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, databaseName + "." + HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.setInt(maxCommitPulls, numberOfCommitsToPull);
jobConf.setBoolean(HoodieHiveUtils.HOODIE_INCREMENTAL_USE_DATABASE, isIncrementalUseDatabase);
}
public static void setupSnapshotIncludePendingCommits(JobConf jobConf, String instantTime) { public static void setupSnapshotIncludePendingCommits(JobConf jobConf, String instantTime) {
setupSnapshotScanMode(jobConf, true); setupSnapshotScanMode(jobConf, true);
String validateTimestampName = String validateTimestampName =

View File

@@ -85,6 +85,7 @@ object HoodieSparkSqlWriter {
validateTableConfig(sqlContext.sparkSession, optParams, tableConfig) validateTableConfig(sqlContext.sparkSession, optParams, tableConfig)
val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig) val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig)
val databaseName = hoodieConfig.getStringOrDefault(HoodieTableConfig.DATABASE_NAME, "")
val tblName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, val tblName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME,
s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.").trim s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.").trim
assert(!StringUtils.isNullOrEmpty(hoodieConfig.getString(HoodieWriteConfig.TBL_NAME)), assert(!StringUtils.isNullOrEmpty(hoodieConfig.getString(HoodieWriteConfig.TBL_NAME)),
@@ -131,6 +132,7 @@ object HoodieSparkSqlWriter {
val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder() val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder()
.setTableType(tableType) .setTableType(tableType)
.setDatabaseName(databaseName)
.setTableName(tblName) .setTableName(tblName)
.setRecordKeyFields(recordKeyFields) .setRecordKeyFields(recordKeyFields)
.setBaseFileFormat(baseFileFormat) .setBaseFileFormat(baseFileFormat)

View File

@@ -21,8 +21,7 @@ import org.apache.hudi.AvroConversionUtils
import org.apache.hudi.HoodieWriterUtils._ import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.common.config.DFSPropertiesConfiguration import org.apache.hudi.common.config.DFSPropertiesConfiguration
import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.ValidationUtils import org.apache.hudi.common.util.ValidationUtils
import org.apache.hudi.keygen.ComplexKeyGenerator import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
@@ -184,8 +183,10 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
} else { } else {
val (recordName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(table.identifier.table) val (recordName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(table.identifier.table)
val schema = SchemaConverters.toAvroType(finalSchema, false, recordName, namespace) val schema = SchemaConverters.toAvroType(finalSchema, false, recordName, namespace)
val hoodieDatabaseName = formatName(spark, table.identifier.database.getOrElse(spark.sessionState.catalog.getCurrentDatabase))
HoodieTableMetaClient.withPropertyBuilder() HoodieTableMetaClient.withPropertyBuilder()
.fromProperties(properties) .fromProperties(properties)
.setDatabaseName(hoodieDatabaseName)
.setTableName(table.identifier.table) .setTableName(table.identifier.table)
.setTableCreateSchema(schema.toString()) .setTableCreateSchema(schema.toString())
.setPartitionFields(table.partitionColumnNames.mkString(",")) .setPartitionFields(table.partitionColumnNames.mkString(","))

View File

@@ -202,11 +202,11 @@ object HoodieOptionConfig {
s"Can't find primaryKey `$primaryKey` in ${schema.treeString}.") s"Can't find primaryKey `$primaryKey` in ${schema.treeString}.")
} }
// validate precombine key // validate preCombine key
val precombineKey = sqlOptions.get(SQL_KEY_PRECOMBINE_FIELD.sqlKeyName) val preCombineKey = sqlOptions.get(SQL_KEY_PRECOMBINE_FIELD.sqlKeyName)
if (precombineKey.isDefined && precombineKey.get.nonEmpty) { if (preCombineKey.isDefined && preCombineKey.get.nonEmpty) {
ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, precombineKey.get)), ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, preCombineKey.get)),
s"Can't find preCombineKey `${precombineKey.get}` in ${schema.treeString}.") s"Can't find preCombineKey `${preCombineKey.get}` in ${schema.treeString}.")
} }
// validate table type // validate table type

View File

@@ -123,12 +123,12 @@ object CreateHoodieTableCommand {
table.storage.compressed, table.storage.compressed,
storageProperties + ("path" -> path)) storageProperties + ("path" -> path))
val tablName = HoodieSqlCommonUtils.formatName(sparkSession, table.identifier.table) val tableName = HoodieSqlCommonUtils.formatName(sparkSession, table.identifier.table)
val newDatabaseName = HoodieSqlCommonUtils.formatName(sparkSession, table.identifier.database val newDatabaseName = HoodieSqlCommonUtils.formatName(sparkSession, table.identifier.database
.getOrElse(catalog.getCurrentDatabase)) .getOrElse(catalog.getCurrentDatabase))
val newTableIdentifier = table.identifier val newTableIdentifier = table.identifier
.copy(table = tablName, database = Some(newDatabaseName)) .copy(table = tableName, database = Some(newDatabaseName))
val partitionColumnNames = hoodieCatalogTable.partitionSchema.map(_.name) val partitionColumnNames = hoodieCatalogTable.partitionSchema.map(_.name)
// append pk, preCombineKey, type to the properties of table // append pk, preCombineKey, type to the properties of table

View File

@@ -50,7 +50,7 @@ class TruncateHoodieTableCommand(
} }
// If we have not specified the partition, truncate will delete all the data in the table path // If we have not specified the partition, truncate will delete all the data in the table path
// include the hoodi.properties. In this case we should reInit the table. // include the hoodie.properties. In this case we should reInit the table.
if (partitionSpec.isEmpty) { if (partitionSpec.isEmpty) {
val hadoopConf = sparkSession.sessionState.newHadoopConf() val hadoopConf = sparkSession.sessionState.newHadoopConf()
// ReInit hoodie.properties // ReInit hoodie.properties

View File

@@ -115,7 +115,7 @@ class ExpressionPayload(record: GenericRecord,
if (targetRecord.isEmpty || needUpdatingPersistedRecord(targetRecord.get, resultRecord, properties)) { if (targetRecord.isEmpty || needUpdatingPersistedRecord(targetRecord.get, resultRecord, properties)) {
resultRecordOpt = HOption.of(resultRecord) resultRecordOpt = HOption.of(resultRecord)
} else { } else {
// if the PreCombine field value of targetRecord is greate // if the PreCombine field value of targetRecord is greater
// than the new incoming record, just keep the old record value. // than the new incoming record, just keep the old record value.
resultRecordOpt = HOption.of(targetRecord.get) resultRecordOpt = HOption.of(targetRecord.get)
} }
@@ -270,7 +270,7 @@ class ExpressionPayload(record: GenericRecord,
object ExpressionPayload { object ExpressionPayload {
/** /**
* Property for pass the merge-into delete clause condition expresssion. * Property for pass the merge-into delete clause condition expression.
*/ */
val PAYLOAD_DELETE_CONDITION = "hoodie.payload.delete.condition" val PAYLOAD_DELETE_CONDITION = "hoodie.payload.delete.condition"

View File

@@ -146,7 +146,7 @@ object InsertIntoHoodieTableCommand extends Logging {
queryOutputWithoutMetaFields queryOutputWithoutMetaFields
} }
// Align for the data fields of the query // Align for the data fields of the query
val dataProjectsWithputMetaFields = queryDataFieldsWithoutMetaFields.zip( val dataProjectsWithoutMetaFields = queryDataFieldsWithoutMetaFields.zip(
hoodieCatalogTable.dataSchemaWithoutMetaFields.fields).map { case (dataAttr, targetField) => hoodieCatalogTable.dataSchemaWithoutMetaFields.fields).map { case (dataAttr, targetField) =>
val castAttr = castIfNeeded(dataAttr.withNullability(targetField.nullable), val castAttr = castIfNeeded(dataAttr.withNullability(targetField.nullable),
targetField.dataType, conf) targetField.dataType, conf)
@@ -171,7 +171,7 @@ object InsertIntoHoodieTableCommand extends Logging {
Alias(castAttr, f.name)() Alias(castAttr, f.name)()
}) })
} }
val alignedProjects = dataProjectsWithputMetaFields ++ partitionProjects val alignedProjects = dataProjectsWithoutMetaFields ++ partitionProjects
Project(alignedProjects, query) Project(alignedProjects, query)
} }
@@ -217,7 +217,7 @@ object InsertIntoHoodieTableCommand extends Logging {
DataSourceWriteOptions.SQL_INSERT_MODE.defaultValue())) DataSourceWriteOptions.SQL_INSERT_MODE.defaultValue()))
val isNonStrictMode = insertMode == InsertMode.NON_STRICT val isNonStrictMode = insertMode == InsertMode.NON_STRICT
val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty
val hasPrecombineColumn = preCombineColumn.nonEmpty val hasPreCombineColumn = preCombineColumn.nonEmpty
val operation = val operation =
(enableBulkInsert, isOverwrite, dropDuplicate, isNonStrictMode, isPartitionedTable) match { (enableBulkInsert, isOverwrite, dropDuplicate, isNonStrictMode, isPartitionedTable) match {
case (true, _, _, false, _) => case (true, _, _, false, _) =>
@@ -234,7 +234,7 @@ object InsertIntoHoodieTableCommand extends Logging {
// insert overwrite partition // insert overwrite partition
case (_, true, _, _, true) => INSERT_OVERWRITE_OPERATION_OPT_VAL case (_, true, _, _, true) => INSERT_OVERWRITE_OPERATION_OPT_VAL
// disable dropDuplicate, and provide preCombineKey, use the upsert operation for strict and upsert mode. // disable dropDuplicate, and provide preCombineKey, use the upsert operation for strict and upsert mode.
case (false, false, false, false, _) if hasPrecombineColumn => UPSERT_OPERATION_OPT_VAL case (false, false, false, false, _) if hasPreCombineColumn => UPSERT_OPERATION_OPT_VAL
// if table is pk table and has enableBulkInsert use bulk insert for non-strict mode. // if table is pk table and has enableBulkInsert use bulk insert for non-strict mode.
case (true, _, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL case (true, _, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL
// for the rest case, use the insert operation // for the rest case, use the insert operation
@@ -267,7 +267,7 @@ object InsertIntoHoodieTableCommand extends Logging {
PARTITIONPATH_FIELD.key -> partitionFields, PARTITIONPATH_FIELD.key -> partitionFields,
PAYLOAD_CLASS_NAME.key -> payloadClassName, PAYLOAD_CLASS_NAME.key -> payloadClassName,
ENABLE_ROW_WRITER.key -> enableBulkInsert.toString, ENABLE_ROW_WRITER.key -> enableBulkInsert.toString,
HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn), HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPreCombineColumn),
META_SYNC_ENABLED.key -> enableHive.toString, META_SYNC_ENABLED.key -> enableHive.toString,
HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(), HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
HIVE_USE_JDBC.key -> "false", HIVE_USE_JDBC.key -> "false",

View File

@@ -450,7 +450,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
"path" -> path, "path" -> path,
RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp, RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp,
PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""), PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""),
TBL_NAME.key -> targetTableName, TBL_NAME.key -> hoodieCatalogTable.tableName,
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp, PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName, PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName,
HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,

View File

@@ -33,6 +33,10 @@ import scala.collection.JavaConverters._
class TestCreateTable extends TestHoodieSqlBase { class TestCreateTable extends TestHoodieSqlBase {
test("Test Create Managed Hoodie Table") { test("Test Create Managed Hoodie Table") {
val databaseName = "hudi_database"
spark.sql(s"create database if not exists $databaseName")
spark.sql(s"use $databaseName")
val tableName = generateTableName val tableName = generateTableName
// Create a managed table // Create a managed table
spark.sql( spark.sql(
@@ -60,6 +64,14 @@ class TestCreateTable extends TestHoodieSqlBase {
StructField("price", DoubleType), StructField("price", DoubleType),
StructField("ts", LongType)) StructField("ts", LongType))
)(table.schema.fields) )(table.schema.fields)
val tablePath = table.storage.properties("path")
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(tablePath)
.setConf(spark.sessionState.newHadoopConf())
.build()
val tableConfig = metaClient.getTableConfig
assertResult(databaseName)(tableConfig.getDatabaseName)
} }
test("Test Create Hoodie Table With Options") { test("Test Create Hoodie Table With Options") {
@@ -88,7 +100,7 @@ class TestCreateTable extends TestHoodieSqlBase {
assertResult(CatalogTableType.MANAGED)(table.tableType) assertResult(CatalogTableType.MANAGED)(table.tableType)
assertResult( assertResult(
HoodieRecord.HOODIE_META_COLUMNS.asScala.map(StructField(_, StringType)) HoodieRecord.HOODIE_META_COLUMNS.asScala.map(StructField(_, StringType))
++ Seq( ++ Seq(
StructField("id", IntegerType), StructField("id", IntegerType),
StructField("name", StringType), StructField("name", StringType),
StructField("price", DoubleType), StructField("price", DoubleType),
@@ -192,7 +204,7 @@ class TestCreateTable extends TestHoodieSqlBase {
} }
test("Test Table Column Validate") { test("Test Table Column Validate") {
withTempDir {tmp => withTempDir { tmp =>
val tableName = generateTableName val tableName = generateTableName
assertThrows[IllegalArgumentException] { assertThrows[IllegalArgumentException] {
spark.sql( spark.sql(
@@ -277,7 +289,7 @@ class TestCreateTable extends TestHoodieSqlBase {
| select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt | select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt
""".stripMargin """.stripMargin
) )
checkAnswer(s"select id, name, price, dt from $tableName2") ( checkAnswer(s"select id, name, price, dt from $tableName2")(
Seq(1, "a1", 10, "2021-04-01") Seq(1, "a1", 10, "2021-04-01")
) )
@@ -360,6 +372,10 @@ class TestCreateTable extends TestHoodieSqlBase {
test("Test Create Table From Existing Hoodie Table") { test("Test Create Table From Existing Hoodie Table") {
withTempDir { tmp => withTempDir { tmp =>
val databaseName = "hudi_database"
spark.sql(s"create database if not exists $databaseName")
spark.sql(s"use $databaseName")
Seq("2021-08-02", "2021/08/02").foreach { partitionValue => Seq("2021-08-02", "2021/08/02").foreach { partitionValue =>
val tableName = generateTableName val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName" val tablePath = s"${tmp.getCanonicalPath}/$tableName"
@@ -367,7 +383,7 @@ class TestCreateTable extends TestHoodieSqlBase {
val df = Seq((1, "a1", 10, 1000, partitionValue)).toDF("id", "name", "value", "ts", "dt") val df = Seq((1, "a1", 10, 1000, partitionValue)).toDF("id", "name", "value", "ts", "dt")
// Write a table by spark dataframe. // Write a table by spark dataframe.
df.write.format("hudi") df.write.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key, tableName) .option(HoodieWriteConfig.TBL_NAME.key, s"original_$tableName")
.option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL) .option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL)
.option(RECORDKEY_FIELD.key, "id") .option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "ts") .option(PRECOMBINE_FIELD.key, "ts")
@@ -386,7 +402,7 @@ class TestCreateTable extends TestHoodieSqlBase {
|partitioned by (dt) |partitioned by (dt)
|location '$tablePath' |location '$tablePath'
|""".stripMargin |""".stripMargin
) ("It is not allowed to specify partition columns when the table schema is not defined.") )("It is not allowed to specify partition columns when the table schema is not defined.")
spark.sql( spark.sql(
s""" s"""
@@ -405,6 +421,8 @@ class TestCreateTable extends TestHoodieSqlBase {
assertResult(true)(properties.contains(HoodieTableConfig.CREATE_SCHEMA.key)) assertResult(true)(properties.contains(HoodieTableConfig.CREATE_SCHEMA.key))
assertResult("dt")(properties(HoodieTableConfig.PARTITION_FIELDS.key)) assertResult("dt")(properties(HoodieTableConfig.PARTITION_FIELDS.key))
assertResult("ts")(properties(HoodieTableConfig.PRECOMBINE_FIELD.key)) assertResult("ts")(properties(HoodieTableConfig.PRECOMBINE_FIELD.key))
assertResult("")(metaClient.getTableConfig.getDatabaseName)
assertResult(s"original_$tableName")(metaClient.getTableConfig.getTableName)
// Test insert into // Test insert into
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, '$partitionValue')") spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, '$partitionValue')")
@@ -512,7 +530,7 @@ class TestCreateTable extends TestHoodieSqlBase {
} }
test("Test Create Table From Existing Hoodie Table For None Partitioned Table") { test("Test Create Table From Existing Hoodie Table For None Partitioned Table") {
withTempDir{tmp => withTempDir { tmp =>
// Write a table by spark dataframe. // Write a table by spark dataframe.
val tableName = generateTableName val tableName = generateTableName
import spark.implicits._ import spark.implicits._