1
0

[HUDI-2671] Making error -> warn logs from timeline server with concurrent writers for inconsistent state (#4088)

* Making error -> warn logs from timeline server with concurrent writers for inconsistent state

* Fixing bad request response exception for timeline out of sync

* Addressing feedback. removed write concurrency mode depedency
This commit is contained in:
Sivabalan Narayanan
2021-11-25 14:21:32 -05:00
committed by GitHub
parent 7bb90e8caf
commit f692078d32
3 changed files with 76 additions and 7 deletions

View File

@@ -32,6 +32,8 @@ import org.apache.hudi.common.util.Functions.Function3;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpResponseException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -65,7 +67,7 @@ public class PriorityBasedFileSystemView implements SyncableFileSystemView, Seri
try {
return preferredFunction.apply();
} catch (RuntimeException re) {
LOG.error("Got error running preferred function. Trying secondary", re);
handleRuntimeException(re);
errorOnPreferredView = true;
return secondaryFunction.apply();
}
@@ -80,7 +82,7 @@ public class PriorityBasedFileSystemView implements SyncableFileSystemView, Seri
try {
return preferredFunction.apply(val);
} catch (RuntimeException re) {
LOG.error("Got error running preferred function. Trying secondary", re);
handleRuntimeException(re);
errorOnPreferredView = true;
return secondaryFunction.apply(val);
}
@@ -96,7 +98,7 @@ public class PriorityBasedFileSystemView implements SyncableFileSystemView, Seri
try {
return preferredFunction.apply(val, val2);
} catch (RuntimeException re) {
LOG.error("Got error running preferred function. Trying secondary", re);
handleRuntimeException(re);
errorOnPreferredView = true;
return secondaryFunction.apply(val, val2);
}
@@ -112,13 +114,21 @@ public class PriorityBasedFileSystemView implements SyncableFileSystemView, Seri
try {
return preferredFunction.apply(val, val2, val3);
} catch (RuntimeException re) {
LOG.error("Got error running preferred function. Trying secondary", re);
handleRuntimeException(re);
errorOnPreferredView = true;
return secondaryFunction.apply(val, val2, val3);
}
}
}
private void handleRuntimeException(RuntimeException re) {
if (re.getCause() instanceof HttpResponseException && ((HttpResponseException)re.getCause()).getStatusCode() == HttpStatus.SC_BAD_REQUEST) {
LOG.warn("Got error running preferred function. Likely due to another concurrent writer in progress. Trying secondary");
} else {
LOG.error("Got error running preferred function. Trying secondary", re);
}
}
@Override
public Stream<HoodieBaseFile> getLatestBaseFiles(String partitionPath) {
return execute(partitionPath, preferredView::getLatestBaseFiles, secondaryView::getLatestBaseFiles);

View File

@@ -30,6 +30,11 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.http.client.HttpResponseException;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -37,12 +42,14 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -102,6 +109,30 @@ public class TestPriorityBasedFileSystemView {
});
}
@Test
public void testBadRequestExceptionWithPrimary() {
final TestLogAppender appender = new TestLogAppender();
final Logger logger = Logger.getRootLogger();
try {
logger.addAppender(appender);
Stream<HoodieBaseFile> actual;
Stream<HoodieBaseFile> expected = testBaseFileStream;
resetMocks();
when(primary.getLatestBaseFiles()).thenThrow(new RuntimeException(new HttpResponseException(400, "Bad Request")));
when(secondary.getLatestBaseFiles()).thenReturn(testBaseFileStream);
actual = fsView.getLatestBaseFiles();
assertEquals(expected, actual);
final List<LoggingEvent> logs = appender.getLog();
final LoggingEvent firstLogEntry = logs.get(0);
assertEquals(firstLogEntry.getLevel(), Level.WARN);
assertTrue(((String)firstLogEntry.getMessage()).contains("Got error running preferred function. Likely due to another "
+ "concurrent writer in progress. Trying secondary"));
} finally {
logger.removeAppender(appender);
}
}
@Test
public void testGetLatestBaseFilesWithPartitionPath() {
Stream<HoodieBaseFile> actual;
@@ -633,4 +664,26 @@ public class TestPriorityBasedFileSystemView {
public void testGetSecondaryView() {
assertEquals(secondary, fsView.getSecondaryView());
}
class TestLogAppender extends AppenderSkeleton {
private final List<LoggingEvent> log = new ArrayList<LoggingEvent>();
@Override
public boolean requiresLayout() {
return false;
}
@Override
protected void append(final LoggingEvent loggingEvent) {
log.add(loggingEvent);
}
@Override
public void close() {
}
public List<LoggingEvent> getLog() {
return new ArrayList<LoggingEvent>(log);
}
}
}

View File

@@ -33,7 +33,6 @@ import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.timeline.service.handlers.BaseFileHandler;
import org.apache.hudi.timeline.service.handlers.FileSliceHandler;
@@ -42,6 +41,7 @@ import org.apache.hudi.timeline.service.handlers.TimelineHandler;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.javalin.BadRequestResponse;
import io.javalin.Context;
import io.javalin.Handler;
import io.javalin.Javalin;
@@ -507,13 +507,19 @@ public class RequestHandler {
+ " but server has the following timeline "
+ viewManager.getFileSystemView(context.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM))
.getTimeline().getInstants().collect(Collectors.toList());
ValidationUtils.checkArgument(!isLocalViewBehind(context), errMsg);
if (isLocalViewBehind(context)) {
throw new BadRequestResponse(errMsg);
}
long endFinalCheck = System.currentTimeMillis();
finalCheckTimeTaken = endFinalCheck - beginFinalCheck;
}
} catch (RuntimeException re) {
success = false;
LOG.error("Got runtime exception servicing request " + context.queryString(), re);
if (re instanceof BadRequestResponse) {
LOG.warn("Bad request response due to client view behind server view. " + re.getMessage());
} else {
LOG.error("Got runtime exception servicing request " + context.queryString(), re);
}
throw re;
} finally {
long endTs = System.currentTimeMillis();