hostAddr = context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST);
+ timelineServer = Option.of(new EmbeddedTimelineService(context, hostAddr.orElse(null),
config.getClientSpecifiedViewStorageConfig()));
try {
timelineServer.get().startServer();
@@ -122,6 +125,8 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl
}
protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoad) {
- return ClientUtils.createMetaClient(hadoopConf, config, loadActiveTimelineOnLoad);
+ return new HoodieTableMetaClient(hadoopConf, config.getBasePath(), loadActiveTimelineOnLoad,
+ config.getConsistencyGuardConfig(),
+ Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())));
}
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
similarity index 61%
rename from hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index 7970623ee..0f35e270e 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -18,22 +18,27 @@
package org.apache.hudi.client;
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.callback.HoodieWriteCommitCallback;
+import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
+import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
+import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
@@ -45,109 +50,178 @@ import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metrics.HoodieMetrics;
+import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.apache.hudi.table.MarkerFiles;
-import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.table.action.savepoint.SavepointHelpers;
-
-import com.codahale.metrics.Timer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
- * Hoodie Write Client helps you build tables on HDFS [insert()] and then perform efficient mutations on an HDFS
- * table [upsert()]
- *
- * Note that, at any given time, there can only be one Spark job performing these operations on a Hoodie table.
+ * Abstract Write Client providing functionality for performing commit, index updates and rollback
+ * Reused for regular write operations like upsert/insert/bulk-insert.. as well as bootstrap
+ *
+ * @param Sub type of HoodieRecordPayload
+ * @param Type of inputs
+ * @param Type of keys
+ * @param Type of outputs
*/
-public class HoodieWriteClient extends AbstractHoodieWriteClient {
+public abstract class AbstractHoodieWriteClient extends AbstractHoodieClient {
+ protected static final String LOOKUP_STR = "lookup";
private static final long serialVersionUID = 1L;
- private static final Logger LOG = LogManager.getLogger(HoodieWriteClient.class);
- private static final String LOOKUP_STR = "lookup";
- private final boolean rollbackPending;
- private final transient HoodieMetrics metrics;
- private transient Timer.Context compactionTimer;
- private transient AsyncCleanerService asyncCleanerService;
+ private static final Logger LOG = LogManager.getLogger(AbstractHoodieWriteClient.class);
+
+ protected final transient HoodieMetrics metrics;
+ private final transient HoodieIndex index;
+
+ protected transient Timer.Context writeTimer = null;
+ protected transient Timer.Context compactionTimer;
+
+ private transient WriteOperationType operationType;
+ private transient HoodieWriteCommitCallback commitCallback;
+ protected final boolean rollbackPending;
+ protected transient AsyncCleanerService asyncCleanerService;
/**
* Create a write client, without cleaning up failed/inflight commits.
*
- * @param jsc Java Spark Context
+ * @param context HoodieEngineContext
* @param clientConfig instance of HoodieWriteConfig
*/
- public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) {
- this(jsc, clientConfig, false);
+ public AbstractHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
+ this(context, clientConfig, false);
}
/**
* Create a write client, with new hudi index.
*
- * @param jsc Java Spark Context
+ * @param context HoodieEngineContext
* @param writeConfig instance of HoodieWriteConfig
* @param rollbackPending whether need to cleanup pending commits
*/
- public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig writeConfig, boolean rollbackPending) {
- this(jsc, writeConfig, rollbackPending, HoodieIndex.createIndex(writeConfig));
- }
-
- public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig writeConfig, boolean rollbackPending, HoodieIndex index) {
- this(jsc, writeConfig, rollbackPending, index, Option.empty());
+ public AbstractHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending) {
+ this(context, writeConfig, rollbackPending, Option.empty());
}
/**
- * Create a write client, allows to specify all parameters.
+ * Create a write client, allows to specify all parameters.
*
- * @param jsc Java Spark Context
+ * @param context HoodieEngineContext
* @param writeConfig instance of HoodieWriteConfig
* @param rollbackPending whether need to cleanup pending commits
* @param timelineService Timeline Service that runs as part of write client.
*/
- public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig writeConfig, boolean rollbackPending,
- HoodieIndex index, Option timelineService) {
- super(jsc, index, writeConfig, timelineService);
+ public AbstractHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending,
+ Option timelineService) {
+ super(context, writeConfig, timelineService);
this.metrics = new HoodieMetrics(config, config.getTableName());
this.rollbackPending = rollbackPending;
+ this.index = createIndex(writeConfig);
+ }
+
+ protected abstract HoodieIndex createIndex(HoodieWriteConfig writeConfig);
+
+ public void setOperationType(WriteOperationType operationType) {
+ this.operationType = operationType;
+ }
+
+ public WriteOperationType getOperationType() {
+ return this.operationType;
}
/**
- * Register hudi classes for Kryo serialization.
- *
- * @param conf instance of SparkConf
- * @return SparkConf
+ * Commit changes performed at the given instantTime marker.
*/
- public static SparkConf registerClasses(SparkConf conf) {
- conf.registerKryoClasses(new Class[]{HoodieWriteConfig.class, HoodieRecord.class, HoodieKey.class});
- return conf;
+ public boolean commit(String instantTime, O writeStatuses) {
+ return commit(instantTime, writeStatuses, Option.empty());
+ }
+
+ /**
+ *
+ * Commit changes performed at the given instantTime marker.
+ */
+ public boolean commit(String instantTime, O writeStatuses, Option