diff --git a/LICENSE b/LICENSE
index dc4edaaf3..1e2174731 100644
--- a/LICENSE
+++ b/LICENSE
@@ -206,6 +206,7 @@
This product includes code from Apache Hive.
* org.apache.hadoop.hive.ql.io.CombineHiveInputFormat copied to org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat
+* org.apache.hadoop.hive.serde2.ColumnProjectionUtils copied and modified to org.apache.hudi.hadoop.HoodieColumnProjectionUtils
Copyright: 2011-2019 The Apache Software Foundation
Home page: http://hive.apache.org/
diff --git a/docker/demo/compaction.commands b/docker/demo/compaction.commands
index 9bb8eb82a..6abdad743 100644
--- a/docker/demo/compaction.commands
+++ b/docker/demo/compaction.commands
@@ -19,4 +19,7 @@ connect --path /user/hive/warehouse/stock_ticks_mor
compactions show all
compaction schedule --hoodieConfigs hoodie.compact.inline.max.delta.commits=1
compaction run --parallelism 2 --sparkMemory 1G --schemaFilePath /var/demo/config/schema.avsc --retry 1
-
+connect --path /user/hive/warehouse/stock_ticks_mor_bs
+compactions show all
+compaction schedule --hoodieConfigs hoodie.compact.inline.max.delta.commits=1
+compaction run --parallelism 2 --sparkMemory 1G --schemaFilePath /var/demo/config/schema.avsc --retry 1
diff --git a/docker/demo/hive-batch1.commands b/docker/demo/hive-batch1.commands
index 93bf3b679..021c6d55b 100644
--- a/docker/demo/hive-batch1.commands
+++ b/docker/demo/hive-batch1.commands
@@ -25,4 +25,12 @@ select symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GO
select symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';
select symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';
+select symbol, max(ts) from stock_ticks_cow_bs group by symbol HAVING symbol = 'GOOG';
+select symbol, max(ts) from stock_ticks_mor_bs_ro group by symbol HAVING symbol = 'GOOG';
+select symbol, max(ts) from stock_ticks_mor_bs_rt group by symbol HAVING symbol = 'GOOG';
+
+select symbol, ts, volume, open, close from stock_ticks_cow_bs where symbol = 'GOOG';
+select symbol, ts, volume, open, close from stock_ticks_mor_bs_ro where symbol = 'GOOG';
+select symbol, ts, volume, open, close from stock_ticks_mor_bs_rt where symbol = 'GOOG';
+
!quit
diff --git a/docker/demo/hive-batch2-after-compaction.commands b/docker/demo/hive-batch2-after-compaction.commands
index 6b087019d..06582a309 100644
--- a/docker/demo/hive-batch2-after-compaction.commands
+++ b/docker/demo/hive-batch2-after-compaction.commands
@@ -23,4 +23,10 @@ select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = '
select symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';
select symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';
+select symbol, max(ts) from stock_ticks_mor_bs_ro group by symbol HAVING symbol = 'GOOG';
+select symbol, max(ts) from stock_ticks_mor_bs_rt group by symbol HAVING symbol = 'GOOG';
+
+select symbol, ts, volume, open, close from stock_ticks_mor_bs_ro where symbol = 'GOOG';
+select symbol, ts, volume, open, close from stock_ticks_mor_bs_rt where symbol = 'GOOG';
+
!quit
diff --git a/docker/demo/hive-incremental-cow.commands b/docker/demo/hive-incremental-cow.commands
index 7f4354807..702b2afa5 100644
--- a/docker/demo/hive-incremental-cow.commands
+++ b/docker/demo/hive-incremental-cow.commands
@@ -23,5 +23,11 @@ set hoodie.stock_ticks_cow.consume.start.timestamp='${min.commit.time}';
select symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG' and `_hoodie_commit_time` > '${min.commit.time}';
+set hoodie.stock_ticks_cow_bs.consume.mode=INCREMENTAL;
+set hoodie.stock_ticks_cow_bs.consume.max.commits=3;
+set hoodie.stock_ticks_cow_bs.consume.start.timestamp='00000000000001';
+
+select symbol, ts, volume, open, close from stock_ticks_cow_bs where symbol = 'GOOG' and `_hoodie_commit_time` > '00000000000001';
+
!quit
diff --git a/docker/demo/hive-incremental-mor-ro.commands b/docker/demo/hive-incremental-mor-ro.commands
index 8b97c0aac..51683c010 100644
--- a/docker/demo/hive-incremental-mor-ro.commands
+++ b/docker/demo/hive-incremental-mor-ro.commands
@@ -23,5 +23,11 @@ set hoodie.stock_ticks_mor.consume.start.timestamp='${min.commit.time}';
select symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG' and `_hoodie_commit_time` > '${min.commit.time}';
+set hoodie.stock_ticks_mor_bs.consume.mode=INCREMENTAL;
+set hoodie.stock_ticks_mor_bs.consume.max.commits=3;
+set hoodie.stock_ticks_mor_bs.consume.start.timestamp='00000000000001';
+
+select symbol, ts, volume, open, close from stock_ticks_mor_bs_ro where symbol = 'GOOG' and `_hoodie_commit_time` > '00000000000001';
+
!quit
diff --git a/docker/demo/hive-incremental-mor-rt.commands b/docker/demo/hive-incremental-mor-rt.commands
index a81fb77e0..c29fc7ce5 100644
--- a/docker/demo/hive-incremental-mor-rt.commands
+++ b/docker/demo/hive-incremental-mor-rt.commands
@@ -23,5 +23,11 @@ set hoodie.stock_ticks_mor.consume.start.timestamp='${min.commit.time}';
select symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG' and `_hoodie_commit_time` > '${min.commit.time}';
+set hoodie.stock_ticks_mor_bs.consume.mode=INCREMENTAL;
+set hoodie.stock_ticks_mor_bs.consume.max.commits=3;
+set hoodie.stock_ticks_mor_bs.consume.start.timestamp='00000000000001';
+
+select symbol, ts, volume, open, close from stock_ticks_mor_bs_rt where symbol = 'GOOG' and `_hoodie_commit_time` > '00000000000001';
+
!quit
diff --git a/docker/demo/sparksql-batch1.commands b/docker/demo/sparksql-batch1.commands
index 727aa1633..4de2486c6 100644
--- a/docker/demo/sparksql-batch1.commands
+++ b/docker/demo/sparksql-batch1.commands
@@ -27,4 +27,14 @@ spark.sql("select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG'").show(100, false)
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false)
+// Bootstrapped Copy-On-Write table
+spark.sql("select symbol, max(ts) from stock_ticks_cow_bs group by symbol HAVING symbol = 'GOOG'").show(100, false)
+spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow_bs where symbol = 'GOOG'").show(100, false)
+
+// Bootstrapped Merge-On-Read table
+spark.sql("select symbol, max(ts) from stock_ticks_mor_bs_ro group by symbol HAVING symbol = 'GOOG'").show(100, false)
+spark.sql("select symbol, max(ts) from stock_ticks_mor_bs_rt group by symbol HAVING symbol = 'GOOG'").show(100, false)
+spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_bs_ro where symbol = 'GOOG'").show(100, false)
+spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_bs_rt where symbol = 'GOOG'").show(100, false)
+
System.exit(0)
diff --git a/docker/demo/sparksql-batch2.commands b/docker/demo/sparksql-batch2.commands
index 391e11b97..739d991db 100644
--- a/docker/demo/sparksql-batch2.commands
+++ b/docker/demo/sparksql-batch2.commands
@@ -26,4 +26,14 @@ spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from s
spark.sql("select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false)
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false)
+ // Copy-On-Write Bootstrapped table
+spark.sql("select symbol, max(ts) from stock_ticks_cow_bs group by symbol HAVING symbol = 'GOOG'").show(100, false)
+spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow_bs where symbol = 'GOOG'").show(100, false)
+
+// Merge-On-Read table Bootstrapped Table
+spark.sql("select symbol, max(ts) from stock_ticks_mor_bs_ro group by symbol HAVING symbol = 'GOOG'").show(100, false)
+spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_bs_ro where symbol = 'GOOG'").show(100, false)
+spark.sql("select symbol, max(ts) from stock_ticks_mor_bs_rt group by symbol HAVING symbol = 'GOOG'").show(100, false)
+spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_bs_rt where symbol = 'GOOG'").show(100, false)
+
System.exit(0)
diff --git a/docker/demo/sparksql-bootstrap-prep-source.commands b/docker/demo/sparksql-bootstrap-prep-source.commands
new file mode 100644
index 000000000..23db3e4d3
--- /dev/null
+++ b/docker/demo/sparksql-bootstrap-prep-source.commands
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.spark.sql.functions.col
+
+val df = spark.read.format("org.apache.hudi").load("/user/hive/warehouse/stock_ticks_cow/*/*/*").drop("_hoodie_commit_time", "_hoodie_record_key", "_hoodie_file_name", "_hoodie_commit_seqno", "_hoodie_partition_path")
+df.write.format("parquet").save("/user/hive/warehouse/stock_ticks_cow_bs_src/2018/08/31/")
+System.exit(0)
diff --git a/docker/demo/sparksql-incremental.commands b/docker/demo/sparksql-incremental.commands
index 8e3e153e2..febfcd28a 100644
--- a/docker/demo/sparksql-incremental.commands
+++ b/docker/demo/sparksql-incremental.commands
@@ -52,8 +52,38 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
mode(SaveMode.Overwrite).
save("/user/hive/warehouse/stock_ticks_derived_mor");
-spark.sql("show tables").show(20, false)
spark.sql("select count(*) from stock_ticks_derived_mor_ro").show(20, false)
spark.sql("select count(*) from stock_ticks_derived_mor_rt").show(20, false)
-System.exit(0);
\ No newline at end of file
+val hoodieIncQueryBsDF = spark.read.format("org.apache.hudi").
+ option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).
+ option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "00000000000001").
+ load("/user/hive/warehouse/stock_ticks_cow_bs");
+hoodieIncQueryBsDF.registerTempTable("stock_ticks_cow_bs_incr")
+spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow_bs_incr where symbol = 'GOOG'").show(100, false);
+
+spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, close from stock_ticks_cow_bs_incr").
+ write.format("org.apache.hudi").
+ option("hoodie.insert.shuffle.parallelism", "2").
+ option("hoodie.upsert.shuffle.parallelism","2").
+ option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
+ option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).
+ option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "key").
+ option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr").
+ option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts").
+ option(HoodieWriteConfig.TABLE_NAME, "stock_ticks_derived_mor_bs").
+ option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "stock_ticks_derived_mor_bs").
+ option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "default").
+ option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://hiveserver:10000").
+ option(DataSourceWriteOptions.HIVE_USER_OPT_KEY, "hive").
+ option(DataSourceWriteOptions.HIVE_PASS_OPT_KEY, "hive").
+ option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true").
+ option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "datestr").
+ mode(SaveMode.Overwrite).
+ save("/user/hive/warehouse/stock_ticks_derived_mor_bs");
+
+spark.sql("show tables").show(20, false)
+spark.sql("select count(*) from stock_ticks_derived_mor_bs_ro").show(20, false)
+spark.sql("select count(*) from stock_ticks_derived_mor_bs_rt").show(20, false)
+
+System.exit(0);
diff --git a/hudi-cli/hudi-cli.sh b/hudi-cli/hudi-cli.sh
index b6e708c14..bbfba85a8 100755
--- a/hudi-cli/hudi-cli.sh
+++ b/hudi-cli/hudi-cli.sh
@@ -25,4 +25,7 @@ if [ -z "$CLIENT_JAR" ]; then
echo "Client jar location not set, please set it in conf/hudi-env.sh"
fi
-java -cp ${HADOOP_CONF_DIR}:${SPARK_CONF_DIR}:$DIR/target/lib/*:$HOODIE_JAR:${CLIENT_JAR} -DSPARK_CONF_DIR=${SPARK_CONF_DIR} -DHADOOP_CONF_DIR=${HADOOP_CONF_DIR} org.springframework.shell.Bootstrap $@
+OTHER_JARS=`ls ${DIR}/target/lib/* | grep -v 'hudi-[^/]*jar' | tr '\n' ':'`
+
+echo "Running : java -cp ${HADOOP_CONF_DIR}:${SPARK_CONF_DIR}:${HOODIE_JAR}:${OTHER_JARS}:${CLIENT_JAR} -DSPARK_CONF_DIR=${SPARK_CONF_DIR} -DHADOOP_CONF_DIR=${HADOOP_CONF_DIR} org.springframework.shell.Bootstrap $@"
+java -cp ${HADOOP_CONF_DIR}:${SPARK_CONF_DIR}:${HOODIE_JAR}:${OTHER_JARS}:${CLIENT_JAR} -DSPARK_CONF_DIR=${SPARK_CONF_DIR} -DHADOOP_CONF_DIR=${HADOOP_CONF_DIR} org.springframework.shell.Bootstrap $@
diff --git a/hudi-cli/pom.xml b/hudi-cli/pom.xml
index 388e4145a..ec15cce76 100644
--- a/hudi-cli/pom.xml
+++ b/hudi-cli/pom.xml
@@ -139,26 +139,6 @@
-
- org.apache.hudi
- hudi-client
- ${project.version}
-
-
- org.apache.hudi
- hudi-common
- ${project.version}
-
-
- org.apache.hudi
- hudi-hive-sync
- ${project.version}
-
-
- org.apache.hudi
- hudi-utilities_${scala.binary.version}
- ${project.version}
-
org.apache.hudi
hudi-common
@@ -198,6 +178,12 @@
+
+ org.apache.hudi
+ hudi-utilities-bundle_${scala.binary.version}
+ ${project.version}
+
+
log4j
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
index 9b55fe26e..ffbf70e12 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
@@ -213,7 +213,7 @@ public class CompactionCommand implements CommandMarker {
if (exitCode != 0) {
return "Failed to run compaction for " + compactionInstantTime;
}
- return "Compaction successfully completed for " + compactionInstantTime;
+ return "Attempted to schedule compaction for " + compactionInstantTime;
}
@CliCommand(value = "compaction run", help = "Run Compaction for given instant time")
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java
index 2e32515cf..ef76ee4e2 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java
@@ -22,6 +22,7 @@ import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.TableHeader;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -239,7 +240,7 @@ public class FileSystemViewCommand implements CommandMarker {
new HoodieTableMetaClient(client.getHadoopConf(), client.getBasePath(), true);
FileSystem fs = HoodieCLI.fs;
String globPath = String.format("%s/%s/*", client.getBasePath(), globRegex);
- FileStatus[] statuses = fs.globStatus(new Path(globPath));
+ List statuses = FSUtils.getGlobStatusExcludingMetaFolder(fs, new Path(globPath));
Stream instantsStream;
HoodieTimeline timeline;
@@ -269,6 +270,6 @@ public class FileSystemViewCommand implements CommandMarker {
HoodieTimeline filteredTimeline = new HoodieDefaultTimeline(instantsStream,
(Function> & Serializable) metaClient.getActiveTimeline()::getInstantDetails);
- return new HoodieTableFileSystemView(metaClient, filteredTimeline, statuses);
+ return new HoodieTableFileSystemView(metaClient, filteredTimeline, statuses.toArray(new FileStatus[0]));
}
}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index a5fe4fe53..f8e82ae61 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -22,6 +22,7 @@ import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.TableHeader;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -53,7 +54,6 @@ import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -82,7 +82,7 @@ public class HoodieLogFileCommand implements CommandMarker {
throws IOException {
FileSystem fs = HoodieCLI.getTableMetaClient().getFs();
- List logFilePaths = Arrays.stream(fs.globStatus(new Path(logFilePathPattern)))
+ List logFilePaths = FSUtils.getGlobStatusExcludingMetaFolder(fs, new Path(logFilePathPattern)).stream()
.map(status -> status.getPath().toString()).collect(Collectors.toList());
Map, Map>, Integer>>> commitCountAndMetadata =
new HashMap<>();
@@ -175,7 +175,7 @@ public class HoodieLogFileCommand implements CommandMarker {
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
FileSystem fs = client.getFs();
- List logFilePaths = Arrays.stream(fs.globStatus(new Path(logFilePathPattern)))
+ List logFilePaths = FSUtils.getGlobStatusExcludingMetaFolder(fs, new Path(logFilePathPattern)).stream()
.map(status -> status.getPath().toString()).sorted(Comparator.reverseOrder())
.collect(Collectors.toList());
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
index 72cf6c02d..66c556310 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
@@ -118,7 +118,7 @@ public class StatsCommand implements CommandMarker {
FileSystem fs = HoodieCLI.fs;
String globPath = String.format("%s/%s/*", HoodieCLI.getTableMetaClient().getBasePath(), globRegex);
- FileStatus[] statuses = fs.globStatus(new Path(globPath));
+ List statuses = FSUtils.getGlobStatusExcludingMetaFolder(fs, new Path(globPath));
// max, min, #small files < 10MB, 50th, avg, 95th
Histogram globalHistogram = new Histogram(new UniformReservoir(MAX_FILES));
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
index 78ae35e19..9c947e4d4 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
@@ -88,8 +88,7 @@ public class TableCommand implements CommandMarker {
@CliOption(key = {"archiveLogFolder"}, help = "Folder Name for storing archived timeline") String archiveFolder,
@CliOption(key = {"layoutVersion"}, help = "Specific Layout Version to use") Integer layoutVersion,
@CliOption(key = {"payloadClass"}, unspecifiedDefaultValue = "org.apache.hudi.common.model.HoodieAvroPayload",
- help = "Payload Class") final String payloadClass)
- throws IOException {
+ help = "Payload Class") final String payloadClass) throws IOException {
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
index 4c7ce8819..7d5cee693 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
@@ -62,6 +62,7 @@ public class TestArchivedCommitsCommand extends AbstractShellIntegrationTest {
// Create table and connect
String tableName = "test_table";
tablePath = basePath + File.separator + tableName;
+
new TableCommand().createTable(
tablePath, tableName,
"COPY_ON_WRITE", "", 1, "org.apache.hudi.common.model.HoodieAvroPayload");
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index 9782b46b6..2a1520a77 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -149,6 +149,35 @@ public class HoodieWriteClient extends AbstractHo
return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown());
}
+ /**
+ * Main API to run bootstrap to hudi.
+ */
+ public void bootstrap(Option> extraMetadata) {
+ if (rollbackPending) {
+ rollBackInflightBootstrap();
+ }
+ HoodieTable table = getTableAndInitCtx(WriteOperationType.UPSERT);
+ table.bootstrap(jsc, extraMetadata);
+ }
+
+ /**
+ * Main API to rollback pending bootstrap.
+ */
+ protected void rollBackInflightBootstrap() {
+ LOG.info("Rolling back pending bootstrap if present");
+ HoodieTable table = HoodieTable.create(config, hadoopConf);
+ HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
+ Option instant = Option.fromJavaOptional(
+ inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).findFirst());
+ if (instant.isPresent() && HoodieTimeline.compareTimestamps(instant.get(), HoodieTimeline.LESSER_THAN_OR_EQUALS,
+ HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
+ LOG.info("Found pending bootstrap instants. Rolling them back");
+ table.rollbackBootstrap(jsc, HoodieActiveTimeline.createNewInstantTime());
+ LOG.info("Finished rolling back pending bootstrap");
+ }
+
+ }
+
/**
* Upsert a batch of new records into Hoodie table at the supplied instantTime.
*
@@ -671,7 +700,13 @@ public class HoodieWriteClient extends AbstractHo
List commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
for (String commit : commits) {
- rollback(commit);
+ if (HoodieTimeline.compareTimestamps(commit, HoodieTimeline.LESSER_THAN_OR_EQUALS,
+ HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
+ rollBackInflightBootstrap();
+ break;
+ } else {
+ rollback(commit);
+ }
}
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/BootstrapMode.java b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/BootstrapMode.java
new file mode 100644
index 000000000..769f3c25c
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/BootstrapMode.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.bootstrap;
+
+/**
+ * Identifies different types of bootstrap.
+ */
+public enum BootstrapMode {
+ /**
+ * In this mode, record level metadata is generated for each source record and both original record and metadata
+ * for each record copied.
+ */
+ FULL_RECORD,
+
+ /**
+ * In this mode, record level metadata alone is generated for each source record and stored in new bootstrap location.
+ */
+ METADATA_ONLY
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/BootstrapRecordPayload.java b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/BootstrapRecordPayload.java
new file mode 100644
index 000000000..fa508e42f
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/BootstrapRecordPayload.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.bootstrap;
+
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+public class BootstrapRecordPayload implements HoodieRecordPayload {
+
+ private final GenericRecord record;
+
+ public BootstrapRecordPayload(GenericRecord record) {
+ this.record = record;
+ }
+
+ @Override
+ public BootstrapRecordPayload preCombine(BootstrapRecordPayload another) {
+ return this;
+ }
+
+ @Override
+ public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) {
+ return Option.ofNullable(record);
+ }
+
+ @Override
+ public Option getInsertValue(Schema schema) {
+ return Option.ofNullable(record);
+ }
+
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/BootstrapSchemaProvider.java b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/BootstrapSchemaProvider.java
new file mode 100644
index 000000000..61e29c2cf
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/BootstrapSchemaProvider.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.bootstrap;
+
+import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.common.bootstrap.FileStatusUtils;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
+
+import java.util.List;
+
+/**
+ * Bootstrap Schema Provider. Schema provided in config is used. If not available, use schema from Parquet
+ */
+public class BootstrapSchemaProvider {
+
+ protected final HoodieWriteConfig writeConfig;
+
+ public BootstrapSchemaProvider(HoodieWriteConfig writeConfig) {
+ this.writeConfig = writeConfig;
+ }
+
+ /**
+ * Main API to select avro schema for bootstrapping.
+ * @param jsc Java Spark Context
+ * @param partitions List of partitions with files within them
+ * @return Avro Schema
+ */
+ public final Schema getBootstrapSchema(JavaSparkContext jsc, List>> partitions) {
+ if (writeConfig.getSchema() != null) {
+ // Use schema specified by user if set
+ return Schema.parse(writeConfig.getSchema());
+ }
+ return getBootstrapSourceSchema(jsc, partitions);
+ }
+
+ /**
+ * Select a random file to be used to generate avro schema.
+ * Override this method to get custom schema selection.
+ * @param jsc Java Spark Context
+ * @param partitions List of partitions with files within them
+ * @return Avro Schema
+ */
+ protected Schema getBootstrapSourceSchema(JavaSparkContext jsc,
+ List>> partitions) {
+ return partitions.stream().flatMap(p -> p.getValue().stream())
+ .map(fs -> {
+ try {
+ Path filePath = FileStatusUtils.toPath(fs.getPath());
+ return ParquetUtils.readAvroSchema(jsc.hadoopConfiguration(), filePath);
+ } catch (Exception ex) {
+ return null;
+ }
+ }).filter(x -> x != null).findAny().get();
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/BootstrapWriteStatus.java b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/BootstrapWriteStatus.java
new file mode 100644
index 000000000..29d0646a4
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/BootstrapWriteStatus.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.bootstrap;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.BootstrapFileMapping;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.collection.Pair;
+
+/**
+ * WriteStatus for Bootstrap.
+ */
+public class BootstrapWriteStatus extends WriteStatus {
+
+ private BootstrapFileMapping sourceFileMapping;
+
+ public BootstrapWriteStatus(Boolean trackSuccessRecords, Double failureFraction) {
+ super(trackSuccessRecords, failureFraction);
+ }
+
+ public BootstrapFileMapping getBootstrapSourceFileMapping() {
+ return sourceFileMapping;
+ }
+
+ public Pair getBootstrapSourceAndWriteStat() {
+ return Pair.of(getBootstrapSourceFileMapping(), getStat());
+ }
+
+ public void setBootstrapSourceFileMapping(BootstrapFileMapping sourceFileMapping) {
+ this.sourceFileMapping = sourceFileMapping;
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/FullRecordBootstrapDataProvider.java b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/FullRecordBootstrapDataProvider.java
new file mode 100644
index 000000000..8b077ad9f
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/FullRecordBootstrapDataProvider.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.bootstrap;
+
+import java.io.Serializable;
+import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.List;
+
+/**
+ * Creates RDD of Hoodie Records with complete record data, given a list of partitions to be bootstrapped.
+ */
+public abstract class FullRecordBootstrapDataProvider implements Serializable {
+
+ protected static final Logger LOG = LogManager.getLogger(FullRecordBootstrapDataProvider.class);
+
+ protected final TypedProperties props;
+ protected final transient JavaSparkContext jsc;
+
+ public FullRecordBootstrapDataProvider(TypedProperties props, JavaSparkContext jsc) {
+ this.props = props;
+ this.jsc = jsc;
+ }
+
+ /**
+ * Generates a list of input partition and files and returns a RDD representing source.
+ * @param tableName Hudi Table Name
+ * @param sourceBasePath Source Base Path
+ * @param partitionPaths Partition Paths
+ * @return JavaRDD of input records
+ */
+ public abstract JavaRDD generateInputRecordRDD(String tableName,
+ String sourceBasePath, List>> partitionPaths);
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/selector/BootstrapModeSelector.java b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/selector/BootstrapModeSelector.java
new file mode 100644
index 000000000..4e098c6b3
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/selector/BootstrapModeSelector.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.bootstrap.selector;
+
+import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.client.bootstrap.BootstrapMode;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Pluggable Partition Selector for selecting partitions to perform full or metadata-only bootstrapping.
+ */
+public abstract class BootstrapModeSelector implements Serializable {
+
+ protected final HoodieWriteConfig writeConfig;
+
+ public BootstrapModeSelector(HoodieWriteConfig writeConfig) {
+ this.writeConfig = writeConfig;
+ }
+
+ /**
+ * Classify partitions for the purpose of bootstrapping. For a non-partitioned source, input list will be one entry.
+ *
+ * @param partitions List of partitions with files present in each partitions
+ * @return a partitions grouped by bootstrap mode
+ */
+ public abstract Map> select(List>> partitions);
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/selector/BootstrapRegexModeSelector.java b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/selector/BootstrapRegexModeSelector.java
new file mode 100644
index 000000000..43fae7000
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/selector/BootstrapRegexModeSelector.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.bootstrap.selector;
+
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.client.bootstrap.BootstrapMode;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+public class BootstrapRegexModeSelector extends BootstrapModeSelector {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LogManager.getLogger(BootstrapRegexModeSelector.class);
+
+ private final Pattern pattern;
+ private final BootstrapMode bootstrapModeOnMatch;
+ private final BootstrapMode defaultMode;
+
+ public BootstrapRegexModeSelector(HoodieWriteConfig writeConfig) {
+ super(writeConfig);
+ this.pattern = Pattern.compile(writeConfig.getBootstrapModeSelectorRegex());
+ this.bootstrapModeOnMatch = writeConfig.getBootstrapModeForRegexMatch();
+ this.defaultMode = BootstrapMode.FULL_RECORD.equals(bootstrapModeOnMatch)
+ ? BootstrapMode.METADATA_ONLY : BootstrapMode.FULL_RECORD;
+ LOG.info("Default Mode :" + defaultMode + ", on Match Mode :" + bootstrapModeOnMatch);
+ }
+
+ @Override
+ public Map> select(List>> partitions) {
+ return partitions.stream()
+ .map(p -> Pair.of(pattern.matcher(p.getKey()).matches() ? bootstrapModeOnMatch : defaultMode, p.getKey()))
+ .collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toList())));
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/selector/FullRecordBootstrapModeSelector.java b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/selector/FullRecordBootstrapModeSelector.java
new file mode 100644
index 000000000..18e5b38a1
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/selector/FullRecordBootstrapModeSelector.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.bootstrap.selector;
+
+import org.apache.hudi.client.bootstrap.BootstrapMode;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+public class FullRecordBootstrapModeSelector extends UniformBootstrapModeSelector {
+
+ public FullRecordBootstrapModeSelector(HoodieWriteConfig bootstrapConfig) {
+ super(bootstrapConfig, BootstrapMode.FULL_RECORD);
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/selector/MetadataOnlyBootstrapModeSelector.java b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/selector/MetadataOnlyBootstrapModeSelector.java
new file mode 100644
index 000000000..5de95e442
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/selector/MetadataOnlyBootstrapModeSelector.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.bootstrap.selector;
+
+import org.apache.hudi.client.bootstrap.BootstrapMode;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+public class MetadataOnlyBootstrapModeSelector extends UniformBootstrapModeSelector {
+
+ public MetadataOnlyBootstrapModeSelector(HoodieWriteConfig bootstrapConfig) {
+ super(bootstrapConfig, BootstrapMode.METADATA_ONLY);
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/selector/UniformBootstrapModeSelector.java b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/selector/UniformBootstrapModeSelector.java
new file mode 100644
index 000000000..e2784c755
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/selector/UniformBootstrapModeSelector.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.bootstrap.selector;
+
+import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.client.bootstrap.BootstrapMode;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A bootstrap selector which employs same bootstrap mode for all partitions.
+ */
+public abstract class UniformBootstrapModeSelector extends BootstrapModeSelector {
+
+ private final BootstrapMode bootstrapMode;
+
+ public UniformBootstrapModeSelector(HoodieWriteConfig bootstrapConfig, BootstrapMode bootstrapMode) {
+ super(bootstrapConfig);
+ this.bootstrapMode = bootstrapMode;
+ }
+
+ @Override
+ public Map> select(List>> partitions) {
+ return partitions.stream().map(p -> Pair.of(bootstrapMode, p))
+ .collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(x -> x.getValue().getKey(),
+ Collectors.toList())));
+ }
+}
\ No newline at end of file
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/translator/BootstrapPartitionPathTranslator.java b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/translator/BootstrapPartitionPathTranslator.java
new file mode 100644
index 000000000..6f46e8e76
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/translator/BootstrapPartitionPathTranslator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.bootstrap.translator;
+
+import java.io.Serializable;
+import org.apache.hudi.common.config.TypedProperties;
+
+public abstract class BootstrapPartitionPathTranslator implements Serializable {
+
+ private final TypedProperties properties;
+
+ public BootstrapPartitionPathTranslator(TypedProperties properties) {
+ this.properties = properties;
+ }
+
+ /**
+ * Given a bootstrap partition path, translated partition path.
+ *
+ * @param bootStrapPartitionPath bootstrap Partition Path
+ * @return Translated Path
+ */
+ public abstract String getBootstrapTranslatedPath(String bootStrapPartitionPath);
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/translator/IdentityBootstrapPartitionPathTranslator.java b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/translator/IdentityBootstrapPartitionPathTranslator.java
new file mode 100644
index 000000000..a433eb568
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/client/bootstrap/translator/IdentityBootstrapPartitionPathTranslator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.bootstrap.translator;
+
+import org.apache.hudi.common.config.TypedProperties;
+
+/**
+ * Return same path as bootstrap partition path.
+ */
+public class IdentityBootstrapPartitionPathTranslator extends BootstrapPartitionPathTranslator {
+
+ public IdentityBootstrapPartitionPathTranslator(TypedProperties properties) {
+ super(properties);
+ }
+
+ @Override
+ public String getBootstrapTranslatedPath(String bootStrapPartitionPath) {
+ return bootStrapPartitionPath;
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/utils/MergingIterator.java b/hudi-client/src/main/java/org/apache/hudi/client/utils/MergingIterator.java
new file mode 100644
index 000000000..47dde723e
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/client/utils/MergingIterator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import java.util.Iterator;
+import java.util.function.Function;
+import org.apache.avro.generic.GenericRecord;
+
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+public class MergingIterator implements Iterator {
+
+ private final Iterator leftIterator;
+ private final Iterator rightIterator;
+ private final Function, T> mergeFunction;
+
+ public MergingIterator(Iterator leftIterator, Iterator rightIterator, Function, T> mergeFunction) {
+ this.leftIterator = leftIterator;
+ this.rightIterator = rightIterator;
+ this.mergeFunction = mergeFunction;
+ }
+
+ @Override
+ public boolean hasNext() {
+ boolean leftHasNext = leftIterator.hasNext();
+ boolean rightHasNext = rightIterator.hasNext();
+ ValidationUtils.checkArgument(leftHasNext == rightHasNext);
+ return leftHasNext;
+ }
+
+ @Override
+ public T next() {
+ return mergeFunction.apply(Pair.of(leftIterator.next(), rightIterator.next()));
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
new file mode 100644
index 000000000..ebfaed0d3
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.client.bootstrap.BootstrapMode;
+import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector;
+import org.apache.hudi.client.bootstrap.translator.IdentityBootstrapPartitionPathTranslator;
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Bootstrap specific configs.
+ */
+public class HoodieBootstrapConfig extends DefaultHoodieConfig {
+
+ public static final String BOOTSTRAP_BASE_PATH_PROP = "hoodie.bootstrap.base.path";
+ public static final String BOOTSTRAP_MODE_SELECTOR = "hoodie.bootstrap.mode.selector";
+ public static final String FULL_BOOTSTRAP_INPUT_PROVIDER = "hoodie.bootstrap.full.input.provider";
+ public static final String BOOTSTRAP_KEYGEN_CLASS = "hoodie.bootstrap.keygen.class";
+ public static final String BOOTSTRAP_PARTITION_PATH_TRANSLATOR_CLASS =
+ "hoodie.bootstrap.partitionpath.translator.class";
+ public static final String DEFAULT_BOOTSTRAP_PARTITION_PATH_TRANSLATOR_CLASS =
+ IdentityBootstrapPartitionPathTranslator.class.getName();
+
+ public static final String BOOTSTRAP_PARALLELISM = "hoodie.bootstrap.parallelism";
+ public static final String DEFAULT_BOOTSTRAP_PARALLELISM = "1500";
+
+ // Used By BootstrapRegexModeSelector class. When a partition path matches the regex, the corresponding
+ // mode will be used. Otherwise, the alternative mode will be used.
+ public static final String BOOTSTRAP_MODE_SELECTOR_REGEX = "hoodie.bootstrap.mode.selector.regex";
+ public static final String BOOTSTRAP_MODE_SELECTOR_REGEX_MODE = "hoodie.bootstrap.mode.selector.regex.mode";
+ public static final String DEFAULT_BOOTSTRAP_MODE_SELECTOR_REGEX = ".*";
+ public static final String DEFAULT_BOOTSTRAP_MODE_SELECTOR_REGEX_MODE = BootstrapMode.METADATA_ONLY.name();
+
+ public HoodieBootstrapConfig(Properties props) {
+ super(props);
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private final Properties props = new Properties();
+
+ public Builder fromFile(File propertiesFile) throws IOException {
+ try (FileReader reader = new FileReader(propertiesFile)) {
+ this.props.load(reader);
+ return this;
+ }
+ }
+
+ public Builder withBootstrapBasePath(String basePath) {
+ props.setProperty(BOOTSTRAP_BASE_PATH_PROP, basePath);
+ return this;
+ }
+
+ public Builder withBootstrapModeSelector(String partitionSelectorClass) {
+ props.setProperty(BOOTSTRAP_MODE_SELECTOR, partitionSelectorClass);
+ return this;
+ }
+
+ public Builder withFullBootstrapInputProvider(String partitionSelectorClass) {
+ props.setProperty(FULL_BOOTSTRAP_INPUT_PROVIDER, partitionSelectorClass);
+ return this;
+ }
+
+ public Builder withBootstrapKeyGenClass(String keyGenClass) {
+ props.setProperty(BOOTSTRAP_KEYGEN_CLASS, keyGenClass);
+ return this;
+ }
+
+ public Builder withBootstrapPartitionPathTranslatorClass(String partitionPathTranslatorClass) {
+ props.setProperty(BOOTSTRAP_PARTITION_PATH_TRANSLATOR_CLASS, partitionPathTranslatorClass);
+ return this;
+ }
+
+ public Builder withBootstrapParallelism(int parallelism) {
+ props.setProperty(BOOTSTRAP_PARALLELISM, String.valueOf(parallelism));
+ return this;
+ }
+
+ public Builder withBootstrapModeSelectorRegex(String regex) {
+ props.setProperty(BOOTSTRAP_MODE_SELECTOR_REGEX, regex);
+ return this;
+ }
+
+ public Builder withBootstrapModeForRegexMatch(BootstrapMode modeForRegexMatch) {
+ props.setProperty(BOOTSTRAP_MODE_SELECTOR_REGEX_MODE, modeForRegexMatch.name());
+ return this;
+ }
+
+ public Builder fromProperties(Properties props) {
+ this.props.putAll(props);
+ return this;
+ }
+
+ public HoodieBootstrapConfig build() {
+ HoodieBootstrapConfig config = new HoodieBootstrapConfig(props);
+ setDefaultOnCondition(props, !props.containsKey(BOOTSTRAP_PARALLELISM), BOOTSTRAP_PARALLELISM,
+ DEFAULT_BOOTSTRAP_PARALLELISM);
+ setDefaultOnCondition(props, !props.containsKey(BOOTSTRAP_PARTITION_PATH_TRANSLATOR_CLASS),
+ BOOTSTRAP_PARTITION_PATH_TRANSLATOR_CLASS, DEFAULT_BOOTSTRAP_PARTITION_PATH_TRANSLATOR_CLASS);
+ setDefaultOnCondition(props, !props.containsKey(BOOTSTRAP_MODE_SELECTOR), BOOTSTRAP_MODE_SELECTOR,
+ MetadataOnlyBootstrapModeSelector.class.getCanonicalName());
+ setDefaultOnCondition(props, !props.containsKey(BOOTSTRAP_MODE_SELECTOR_REGEX), BOOTSTRAP_MODE_SELECTOR_REGEX,
+ DEFAULT_BOOTSTRAP_MODE_SELECTOR_REGEX);
+ setDefaultOnCondition(props, !props.containsKey(BOOTSTRAP_MODE_SELECTOR_REGEX_MODE),
+ BOOTSTRAP_MODE_SELECTOR_REGEX_MODE, DEFAULT_BOOTSTRAP_MODE_SELECTOR_REGEX_MODE);
+ BootstrapMode.valueOf(props.getProperty(BOOTSTRAP_MODE_SELECTOR_REGEX_MODE));
+ return config;
+ }
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 61e89e3da..affe55352 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -20,6 +20,7 @@ package org.apache.hudi.config;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.bootstrap.BootstrapMode;
import org.apache.hudi.common.config.DefaultHoodieConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
@@ -129,6 +130,9 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
"_.hoodie.allow.multi.write.on.same.instant";
public static final String DEFAULT_ALLOW_MULTI_WRITE_ON_SAME_INSTANT = "false";
+ public static final String EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION = AVRO_SCHEMA + ".externalTransformation";
+ public static final String DEFAULT_EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION = "false";
+
private ConsistencyGuardConfig consistencyGuardConfig;
// Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled
@@ -136,7 +140,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private final FileSystemViewStorageConfig clientSpecifiedViewStorageConfig;
private FileSystemViewStorageConfig viewStorageConfig;
- private HoodieWriteConfig(Properties props) {
+ protected HoodieWriteConfig(Properties props) {
super(props);
Properties newProps = new Properties();
newProps.putAll(props);
@@ -180,6 +184,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Boolean.parseBoolean(props.getProperty(HOODIE_ASSUME_DATE_PARTITIONING_PROP));
}
+ public boolean shouldUseExternalSchemaTransformation() {
+ return Boolean.parseBoolean(props.getProperty(EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION));
+ }
+
public Integer getTimelineLayoutVersion() {
return Integer.parseInt(props.getProperty(TIMELINE_LAYOUT_VERSION));
}
@@ -675,13 +683,46 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_CLASS_PROP);
}
+ public String getBootstrapSourceBasePath() {
+ return props.getProperty(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP);
+ }
+
+ public String getBootstrapModeSelectorClass() {
+ return props.getProperty(HoodieBootstrapConfig.BOOTSTRAP_MODE_SELECTOR);
+ }
+
+ public String getFullBootstrapInputProvider() {
+ return props.getProperty(HoodieBootstrapConfig.FULL_BOOTSTRAP_INPUT_PROVIDER);
+ }
+
+ public String getBootstrapKeyGeneratorClass() {
+ return props.getProperty(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS);
+ }
+
+ public String getBootstrapModeSelectorRegex() {
+ return props.getProperty(HoodieBootstrapConfig.BOOTSTRAP_MODE_SELECTOR_REGEX);
+ }
+
+ public BootstrapMode getBootstrapModeForRegexMatch() {
+ return BootstrapMode.valueOf(props.getProperty(HoodieBootstrapConfig.BOOTSTRAP_MODE_SELECTOR_REGEX_MODE));
+ }
+
+ public String getBootstrapPartitionPathTranslatorClass() {
+ return props.getProperty(HoodieBootstrapConfig.BOOTSTRAP_PARTITION_PATH_TRANSLATOR_CLASS);
+ }
+
+ public int getBootstrapParallelism() {
+ return Integer.parseInt(props.getProperty(HoodieBootstrapConfig.BOOTSTRAP_PARALLELISM));
+ }
+
public static class Builder {
- private final Properties props = new Properties();
+ protected final Properties props = new Properties();
private boolean isIndexConfigSet = false;
private boolean isStorageConfigSet = false;
private boolean isCompactionConfigSet = false;
private boolean isMetricsConfigSet = false;
+ private boolean isBootstrapConfigSet = false;
private boolean isMemoryConfigSet = false;
private boolean isViewConfigSet = false;
private boolean isConsistencyGuardSet = false;
@@ -805,6 +846,12 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
+ public Builder withBootstrapConfig(HoodieBootstrapConfig bootstrapConfig) {
+ props.putAll(bootstrapConfig.getProps());
+ isBootstrapConfigSet = true;
+ return this;
+ }
+
public Builder withAutoCommit(boolean autoCommit) {
props.setProperty(HOODIE_AUTO_COMMIT_PROP, String.valueOf(autoCommit));
return this;
@@ -863,7 +910,17 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
- public HoodieWriteConfig build() {
+ public Builder withExternalSchemaTrasformation(boolean enabled) {
+ props.setProperty(EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION, String.valueOf(enabled));
+ return this;
+ }
+
+ public Builder withProperties(Properties properties) {
+ this.props.putAll(properties);
+ return this;
+ }
+
+ protected void setDefaults() {
// Check for mandatory properties
setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), INSERT_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(BULKINSERT_PARALLELISM), BULKINSERT_PARALLELISM,
@@ -916,6 +973,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
setDefaultOnCondition(props, !isCompactionConfigSet,
HoodieCompactionConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isMetricsConfigSet, HoodieMetricsConfig.newBuilder().fromProperties(props).build());
+ setDefaultOnCondition(props, !isBootstrapConfigSet,
+ HoodieBootstrapConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isMemoryConfigSet, HoodieMemoryConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isViewConfigSet,
FileSystemViewStorageConfig.newBuilder().fromProperties(props).build());
@@ -924,15 +983,24 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
setDefaultOnCondition(props, !isCallbackConfigSet,
HoodieWriteCommitCallbackConfig.newBuilder().fromProperties(props).build());
+ setDefaultOnCondition(props, !props.containsKey(EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION),
+ EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION, DEFAULT_EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION);
setDefaultOnCondition(props, !props.containsKey(TIMELINE_LAYOUT_VERSION), TIMELINE_LAYOUT_VERSION,
String.valueOf(TimelineLayoutVersion.CURR_VERSION));
+ }
+
+ private void validate() {
String layoutVersion = props.getProperty(TIMELINE_LAYOUT_VERSION);
// Ensure Layout Version is good
new TimelineLayoutVersion(Integer.parseInt(layoutVersion));
+ Objects.requireNonNull(props.getProperty(BASE_PATH_PROP));
+ }
+ public HoodieWriteConfig build() {
+ setDefaults();
+ validate();
// Build WriteConfig at the end
HoodieWriteConfig config = new HoodieWriteConfig(props);
- Objects.requireNonNull(config.getBasePath());
return config;
}
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index e81c4add7..7a8e5abf3 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -165,7 +165,7 @@ public class HoodieAppendHandle extends HoodieWri
private Option getIndexedRecord(HoodieRecord hoodieRecord) {
Option recordMetadata = hoodieRecord.getData().getMetadata();
try {
- Option avroRecord = hoodieRecord.getData().getInsertValue(originalSchema);
+ Option avroRecord = hoodieRecord.getData().getInsertValue(writerSchema);
if (avroRecord.isPresent()) {
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
avroRecord = Option.of(rewriteRecord((GenericRecord) avroRecord.get()));
@@ -212,7 +212,7 @@ public class HoodieAppendHandle extends HoodieWri
private void doAppend(Map header) {
try {
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
- header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writerSchema.toString());
+ header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writerSchemaWithMetafields.toString());
if (recordList.size() > 0) {
writer = writer.appendBlock(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header));
recordList.clear();
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieBootstrapHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieBootstrapHandle.java
new file mode 100644
index 000000000..5deeae1d9
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieBootstrapHandle.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+/**
+ * This class is essentially same as Create Handle but overrides two things
+ * 1) Schema : Metadata bootstrap writes only metadata fields as part of write. So, setup the writer schema accordingly.
+ * 2) canWrite is overridden to always return true so that skeleton file and bootstrap file is aligned and we don't end up
+ * writing more than 1 skeleton file for the same bootstrap file.
+ * @param HoodieRecordPayload
+ */
+public class HoodieBootstrapHandle extends HoodieCreateHandle {
+
+ public HoodieBootstrapHandle(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable,
+ String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) {
+ super(config, commitTime, hoodieTable, partitionPath, fileId,
+ Pair.of(HoodieAvroUtils.RECORD_KEY_SCHEMA,
+ HoodieAvroUtils.addMetadataFields(HoodieAvroUtils.RECORD_KEY_SCHEMA)), sparkTaskContextSupplier);
+ }
+
+ @Override
+ public boolean canWrite(HoodieRecord record) {
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index dfa63b079..705e98d94 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -18,6 +18,7 @@
package org.apache.hudi.io;
+import org.apache.avro.Schema;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.fs.FSUtils;
@@ -28,6 +29,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.io.storage.HoodieFileWriter;
@@ -56,8 +58,16 @@ public class HoodieCreateHandle extends HoodieWri
private boolean useWriterSchema = false;
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable,
- String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) {
- super(config, instantTime, partitionPath, fileId, hoodieTable, sparkTaskContextSupplier);
+ String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) {
+ this(config, instantTime, hoodieTable, partitionPath, fileId, getWriterSchemaIncludingAndExcludingMetadataPair(config),
+ sparkTaskContextSupplier);
+ }
+
+ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable,
+ String partitionPath, String fileId, Pair writerSchemaIncludingAndExcludingMetadataPair,
+ SparkTaskContextSupplier sparkTaskContextSupplier) {
+ super(config, instantTime, partitionPath, fileId, hoodieTable, writerSchemaIncludingAndExcludingMetadataPair,
+ sparkTaskContextSupplier);
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
@@ -68,8 +78,7 @@ public class HoodieCreateHandle extends HoodieWri
new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
partitionMetadata.trySave(getPartitionId());
createMarkerFile(partitionPath, FSUtils.makeDataFileName(this.instantTime, this.writeToken, this.fileId, hoodieTable.getBaseFileExtension()));
- this.fileWriter =
- HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, writerSchema, this.sparkTaskContextSupplier);
+ this.fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, writerSchemaWithMetafields, this.sparkTaskContextSupplier);
} catch (IOException e) {
throw new HoodieInsertException("Failed to initialize HoodieStorageWriter for path " + path, e);
}
@@ -132,9 +141,9 @@ public class HoodieCreateHandle extends HoodieWri
while (recordIterator.hasNext()) {
HoodieRecord record = recordIterator.next();
if (useWriterSchema) {
- write(record, record.getData().getInsertValue(writerSchema));
+ write(record, record.getData().getInsertValue(writerSchemaWithMetafields));
} else {
- write(record, record.getData().getInsertValue(originalSchema));
+ write(record, record.getData().getInsertValue(writerSchema));
}
}
} catch (IOException io) {
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index e87cf3c01..f0ea284ea 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -67,6 +67,7 @@ public class HoodieMergeHandle extends HoodieWrit
private long updatedRecordsWritten = 0;
private long insertRecordsWritten = 0;
private boolean useWriterSchema;
+ private HoodieBaseFile baseFileToMerge;
public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable,
Iterator> recordItr, String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) {
@@ -88,6 +89,10 @@ public class HoodieMergeHandle extends HoodieWrit
}
@Override
+ public Schema getWriterSchemaWithMetafields() {
+ return writerSchemaWithMetafields;
+ }
+
public Schema getWriterSchema() {
return writerSchema;
}
@@ -95,12 +100,13 @@ public class HoodieMergeHandle extends HoodieWrit
/**
* Extract old file path, initialize StorageWriter and WriteStatus.
*/
- private void init(String fileId, String partitionPath, HoodieBaseFile dataFileToBeMerged) {
+ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileToMerge) {
LOG.info("partitionPath:" + partitionPath + ", fileId to be merged:" + fileId);
+ this.baseFileToMerge = baseFileToMerge;
this.writtenRecordKeys = new HashSet<>();
writeStatus.setStat(new HoodieWriteStat());
try {
- String latestValidFilePath = dataFileToBeMerged.getFileName();
+ String latestValidFilePath = baseFileToMerge.getFileName();
writeStatus.getStat().setPrevCommit(FSUtils.getCommitTime(latestValidFilePath));
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, instantTime,
@@ -126,8 +132,7 @@ public class HoodieMergeHandle extends HoodieWrit
createMarkerFile(partitionPath, newFileName);
// Create the writer for writing the new version file
- fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, writerSchema, sparkTaskContextSupplier);
-
+ fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, writerSchemaWithMetafields, sparkTaskContextSupplier);
} catch (IOException io) {
LOG.error("Error in update task at commit " + instantTime, io);
writeStatus.setGlobalError(io);
@@ -145,7 +150,7 @@ public class HoodieMergeHandle extends HoodieWrit
long memoryForMerge = SparkConfigUtils.getMaxMemoryPerPartitionMerge(config.getProps());
LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(),
- new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(originalSchema));
+ new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(writerSchema));
} catch (IOException io) {
throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
}
@@ -216,7 +221,7 @@ public class HoodieMergeHandle extends HoodieWrit
HoodieRecord hoodieRecord = new HoodieRecord<>(keyToNewRecords.get(key));
try {
Option combinedAvroRecord =
- hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, useWriterSchema ? writerSchema : originalSchema);
+ hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, useWriterSchema ? writerSchemaWithMetafields : writerSchema);
if (writeUpdateRecord(hoodieRecord, combinedAvroRecord)) {
/*
* ONLY WHEN 1) we have an update for this key AND 2) We are able to successfully write the the combined new
@@ -241,7 +246,7 @@ public class HoodieMergeHandle extends HoodieWrit
fileWriter.writeAvro(key, oldRecord);
} catch (ClassCastException e) {
LOG.error("Schema mismatch when rewriting old record " + oldRecord + " from file " + getOldFilePath()
- + " to file " + newFilePath + " with writerSchema " + writerSchema.toString(true));
+ + " to file " + newFilePath + " with writerSchema " + writerSchemaWithMetafields.toString(true));
throw new HoodieUpsertException(errMsg, e);
} catch (IOException e) {
LOG.error("Failed to merge old record into new file for key " + key + " from old file " + getOldFilePath()
@@ -262,9 +267,9 @@ public class HoodieMergeHandle extends HoodieWrit
HoodieRecord hoodieRecord = newRecordsItr.next();
if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
if (useWriterSchema) {
- writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchema));
+ writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchemaWithMetafields));
} else {
- writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(originalSchema));
+ writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchema));
}
insertRecordsWritten++;
}
@@ -312,4 +317,8 @@ public class HoodieMergeHandle extends HoodieWrit
public IOType getIOType() {
return IOType.MERGE;
}
+
+ public HoodieBaseFile baseFileForMerge() {
+ return baseFileToMerge;
+ }
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 7fd3b42ae..d148b1b8a 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieFileWriter;
@@ -51,8 +52,8 @@ public abstract class HoodieWriteHandle extends H
private static final Logger LOG = LogManager.getLogger(HoodieWriteHandle.class);
- protected final Schema originalSchema;
protected final Schema writerSchema;
+ protected final Schema writerSchemaWithMetafields;
protected HoodieTimer timer;
protected final WriteStatus writeStatus;
protected final String partitionPath;
@@ -62,11 +63,18 @@ public abstract class HoodieWriteHandle extends H
public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath,
String fileId, HoodieTable hoodieTable, SparkTaskContextSupplier sparkTaskContextSupplier) {
+ this(config, instantTime, partitionPath, fileId, hoodieTable,
+ getWriterSchemaIncludingAndExcludingMetadataPair(config), sparkTaskContextSupplier);
+ }
+
+ protected HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath, String fileId,
+ HoodieTable hoodieTable, Pair writerSchemaIncludingAndExcludingMetadataPair,
+ SparkTaskContextSupplier sparkTaskContextSupplier) {
super(config, instantTime, hoodieTable);
this.partitionPath = partitionPath;
this.fileId = fileId;
- this.originalSchema = new Schema.Parser().parse(config.getSchema());
- this.writerSchema = HoodieAvroUtils.createHoodieWriteSchema(originalSchema);
+ this.writerSchema = writerSchemaIncludingAndExcludingMetadataPair.getKey();
+ this.writerSchemaWithMetafields = writerSchemaIncludingAndExcludingMetadataPair.getValue();
this.timer = new HoodieTimer().startTimer();
this.writeStatus = (WriteStatus) ReflectionUtils.loadClass(config.getWriteStatusClassName(),
!hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction());
@@ -74,6 +82,19 @@ public abstract class HoodieWriteHandle extends H
this.writeToken = makeWriteToken();
}
+ /**
+ * Returns writer schema pairs containing
+ * (a) Writer Schema from client
+ * (b) (a) with hoodie metadata fields.
+ * @param config Write Config
+ * @return
+ */
+ protected static Pair getWriterSchemaIncludingAndExcludingMetadataPair(HoodieWriteConfig config) {
+ Schema originalSchema = new Schema.Parser().parse(config.getSchema());
+ Schema hoodieSchema = HoodieAvroUtils.addMetadataFields(originalSchema);
+ return Pair.of(originalSchema, hoodieSchema);
+ }
+
/**
* Generate a write token based on the currently running spark task and its place in the spark dag.
*/
@@ -103,8 +124,8 @@ public abstract class HoodieWriteHandle extends H
markerFiles.create(partitionPath, dataFileName, getIOType());
}
- public Schema getWriterSchema() {
- return writerSchema;
+ public Schema getWriterSchemaWithMetafields() {
+ return writerSchemaWithMetafields;
}
/**
@@ -142,7 +163,7 @@ public abstract class HoodieWriteHandle extends H
* Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields.
*/
protected GenericRecord rewriteRecord(GenericRecord record) {
- return HoodieAvroUtils.rewriteRecord(record, writerSchema);
+ return HoodieAvroUtils.rewriteRecord(record, writerSchemaWithMetafields);
}
public abstract WriteStatus close();
diff --git a/hudi-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java b/hudi-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
new file mode 100644
index 000000000..7c3edf747
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+ protected BuiltinKeyGenerator(TypedProperties config) {
+ super(config);
+ }
+
+ /**
+ * Generate a record Key out of provided generic record.
+ */
+ public abstract String getRecordKey(GenericRecord record);
+
+ /**
+ * Generate a partition path out of provided generic record.
+ */
+ public abstract String getPartitionPath(GenericRecord record);
+
+ /**
+ * Generate a Hoodie Key out of provided generic record.
+ */
+ public final HoodieKey getKey(GenericRecord record) {
+ if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
+ throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
+ }
+ return new HoodieKey(getRecordKey(record), getPartitionPath(record));
+ }
+
+ /**
+ * Return fields that constitute record key. Used by Metadata bootstrap.
+ * Have a base implementation inorder to prevent forcing custom KeyGenerator implementation
+ * to implement this method
+ * @return list of record key fields
+ */
+ public List getRecordKeyFields() {
+ throw new IllegalStateException("This method is expected to be overridden by subclasses");
+ }
+
+ /**
+ * Return fields that constiture partition path. Used by Metadata bootstrap.
+ * Have a base implementation inorder to prevent forcing custom KeyGenerator implementation
+ * to implement this method
+ * @return list of partition path fields
+ */
+ public List getPartitionPathFields() {
+ throw new IllegalStateException("This method is expected to be overridden by subclasses");
+ }
+
+ @Override
+ public final List getRecordKeyFieldNames() {
+ // For nested columns, pick top level column name
+ return getRecordKeyFields().stream().map(k -> {
+ int idx = k.indexOf('.');
+ return idx > 0 ? k.substring(0, idx) : k;
+ }).collect(Collectors.toList());
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java b/hudi-client/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
new file mode 100644
index 000000000..c4ac29b2b
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieKeyException;
+
+public class KeyGenUtils {
+
+ protected static final String NULL_RECORDKEY_PLACEHOLDER = "__null__";
+ protected static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__";
+
+ protected static final String DEFAULT_PARTITION_PATH = "default";
+ protected static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
+
+ public static String getRecordKey(GenericRecord record, List recordKeyFields) {
+ boolean keyIsNullEmpty = true;
+ StringBuilder recordKey = new StringBuilder();
+ for (String recordKeyField : recordKeyFields) {
+ String recordKeyValue = HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyField, true);
+ if (recordKeyValue == null) {
+ recordKey.append(recordKeyField + ":" + NULL_RECORDKEY_PLACEHOLDER + ",");
+ } else if (recordKeyValue.isEmpty()) {
+ recordKey.append(recordKeyField + ":" + EMPTY_RECORDKEY_PLACEHOLDER + ",");
+ } else {
+ recordKey.append(recordKeyField + ":" + recordKeyValue + ",");
+ keyIsNullEmpty = false;
+ }
+ }
+ recordKey.deleteCharAt(recordKey.length() - 1);
+ if (keyIsNullEmpty) {
+ throw new HoodieKeyException("recordKey values: \"" + recordKey + "\" for fields: "
+ + recordKeyFields.toString() + " cannot be entirely null or empty.");
+ }
+ return recordKey.toString();
+ }
+
+ public static String getRecordPartitionPath(GenericRecord record, List partitionPathFields,
+ boolean hiveStylePartitioning, boolean encodePartitionPath) {
+ StringBuilder partitionPath = new StringBuilder();
+ for (String partitionPathField : partitionPathFields) {
+ String fieldVal = HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathField, true);
+ if (fieldVal == null || fieldVal.isEmpty()) {
+ partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" + DEFAULT_PARTITION_PATH
+ : DEFAULT_PARTITION_PATH);
+ } else {
+ if (encodePartitionPath) {
+ try {
+ fieldVal = URLEncoder.encode(fieldVal, StandardCharsets.UTF_8.toString());
+ } catch (UnsupportedEncodingException uoe) {
+ throw new HoodieException(uoe.getMessage(), uoe);
+ }
+ }
+ partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" + fieldVal : fieldVal);
+ }
+ partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
+ }
+ partitionPath.deleteCharAt(partitionPath.length() - 1);
+ return partitionPath.toString();
+ }
+
+ public static String getRecordKey(GenericRecord record, String recordKeyField) {
+ String recordKey = HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyField, true);
+ if (recordKey == null || recordKey.isEmpty()) {
+ throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty.");
+ }
+ return recordKey;
+ }
+
+ public static String getPartitionPath(GenericRecord record, String partitionPathField,
+ boolean hiveStylePartitioning, boolean encodePartitionPath) {
+ String partitionPath = HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathField, true);
+ if (partitionPath == null || partitionPath.isEmpty()) {
+ partitionPath = DEFAULT_PARTITION_PATH;
+ }
+ if (encodePartitionPath) {
+ try {
+ partitionPath = URLEncoder.encode(partitionPath, StandardCharsets.UTF_8.toString());
+ } catch (UnsupportedEncodingException uoe) {
+ throw new HoodieException(uoe.getMessage(), uoe);
+ }
+ }
+ if (hiveStylePartitioning) {
+ partitionPath = partitionPathField + "=" + partitionPath;
+ }
+ return partitionPath;
+ }
+}
\ No newline at end of file
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenerator.java b/hudi-client/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
similarity index 76%
rename from hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
rename to hudi-client/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
index b4d609dcf..1a798af74 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
+++ b/hudi-client/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.avro.generic.GenericRecord;
import java.io.Serializable;
+import java.util.List;
/**
* Abstract class to extend for plugging in extraction of {@link HoodieKey} from an Avro record.
@@ -40,4 +41,14 @@ public abstract class KeyGenerator implements Serializable {
* Generate a Hoodie Key out of provided generic record.
*/
public abstract HoodieKey getKey(GenericRecord record);
-}
\ No newline at end of file
+
+ /**
+ * Used during bootstrap, to project out only the record key fields from bootstrap source dataset.
+ *
+ * @return list of field names, when concatenated make up the record key.
+ */
+ public List getRecordKeyFieldNames() {
+ throw new UnsupportedOperationException("Bootstrap not supported for key generator. "
+ + "Please override this method in your custom key generator.");
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
index 849673eaa..21f8bef07 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
@@ -18,9 +18,6 @@
package org.apache.hudi.table;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
@@ -33,30 +30,32 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
-import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
-import org.apache.hudi.table.action.clean.CleanActionExecutor;
import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.bootstrap.BootstrapCommitActionExecutor;
+import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
+import org.apache.hudi.table.action.clean.CleanActionExecutor;
import org.apache.hudi.table.action.commit.BulkInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.BulkInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.DeleteCommitActionExecutor;
import org.apache.hudi.table.action.commit.InsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.InsertPreppedCommitActionExecutor;
+import org.apache.hudi.table.action.commit.MergeHelper;
import org.apache.hudi.table.action.commit.UpsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.UpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -97,7 +96,7 @@ public class HoodieCopyOnWriteTable extends Hoodi
@Override
public HoodieWriteMetadata bulkInsert(JavaSparkContext jsc, String instantTime, JavaRDD> records,
Option userDefinedBulkInsertPartitioner) {
- return new BulkInsertCommitActionExecutor<>(jsc, config,
+ return new BulkInsertCommitActionExecutor(jsc, config,
this, instantTime, records, userDefinedBulkInsertPartitioner).execute();
}
@@ -121,7 +120,7 @@ public class HoodieCopyOnWriteTable extends Hoodi
@Override
public HoodieWriteMetadata bulkInsertPrepped(JavaSparkContext jsc, String instantTime,
JavaRDD> preppedRecords, Option userDefinedBulkInsertPartitioner) {
- return new BulkInsertPreppedCommitActionExecutor<>(jsc, config,
+ return new BulkInsertPreppedCommitActionExecutor(jsc, config,
this, instantTime, preppedRecords, userDefinedBulkInsertPartitioner).execute();
}
@@ -135,6 +134,16 @@ public class HoodieCopyOnWriteTable extends Hoodi
throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table");
}
+ @Override
+ public HoodieBootstrapWriteMetadata bootstrap(JavaSparkContext jsc, Option> extraMetadata) {
+ return new BootstrapCommitActionExecutor(jsc, config, this, extraMetadata).execute();
+ }
+
+ @Override
+ public void rollbackBootstrap(JavaSparkContext jsc, String instantTime) {
+ new CopyOnWriteRestoreActionExecutor(jsc, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
+ }
+
public Iterator> handleUpdate(String instantTime, String partitionPath, String fileId,
Map> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException {
// these are updates
@@ -148,25 +157,10 @@ public class HoodieCopyOnWriteTable extends Hoodi
throw new HoodieUpsertException(
"Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
} else {
- BoundedInMemoryExecutor wrapper = null;
- HoodieFileReader storageReader =
- HoodieFileReaderFactory.getFileReader(getHadoopConf(), upsertHandle.getOldFilePath());
-
- try {
- wrapper =
- new SparkBoundedInMemoryExecutor(config, storageReader.getRecordIterator(upsertHandle.getWriterSchema()),
- new UpdateHandler(upsertHandle), x -> x);
- wrapper.execute();
- } catch (Exception e) {
- throw new HoodieException(e);
- } finally {
- upsertHandle.close();
- if (null != wrapper) {
- wrapper.shutdownNow();
- }
- }
+ MergeHelper.runMerge(this, upsertHandle);
}
+
// TODO(vc): This needs to be revisited
if (upsertHandle.getWriteStatus().getPartitionPath() == null) {
LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", "
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
index 785efa579..a236cdb94 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
@@ -28,10 +28,14 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.bootstrap.BootstrapDeltaCommitActionExecutor;
+import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.action.compact.RunCompactionActionExecutor;
import org.apache.hudi.table.action.deltacommit.BulkInsertDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.BulkInsertPreppedDeltaCommitActionExecutor;
@@ -84,7 +88,7 @@ public class HoodieMergeOnReadTable extends Hoodi
@Override
public HoodieWriteMetadata bulkInsert(JavaSparkContext jsc, String instantTime, JavaRDD> records,
Option userDefinedBulkInsertPartitioner) {
- return new BulkInsertDeltaCommitActionExecutor<>(jsc, config,
+ return new BulkInsertDeltaCommitActionExecutor(jsc, config,
this, instantTime, records, userDefinedBulkInsertPartitioner).execute();
}
@@ -108,7 +112,7 @@ public class HoodieMergeOnReadTable extends Hoodi
@Override
public HoodieWriteMetadata bulkInsertPrepped(JavaSparkContext jsc, String instantTime,
JavaRDD> preppedRecords, Option userDefinedBulkInsertPartitioner) {
- return new BulkInsertPreppedDeltaCommitActionExecutor<>(jsc, config,
+ return new BulkInsertPreppedDeltaCommitActionExecutor(jsc, config,
this, instantTime, preppedRecords, userDefinedBulkInsertPartitioner).execute();
}
@@ -125,6 +129,16 @@ public class HoodieMergeOnReadTable extends Hoodi
return compactionExecutor.execute();
}
+ @Override
+ public HoodieBootstrapWriteMetadata bootstrap(JavaSparkContext jsc, Option> extraMetadata) {
+ return new BootstrapDeltaCommitActionExecutor(jsc, config, this, extraMetadata).execute();
+ }
+
+ @Override
+ public void rollbackBootstrap(JavaSparkContext jsc, String instantTime) {
+ new MergeOnReadRestoreActionExecutor(jsc, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
+ }
+
@Override
public HoodieRollbackMetadata rollback(JavaSparkContext jsc,
String rollbackInstantTime,
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index d8b0c6e90..565e046ea 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -60,6 +60,7 @@ import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -331,6 +332,20 @@ public abstract class HoodieTable implements Seri
public abstract HoodieWriteMetadata compact(JavaSparkContext jsc,
String compactionInstantTime);
+ /**
+ * Perform metadata/full bootstrap of a Hudi table.
+ * @param jsc JavaSparkContext
+ * @param extraMetadata Additional Metadata for storing in commit file.
+ * @return HoodieBootstrapWriteMetadata
+ */
+ public abstract HoodieBootstrapWriteMetadata bootstrap(JavaSparkContext jsc, Option> extraMetadata);
+
+ /**
+ * Perform rollback of bootstrap of a Hudi table.
+ * @param jsc JavaSparkContext
+ */
+ public abstract void rollbackBootstrap(JavaSparkContext jsc, String instantTime);
+
/**
* Executes a new clean action.
*
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapCommitActionExecutor.java
new file mode 100644
index 000000000..e4224fdd8
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapCommitActionExecutor.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.bootstrap;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.bootstrap.BootstrapMode;
+import org.apache.hudi.client.bootstrap.BootstrapRecordPayload;
+import org.apache.hudi.client.bootstrap.BootstrapSchemaProvider;
+import org.apache.hudi.client.bootstrap.BootstrapWriteStatus;
+import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider;
+import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
+import org.apache.hudi.client.bootstrap.translator.BootstrapPartitionPathTranslator;
+import org.apache.hudi.common.bootstrap.FileStatusUtils;
+import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BootstrapFileMapping;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstant.State;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetReaderIterator;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
+import org.apache.hudi.io.HoodieBootstrapHandle;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.commit.BaseCommitActionExecutor;
+import org.apache.hudi.table.action.commit.BulkInsertCommitActionExecutor;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.table.action.commit.CommitActionExecutor;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class BootstrapCommitActionExecutor>
+ extends BaseCommitActionExecutor {
+
+ private static final Logger LOG = LogManager.getLogger(BootstrapCommitActionExecutor.class);
+ protected String bootstrapSchema = null;
+ private transient FileSystem bootstrapSourceFileSystem;
+
+ public BootstrapCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable> table,
+ Option> extraMetadata) {
+ super(jsc, new HoodieWriteConfig.Builder().withProps(config.getProps())
+ .withAutoCommit(true).withWriteStatusClass(BootstrapWriteStatus.class)
+ .withBulkInsertParallelism(config.getBootstrapParallelism())
+ .build(), table, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, WriteOperationType.BOOTSTRAP,
+ extraMetadata);
+ bootstrapSourceFileSystem = FSUtils.getFs(config.getBootstrapSourceBasePath(), hadoopConf);
+ }
+
+ private void validate() {
+ ValidationUtils.checkArgument(config.getBootstrapSourceBasePath() != null,
+ "Ensure Bootstrap Source Path is set");
+ ValidationUtils.checkArgument(config.getBootstrapModeSelectorClass() != null,
+ "Ensure Bootstrap Partition Selector is set");
+ ValidationUtils.checkArgument(config.getBootstrapKeyGeneratorClass() != null,
+ "Ensure bootstrap key generator class is set");
+ }
+
+ @Override
+ public HoodieBootstrapWriteMetadata execute() {
+ validate();
+ try {
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ Option completetedInstant =
+ metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
+ ValidationUtils.checkArgument(!completetedInstant.isPresent(),
+ "Active Timeline is expected to be empty for bootstrap to be performed. "
+ + "If you want to re-bootstrap, please rollback bootstrap first !!");
+ Map>>> partitionSelections = listAndProcessSourcePartitions();
+
+ // First run metadata bootstrap which will auto commit
+ Option metadataResult = metadataBootstrap(partitionSelections.get(BootstrapMode.METADATA_ONLY));
+ // if there are full bootstrap to be performed, perform that too
+ Option fullBootstrapResult = fullBootstrap(partitionSelections.get(BootstrapMode.FULL_RECORD));
+ return new HoodieBootstrapWriteMetadata(metadataResult, fullBootstrapResult);
+ } catch (IOException ioe) {
+ throw new HoodieIOException(ioe.getMessage(), ioe);
+ }
+ }
+
+ protected String getSchemaToStoreInCommit() {
+ return bootstrapSchema;
+ }
+
+ /**
+ * Perform Metadata Bootstrap.
+ * @param partitionFilesList List of partitions and files within that partitions
+ */
+ protected Option metadataBootstrap(List>> partitionFilesList) {
+ if (null == partitionFilesList || partitionFilesList.isEmpty()) {
+ return Option.empty();
+ }
+
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ metaClient.getActiveTimeline().createNewInstant(
+ new HoodieInstant(State.REQUESTED, metaClient.getCommitActionType(),
+ HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS));
+
+ table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED,
+ metaClient.getCommitActionType(), HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS), Option.empty());
+
+ JavaRDD bootstrapWriteStatuses = runMetadataBootstrap(partitionFilesList);
+
+ HoodieWriteMetadata result = new HoodieWriteMetadata();
+ updateIndexAndCommitIfNeeded(bootstrapWriteStatuses.map(w -> w), result);
+ return Option.of(result);
+ }
+
+ @Override
+ protected void commit(Option> extraMetadata, HoodieWriteMetadata result) {
+ // Perform bootstrap index write and then commit. Make sure both record-key and bootstrap-index
+ // is all done in a single job DAG.
+ Map>> bootstrapSourceAndStats =
+ result.getWriteStatuses().collect().stream()
+ .map(w -> {
+ BootstrapWriteStatus ws = (BootstrapWriteStatus) w;
+ return Pair.of(ws.getBootstrapSourceFileMapping(), ws.getStat());
+ }).collect(Collectors.groupingBy(w -> w.getKey().getPartitionPath()));
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ try (BootstrapIndex.IndexWriter indexWriter = BootstrapIndex.getBootstrapIndex(metaClient)
+ .createWriter(metaClient.getTableConfig().getBootstrapBasePath().get())) {
+ LOG.info("Starting to write bootstrap index for source " + config.getBootstrapSourceBasePath() + " in table "
+ + config.getBasePath());
+ indexWriter.begin();
+ bootstrapSourceAndStats.forEach((key, value) -> indexWriter.appendNextPartition(key,
+ value.stream().map(Pair::getKey).collect(Collectors.toList())));
+ indexWriter.finish();
+ LOG.info("Finished writing bootstrap index for source " + config.getBootstrapSourceBasePath() + " in table "
+ + config.getBasePath());
+ }
+
+ super.commit(extraMetadata, result, bootstrapSourceAndStats.values().stream()
+ .flatMap(f -> f.stream().map(Pair::getValue)).collect(Collectors.toList()));
+ LOG.info("Committing metadata bootstrap !!");
+ }
+
+ /**
+ * Perform Metadata Bootstrap.
+ * @param partitionFilesList List of partitions and files within that partitions
+ */
+ protected Option fullBootstrap(List>> partitionFilesList) {
+ if (null == partitionFilesList || partitionFilesList.isEmpty()) {
+ return Option.empty();
+ }
+ TypedProperties properties = new TypedProperties();
+ properties.putAll(config.getProps());
+ FullRecordBootstrapDataProvider inputProvider =
+ (FullRecordBootstrapDataProvider) ReflectionUtils.loadClass(config.getFullBootstrapInputProvider(),
+ properties, jsc);
+ JavaRDD inputRecordsRDD =
+ inputProvider.generateInputRecordRDD("bootstrap_source", config.getBootstrapSourceBasePath(),
+ partitionFilesList);
+ // Start Full Bootstrap
+ final HoodieInstant requested = new HoodieInstant(State.REQUESTED, table.getMetaClient().getCommitActionType(),
+ HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS);
+ table.getActiveTimeline().createNewInstant(requested);
+
+ // Setup correct schema and run bulk insert.
+ return Option.of(getBulkInsertActionExecutor(inputRecordsRDD).execute());
+ }
+
+ protected CommitActionExecutor getBulkInsertActionExecutor(JavaRDD inputRecordsRDD) {
+ return new BulkInsertCommitActionExecutor(jsc, new HoodieWriteConfig.Builder().withProps(config.getProps())
+ .withSchema(bootstrapSchema).build(), table, HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
+ inputRecordsRDD, extraMetadata);
+ }
+
+ private BootstrapWriteStatus handleMetadataBootstrap(String srcPartitionPath, String partitionPath,
+ HoodieFileStatus srcFileStatus, KeyGenerator keyGenerator) {
+
+ Path sourceFilePath = FileStatusUtils.toPath(srcFileStatus.getPath());
+ HoodieBootstrapHandle bootstrapHandle = new HoodieBootstrapHandle(config, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
+ table, partitionPath, FSUtils.createNewFileIdPfx(), table.getSparkTaskContextSupplier());
+ Schema avroSchema = null;
+ try {
+ ParquetMetadata readFooter = ParquetFileReader.readFooter(table.getHadoopConf(), sourceFilePath,
+ ParquetMetadataConverter.NO_FILTER);
+ MessageType parquetSchema = readFooter.getFileMetaData().getSchema();
+ avroSchema = new AvroSchemaConverter().convert(parquetSchema);
+ Schema recordKeySchema = HoodieAvroUtils.generateProjectionSchema(avroSchema,
+ keyGenerator.getRecordKeyFieldNames());
+ LOG.info("Schema to be used for reading record Keys :" + recordKeySchema);
+ AvroReadSupport.setAvroReadSchema(table.getHadoopConf(), recordKeySchema);
+ AvroReadSupport.setRequestedProjection(table.getHadoopConf(), recordKeySchema);
+
+ BoundedInMemoryExecutor wrapper = null;
+ try (ParquetReader reader =
+ AvroParquetReader.builder(sourceFilePath).withConf(table.getHadoopConf()).build()) {
+ wrapper = new SparkBoundedInMemoryExecutor(config,
+ new ParquetReaderIterator(reader), new BootstrapRecordConsumer(bootstrapHandle), inp -> {
+ String recKey = keyGenerator.getKey(inp).getRecordKey();
+ GenericRecord gr = new GenericData.Record(HoodieAvroUtils.RECORD_KEY_SCHEMA);
+ gr.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recKey);
+ BootstrapRecordPayload payload = new BootstrapRecordPayload(gr);
+ HoodieRecord rec = new HoodieRecord(new HoodieKey(recKey, partitionPath), payload);
+ return rec;
+ });
+ wrapper.execute();
+ } catch (Exception e) {
+ throw new HoodieException(e);
+ } finally {
+ bootstrapHandle.close();
+ if (null != wrapper) {
+ wrapper.shutdownNow();
+ }
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException(e.getMessage(), e);
+ }
+ BootstrapWriteStatus writeStatus = (BootstrapWriteStatus)bootstrapHandle.getWriteStatus();
+ BootstrapFileMapping bootstrapFileMapping = new BootstrapFileMapping(
+ config.getBootstrapSourceBasePath(), srcPartitionPath, partitionPath,
+ srcFileStatus, writeStatus.getFileId());
+ writeStatus.setBootstrapSourceFileMapping(bootstrapFileMapping);
+ return writeStatus;
+ }
+
+ /**
+ * Return Bootstrap Mode selections for partitions listed and figure out bootstrap Schema.
+ * @return
+ * @throws IOException
+ */
+ private Map>>> listAndProcessSourcePartitions() throws IOException {
+ List>> folders =
+ BootstrapUtils.getAllLeafFoldersWithFiles(bootstrapSourceFileSystem,
+ config.getBootstrapSourceBasePath(), path -> {
+ // TODO: Needs to be abstracted out when supporting different formats
+ // TODO: Remove hoodieFilter
+ return path.getName().endsWith(HoodieFileFormat.PARQUET.getFileExtension());
+ });
+
+ LOG.info("Fetching Bootstrap Schema !!");
+ BootstrapSchemaProvider sourceSchemaProvider = new BootstrapSchemaProvider(config);
+ bootstrapSchema = sourceSchemaProvider.getBootstrapSchema(jsc, folders).toString();
+ LOG.info("Bootstrap Schema :" + bootstrapSchema);
+
+ BootstrapModeSelector selector =
+ (BootstrapModeSelector) ReflectionUtils.loadClass(config.getBootstrapModeSelectorClass(), config);
+
+ Map> result = selector.select(folders);
+ Map> partitionToFiles = folders.stream().collect(
+ Collectors.toMap(Pair::getKey, Pair::getValue));
+
+ // Ensure all partitions are accounted for
+ ValidationUtils.checkArgument(partitionToFiles.keySet().equals(
+ result.values().stream().flatMap(Collection::stream).collect(Collectors.toSet())));
+
+ return result.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue().stream()
+ .map(p -> Pair.of(p, partitionToFiles.get(p))).collect(Collectors.toList())))
+ .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+ }
+
+ private JavaRDD runMetadataBootstrap(List>> partitions) {
+ if (null == partitions || partitions.isEmpty()) {
+ return jsc.emptyRDD();
+ }
+
+ TypedProperties properties = new TypedProperties();
+ properties.putAll(config.getProps());
+ KeyGenerator keyGenerator = (KeyGenerator) ReflectionUtils.loadClass(config.getBootstrapKeyGeneratorClass(),
+ properties);
+ BootstrapPartitionPathTranslator translator = (BootstrapPartitionPathTranslator) ReflectionUtils.loadClass(
+ config.getBootstrapPartitionPathTranslatorClass(), properties);
+
+ List>> bootstrapPaths = partitions.stream()
+ .flatMap(p -> {
+ String translatedPartitionPath = translator.getBootstrapTranslatedPath(p.getKey());
+ return p.getValue().stream().map(f -> Pair.of(p.getKey(), Pair.of(translatedPartitionPath, f)));
+ })
+ .collect(Collectors.toList());
+
+ return jsc.parallelize(bootstrapPaths, config.getBootstrapParallelism())
+ .map(partitionFsPair -> handleMetadataBootstrap(partitionFsPair.getLeft(), partitionFsPair.getRight().getLeft(),
+ partitionFsPair.getRight().getRight(), keyGenerator));
+ }
+
+ //TODO: Once we decouple commit protocol, we should change the class hierarchy to avoid doing this.
+ @Override
+ protected Partitioner getUpsertPartitioner(WorkloadProfile profile) {
+ throw new UnsupportedOperationException("Should not called in bootstrap code path");
+ }
+
+ @Override
+ protected Partitioner getInsertPartitioner(WorkloadProfile profile) {
+ throw new UnsupportedOperationException("Should not called in bootstrap code path");
+ }
+
+ @Override
+ protected Iterator> handleInsert(String idPfx, Iterator> recordItr) {
+ throw new UnsupportedOperationException("Should not called in bootstrap code path");
+ }
+
+ @Override
+ protected Iterator> handleUpdate(String partitionPath, String fileId, Iterator> recordItr) {
+ throw new UnsupportedOperationException("Should not called in bootstrap code path");
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapDeltaCommitActionExecutor.java
new file mode 100644
index 000000000..08760cc3d
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapDeltaCommitActionExecutor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.bootstrap;
+
+import java.util.Map;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.commit.CommitActionExecutor;
+import org.apache.hudi.table.action.deltacommit.BulkInsertDeltaCommitActionExecutor;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class BootstrapDeltaCommitActionExecutor>
+ extends BootstrapCommitActionExecutor {
+
+ public BootstrapDeltaCommitActionExecutor(JavaSparkContext jsc,
+ HoodieWriteConfig config, HoodieTable> table,
+ Option> extraMetadata) {
+ super(jsc, config, table, extraMetadata);
+ }
+
+ protected CommitActionExecutor getBulkInsertActionExecutor(JavaRDD inputRecordsRDD) {
+ return new BulkInsertDeltaCommitActionExecutor(jsc, new HoodieWriteConfig.Builder().withProps(config.getProps())
+ .withSchema(bootstrapSchema).build(), table, HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
+ inputRecordsRDD, extraMetadata);
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java b/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java
new file mode 100644
index 000000000..7ee240d96
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.bootstrap;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.HoodieBootstrapHandle;
+
+import java.io.IOException;
+
+/**
+ * Consumer that dequeues records from queue and sends to Merge Handle for writing.
+ */
+public class BootstrapRecordConsumer extends BoundedInMemoryQueueConsumer {
+
+ private final HoodieBootstrapHandle bootstrapHandle;
+
+ public BootstrapRecordConsumer(HoodieBootstrapHandle bootstrapHandle) {
+ this.bootstrapHandle = bootstrapHandle;
+ }
+
+ @Override
+ protected void consumeOneRecord(HoodieRecord record) {
+ try {
+ bootstrapHandle.write(record, record.getData().getInsertValue(bootstrapHandle.getWriterSchemaWithMetafields()));
+ } catch (IOException e) {
+ throw new HoodieIOException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ protected void finish() {}
+
+ @Override
+ protected Void getResult() {
+ return null;
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java b/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java
new file mode 100644
index 000000000..67d13651a
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.bootstrap;
+
+import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.common.bootstrap.FileStatusUtils;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.stream.Collectors;
+
+public class BootstrapUtils {
+
+ /**
+ * Returns leaf folders with files under a path.
+ * @param fs File System
+ * @param basePathStr Base Path to look for leaf folders
+ * @param filePathFilter Filters to skip directories/paths
+ * @return list of partition paths with files under them.
+ * @throws IOException
+ */
+ public static List>> getAllLeafFoldersWithFiles(FileSystem fs, String basePathStr,
+ PathFilter filePathFilter) throws IOException {
+ final Path basePath = new Path(basePathStr);
+ final Map> levelToPartitions = new HashMap<>();
+ final Map> partitionToFiles = new HashMap<>();
+ FSUtils.processFiles(fs, basePathStr, (status) -> {
+ if (status.isFile() && filePathFilter.accept(status.getPath())) {
+ String relativePath = FSUtils.getRelativePartitionPath(basePath, status.getPath().getParent());
+ List statusList = partitionToFiles.get(relativePath);
+ if (null == statusList) {
+ Integer level = (int) relativePath.chars().filter(ch -> ch == '/').count();
+ List dirs = levelToPartitions.get(level);
+ if (null == dirs) {
+ dirs = new ArrayList<>();
+ levelToPartitions.put(level, dirs);
+ }
+ dirs.add(relativePath);
+ statusList = new ArrayList<>();
+ partitionToFiles.put(relativePath, statusList);
+ }
+ statusList.add(FileStatusUtils.fromFileStatus(status));
+ }
+ return true;
+ }, true);
+ OptionalInt maxLevelOpt = levelToPartitions.keySet().stream().mapToInt(x -> x).max();
+ int maxLevel = maxLevelOpt.orElse(-1);
+ return maxLevel >= 0 ? levelToPartitions.get(maxLevel).stream()
+ .map(d -> Pair.of(d, partitionToFiles.get(d))).collect(Collectors.toList()) : new ArrayList<>();
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/HoodieBootstrapWriteMetadata.java b/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/HoodieBootstrapWriteMetadata.java
new file mode 100644
index 000000000..4e6167ecb
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/HoodieBootstrapWriteMetadata.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.bootstrap;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+/**
+ * Write Result for a bootstrap operation.
+ */
+public class HoodieBootstrapWriteMetadata {
+
+ private final Option metadataBootstrapResult;
+ private final Option fullBootstrapResult;
+
+ public HoodieBootstrapWriteMetadata(Option metadataBootstrapResult,
+ Option fullBootstrapResult) {
+ this.metadataBootstrapResult = metadataBootstrapResult;
+ this.fullBootstrapResult = fullBootstrapResult;
+ }
+
+ public Option getMetadataBootstrapResult() {
+ return metadataBootstrapResult;
+ }
+
+ public Option getFullBootstrapResult() {
+ return fullBootstrapResult;
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index 40185c69d..a8f4341f7 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -18,6 +18,13 @@
package org.apache.hudi.table.action.commit;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.SparkConfigUtils;
@@ -38,43 +45,31 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.BaseActionExecutor;
-
import org.apache.hudi.table.action.HoodieWriteMetadata;
+
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.storage.StorageLevel;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
import scala.Tuple2;
-public abstract class BaseCommitActionExecutor>
- extends BaseActionExecutor {
+public abstract class BaseCommitActionExecutor, R>
+ extends BaseActionExecutor {
private static final Logger LOG = LogManager.getLogger(BaseCommitActionExecutor.class);
+ protected final Option> extraMetadata;
private final WriteOperationType operationType;
protected final SparkTaskContextSupplier sparkTaskContextSupplier = new SparkTaskContextSupplier();
- public BaseCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config,
- HoodieTable table, String instantTime, WriteOperationType operationType) {
- this(jsc, config, table, instantTime, operationType, null);
- }
-
public BaseCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config,
HoodieTable table, String instantTime, WriteOperationType operationType,
- JavaRDD> inputRecordsRDD) {
+ Option> extraMetadata) {
super(jsc, config, table, instantTime);
this.operationType = operationType;
+ this.extraMetadata = extraMetadata;
}
public HoodieWriteMetadata execute(JavaRDD> inputRecordsRDD) {
@@ -173,13 +168,17 @@ public abstract class BaseCommitActionExecutor>
protected void commitOnAutoCommit(HoodieWriteMetadata result) {
if (config.shouldAutoCommit()) {
LOG.info("Auto commit enabled: Committing " + instantTime);
- commit(Option.empty(), result);
+ commit(extraMetadata, result);
} else {
LOG.info("Auto commit disabled for " + instantTime);
}
}
- private void commit(Option> extraMetadata, HoodieWriteMetadata result) {
+ protected void commit(Option> extraMetadata, HoodieWriteMetadata result) {
+ commit(extraMetadata, result, result.getWriteStatuses().map(WriteStatus::getStat).collect());
+ }
+
+ protected void commit(Option> extraMetadata, HoodieWriteMetadata result, List stats) {
String actionType = table.getMetaClient().getCommitActionType();
LOG.info("Committing " + instantTime + ", action Type " + actionType);
// Create a Hoodie table which encapsulated the commits and files visible
@@ -189,7 +188,6 @@ public abstract class BaseCommitActionExecutor>
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
result.setCommitted(true);
- List stats = result.getWriteStatuses().map(WriteStatus::getStat).collect();
stats.forEach(stat -> metadata.addWriteStat(stat.getPartitionPath(), stat));
result.setWriteStats(stats);
@@ -200,7 +198,7 @@ public abstract class BaseCommitActionExecutor>
if (extraMetadata.isPresent()) {
extraMetadata.get().forEach(metadata::addMetadata);
}
- metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema());
+ metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, getSchemaToStoreInCommit());
metadata.setOperationType(operationType);
try {
@@ -229,6 +227,13 @@ public abstract class BaseCommitActionExecutor>
}
}
+ /**
+ * By default, return the writer schema in Write Config for storing in commit.
+ */
+ protected String getSchemaToStoreInCommit() {
+ return config.getSchema();
+ }
+
protected boolean isWorkloadProfileNeeded() {
return true;
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertCommitActionExecutor.java
index ee93f06a8..162ae2984 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertCommitActionExecutor.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table.action.commit;
+import java.util.Map;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
@@ -31,30 +32,34 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-public class BulkInsertCommitActionExecutor>
- extends CommitActionExecutor {
+public class BulkInsertCommitActionExecutor> extends CommitActionExecutor {
private final JavaRDD> inputRecordsRDD;
- private final Option userDefinedBulkInsertPartitioner;
+ private final Option> bulkInsertPartitioner;
- public BulkInsertCommitActionExecutor(JavaSparkContext jsc,
- HoodieWriteConfig config, HoodieTable table,
- String instantTime, JavaRDD> inputRecordsRDD,
- Option userDefinedBulkInsertPartitioner) {
- super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT);
+ public BulkInsertCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable table,
+ String instantTime, JavaRDD> inputRecordsRDD,
+ Option> bulkInsertPartitioner) {
+ this(jsc, config, table, instantTime, inputRecordsRDD, bulkInsertPartitioner, Option.empty());
+ }
+
+ public BulkInsertCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable table,
+ String instantTime, JavaRDD> inputRecordsRDD,
+ Option> bulkInsertPartitioner,
+ Option> extraMetadata) {
+ super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT, extraMetadata);
this.inputRecordsRDD = inputRecordsRDD;
- this.userDefinedBulkInsertPartitioner = userDefinedBulkInsertPartitioner;
+ this.bulkInsertPartitioner = bulkInsertPartitioner;
}
@Override
public HoodieWriteMetadata execute() {
try {
return BulkInsertHelper.bulkInsert(inputRecordsRDD, instantTime, (HoodieTable) table, config,
- this, true, userDefinedBulkInsertPartitioner);
+ this, true, bulkInsertPartitioner);
+ } catch (HoodieInsertException ie) {
+ throw ie;
} catch (Throwable e) {
- if (e instanceof HoodieInsertException) {
- throw e;
- }
throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e);
}
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java
index eb38f076d..4683c8218 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java
@@ -39,11 +39,10 @@ import java.util.stream.IntStream;
public class BulkInsertHelper> {
- public static > HoodieWriteMetadata bulkInsert(
- JavaRDD> inputRecords, String instantTime,
- HoodieTable table, HoodieWriteConfig config,
- CommitActionExecutor executor, boolean performDedupe,
- Option userDefinedBulkInsertPartitioner) {
+ public static > HoodieWriteMetadata bulkInsert(JavaRDD> inputRecords, String instantTime,
+ HoodieTable table, HoodieWriteConfig config,
+ CommitActionExecutor executor, boolean performDedupe,
+ Option> userDefinedBulkInsertPartitioner) {
HoodieWriteMetadata result = new HoodieWriteMetadata();
// De-dupe/merge if needed
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertPreppedCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertPreppedCommitActionExecutor.java
index 0f7848199..f63d06e95 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertPreppedCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertPreppedCommitActionExecutor.java
@@ -35,12 +35,12 @@ public class BulkInsertPreppedCommitActionExecutor {
private final JavaRDD> preppedInputRecordRdd;
- private final Option userDefinedBulkInsertPartitioner;
+ private final Option> userDefinedBulkInsertPartitioner;
public BulkInsertPreppedCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD> preppedInputRecordRdd,
- Option userDefinedBulkInsertPartitioner) {
+ Option> userDefinedBulkInsertPartitioner) {
super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT);
this.preppedInputRecordRdd = preppedInputRecordRdd;
this.userDefinedBulkInsertPartitioner = userDefinedBulkInsertPartitioner;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
index c07d4c977..fc721ec36 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
@@ -23,21 +23,15 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
-import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.LazyInsertIterable;
-import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieMergeHandle;
-import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
@@ -50,14 +44,19 @@ import java.util.List;
import java.util.Map;
public abstract class CommitActionExecutor>
- extends BaseCommitActionExecutor {
+ extends BaseCommitActionExecutor {
private static final Logger LOG = LogManager.getLogger(CommitActionExecutor.class);
- public CommitActionExecutor(JavaSparkContext jsc,
- HoodieWriteConfig config, HoodieTable table,
- String instantTime, WriteOperationType operationType) {
- super(jsc, config, table, instantTime, operationType);
+ public CommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable table,
+ String instantTime, WriteOperationType operationType) {
+ this(jsc, config, table, instantTime, operationType, Option.empty());
+ }
+
+ public CommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable table,
+ String instantTime, WriteOperationType operationType,
+ Option> extraMetadata) {
+ super(jsc, config, table, instantTime, operationType, extraMetadata);
}
@Override
@@ -87,22 +86,7 @@ public abstract class CommitActionExecutor>
throw new HoodieUpsertException(
"Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
} else {
- BoundedInMemoryExecutor wrapper = null;
- try {
- HoodieFileReader storageReader =
- HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), upsertHandle.getOldFilePath());
- wrapper =
- new SparkBoundedInMemoryExecutor(config, storageReader.getRecordIterator(upsertHandle.getWriterSchema()),
- new UpdateHandler(upsertHandle), x -> x);
- wrapper.execute();
- } catch (Exception e) {
- throw new HoodieException(e);
- } finally {
- upsertHandle.close();
- if (null != wrapper) {
- wrapper.shutdownNow();
- }
- }
+ MergeHelper.runMerge(table, upsertHandle);
}
// TODO(vc): This needs to be revisited
@@ -147,29 +131,4 @@ public abstract class CommitActionExecutor>
public Partitioner getInsertPartitioner(WorkloadProfile profile) {
return getUpsertPartitioner(profile);
}
-
- /**
- * Consumer that dequeues records from queue and sends to Merge Handle.
- */
- private static class UpdateHandler extends BoundedInMemoryQueueConsumer {
-
- private final HoodieMergeHandle upsertHandle;
-
- private UpdateHandler(HoodieMergeHandle upsertHandle) {
- this.upsertHandle = upsertHandle;
- }
-
- @Override
- protected void consumeOneRecord(GenericRecord record) {
- upsertHandle.write(record);
- }
-
- @Override
- protected void finish() {}
-
- @Override
- protected Void getResult() {
- return null;
- }
- }
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/MergeHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/MergeHelper.java
new file mode 100644
index 000000000..4daa5c61f
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/MergeHelper.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import java.io.ByteArrayOutputStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.utils.MergingIterator;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
+import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
+import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Helper to read records from previous version of parquet and run Merge.
+ */
+public class MergeHelper {
+
+ /**
+ * Read records from previous version of base file and merge.
+ * @param table Hoodie Table
+ * @param upsertHandle Merge Handle
+ * @param
+ * @throws IOException in case of error
+ */
+ public static > void runMerge(HoodieTable table, HoodieMergeHandle upsertHandle) throws IOException {
+ final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation();
+ Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
+ HoodieBaseFile baseFile = upsertHandle.baseFileForMerge();
+
+ final GenericDatumWriter gWriter;
+ final GenericDatumReader gReader;
+ Schema readSchema;
+ if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) {
+ readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), upsertHandle.getOldFilePath()).getSchema();
+ gWriter = new GenericDatumWriter<>(readSchema);
+ gReader = new GenericDatumReader<>(readSchema, upsertHandle.getWriterSchemaWithMetafields());
+ } else {
+ gReader = null;
+ gWriter = null;
+ readSchema = upsertHandle.getWriterSchemaWithMetafields();
+ }
+
+ BoundedInMemoryExecutor wrapper = null;
+ HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, upsertHandle.getOldFilePath());
+ try {
+ final Iterator readerIterator;
+ if (baseFile.getBootstrapBaseFile().isPresent()) {
+ readerIterator = getMergingIterator(table, upsertHandle, baseFile, reader, readSchema, externalSchemaTransformation);
+ } else {
+ readerIterator = reader.getRecordIterator(readSchema);
+ }
+
+ ThreadLocal encoderCache = new ThreadLocal<>();
+ ThreadLocal decoderCache = new ThreadLocal<>();
+ wrapper = new SparkBoundedInMemoryExecutor(table.getConfig(), readerIterator,
+ new UpdateHandler(upsertHandle), record -> {
+ if (!externalSchemaTransformation) {
+ return record;
+ }
+ return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord) record);
+ });
+ wrapper.execute();
+ } catch (Exception e) {
+ throw new HoodieException(e);
+ } finally {
+ if (reader != null) {
+ reader.close();
+ }
+ upsertHandle.close();
+ if (null != wrapper) {
+ wrapper.shutdownNow();
+ }
+ }
+ }
+
+ private static GenericRecord transformRecordBasedOnNewSchema(GenericDatumReader gReader, GenericDatumWriter gWriter,
+ ThreadLocal encoderCache, ThreadLocal decoderCache,
+ GenericRecord gRec) {
+ ByteArrayOutputStream inStream = null;
+ try {
+ inStream = new ByteArrayOutputStream();
+ BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(inStream, encoderCache.get());
+ encoderCache.set(encoder);
+ gWriter.write(gRec, encoder);
+ encoder.flush();
+
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inStream.toByteArray(), decoderCache.get());
+ decoderCache.set(decoder);
+ GenericRecord transformedRec = gReader.read(null, decoder);
+ return transformedRec;
+ } catch (IOException e) {
+ throw new HoodieException(e);
+ } finally {
+ try {
+ inStream.close();
+ } catch (IOException ioe) {
+ throw new HoodieException(ioe.getMessage(), ioe);
+ }
+ }
+ }
+
+ /**
+ * Create Parquet record iterator that provides a stitched view of record read from skeleton and bootstrap file.
+ * Skeleton file is a representation of the bootstrap file inside the table, with just the bare bone fields needed
+ * for indexing, writing and other functionality.
+ *
+ */
+ private static > Iterator getMergingIterator(HoodieTable table, HoodieMergeHandle mergeHandle,
+ HoodieBaseFile baseFile, HoodieFileReader reader,
+ Schema readSchema, boolean externalSchemaTransformation) throws IOException {
+ Path externalFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath());
+ Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf());
+ HoodieFileReader bootstrapReader = HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, externalFilePath);
+ Schema bootstrapReadSchema;
+ if (externalSchemaTransformation) {
+ bootstrapReadSchema = bootstrapReader.getSchema();
+ } else {
+ bootstrapReadSchema = mergeHandle.getWriterSchema();
+ }
+
+ return new MergingIterator<>(reader.getRecordIterator(readSchema), bootstrapReader.getRecordIterator(bootstrapReadSchema),
+ (inputRecordPair) -> HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(), inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetafields()));
+ }
+
+ /**
+ * Consumer that dequeues records from queue and sends to Merge Handle.
+ */
+ private static class UpdateHandler extends BoundedInMemoryQueueConsumer {
+
+ private final HoodieMergeHandle upsertHandle;
+
+ private UpdateHandler(HoodieMergeHandle upsertHandle) {
+ this.upsertHandle = upsertHandle;
+ }
+
+ @Override
+ protected void consumeOneRecord(GenericRecord record) {
+ upsertHandle.write(record);
+ }
+
+ @Override
+ protected void finish() {}
+
+ @Override
+ protected Void getResult() {
+ return null;
+ }
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
index 6d88e5cc1..0d7f6bee8 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
@@ -56,22 +56,23 @@ public class ScheduleCompactionActionExecutor extends BaseActionExecutor lastCompaction = table.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
- String deltaCommitsSinceTs = "0";
+ Option lastCompaction = table.getActiveTimeline().getCommitTimeline()
+ .filterCompletedInstants().lastInstant();
+ String lastCompactionTs = "0";
if (lastCompaction.isPresent()) {
- deltaCommitsSinceTs = lastCompaction.get().getTimestamp();
+ lastCompactionTs = lastCompaction.get().getTimestamp();
}
int deltaCommitsSinceLastCompaction = table.getActiveTimeline().getDeltaCommitTimeline()
- .findInstantsAfter(deltaCommitsSinceTs, Integer.MAX_VALUE).countInstants();
+ .findInstantsAfter(lastCompactionTs, Integer.MAX_VALUE).countInstants();
if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) {
LOG.info("Not scheduling compaction as only " + deltaCommitsSinceLastCompaction
- + " delta commits was found since last compaction " + deltaCommitsSinceTs + ". Waiting for "
+ + " delta commits was found since last compaction " + lastCompactionTs + ". Waiting for "
+ config.getInlineCompactDeltaCommitMax());
return new HoodieCompactionPlan();
}
- LOG.info("Compacting merge on read table " + config.getBasePath());
+ LOG.info("Generating compaction plan for merge on read table " + config.getBasePath());
HoodieMergeOnReadTableCompactor compactor = new HoodieMergeOnReadTableCompactor();
try {
return compactor.generateCompactionPlan(jsc, table, config, instantTime,
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertDeltaCommitActionExecutor.java
index 61eb6128b..01ff1faa4 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertDeltaCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertDeltaCommitActionExecutor.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table.action.deltacommit;
+import java.util.Map;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
@@ -36,13 +37,19 @@ public class BulkInsertDeltaCommitActionExecutor {
private final JavaRDD> inputRecordsRDD;
- private final Option bulkInsertPartitioner;
+ private final Option> bulkInsertPartitioner;
- public BulkInsertDeltaCommitActionExecutor(JavaSparkContext jsc,
- HoodieWriteConfig config, HoodieTable table,
- String instantTime, JavaRDD> inputRecordsRDD,
- Option bulkInsertPartitioner) {
- super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT);
+ public BulkInsertDeltaCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable table,
+ String instantTime, JavaRDD> inputRecordsRDD,
+ Option> bulkInsertPartitioner) {
+ this(jsc, config, table, instantTime, inputRecordsRDD, bulkInsertPartitioner, Option.empty());
+ }
+
+ public BulkInsertDeltaCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable table,
+ String instantTime, JavaRDD> inputRecordsRDD,
+ Option> bulkInsertPartitioner,
+ Option