1
0

[HUDI-1633] Make callback return HoodieWriteStat (#2445)

* CALLBACK add partitionPath

* callback can send hoodieWriteStat

* add ApiMaturityLevel
This commit is contained in:
liujinhui
2021-07-16 12:37:07 +08:00
committed by GitHub
parent 38cd74b563
commit 3b264e80d9
6 changed files with 25 additions and 21 deletions

View File

@@ -17,11 +17,15 @@
package org.apache.hudi.callback; package org.apache.hudi.callback;
import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIClass;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage; import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
/** /**
* A callback interface help to call back when a write commit completes successfully. * A callback interface help to call back when a write commit completes successfully.
*/ */
@PublicAPIClass(maturity = ApiMaturityLevel.STABLE)
public interface HoodieWriteCommitCallback { public interface HoodieWriteCommitCallback {
/** /**
@@ -30,6 +34,7 @@ public interface HoodieWriteCommitCallback {
* *
* @param callbackMessage Callback msg, which will be sent to external system. * @param callbackMessage Callback msg, which will be sent to external system.
*/ */
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
void call(HoodieWriteCommitCallbackMessage callbackMessage); void call(HoodieWriteCommitCallbackMessage callbackMessage);
} }

View File

@@ -71,7 +71,7 @@ public class HoodieWriteCommitHttpCallbackClient implements Closeable {
if (statusCode >= 300) { if (statusCode >= 300) {
LOG.warn(String.format("Failed to send callback message. Response was %s", response)); LOG.warn(String.format("Failed to send callback message. Response was %s", response));
} else { } else {
LOG.info(String.format("Sent Callback data %s to %s successfully !", callbackMsg, url)); LOG.info(String.format("Sent Callback data to %s successfully !", url));
} }
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to send callback.", e); LOG.warn("Failed to send callback.", e);

View File

@@ -17,11 +17,17 @@
package org.apache.hudi.callback.common; package org.apache.hudi.callback.common;
import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIClass;
import org.apache.hudi.common.model.HoodieWriteStat;
import java.io.Serializable; import java.io.Serializable;
import java.util.List;
/** /**
* Base callback message, which contains commitTime and tableName only for now. * Base callback message, which contains commitTime and tableName only for now.
*/ */
@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
public class HoodieWriteCommitCallbackMessage implements Serializable { public class HoodieWriteCommitCallbackMessage implements Serializable {
private static final long serialVersionUID = -3033643980627719561L; private static final long serialVersionUID = -3033643980627719561L;
@@ -29,48 +35,43 @@ public class HoodieWriteCommitCallbackMessage implements Serializable {
/** /**
* CommitTime for one batch write, this is required. * CommitTime for one batch write, this is required.
*/ */
private String commitTime; private final String commitTime;
/** /**
* Table name this batch commit to. * Table name this batch commit to.
*/ */
private String tableName; private final String tableName;
/** /**
* BathPath the table located. * BathPath the table located.
*/ */
private String basePath; private final String basePath;
public HoodieWriteCommitCallbackMessage() { /**
} * Statistics about Hoodie write operation.
*/
private final List<HoodieWriteStat> hoodieWriteStat;
public HoodieWriteCommitCallbackMessage(String commitTime, String tableName, String basePath) { public HoodieWriteCommitCallbackMessage(String commitTime, String tableName, String basePath, List<HoodieWriteStat> hoodieWriteStat) {
this.commitTime = commitTime; this.commitTime = commitTime;
this.tableName = tableName; this.tableName = tableName;
this.basePath = basePath; this.basePath = basePath;
this.hoodieWriteStat = hoodieWriteStat;
} }
public String getCommitTime() { public String getCommitTime() {
return commitTime; return commitTime;
} }
public void setCommitTime(String commitTime) {
this.commitTime = commitTime;
}
public String getTableName() { public String getTableName() {
return tableName; return tableName;
} }
public void setTableName(String tableName) {
this.tableName = tableName;
}
public String getBasePath() { public String getBasePath() {
return basePath; return basePath;
} }
public void setBasePath(String basePath) { public List<HoodieWriteStat> getHoodieWriteStat() {
this.basePath = basePath; return hoodieWriteStat;
} }
} }

View File

@@ -43,8 +43,6 @@ public class HoodieWriteCommitHttpCallback implements HoodieWriteCommitCallback
public void call(HoodieWriteCommitCallbackMessage callbackMessage) { public void call(HoodieWriteCommitCallbackMessage callbackMessage) {
// convert to json // convert to json
String callbackMsg = HoodieWriteCommitCallbackUtil.convertToJsonString(callbackMessage); String callbackMsg = HoodieWriteCommitCallbackUtil.convertToJsonString(callbackMessage);
LOG.info("Try to send callbackMsg, msg = " + callbackMsg);
client.send(callbackMsg); client.send(callbackMsg);
} }
} }

View File

@@ -197,7 +197,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
if (null == commitCallback) { if (null == commitCallback) {
commitCallback = HoodieCommitCallbackFactory.create(config); commitCallback = HoodieCommitCallbackFactory.create(config);
} }
commitCallback.call(new HoodieWriteCommitCallbackMessage(instantTime, config.getTableName(), config.getBasePath())); commitCallback.call(new HoodieWriteCommitCallbackMessage(instantTime, config.getTableName(), config.getBasePath(), stats));
} }
return true; return true;
} }

View File

@@ -66,7 +66,7 @@ public class HoodieWriteCommitKafkaCallback implements HoodieWriteCommitCallback
try (KafkaProducer<String, String> producer = createProducer(hoodieConfig)) { try (KafkaProducer<String, String> producer = createProducer(hoodieConfig)) {
ProducerRecord<String, String> record = buildProducerRecord(hoodieConfig, callbackMsg); ProducerRecord<String, String> record = buildProducerRecord(hoodieConfig, callbackMsg);
producer.send(record); producer.send(record);
LOG.info(String.format("Send callback message %s succeed", callbackMsg)); LOG.info("Send callback message succeed");
} catch (Exception e) { } catch (Exception e) {
LOG.error("Send kafka callback msg failed : ", e); LOG.error("Send kafka callback msg failed : ", e);
} }