diff --git a/hudi-client/src/main/java/org/apache/hudi/callback/HoodieWriteCommitCallback.java b/hudi-client/src/main/java/org/apache/hudi/callback/HoodieWriteCommitCallback.java new file mode 100644 index 000000000..2f5a4eff4 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/callback/HoodieWriteCommitCallback.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.callback; + +import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage; + +/** + * A callback interface help to call back when a write commit completes successfully. + */ +public interface HoodieWriteCommitCallback { + + /** + * A callback method the user can implement to provide asynchronous handling of successful write. + * This method will be called when a write operation is committed successfully. + * + * @param callbackMessage Callback msg, which will be sent to external system. + */ + void call(HoodieWriteCommitCallbackMessage callbackMessage); + +} diff --git a/hudi-client/src/main/java/org/apache/hudi/callback/client/http/HoodieWriteCommitHttpCallbackClient.java b/hudi-client/src/main/java/org/apache/hudi/callback/client/http/HoodieWriteCommitHttpCallbackClient.java new file mode 100644 index 000000000..6c41e2f5e --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/callback/client/http/HoodieWriteCommitHttpCallbackClient.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.callback.client.http; + +import org.apache.http.HttpHeaders; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.hudi.config.HoodieWriteCommitCallbackConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Properties; + +/** + * Write commit callback http client. + */ +public class HoodieWriteCommitHttpCallbackClient implements Closeable { + + private static final Logger LOG = LogManager.getLogger(HoodieWriteCommitHttpCallbackClient.class); + + public static final String HEADER_KEY_API_KEY = "HUDI-CALLBACK-KEY"; + + private final String apiKey; + private final String url; + private final CloseableHttpClient client; + private Properties props; + + public HoodieWriteCommitHttpCallbackClient(HoodieWriteConfig config) { + this.props = config.getProps(); + this.apiKey = getApiKey(); + this.url = getUrl(); + this.client = getClient(); + } + + public HoodieWriteCommitHttpCallbackClient(String apiKey, String url, CloseableHttpClient client) { + this.apiKey = apiKey; + this.url = url; + this.client = client; + } + + public void send(String callbackMsg) { + HttpPost request = new HttpPost(url); + request.setHeader(HEADER_KEY_API_KEY, apiKey); + request.setHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString()); + request.setEntity(new StringEntity(callbackMsg, ContentType.APPLICATION_JSON)); + try (CloseableHttpResponse response = client.execute(request)) { + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode >= 300) { + LOG.warn(String.format("Failed to send callback message. Response was %s", response)); + } else { + LOG.info(String.format("Sent Callback data %s to %s successfully !", callbackMsg, url)); + } + } catch (IOException e) { + LOG.warn("Failed to send callback.", e); + } + } + + private String getApiKey() { + return props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_HTTP_API_KEY); + } + + private String getUrl() { + return props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_HTTP_URL_PROP); + } + + private CloseableHttpClient getClient() { + int timeoutSeconds = getHttpTimeoutSeconds() * 1000; + return HttpClientBuilder.create() + .setDefaultRequestConfig(RequestConfig.custom() + .setConnectTimeout(timeoutSeconds) + .setConnectionRequestTimeout(timeoutSeconds) + .setSocketTimeout(timeoutSeconds).build()) + .build(); + } + + private Integer getHttpTimeoutSeconds() { + return Integer.parseInt(props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_HTTP_TIMEOUT_SECONDS)); + } + + @Override + public void close() throws IOException { + client.close(); + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/callback/common/HoodieWriteCommitCallbackMessage.java b/hudi-client/src/main/java/org/apache/hudi/callback/common/HoodieWriteCommitCallbackMessage.java new file mode 100644 index 000000000..0233feeae --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/callback/common/HoodieWriteCommitCallbackMessage.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.callback.common; + +import java.io.Serializable; + +/** + * Base callback message, which contains commitTime and tableName only for now. + */ +public class HoodieWriteCommitCallbackMessage implements Serializable { + + private static final long serialVersionUID = -3033643980627719561L; + + /** + * CommitTime for one batch write, this is required. + */ + private String commitTime; + + /** + * Table name this batch commit to. + */ + private String tableName; + + /** + * BathPath the table located. + */ + private String basePath; + + public HoodieWriteCommitCallbackMessage() { + } + + public HoodieWriteCommitCallbackMessage(String commitTime, String tableName, String basePath) { + this.commitTime = commitTime; + this.tableName = tableName; + this.basePath = basePath; + } + + public String getCommitTime() { + return commitTime; + } + + public void setCommitTime(String commitTime) { + this.commitTime = commitTime; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public String getBasePath() { + return basePath; + } + + public void setBasePath(String basePath) { + this.basePath = basePath; + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/callback/impl/HoodieWriteCommitHttpCallback.java b/hudi-client/src/main/java/org/apache/hudi/callback/impl/HoodieWriteCommitHttpCallback.java new file mode 100644 index 000000000..910e62618 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/callback/impl/HoodieWriteCommitHttpCallback.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.callback.impl; + +import org.apache.hudi.callback.HoodieWriteCommitCallback; +import org.apache.hudi.callback.client.http.HoodieWriteCommitHttpCallbackClient; +import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieCommitCallbackException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.codehaus.jackson.map.ObjectMapper; + +import java.io.IOException; + +/** + * A http implementation of {@link HoodieWriteCommitCallback}. + */ +public class HoodieWriteCommitHttpCallback implements HoodieWriteCommitCallback { + + private static final Logger LOG = LogManager.getLogger(HoodieWriteCommitHttpCallback.class); + + private final HoodieWriteCommitHttpCallbackClient client; + + public HoodieWriteCommitHttpCallback(HoodieWriteConfig config) { + this.client = new HoodieWriteCommitHttpCallbackClient(config); + } + + @Override + public void call(HoodieWriteCommitCallbackMessage callbackMessage) { + // convert to json + ObjectMapper mapper = new ObjectMapper(); + String callbackMsg = null; + try { + callbackMsg = mapper.writeValueAsString(callbackMessage); + } catch (IOException e) { + throw new HoodieCommitCallbackException("Callback service convert message to json failed", e); + } + LOG.info("Try to send callbackMsg, msg = " + callbackMsg); + client.send(callbackMsg); + } + +} diff --git a/hudi-client/src/main/java/org/apache/hudi/callback/util/HoodieCommitCallbackFactory.java b/hudi-client/src/main/java/org/apache/hudi/callback/util/HoodieCommitCallbackFactory.java new file mode 100644 index 000000000..9d1e9c354 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/callback/util/HoodieCommitCallbackFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.callback.util; + +import org.apache.hudi.callback.HoodieWriteCommitCallback; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieCommitCallbackException; + +import static org.apache.hudi.config.HoodieWriteCommitCallbackConfig.CALLBACK_CLASS_PROP; + +/** + * Factory help to create {@link HoodieWriteCommitCallback}. + */ +public class HoodieCommitCallbackFactory { + public static HoodieWriteCommitCallback create(HoodieWriteConfig config) { + String callbackClass = config.getCallbackClass(); + if (!StringUtils.isNullOrEmpty(callbackClass)) { + Object instance = ReflectionUtils.loadClass(callbackClass, config); + if (!(instance instanceof HoodieWriteCommitCallback)) { + throw new HoodieCommitCallbackException(callbackClass + " is not a subclass of " + + HoodieWriteCommitCallback.class.getSimpleName()); + } + return (HoodieWriteCommitCallback) instance; + } else { + throw new HoodieCommitCallbackException(String.format("The value of the config option %s can not be null or " + + "empty", CALLBACK_CLASS_PROP)); + } + } + +} diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index b922caa9d..644aca9da 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -19,6 +19,9 @@ package org.apache.hudi.client; import com.codahale.metrics.Timer; +import org.apache.hudi.callback.HoodieWriteCommitCallback; +import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage; +import org.apache.hudi.callback.util.HoodieCommitCallbackFactory; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -60,6 +63,7 @@ public abstract class AbstractHoodieWriteClient e private transient Timer.Context writeContext = null; private transient WriteOperationType operationType; + private transient HoodieWriteCommitCallback commitCallback; public void setOperationType(WriteOperationType operationType) { this.operationType = operationType; @@ -124,6 +128,14 @@ public abstract class AbstractHoodieWriteClient e throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime, e); } + + // callback if needed. + if (config.writeCommitCallbackOn()) { + if (null == commitCallback) { + commitCallback = HoodieCommitCallbackFactory.create(config); + } + commitCallback.call(new HoodieWriteCommitCallbackMessage(instantTime, config.getTableName(), config.getBasePath())); + } return true; } diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java new file mode 100644 index 000000000..47a01aa05 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config; + +import org.apache.hudi.common.config.DefaultHoodieConfig; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Properties; + +/** + * Write callback related config. + */ +public class HoodieWriteCommitCallbackConfig extends DefaultHoodieConfig { + + public static final String CALLBACK_ON = "hoodie.write.commit.callback.on"; + public static final boolean DEFAULT_CALLBACK_ON = false; + + public static final String CALLBACK_CLASS_PROP = "hoodie.write.commit.callback.class"; + public static final String DEFAULT_CALLBACK_CLASS_PROP = "org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback"; + + // ***** REST callback configs ***** + public static final String CALLBACK_HTTP_URL_PROP = "hoodie.write.commit.callback.http.url"; + public static final String CALLBACK_HTTP_API_KEY = "hoodie.write.commit.callback.http.api.key"; + public static final String DEFAULT_CALLBACK_HTTP_API_KEY = "hudi_write_commit_http_callback"; + public static final String CALLBACK_HTTP_TIMEOUT_SECONDS = "hoodie.write.commit.callback.http.timeout.seconds"; + public static final int DEFAULT_CALLBACK_HTTP_TIMEOUT_SECONDS = 3; + + private HoodieWriteCommitCallbackConfig(Properties props) { + super(props); + } + + public static HoodieWriteCommitCallbackConfig.Builder newBuilder() { + return new HoodieWriteCommitCallbackConfig.Builder(); + } + + public static class Builder { + + private final Properties props = new Properties(); + + public HoodieWriteCommitCallbackConfig.Builder fromFile(File propertiesFile) throws IOException { + try (FileReader reader = new FileReader(propertiesFile)) { + this.props.load(reader); + return this; + } + } + + public HoodieWriteCommitCallbackConfig.Builder fromProperties(Properties props) { + this.props.putAll(props); + return this; + } + + public HoodieWriteCommitCallbackConfig.Builder writeCommitCallbackOn(String callbackOn) { + props.setProperty(CALLBACK_ON, callbackOn); + return this; + } + + public HoodieWriteCommitCallbackConfig.Builder withCallbackClass(String callbackClass) { + props.setProperty(CALLBACK_CLASS_PROP, callbackClass); + return this; + } + + public HoodieWriteCommitCallbackConfig.Builder withCallbackHttpUrl(String url) { + props.setProperty(CALLBACK_HTTP_URL_PROP, url); + return this; + } + + public Builder withCallbackHttpTimeoutSeconds(String timeoutSeconds) { + props.setProperty(CALLBACK_HTTP_TIMEOUT_SECONDS, timeoutSeconds); + return this; + } + + public Builder withCallbackHttpApiKey(String apiKey) { + props.setProperty(CALLBACK_HTTP_API_KEY, apiKey); + return this; + } + + public HoodieWriteCommitCallbackConfig build() { + HoodieWriteCommitCallbackConfig config = new HoodieWriteCommitCallbackConfig(props); + setDefaultOnCondition(props, !props.containsKey(CALLBACK_ON), CALLBACK_ON, String.valueOf(DEFAULT_CALLBACK_ON)); + setDefaultOnCondition(props, !props.containsKey(CALLBACK_CLASS_PROP), CALLBACK_CLASS_PROP, DEFAULT_CALLBACK_CLASS_PROP); + setDefaultOnCondition(props, !props.containsKey(CALLBACK_HTTP_API_KEY), CALLBACK_HTTP_API_KEY, DEFAULT_CALLBACK_HTTP_API_KEY); + setDefaultOnCondition(props, !props.containsKey(CALLBACK_HTTP_TIMEOUT_SECONDS), CALLBACK_HTTP_TIMEOUT_SECONDS, + String.valueOf(DEFAULT_CALLBACK_HTTP_TIMEOUT_SECONDS)); + + return config; + } + } + +} diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index aefde2cc8..d51832d2f 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -642,6 +642,17 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return clientSpecifiedViewStorageConfig; } + /** + * Commit call back configs. + */ + public boolean writeCommitCallbackOn() { + return Boolean.parseBoolean(props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_ON)); + } + + public String getCallbackClass() { + return props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_CLASS_PROP); + } + public static class Builder { private final Properties props = new Properties(); @@ -652,6 +663,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private boolean isMemoryConfigSet = false; private boolean isViewConfigSet = false; private boolean isConsistencyGuardSet = false; + private boolean isCallbackConfigSet = false; public Builder fromFile(File propertiesFile) throws IOException { try (FileReader reader = new FileReader(propertiesFile)) { @@ -798,6 +810,12 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return this; } + public Builder withCallbackConfig(HoodieWriteCommitCallbackConfig callbackConfig) { + props.putAll(callbackConfig.getProps()); + isCallbackConfigSet = true; + return this; + } + public Builder withFinalizeWriteParallelism(int parallelism) { props.setProperty(FINALIZE_WRITE_PARALLELISM, String.valueOf(parallelism)); return this; @@ -865,6 +883,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { FileSystemViewStorageConfig.newBuilder().fromProperties(props).build()); setDefaultOnCondition(props, !isConsistencyGuardSet, ConsistencyGuardConfig.newBuilder().fromProperties(props).build()); + setDefaultOnCondition(props, !isCallbackConfigSet, + HoodieWriteCommitCallbackConfig.newBuilder().fromProperties(props).build()); setDefaultOnCondition(props, !props.containsKey(TIMELINE_LAYOUT_VERSION), TIMELINE_LAYOUT_VERSION, String.valueOf(TimelineLayoutVersion.CURR_VERSION)); diff --git a/hudi-client/src/main/java/org/apache/hudi/exception/HoodieCommitCallbackException.java b/hudi-client/src/main/java/org/apache/hudi/exception/HoodieCommitCallbackException.java new file mode 100644 index 000000000..57468cb86 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/exception/HoodieCommitCallbackException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.exception; + +import org.apache.hudi.callback.HoodieWriteCommitCallback; + +/** + * Exception thrown for any higher level errors when {@link HoodieWriteCommitCallback} is executing a callback. + */ +public class HoodieCommitCallbackException extends HoodieException { + + public HoodieCommitCallbackException(String msg, Throwable e) { + super(msg, e); + } + + public HoodieCommitCallbackException(String msg) { + super(msg); + } +} \ No newline at end of file diff --git a/hudi-client/src/test/java/org/apache/hudi/callback/http/TestCallbackHttpClient.java b/hudi-client/src/test/java/org/apache/hudi/callback/http/TestCallbackHttpClient.java new file mode 100644 index 000000000..616dc3173 --- /dev/null +++ b/hudi-client/src/test/java/org/apache/hudi/callback/http/TestCallbackHttpClient.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.callback.http; + +import org.apache.http.StatusLine; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.hudi.callback.client.http.HoodieWriteCommitHttpCallbackClient; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.IOException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit test for {@link HoodieWriteCommitHttpCallbackClient}. + */ +@ExtendWith(MockitoExtension.class) +public class TestCallbackHttpClient { + + @Mock + AppenderSkeleton appender; + + @Captor + ArgumentCaptor logCaptor; + + @Mock + CloseableHttpClient httpClient; + + @Mock + CloseableHttpResponse httpResponse; + + @Mock + StatusLine statusLine; + + private void mockResponse(int statusCode) { + when(statusLine.getStatusCode()).thenReturn(statusCode); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + try { + when(httpClient.execute(any())).thenReturn(httpResponse); + } catch (IOException e) { + fail(e.getMessage(), e); + } + } + + @Test + public void sendPayloadShouldLogWhenRequestFailed() throws IOException { + Logger.getRootLogger().addAppender(appender); + when(httpClient.execute(any())).thenThrow(IOException.class); + + HoodieWriteCommitHttpCallbackClient hoodieWriteCommitCallBackHttpClient = + new HoodieWriteCommitHttpCallbackClient("fake_api_key", "fake_url", httpClient); + hoodieWriteCommitCallBackHttpClient.send("{}"); + + verify(appender).doAppend(logCaptor.capture()); + assertEquals("Failed to send callback.", logCaptor.getValue().getRenderedMessage()); + assertEquals(Level.WARN, logCaptor.getValue().getLevel()); + } + + @Test + public void sendPayloadShouldLogUnsuccessfulSending() { + Logger.getRootLogger().addAppender(appender); + mockResponse(401); + when(httpResponse.toString()).thenReturn("unauthorized"); + + HoodieWriteCommitHttpCallbackClient hoodieWriteCommitCallBackHttpClient = + new HoodieWriteCommitHttpCallbackClient("fake_api_key", "fake_url", httpClient); + hoodieWriteCommitCallBackHttpClient.send("{}"); + + verify(appender).doAppend(logCaptor.capture()); + assertEquals("Failed to send callback message. Response was unauthorized", logCaptor.getValue().getRenderedMessage()); + assertEquals(Level.WARN, logCaptor.getValue().getLevel()); + } + + @Test + public void sendPayloadShouldLogSuccessfulSending() { + Logger.getRootLogger().addAppender(appender); + mockResponse(202); + + HoodieWriteCommitHttpCallbackClient hoodieWriteCommitCallBackHttpClient = + new HoodieWriteCommitHttpCallbackClient("fake_api_key", "fake_url", httpClient); + hoodieWriteCommitCallBackHttpClient.send("{}"); + + verify(appender).doAppend(logCaptor.capture()); + assertTrue(logCaptor.getValue().getRenderedMessage().startsWith("Sent Callback data")); + assertEquals(Level.INFO, logCaptor.getValue().getLevel()); + } + +} diff --git a/packaging/hudi-spark-bundle/pom.xml b/packaging/hudi-spark-bundle/pom.xml index e0e60a972..f4affca79 100644 --- a/packaging/hudi-spark-bundle/pom.xml +++ b/packaging/hudi-spark-bundle/pom.xml @@ -78,6 +78,7 @@ org.jetbrains.kotlin:* org.rocksdb:rocksdbjni org.apache.httpcomponents:httpclient + org.apache.httpcomponents:httpcore org.apache.httpcomponents:fluent-hc org.antlr:stringtemplate org.apache.parquet:parquet-avro diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml index 2da82f203..8cf34077c 100644 --- a/packaging/hudi-utilities-bundle/pom.xml +++ b/packaging/hudi-utilities-bundle/pom.xml @@ -81,6 +81,7 @@ org.jetbrains.kotlin:* org.rocksdb:rocksdbjni org.apache.httpcomponents:httpclient + org.apache.httpcomponents:httpcore org.apache.httpcomponents:fluent-hc org.antlr:stringtemplate org.apache.parquet:parquet-avro