diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/HoodieWriteCommitCallback.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/HoodieWriteCommitCallback.java index 2f5a4eff4..6f287123c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/HoodieWriteCommitCallback.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/HoodieWriteCommitCallback.java @@ -17,11 +17,15 @@ package org.apache.hudi.callback; +import org.apache.hudi.ApiMaturityLevel; +import org.apache.hudi.PublicAPIClass; +import org.apache.hudi.PublicAPIMethod; import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage; /** * A callback interface help to call back when a write commit completes successfully. */ +@PublicAPIClass(maturity = ApiMaturityLevel.STABLE) public interface HoodieWriteCommitCallback { /** @@ -30,6 +34,7 @@ public interface HoodieWriteCommitCallback { * * @param callbackMessage Callback msg, which will be sent to external system. */ + @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) void call(HoodieWriteCommitCallbackMessage callbackMessage); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/client/http/HoodieWriteCommitHttpCallbackClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/client/http/HoodieWriteCommitHttpCallbackClient.java index e22d7dcb6..22457612d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/client/http/HoodieWriteCommitHttpCallbackClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/client/http/HoodieWriteCommitHttpCallbackClient.java @@ -71,7 +71,7 @@ public class HoodieWriteCommitHttpCallbackClient implements Closeable { if (statusCode >= 300) { LOG.warn(String.format("Failed to send callback message. Response was %s", response)); } else { - LOG.info(String.format("Sent Callback data %s to %s successfully !", callbackMsg, url)); + LOG.info(String.format("Sent Callback data to %s successfully !", url)); } } catch (IOException e) { LOG.warn("Failed to send callback.", e); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/common/HoodieWriteCommitCallbackMessage.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/common/HoodieWriteCommitCallbackMessage.java index 0233feeae..8210693a7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/common/HoodieWriteCommitCallbackMessage.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/common/HoodieWriteCommitCallbackMessage.java @@ -17,11 +17,17 @@ package org.apache.hudi.callback.common; +import org.apache.hudi.ApiMaturityLevel; +import org.apache.hudi.PublicAPIClass; +import org.apache.hudi.common.model.HoodieWriteStat; + import java.io.Serializable; +import java.util.List; /** * Base callback message, which contains commitTime and tableName only for now. */ +@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING) public class HoodieWriteCommitCallbackMessage implements Serializable { private static final long serialVersionUID = -3033643980627719561L; @@ -29,48 +35,43 @@ public class HoodieWriteCommitCallbackMessage implements Serializable { /** * CommitTime for one batch write, this is required. */ - private String commitTime; + private final String commitTime; /** * Table name this batch commit to. */ - private String tableName; + private final String tableName; /** * BathPath the table located. */ - private String basePath; + private final String basePath; - public HoodieWriteCommitCallbackMessage() { - } + /** + * Statistics about Hoodie write operation. + */ + private final List hoodieWriteStat; - public HoodieWriteCommitCallbackMessage(String commitTime, String tableName, String basePath) { + public HoodieWriteCommitCallbackMessage(String commitTime, String tableName, String basePath, List hoodieWriteStat) { this.commitTime = commitTime; this.tableName = tableName; this.basePath = basePath; + this.hoodieWriteStat = hoodieWriteStat; } public String getCommitTime() { return commitTime; } - public void setCommitTime(String commitTime) { - this.commitTime = commitTime; - } - public String getTableName() { return tableName; } - public void setTableName(String tableName) { - this.tableName = tableName; - } - public String getBasePath() { return basePath; } - public void setBasePath(String basePath) { - this.basePath = basePath; + public List getHoodieWriteStat() { + return hoodieWriteStat; } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/impl/HoodieWriteCommitHttpCallback.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/impl/HoodieWriteCommitHttpCallback.java index bb60879ef..1f30c7cd5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/impl/HoodieWriteCommitHttpCallback.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/impl/HoodieWriteCommitHttpCallback.java @@ -43,8 +43,6 @@ public class HoodieWriteCommitHttpCallback implements HoodieWriteCommitCallback public void call(HoodieWriteCommitCallbackMessage callbackMessage) { // convert to json String callbackMsg = HoodieWriteCommitCallbackUtil.convertToJsonString(callbackMessage); - LOG.info("Try to send callbackMsg, msg = " + callbackMsg); client.send(callbackMsg); } - } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index f1eb6f1e3..c2f90ca38 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -197,7 +197,7 @@ public abstract class AbstractHoodieWriteClient producer = createProducer(hoodieConfig)) { ProducerRecord record = buildProducerRecord(hoodieConfig, callbackMsg); producer.send(record); - LOG.info(String.format("Send callback message %s succeed", callbackMsg)); + LOG.info("Send callback message succeed"); } catch (Exception e) { LOG.error("Send kafka callback msg failed : ", e); }