[HUDI-1037] Introduce a write committed callback hook and given a default http callback implementation (#1842)
This commit is contained in:
@@ -0,0 +1,35 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.callback;
|
||||
|
||||
import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
|
||||
|
||||
/**
|
||||
* A callback interface help to call back when a write commit completes successfully.
|
||||
*/
|
||||
public interface HoodieWriteCommitCallback {
|
||||
|
||||
/**
|
||||
* A callback method the user can implement to provide asynchronous handling of successful write.
|
||||
* This method will be called when a write operation is committed successfully.
|
||||
*
|
||||
* @param callbackMessage Callback msg, which will be sent to external system.
|
||||
*/
|
||||
void call(HoodieWriteCommitCallbackMessage callbackMessage);
|
||||
|
||||
}
|
||||
@@ -0,0 +1,108 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.callback.client.http;
|
||||
|
||||
import org.apache.http.HttpHeaders;
|
||||
import org.apache.http.client.config.RequestConfig;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpPost;
|
||||
import org.apache.http.entity.ContentType;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.HttpClientBuilder;
|
||||
import org.apache.hudi.config.HoodieWriteCommitCallbackConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Write commit callback http client.
|
||||
*/
|
||||
public class HoodieWriteCommitHttpCallbackClient implements Closeable {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(HoodieWriteCommitHttpCallbackClient.class);
|
||||
|
||||
public static final String HEADER_KEY_API_KEY = "HUDI-CALLBACK-KEY";
|
||||
|
||||
private final String apiKey;
|
||||
private final String url;
|
||||
private final CloseableHttpClient client;
|
||||
private Properties props;
|
||||
|
||||
public HoodieWriteCommitHttpCallbackClient(HoodieWriteConfig config) {
|
||||
this.props = config.getProps();
|
||||
this.apiKey = getApiKey();
|
||||
this.url = getUrl();
|
||||
this.client = getClient();
|
||||
}
|
||||
|
||||
public HoodieWriteCommitHttpCallbackClient(String apiKey, String url, CloseableHttpClient client) {
|
||||
this.apiKey = apiKey;
|
||||
this.url = url;
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
public void send(String callbackMsg) {
|
||||
HttpPost request = new HttpPost(url);
|
||||
request.setHeader(HEADER_KEY_API_KEY, apiKey);
|
||||
request.setHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString());
|
||||
request.setEntity(new StringEntity(callbackMsg, ContentType.APPLICATION_JSON));
|
||||
try (CloseableHttpResponse response = client.execute(request)) {
|
||||
int statusCode = response.getStatusLine().getStatusCode();
|
||||
if (statusCode >= 300) {
|
||||
LOG.warn(String.format("Failed to send callback message. Response was %s", response));
|
||||
} else {
|
||||
LOG.info(String.format("Sent Callback data %s to %s successfully !", callbackMsg, url));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to send callback.", e);
|
||||
}
|
||||
}
|
||||
|
||||
private String getApiKey() {
|
||||
return props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_HTTP_API_KEY);
|
||||
}
|
||||
|
||||
private String getUrl() {
|
||||
return props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_HTTP_URL_PROP);
|
||||
}
|
||||
|
||||
private CloseableHttpClient getClient() {
|
||||
int timeoutSeconds = getHttpTimeoutSeconds() * 1000;
|
||||
return HttpClientBuilder.create()
|
||||
.setDefaultRequestConfig(RequestConfig.custom()
|
||||
.setConnectTimeout(timeoutSeconds)
|
||||
.setConnectionRequestTimeout(timeoutSeconds)
|
||||
.setSocketTimeout(timeoutSeconds).build())
|
||||
.build();
|
||||
}
|
||||
|
||||
private Integer getHttpTimeoutSeconds() {
|
||||
return Integer.parseInt(props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_HTTP_TIMEOUT_SECONDS));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
client.close();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,76 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.callback.common;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* Base callback message, which contains commitTime and tableName only for now.
|
||||
*/
|
||||
public class HoodieWriteCommitCallbackMessage implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = -3033643980627719561L;
|
||||
|
||||
/**
|
||||
* CommitTime for one batch write, this is required.
|
||||
*/
|
||||
private String commitTime;
|
||||
|
||||
/**
|
||||
* Table name this batch commit to.
|
||||
*/
|
||||
private String tableName;
|
||||
|
||||
/**
|
||||
* BathPath the table located.
|
||||
*/
|
||||
private String basePath;
|
||||
|
||||
public HoodieWriteCommitCallbackMessage() {
|
||||
}
|
||||
|
||||
public HoodieWriteCommitCallbackMessage(String commitTime, String tableName, String basePath) {
|
||||
this.commitTime = commitTime;
|
||||
this.tableName = tableName;
|
||||
this.basePath = basePath;
|
||||
}
|
||||
|
||||
public String getCommitTime() {
|
||||
return commitTime;
|
||||
}
|
||||
|
||||
public void setCommitTime(String commitTime) {
|
||||
this.commitTime = commitTime;
|
||||
}
|
||||
|
||||
public String getTableName() {
|
||||
return tableName;
|
||||
}
|
||||
|
||||
public void setTableName(String tableName) {
|
||||
this.tableName = tableName;
|
||||
}
|
||||
|
||||
public String getBasePath() {
|
||||
return basePath;
|
||||
}
|
||||
|
||||
public void setBasePath(String basePath) {
|
||||
this.basePath = basePath;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,58 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.callback.impl;
|
||||
|
||||
import org.apache.hudi.callback.HoodieWriteCommitCallback;
|
||||
import org.apache.hudi.callback.client.http.HoodieWriteCommitHttpCallbackClient;
|
||||
import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieCommitCallbackException;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* A http implementation of {@link HoodieWriteCommitCallback}.
|
||||
*/
|
||||
public class HoodieWriteCommitHttpCallback implements HoodieWriteCommitCallback {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(HoodieWriteCommitHttpCallback.class);
|
||||
|
||||
private final HoodieWriteCommitHttpCallbackClient client;
|
||||
|
||||
public HoodieWriteCommitHttpCallback(HoodieWriteConfig config) {
|
||||
this.client = new HoodieWriteCommitHttpCallbackClient(config);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void call(HoodieWriteCommitCallbackMessage callbackMessage) {
|
||||
// convert to json
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
String callbackMsg = null;
|
||||
try {
|
||||
callbackMsg = mapper.writeValueAsString(callbackMessage);
|
||||
} catch (IOException e) {
|
||||
throw new HoodieCommitCallbackException("Callback service convert message to json failed", e);
|
||||
}
|
||||
LOG.info("Try to send callbackMsg, msg = " + callbackMsg);
|
||||
client.send(callbackMsg);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,47 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.callback.util;
|
||||
|
||||
import org.apache.hudi.callback.HoodieWriteCommitCallback;
|
||||
import org.apache.hudi.common.util.ReflectionUtils;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieCommitCallbackException;
|
||||
|
||||
import static org.apache.hudi.config.HoodieWriteCommitCallbackConfig.CALLBACK_CLASS_PROP;
|
||||
|
||||
/**
|
||||
* Factory help to create {@link HoodieWriteCommitCallback}.
|
||||
*/
|
||||
public class HoodieCommitCallbackFactory {
|
||||
public static HoodieWriteCommitCallback create(HoodieWriteConfig config) {
|
||||
String callbackClass = config.getCallbackClass();
|
||||
if (!StringUtils.isNullOrEmpty(callbackClass)) {
|
||||
Object instance = ReflectionUtils.loadClass(callbackClass, config);
|
||||
if (!(instance instanceof HoodieWriteCommitCallback)) {
|
||||
throw new HoodieCommitCallbackException(callbackClass + " is not a subclass of "
|
||||
+ HoodieWriteCommitCallback.class.getSimpleName());
|
||||
}
|
||||
return (HoodieWriteCommitCallback) instance;
|
||||
} else {
|
||||
throw new HoodieCommitCallbackException(String.format("The value of the config option %s can not be null or "
|
||||
+ "empty", CALLBACK_CLASS_PROP));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -19,6 +19,9 @@
|
||||
package org.apache.hudi.client;
|
||||
|
||||
import com.codahale.metrics.Timer;
|
||||
import org.apache.hudi.callback.HoodieWriteCommitCallback;
|
||||
import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
|
||||
import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
|
||||
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
@@ -60,6 +63,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
|
||||
|
||||
private transient Timer.Context writeContext = null;
|
||||
private transient WriteOperationType operationType;
|
||||
private transient HoodieWriteCommitCallback commitCallback;
|
||||
|
||||
public void setOperationType(WriteOperationType operationType) {
|
||||
this.operationType = operationType;
|
||||
@@ -124,6 +128,14 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
|
||||
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
|
||||
e);
|
||||
}
|
||||
|
||||
// callback if needed.
|
||||
if (config.writeCommitCallbackOn()) {
|
||||
if (null == commitCallback) {
|
||||
commitCallback = HoodieCommitCallbackFactory.create(config);
|
||||
}
|
||||
commitCallback.call(new HoodieWriteCommitCallbackMessage(instantTime, config.getTableName(), config.getBasePath()));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,106 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.config;
|
||||
|
||||
import org.apache.hudi.common.config.DefaultHoodieConfig;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Write callback related config.
|
||||
*/
|
||||
public class HoodieWriteCommitCallbackConfig extends DefaultHoodieConfig {
|
||||
|
||||
public static final String CALLBACK_ON = "hoodie.write.commit.callback.on";
|
||||
public static final boolean DEFAULT_CALLBACK_ON = false;
|
||||
|
||||
public static final String CALLBACK_CLASS_PROP = "hoodie.write.commit.callback.class";
|
||||
public static final String DEFAULT_CALLBACK_CLASS_PROP = "org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback";
|
||||
|
||||
// ***** REST callback configs *****
|
||||
public static final String CALLBACK_HTTP_URL_PROP = "hoodie.write.commit.callback.http.url";
|
||||
public static final String CALLBACK_HTTP_API_KEY = "hoodie.write.commit.callback.http.api.key";
|
||||
public static final String DEFAULT_CALLBACK_HTTP_API_KEY = "hudi_write_commit_http_callback";
|
||||
public static final String CALLBACK_HTTP_TIMEOUT_SECONDS = "hoodie.write.commit.callback.http.timeout.seconds";
|
||||
public static final int DEFAULT_CALLBACK_HTTP_TIMEOUT_SECONDS = 3;
|
||||
|
||||
private HoodieWriteCommitCallbackConfig(Properties props) {
|
||||
super(props);
|
||||
}
|
||||
|
||||
public static HoodieWriteCommitCallbackConfig.Builder newBuilder() {
|
||||
return new HoodieWriteCommitCallbackConfig.Builder();
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
|
||||
private final Properties props = new Properties();
|
||||
|
||||
public HoodieWriteCommitCallbackConfig.Builder fromFile(File propertiesFile) throws IOException {
|
||||
try (FileReader reader = new FileReader(propertiesFile)) {
|
||||
this.props.load(reader);
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
public HoodieWriteCommitCallbackConfig.Builder fromProperties(Properties props) {
|
||||
this.props.putAll(props);
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodieWriteCommitCallbackConfig.Builder writeCommitCallbackOn(String callbackOn) {
|
||||
props.setProperty(CALLBACK_ON, callbackOn);
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodieWriteCommitCallbackConfig.Builder withCallbackClass(String callbackClass) {
|
||||
props.setProperty(CALLBACK_CLASS_PROP, callbackClass);
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodieWriteCommitCallbackConfig.Builder withCallbackHttpUrl(String url) {
|
||||
props.setProperty(CALLBACK_HTTP_URL_PROP, url);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withCallbackHttpTimeoutSeconds(String timeoutSeconds) {
|
||||
props.setProperty(CALLBACK_HTTP_TIMEOUT_SECONDS, timeoutSeconds);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withCallbackHttpApiKey(String apiKey) {
|
||||
props.setProperty(CALLBACK_HTTP_API_KEY, apiKey);
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodieWriteCommitCallbackConfig build() {
|
||||
HoodieWriteCommitCallbackConfig config = new HoodieWriteCommitCallbackConfig(props);
|
||||
setDefaultOnCondition(props, !props.containsKey(CALLBACK_ON), CALLBACK_ON, String.valueOf(DEFAULT_CALLBACK_ON));
|
||||
setDefaultOnCondition(props, !props.containsKey(CALLBACK_CLASS_PROP), CALLBACK_CLASS_PROP, DEFAULT_CALLBACK_CLASS_PROP);
|
||||
setDefaultOnCondition(props, !props.containsKey(CALLBACK_HTTP_API_KEY), CALLBACK_HTTP_API_KEY, DEFAULT_CALLBACK_HTTP_API_KEY);
|
||||
setDefaultOnCondition(props, !props.containsKey(CALLBACK_HTTP_TIMEOUT_SECONDS), CALLBACK_HTTP_TIMEOUT_SECONDS,
|
||||
String.valueOf(DEFAULT_CALLBACK_HTTP_TIMEOUT_SECONDS));
|
||||
|
||||
return config;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -642,6 +642,17 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
||||
return clientSpecifiedViewStorageConfig;
|
||||
}
|
||||
|
||||
/**
|
||||
* Commit call back configs.
|
||||
*/
|
||||
public boolean writeCommitCallbackOn() {
|
||||
return Boolean.parseBoolean(props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_ON));
|
||||
}
|
||||
|
||||
public String getCallbackClass() {
|
||||
return props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_CLASS_PROP);
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
|
||||
private final Properties props = new Properties();
|
||||
@@ -652,6 +663,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
||||
private boolean isMemoryConfigSet = false;
|
||||
private boolean isViewConfigSet = false;
|
||||
private boolean isConsistencyGuardSet = false;
|
||||
private boolean isCallbackConfigSet = false;
|
||||
|
||||
public Builder fromFile(File propertiesFile) throws IOException {
|
||||
try (FileReader reader = new FileReader(propertiesFile)) {
|
||||
@@ -798,6 +810,12 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withCallbackConfig(HoodieWriteCommitCallbackConfig callbackConfig) {
|
||||
props.putAll(callbackConfig.getProps());
|
||||
isCallbackConfigSet = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withFinalizeWriteParallelism(int parallelism) {
|
||||
props.setProperty(FINALIZE_WRITE_PARALLELISM, String.valueOf(parallelism));
|
||||
return this;
|
||||
@@ -865,6 +883,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
||||
FileSystemViewStorageConfig.newBuilder().fromProperties(props).build());
|
||||
setDefaultOnCondition(props, !isConsistencyGuardSet,
|
||||
ConsistencyGuardConfig.newBuilder().fromProperties(props).build());
|
||||
setDefaultOnCondition(props, !isCallbackConfigSet,
|
||||
HoodieWriteCommitCallbackConfig.newBuilder().fromProperties(props).build());
|
||||
|
||||
setDefaultOnCondition(props, !props.containsKey(TIMELINE_LAYOUT_VERSION), TIMELINE_LAYOUT_VERSION,
|
||||
String.valueOf(TimelineLayoutVersion.CURR_VERSION));
|
||||
|
||||
@@ -0,0 +1,35 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.exception;
|
||||
|
||||
import org.apache.hudi.callback.HoodieWriteCommitCallback;
|
||||
|
||||
/**
|
||||
* Exception thrown for any higher level errors when {@link HoodieWriteCommitCallback} is executing a callback.
|
||||
*/
|
||||
public class HoodieCommitCallbackException extends HoodieException {
|
||||
|
||||
public HoodieCommitCallbackException(String msg, Throwable e) {
|
||||
super(msg, e);
|
||||
}
|
||||
|
||||
public HoodieCommitCallbackException(String msg) {
|
||||
super(msg);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,119 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.callback.http;
|
||||
|
||||
import org.apache.http.StatusLine;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.hudi.callback.client.http.HoodieWriteCommitHttpCallbackClient;
|
||||
import org.apache.log4j.AppenderSkeleton;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.log4j.spi.LoggingEvent;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Captor;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Unit test for {@link HoodieWriteCommitHttpCallbackClient}.
|
||||
*/
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class TestCallbackHttpClient {
|
||||
|
||||
@Mock
|
||||
AppenderSkeleton appender;
|
||||
|
||||
@Captor
|
||||
ArgumentCaptor<LoggingEvent> logCaptor;
|
||||
|
||||
@Mock
|
||||
CloseableHttpClient httpClient;
|
||||
|
||||
@Mock
|
||||
CloseableHttpResponse httpResponse;
|
||||
|
||||
@Mock
|
||||
StatusLine statusLine;
|
||||
|
||||
private void mockResponse(int statusCode) {
|
||||
when(statusLine.getStatusCode()).thenReturn(statusCode);
|
||||
when(httpResponse.getStatusLine()).thenReturn(statusLine);
|
||||
try {
|
||||
when(httpClient.execute(any())).thenReturn(httpResponse);
|
||||
} catch (IOException e) {
|
||||
fail(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void sendPayloadShouldLogWhenRequestFailed() throws IOException {
|
||||
Logger.getRootLogger().addAppender(appender);
|
||||
when(httpClient.execute(any())).thenThrow(IOException.class);
|
||||
|
||||
HoodieWriteCommitHttpCallbackClient hoodieWriteCommitCallBackHttpClient =
|
||||
new HoodieWriteCommitHttpCallbackClient("fake_api_key", "fake_url", httpClient);
|
||||
hoodieWriteCommitCallBackHttpClient.send("{}");
|
||||
|
||||
verify(appender).doAppend(logCaptor.capture());
|
||||
assertEquals("Failed to send callback.", logCaptor.getValue().getRenderedMessage());
|
||||
assertEquals(Level.WARN, logCaptor.getValue().getLevel());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void sendPayloadShouldLogUnsuccessfulSending() {
|
||||
Logger.getRootLogger().addAppender(appender);
|
||||
mockResponse(401);
|
||||
when(httpResponse.toString()).thenReturn("unauthorized");
|
||||
|
||||
HoodieWriteCommitHttpCallbackClient hoodieWriteCommitCallBackHttpClient =
|
||||
new HoodieWriteCommitHttpCallbackClient("fake_api_key", "fake_url", httpClient);
|
||||
hoodieWriteCommitCallBackHttpClient.send("{}");
|
||||
|
||||
verify(appender).doAppend(logCaptor.capture());
|
||||
assertEquals("Failed to send callback message. Response was unauthorized", logCaptor.getValue().getRenderedMessage());
|
||||
assertEquals(Level.WARN, logCaptor.getValue().getLevel());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void sendPayloadShouldLogSuccessfulSending() {
|
||||
Logger.getRootLogger().addAppender(appender);
|
||||
mockResponse(202);
|
||||
|
||||
HoodieWriteCommitHttpCallbackClient hoodieWriteCommitCallBackHttpClient =
|
||||
new HoodieWriteCommitHttpCallbackClient("fake_api_key", "fake_url", httpClient);
|
||||
hoodieWriteCommitCallBackHttpClient.send("{}");
|
||||
|
||||
verify(appender).doAppend(logCaptor.capture());
|
||||
assertTrue(logCaptor.getValue().getRenderedMessage().startsWith("Sent Callback data"));
|
||||
assertEquals(Level.INFO, logCaptor.getValue().getLevel());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -78,6 +78,7 @@
|
||||
<include>org.jetbrains.kotlin:*</include>
|
||||
<include>org.rocksdb:rocksdbjni</include>
|
||||
<include>org.apache.httpcomponents:httpclient</include>
|
||||
<include>org.apache.httpcomponents:httpcore</include>
|
||||
<include>org.apache.httpcomponents:fluent-hc</include>
|
||||
<include>org.antlr:stringtemplate</include>
|
||||
<include>org.apache.parquet:parquet-avro</include>
|
||||
|
||||
@@ -81,6 +81,7 @@
|
||||
<include>org.jetbrains.kotlin:*</include>
|
||||
<include>org.rocksdb:rocksdbjni</include>
|
||||
<include>org.apache.httpcomponents:httpclient</include>
|
||||
<include>org.apache.httpcomponents:httpcore</include>
|
||||
<include>org.apache.httpcomponents:fluent-hc</include>
|
||||
<include>org.antlr:stringtemplate</include>
|
||||
<include>org.apache.parquet:parquet-avro</include>
|
||||
|
||||
Reference in New Issue
Block a user