In the previous blog post on Ruby concurrency, we explored how to spawn many sub-threads with a concurrent-ruby gem. Let's continue the topic and get to know the Future
class much better: its life cycle, state methods, and strategies for error handling.
TL;DR: There is a great documentation page
States of the future
We can use the Concurrent::Future
object to spawn an asynchronous execution of the code. The preferable way of doing it utilizes the Promise
class and future
factory method:
future = Concurrent::Promises.future { a_long_thing_to_do }
Upon creation, future
starts in the pending state. When it finishes, it becomes resolved. This state has two sub-states to inform if the result was successful or not. Accordingly: fulfilled or rejected.
By the way, you can even create an already-resolved future
!
To fetch the result of the future
, we can use the value
method. This method blocks and waits for the future
to be resolved. Before future
resolves, value
is nil
, but then it becomes the result of the operation. In case of failure, value
stays nil
and we can use an additional method: reason
to find out the error. And speaking about errors...
Errors in the future
Exceptions raised inside futures are silent. They are not transmitted into the main thread unless explicitly specified. When an exception occurs, the future's value
remains nil
. Which alone is not enough to distinguish between failure and an unresolved future. Therefore we need to inspect the internal state of the future using methods like rejected?
or fulfilled?
.
In production, it's crucial to have better control and observability. So we can add a rescue
inside future block execution to act when something wrong happens.
Alternatively, you can force the future to raise an exception and transmit it into the main thread. This behavior is achieved with the value!
method.
Last but not least, rescuing errors in async threads requires rethinking the desired behavior of the program in case of trouble. In some cases, it's acceptable to let the process fail; in others, you may want to retry the operation. In most cases, we want to at least know if something goes wrong within a sub-thread. Let's see it in action!
Example implementation of errors handling
We will use an example script from the previous blogpost - a service class that makes async requests to API and returns results. Code examples are also available in my GitHub repository
require 'net/http'
require 'json'
require 'concurrent'
class AsyncConcurrentService
API_ENDPOINT = 'https://api.chucknorris.io/jokes/random'.freeze
def self.call(requests_count)
jokes = Concurrent::Array.new
futures = requests_count.times.map do
Concurrent::Promises.future { DataRequester.new(API_ENDPOINT).call }
end
jokes = futures.map(&:value)
end
end
class ApiError < StandardError; end
class DataRequester
def initialize(url)
@url = url
end
def call
raise ApiError, 'ERROR' if rand > 0.8 # 80% chance of success
response = make_request
parse_response(response)
end
def make_request
uri = URI(@url)
response = Net::HTTP.get_response(uri)
response.body if response.is_a?(Net::HTTPSuccess)
end
def parse_response(response)
# parsing response
end
end
To test program behavior in case of errors, we added raise ApiError, 'ERROR' if rand > 0.8
. We assume that in 20% of cases, the sub-thread will raise an error. With the current implementation, the thread will silently fail and return nil
as a value. So the good news is that the program still works and finishes without exceptions. Bad news: it works in 80% of cases.
Let's update the service to ensure we fetch the desired number of jokes, no matter how many errors were returned by API.
require 'net/http'
require 'json'
require 'concurrent'
class AsyncConcurrentErrorsRescueService
API_ENDPOINT = 'https://api.chucknorris.io/jokes/random'.freeze
def self.call(requests_count)
futures = requests_count.times.map { future_with_retry(3) }
jokes = Concurrent::Promises.zip(*futures).value
jokes.compact
end
def self.future_with_retry(attempts = 3)
Concurrent::Promises.future do
DataRequester.new(API_ENDPOINT).call
end.rescue(ApiError) do |_error|
future_with_retry(attempts - 1).value if attempts > 1
end
end
end
# rest of the code the same
There are a few key changes:
future_with_retry
is a method that wraps the execution of a future and retries up to 3 times in case of errors. There is arescue
block to capture anyApiError
exceptions. Notice that it is placed after the future block to avoid spawning new sub-threads within the existing future. After 3 failed attempts, it returnsnil
.Concurrent::Promises.zip
is used to ensure all sub-threads finish before the maincall
function returns their results.
This approach makes future_with_retry
method responsible for making API requests retries in case of errors. It could be improved for production usage by adding errors logging and introducing a delay between retries.
Summary
Concurrent-ruby's Future
class is a powerful tool for creating multi-threaded programs. Understanding its life cycle and helper methods is a good starting point before going to more complex topics such as chaining and using non-blocking methods. And let's not forget about proper error handling!