> getPendingCompactions() {
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
- hoodieTable.getMetaClient().getBasePath(), true);
+ HoodieTableMetaClient metaClient =
+ new HoodieTableMetaClient(jsc.hadoopConfiguration(), hoodieTable.getMetaClient().getBasePath(), true);
return CompactionUtils.getAllPendingCompactionPlans(metaClient).stream()
- .map(instantWorkloadPair ->
- Pair.of(instantWorkloadPair.getKey().getTimestamp(), instantWorkloadPair.getValue()))
+ .map(
+ instantWorkloadPair -> Pair.of(instantWorkloadPair.getKey().getTimestamp(), instantWorkloadPair.getValue()))
.collect(Collectors.toList());
}
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
index 4a8482194..86fc7d020 100644
--- a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
@@ -87,11 +87,10 @@ import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;
/**
- * Hoodie Write Client helps you build datasets on HDFS [insert()] and then perform efficient
- * mutations on a HDFS dataset [upsert()]
+ * Hoodie Write Client helps you build datasets on HDFS [insert()] and then perform efficient mutations on a HDFS
+ * dataset [upsert()]
*
- * Note that, at any given time, there can only be one Spark job performing these operatons on a
- * Hoodie dataset.
+ * Note that, at any given time, there can only be one Spark job performing these operatons on a Hoodie dataset.
*/
public class HoodieWriteClient extends AbstractHoodieClient {
@@ -117,19 +116,17 @@ public class HoodieWriteClient extends AbstractHo
* @param clientConfig
* @param rollbackInFlight
*/
- public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig,
- boolean rollbackInFlight) {
+ public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackInFlight) {
this(jsc, clientConfig, rollbackInFlight, HoodieIndex.createIndex(clientConfig, jsc));
}
@VisibleForTesting
- HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig,
- boolean rollbackInFlight, HoodieIndex index) {
+ HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackInFlight, HoodieIndex index) {
this(jsc, clientConfig, rollbackInFlight, index, Option.empty());
}
- public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig,
- boolean rollbackInFlight, HoodieIndex index, Option timelineService) {
+ public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackInFlight,
+ HoodieIndex index, Option timelineService) {
super(jsc, clientConfig, timelineService);
this.index = index;
this.metrics = new HoodieMetrics(config, config.getTableName());
@@ -137,26 +134,22 @@ public class HoodieWriteClient extends AbstractHo
}
public static SparkConf registerClasses(SparkConf conf) {
- conf.registerKryoClasses(
- new Class[]{HoodieWriteConfig.class, HoodieRecord.class, HoodieKey.class});
+ conf.registerKryoClasses(new Class[] {HoodieWriteConfig.class, HoodieRecord.class, HoodieKey.class});
return conf;
}
/**
- * Filter out HoodieRecords that already exists in the output folder. This is useful in
- * deduplication.
+ * Filter out HoodieRecords that already exists in the output folder. This is useful in deduplication.
*
* @param hoodieRecords Input RDD of Hoodie records.
* @return A subset of hoodieRecords RDD, with existing records filtered out.
*/
public JavaRDD> filterExists(JavaRDD> hoodieRecords) {
// Create a Hoodie table which encapsulated the commits and files visible
- HoodieTable table = HoodieTable.getHoodieTable(
- createMetaClient(true), config, jsc);
+ HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
indexTimer = metrics.getIndexCtx();
JavaRDD> recordsWithLocation = index.tagLocation(hoodieRecords, jsc, table);
- metrics.updateIndexMetrics("lookup", metrics.getDurationInMs(indexTimer == null ? 0L :
- indexTimer.stop()));
+ metrics.updateIndexMetrics("lookup", metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
indexTimer = null;
return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown());
}
@@ -168,14 +161,13 @@ public class HoodieWriteClient extends AbstractHo
HoodieTable table = getTableAndInitCtx(records);
try {
// De-dupe/merge if needed
- JavaRDD> dedupedRecords = combineOnCondition(
- config.shouldCombineBeforeUpsert(), records, config.getUpsertShuffleParallelism());
+ JavaRDD> dedupedRecords =
+ combineOnCondition(config.shouldCombineBeforeUpsert(), records, config.getUpsertShuffleParallelism());
indexTimer = metrics.getIndexCtx();
// perform index loop up to get existing location of records
JavaRDD> taggedRecords = index.tagLocation(dedupedRecords, jsc, table);
- metrics.updateIndexMetrics("lookup", metrics.getDurationInMs(indexTimer == null ? 0L :
- indexTimer.stop()));
+ metrics.updateIndexMetrics("lookup", metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
indexTimer = null;
return upsertRecordsInternal(taggedRecords, commitTime, table, true);
} catch (Throwable e) {
@@ -189,15 +181,13 @@ public class HoodieWriteClient extends AbstractHo
/**
* Upserts the given prepared records into the Hoodie table, at the supplied commitTime.
*
- * This implementation requires that the input records are already tagged, and de-duped if
- * needed.
+ * This implementation requires that the input records are already tagged, and de-duped if needed.
*
* @param preppedRecords Prepared HoodieRecords to upsert
* @param commitTime Commit Time handle
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
- public JavaRDD upsertPreppedRecords(JavaRDD> preppedRecords,
- final String commitTime) {
+ public JavaRDD upsertPreppedRecords(JavaRDD> preppedRecords, final String commitTime) {
HoodieTable table = getTableAndInitCtx(preppedRecords);
try {
return upsertRecordsInternal(preppedRecords, commitTime, table, true);
@@ -205,17 +195,15 @@ public class HoodieWriteClient extends AbstractHo
if (e instanceof HoodieUpsertException) {
throw (HoodieUpsertException) e;
}
- throw new HoodieUpsertException(
- "Failed to upsert prepared records for commit time " + commitTime, e);
+ throw new HoodieUpsertException("Failed to upsert prepared records for commit time " + commitTime, e);
}
}
/**
- * Inserts the given HoodieRecords, into the table. This API is intended to be used for normal
- * writes.
+ * Inserts the given HoodieRecords, into the table. This API is intended to be used for normal writes.
*
- * This implementation skips the index check and is able to leverage benefits such as small file
- * handling/blocking alignment, as with upsert(), by profiling the workload
+ * This implementation skips the index check and is able to leverage benefits such as small file handling/blocking
+ * alignment, as with upsert(), by profiling the workload
*
* @param records HoodieRecords to insert
* @param commitTime Commit Time handle
@@ -225,8 +213,8 @@ public class HoodieWriteClient extends AbstractHo
HoodieTable table = getTableAndInitCtx(records);
try {
// De-dupe/merge if needed
- JavaRDD> dedupedRecords = combineOnCondition(
- config.shouldCombineBeforeInsert(), records, config.getInsertShuffleParallelism());
+ JavaRDD> dedupedRecords =
+ combineOnCondition(config.shouldCombineBeforeInsert(), records, config.getInsertShuffleParallelism());
return upsertRecordsInternal(dedupedRecords, commitTime, table, false);
} catch (Throwable e) {
@@ -240,16 +228,15 @@ public class HoodieWriteClient extends AbstractHo
/**
* Inserts the given prepared records into the Hoodie table, at the supplied commitTime.
*
- * This implementation skips the index check, skips de-duping and is able to leverage benefits
- * such as small file handling/blocking alignment, as with insert(), by profiling the workload.
- * The prepared HoodieRecords should be de-duped if needed.
+ * This implementation skips the index check, skips de-duping and is able to leverage benefits such as small file
+ * handling/blocking alignment, as with insert(), by profiling the workload. The prepared HoodieRecords should be
+ * de-duped if needed.
*
* @param preppedRecords HoodieRecords to insert
* @param commitTime Commit Time handle
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
- public JavaRDD insertPreppedRecords(JavaRDD> preppedRecords,
- final String commitTime) {
+ public JavaRDD insertPreppedRecords(JavaRDD> preppedRecords, final String commitTime) {
HoodieTable table = getTableAndInitCtx(preppedRecords);
try {
return upsertRecordsInternal(preppedRecords, commitTime, table, false);
@@ -257,44 +244,38 @@ public class HoodieWriteClient extends AbstractHo
if (e instanceof HoodieInsertException) {
throw e;
}
- throw new HoodieInsertException(
- "Failed to insert prepared records for commit time " + commitTime, e);
+ throw new HoodieInsertException("Failed to insert prepared records for commit time " + commitTime, e);
}
}
/**
- * Loads the given HoodieRecords, as inserts into the table. This is suitable for doing big bulk
- * loads into a Hoodie table for the very first time (e.g: converting an existing dataset to
- * Hoodie).
+ * Loads the given HoodieRecords, as inserts into the table. This is suitable for doing big bulk loads into a Hoodie
+ * table for the very first time (e.g: converting an existing dataset to Hoodie).
*
- * This implementation uses sortBy (which does range partitioning based on reservoir sampling) and
- * attempts to control the numbers of files with less memory compared to the {@link
- * HoodieWriteClient#insert(JavaRDD, String)}
+ * This implementation uses sortBy (which does range partitioning based on reservoir sampling) and attempts to control
+ * the numbers of files with less memory compared to the {@link HoodieWriteClient#insert(JavaRDD, String)}
*
* @param records HoodieRecords to insert
* @param commitTime Commit Time handle
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
- public JavaRDD bulkInsert(JavaRDD> records,
- final String commitTime) {
+ public JavaRDD bulkInsert(JavaRDD> records, final String commitTime) {
return bulkInsert(records, commitTime, Option.empty());
}
/**
- * Loads the given HoodieRecords, as inserts into the table. This is suitable for doing big bulk
- * loads into a Hoodie table for the very first time (e.g: converting an existing dataset to
- * Hoodie).
+ * Loads the given HoodieRecords, as inserts into the table. This is suitable for doing big bulk loads into a Hoodie
+ * table for the very first time (e.g: converting an existing dataset to Hoodie).
*
- * This implementation uses sortBy (which does range partitioning based on reservoir sampling) and
- * attempts to control the numbers of files with less memory compared to the {@link
- * HoodieWriteClient#insert(JavaRDD, String)}. Optionally it allows users to specify their own
- * partitioner. If specified then it will be used for repartitioning records. See {@link
- * UserDefinedBulkInsertPartitioner}.
+ * This implementation uses sortBy (which does range partitioning based on reservoir sampling) and attempts to control
+ * the numbers of files with less memory compared to the {@link HoodieWriteClient#insert(JavaRDD, String)}. Optionally
+ * it allows users to specify their own partitioner. If specified then it will be used for repartitioning records. See
+ * {@link UserDefinedBulkInsertPartitioner}.
*
* @param records HoodieRecords to insert
* @param commitTime Commit Time handle
- * @param bulkInsertPartitioner If specified then it will be used to partition input records
- * before they are inserted into hoodie.
+ * @param bulkInsertPartitioner If specified then it will be used to partition input records before they are inserted
+ * into hoodie.
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD bulkInsert(JavaRDD> records, final String commitTime,
@@ -302,8 +283,8 @@ public class HoodieWriteClient extends AbstractHo
HoodieTable table = getTableAndInitCtx(records);
try {
// De-dupe/merge if needed
- JavaRDD> dedupedRecords = combineOnCondition(
- config.shouldCombineBeforeInsert(), records, config.getInsertShuffleParallelism());
+ JavaRDD> dedupedRecords =
+ combineOnCondition(config.shouldCombineBeforeInsert(), records, config.getInsertShuffleParallelism());
return bulkInsertInternal(dedupedRecords, commitTime, table, bulkInsertPartitioner);
} catch (Throwable e) {
@@ -315,24 +296,23 @@ public class HoodieWriteClient extends AbstractHo
}
/**
- * Loads the given HoodieRecords, as inserts into the table. This is suitable for doing big bulk
- * loads into a Hoodie table for the very first time (e.g: converting an existing dataset to
- * Hoodie). The input records should contain no duplicates if needed.
+ * Loads the given HoodieRecords, as inserts into the table. This is suitable for doing big bulk loads into a Hoodie
+ * table for the very first time (e.g: converting an existing dataset to Hoodie). The input records should contain no
+ * duplicates if needed.
*
- * This implementation uses sortBy (which does range partitioning based on reservoir sampling) and
- * attempts to control the numbers of files with less memory compared to the {@link
- * HoodieWriteClient#insert(JavaRDD, String)}. Optionally it allows users to specify their own
- * partitioner. If specified then it will be used for repartitioning records. See {@link
- * UserDefinedBulkInsertPartitioner}.
+ * This implementation uses sortBy (which does range partitioning based on reservoir sampling) and attempts to control
+ * the numbers of files with less memory compared to the {@link HoodieWriteClient#insert(JavaRDD, String)}. Optionally
+ * it allows users to specify their own partitioner. If specified then it will be used for repartitioning records. See
+ * {@link UserDefinedBulkInsertPartitioner}.
*
* @param preppedRecords HoodieRecords to insert
* @param commitTime Commit Time handle
- * @param bulkInsertPartitioner If specified then it will be used to partition input records
- * before they are inserted into hoodie.
+ * @param bulkInsertPartitioner If specified then it will be used to partition input records before they are inserted
+ * into hoodie.
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
- public JavaRDD bulkInsertPreppedRecords(JavaRDD> preppedRecords,
- final String commitTime, Option bulkInsertPartitioner) {
+ public JavaRDD bulkInsertPreppedRecords(JavaRDD> preppedRecords, final String commitTime,
+ Option bulkInsertPartitioner) {
HoodieTable table = getTableAndInitCtx(preppedRecords);
try {
return bulkInsertInternal(preppedRecords, commitTime, table, bulkInsertPartitioner);
@@ -340,19 +320,16 @@ public class HoodieWriteClient extends AbstractHo
if (e instanceof HoodieInsertException) {
throw e;
}
- throw new HoodieInsertException(
- "Failed to bulk insert prepared records for commit time " + commitTime, e);
+ throw new HoodieInsertException("Failed to bulk insert prepared records for commit time " + commitTime, e);
}
}
- private JavaRDD bulkInsertInternal(JavaRDD> dedupedRecords,
- String commitTime, HoodieTable table,
- Option bulkInsertPartitioner) {
+ private JavaRDD bulkInsertInternal(JavaRDD> dedupedRecords, String commitTime,
+ HoodieTable table, Option bulkInsertPartitioner) {
final JavaRDD