Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added timeouts on send and flush calls in KafkaProducerWrapper #696

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

jzakaryan
Copy link
Collaborator

@jzakaryan jzakaryan commented Mar 24, 2020

The changes in this PR address the issue of kafka client being stuck on send and flush in cases when the destination topic gets dropped. Since kafka treats missing topic metadata as an eventual consistency issue and keeps retrying to send hoping that the topic will be available again, we have to make our calls to those methods bounded (i.e. have them time out after a certain amount of time).
I use CompletableFutures to have timeouts on those calls which are non blocking. Seems like that's the best way to do that in java without having to resort to third party libraries.

@jzakaryan jzakaryan changed the title [WIP] Added timeouts on send and flush calls in KafkaProducerWrapper Added timeouts on send and flush calls in KafkaProducerWrapper Mar 30, 2020
Copy link
Collaborator

@somandal somandal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only a partial review, let's address these and I'll look deeper into the other parts of the newer code.

Comment on lines 190 to 195
/**
* There are two known cases that lead to IllegalStateException and we should retry:
* (1) number of brokers is less than minISR
* (2) producer is closed in generateSendFailure by another thread
* (3) For either condition, we should retry as broker comes back healthy or producer is recreated
*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why you moved these comments out? Also, I see that you've made the last line into (3), whereas it just talks about the above two points?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

To Sonam's first point: I think those comments are closely related to the logic within the method since they discuss impl details like exception types.

Thread.sleep(_sendFailureRetryWaitTimeMs);
} catch (TimeoutException e) {
_log.warn("Kafka producer buffer is full, retry in {} ms.", _sendFailureRetryWaitTimeMs, e);
} catch (TimeoutException ex) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was there some reason why you had to reorder the catches here? In the previous code, we catch IllegalStateException, and then catch TimeoutException, and then catch KafkaException. Let's not reorder these unless there is a good reason to. It also becomes harder to review since I can't easily see the actual changes vs. reordering.

Also you renamed all exception 'e' to 'ex', is that necessary?

} catch (IllegalStateException ex) {
// The following exception should be quite rare as most exceptions will be throw async callback
_log.warn("Either send is called on a closed producer or broker count is less than minISR, retry in {} ms.",
_sendFailureRetryWaitTimeMs, ex);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the earlier IllegalStateException block would sleep and you've removed it. why?
Thread.sleep(_sendFailureRetryWaitTimeMs);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. This is definitely a bug I introduced. Will revert the entire exception handling piece in send(). Better leave it as it is.

Throwable rootCause = ExceptionUtils.getRootCause(ex);
if (numberOfAttempts > MAX_SEND_ATTEMPTS ||
(rootCause instanceof Error || rootCause instanceof RuntimeException)) {
// Set a max_send_attempts for KafkaException as it may be non-recoverable
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment doesn't make sense here, move it back to outside the if condition?

onComplete.onCompletion(metadata, generateSendFailure(exception));
}
}));
numberOfAttempts++;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change this back to ++numberOfAttempts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

I like the rename (added s) but prefer the prefix form.

In fact, it wouldn't be such a bad idea to turn that loop into a for loop:

for (int numberOfAttempts = 1; retry; ++numberOfAttempts) {
  ...
}

or even eliminate retry entirely:

for (int numberOfAttempts = 1;; ++numberOfAttempts) {
    try {
        maybeGetKafkaProducer(task).ifPresent(p -> doSend(p, producerRecord, onComplete));
        return;
     } catch (...) {
         ...
     }
}

}
}
}

private synchronized void shutdownProducer() {
void doSend(Producer<K, V> producer, ProducerRecord<K, V> record, Callback callback) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this intended to be package private? If so, add a comment, otherwise add the appropriate public/protected/private

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@somandal
I think that's okay; this method's intended to be overriden by another class in the same package. If it's marked protected, it'll be more visible than it needs to be, private wouldn't allow it to be overriden, and public is too permissive.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, makes sense. Just keep things consistent, i.e. if you need a more visible scope for multiple methods, declare them all as package private, and the rest as private. Don't mix protected and package private without having a very good reason to.

Comment on lines 36 to 37
brooklin.server.kafkaProducerWrapper.sendTimeout=50000
brooklin.server.kafkaProducerWrapper.flushTimeout=3600000
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wouldn't be the right way to pass configs to your KafkaProducerWrapper. Lets discuss how to do this offline

Comment on lines 30 to 31
private static final String SEND_TIMEOUT_CONFIG_KEY = "brooklin.server.kafkaProducerWrapper.sendTimeout";
private static final String FLUSH_TIMEOUT_CONFIG_KEY = "brooklin.server.kafkaProducerWrapper.flushTimeout";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't the right way to pass configs. Any configs you want to pass here need to be scoped under the transport provider configs. The KafkaProducerWrapper receives all of the transport provider configs, so you'd want to add these under that scope. We don't need to access the full property name, because as the configs pass through the layers, the relevant prefixes are removed. You can see how other configs are accessed in KafkaProducerWrapper, you'll see they don't have that whole "brooklin.server" prefix. You can access these the same way.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@somandal I've discussed this with Ahmed. Will push the fixe soon

@@ -142,7 +143,7 @@ private void populateDefaultProducerConfigs() {
DEFAULT_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_VALUE);
}

private Optional<Producer<K, V>> maybeGetKafkaProducer(DatastreamTask task) {
protected Optional<Producer<K, V>> maybeGetKafkaProducer(DatastreamTask task) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see you overriding this in your Bounded implementation, why make this protected?

@@ -246,7 +253,7 @@ private synchronized void shutdownProducer() {
}
}

private DatastreamRuntimeException generateSendFailure(Exception exception) {
protected DatastreamRuntimeException generateSendFailure(Exception exception) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see you overriding this in your Bounded implementation, why make this protected?

@@ -60,7 +61,8 @@

private static final int TIME_OUT = 2000;
private static final int MAX_SEND_ATTEMPTS = 10;
private final Logger _log;

protected final Logger _log;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this with other protected member variables

@@ -72,8 +74,7 @@
// Producer is lazily initialized during the first send call.
// Also, can be nullified in case of exceptions, and recreated by subsequent send calls.
// Mark as volatile as it is mutable and used by different threads
private volatile Producer<K, V> _kafkaProducer;

protected volatile Producer<K, V> _kafkaProducer;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not necessary to mark member fields/methods protected if the extenders of this class live in the same package. This is only useful if you want to make them accessible to extenders in different packages.

Since KafkaProducerWrapper and BoundedKafkaProducerWrapper both live in the same package (com.linkedin.datastream.kafka), all package-private (no modifier) fields/methods in the former are accessible/overridable to/by the latter.

Comment on lines 190 to 195
/**
* There are two known cases that lead to IllegalStateException and we should retry:
* (1) number of brokers is less than minISR
* (2) producer is closed in generateSendFailure by another thread
* (3) For either condition, we should retry as broker comes back healthy or producer is recreated
*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

To Sonam's first point: I think those comments are closely related to the logic within the method since they discuss impl details like exception types.

onComplete.onCompletion(metadata, generateSendFailure(exception));
}
}));
numberOfAttempts++;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

I like the rename (added s) but prefer the prefix form.

In fact, it wouldn't be such a bad idea to turn that loop into a for loop:

for (int numberOfAttempts = 1; retry; ++numberOfAttempts) {
  ...
}

or even eliminate retry entirely:

for (int numberOfAttempts = 1;; ++numberOfAttempts) {
    try {
        maybeGetKafkaProducer(task).ifPresent(p -> doSend(p, producerRecord, onComplete));
        return;
     } catch (...) {
         ...
     }
}

@@ -17,6 +17,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import org.apache.commons.lang.exception.ExceptionUtils;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing wrong with this but I'd recommend lang3; it generally offers more modern facilities (better support for more recent versions of Java) and it's the one we have an explicit dependency on.

This would also entail adding an explicit dependency for this module on it in build.gradle :

project(':datastream-kafka') {
  dependencies {
    ...
    compile "org.apache.commons:commons-lang3:$commonslang3Version"

}
}
}

private synchronized void shutdownProducer() {
void doSend(Producer<K, V> producer, ProducerRecord<K, V> record, Callback callback) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@somandal
I think that's okay; this method's intended to be overriden by another class in the same package. If it's marked protected, it'll be more visible than it needs to be, private wouldn't allow it to be overriden, and public is too permissive.

Comment on lines 61 to 73
private CompletableFuture<RecordMetadata> produceMessage(Producer<K, V> producer, ProducerRecord<K, V> record) {
CompletableFuture<RecordMetadata> future = new CompletableFuture<>();

producer.send(record, (metadata, exception) -> {
if (exception == null) {
future.complete(metadata);
} else {
future.completeExceptionally(new KafkaClientException(metadata, exception));
}
});

return future;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking out loud:

  • This method is doing almost everything we need: it creates a CompletableFuture that is completed if the callback is called (send success/failure).
  • The only missing bit is canceling future after timeout elapses if future.isDone() is false, which can be accomplished with a ScheduledExecutorService. I know this is exactly what CompletableFutureUtils.failAfter() is doing but I think the logic over there is more than what's absolutely necessary; we don't really need the other CompletableFuture failAfter() creates or the additional logic in within(). We can just cancel this same future if it isn't done when timeout elapses.
     scheduler.schedule(() -> {
         if (!future.isDone()) {
             future.cancel();
         }
     }, _sendTimeout, TimeUnit.MILLISECONDS);
  • future.cancel() causes a java.util.concurrent.CancellationException to be thrown, which means we don't have to construct a TimeoutException ourselves because a CancellationException can only mean we cancelled it after the timeout elapsed.

This seems like something this method can do with a private executor service. I am not sure we really need a utils class just for this purpose.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ahmedahamid I just think that cancellation and timeout are semantically different. We may want to cancel the future after timeout in our case, but that's not necessarily true in general. Also, smth may be cancelled without waiting for timeout (based on user input or other external factors). Just thinking out loud. Will see whether I can get rid of the utils.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just think that cancellation and timeout are semantically different.

I didn't mean to suggest we should propagate CancellationException to callback. I was assuming doSend() will still construct a TimeoutException in case future is cancelled. That does sound a bit roundabout though; it would certainly be better to do future.completeExceptionally(new TimeoutException(...)) instead of future.cancel() if future.isDone() is false after timeout.

if (exception == null) {
future.complete(metadata);
} else {
future.completeExceptionally(new KafkaClientException(metadata, exception));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to propagate metadata if exception is null. The javadocs on Callback.onCompletion() state that metadata is null if an error occurred. This would also spare us having to introduce KafkaClientException.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There were tons of checks up the callback chain that are dealing with the metadata and exception. You're probably right, I need to see if it's safe to do so and remove KafkaClientException. I was kind of forced to introduce it in the first place.

synchronized void flush() {
if (_kafkaProducer != null) {
try {
CompletableFutureUtils.within(CompletableFuture.runAsync(() -> _kafkaProducer.flush()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CompletableFuture.runAsync() uses the common pool which isn't the best option for a potentially long blocking call like Producer.flush(). Even if we provide our own thread pool to runAsync(), the CompletableFuture we'll get won't give us a way to interrupt a Producer.flush() call that exceeds the allowed timeout, which is necessary to free up the thread-pool thread in question. This is because calling cancel(true) on a CompletableFuture returned by runAsync() only causes a cancellation exception to be propagated without interrupting the blocked thread pool.

I'm afraid our only option here seems to be using an ExecutorService directly

    // It's okay to use a single thread executor since flush() is synchronized
    ExecutorService executor = Executors.newSingleThreadExecutor();

    Future<?> future = executor.submit(() -> super.flush());
    try {
        // Block until timeout elapses
        future.get(_flushTimeout, TimeUnit.MILLISECONDS);
    } catch (TimeoutException e) {
        ...
        // Interrupt the Producer.flush() call to free up the blocked thread
        future.cancel(true);
        ...
    }

Comment on lines 27 to 28
private static final int DEFAULT_SEND_TIME_OUT = 5000;
private static final int DEFAULT_FLUSH_TIME_OUT = 10 * 60 * 1000;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Please encode time units into variable/config names, e.g. DEFAULT_SEND_TIME_OUT_MS
  2. Use longs for timeout configs (e.g. see existing timeout configs)
  3. If you like, you can use Duration methods to initialize (e.g. Duration.ofSeconds(5).toMillis())

Comment on lines 33 to 34
private int _sendTimeout;
private int _flushTimeout;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. final
  2. Add time unit suffixes (e.g. _sendTimoutMs)

Comment on lines 23 to 26
/**
* An extension of {@link KafkaProducerWrapper} with bounded calls for flush and send
*/
class BoundedKafkaProducerWrapper<K, V> extends KafkaProducerWrapper<K, V> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bounded is a little vague cause it's easy to confuse with buffering. I realize a better name won't be easy. If you can't think of one, just make sure the Javadoc is unambiguous (e.g. with timeouts for flush and send).

@vmaheshw
Copy link
Collaborator

vmaheshw commented Feb 1, 2022

@jzakaryan Do we still need this PR?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants