As highlighted in our previous articles, the gateway plays a crucial role in receiving responses from Query API Gateway, interacting with multiple workers to gather data, and performing operations on them when needed (such as returning the top 100 records or reducing the data to a single value, or the total income for a company over a given period). The gateway also plays a key role in supporting SQL syntax, allowing you to connect using any MySQL connector and send queries.
Some queries, like SELECT, can limit the amount of returned data (e.g., SELECT * FROM Customers LIMIT 1000;), so we want to fetch only a subset of the data. An effective approach is to communicate the desired number of records with the workers. Yet, in cases of intricate queries, determining the precise data requirement upfront may not be possible. To address this uncertainty, we implement streaming. By utilizing streaming, we can extract data in manageable chunks, ensuring we retrieve the necessary information without the need to know the total size beforehand.
Streaming proves useful in managing complex analytic queries, where the combined response from all workers could be several gigabytes. Rather than risking memory overflow by trying to fetch all the data at once, we can retrieve the data in smaller chunks, perform reductions on each chunk, and then fetch the next chunk when needed.
In this article, I’ll demonstrate how to implement a server and client for streaming data. We'll kick things off with a straightforward solution before diving into a more advanced coroutine mechanism featuring a generator.
First, we need to define a protobuf structure for streaming. We can reuse the existing Request and Response objects from the Handle method. To enable streaming, we simply add the stream keyword before the Response in the method definition. Once this structure is defined, we can initiate streaming. Initially, we send a Request, and then, for each read operation, we receive a Response. In this case, each Response represents a chunk of data.
message Request {
string message = 1;
int id = 2;
}
message Response {
string message = 1;
}
service Worker {
rpc Stream (Request) returns (stream Response) {}
}
Listing 1. Example protobuf structure for a streaming request named Stream.
After defining the protobuf, we can set up a basic streaming mechanism. First, let's examine how we can implement this synchronously. While this approach works, it's inefficient because each Read method blocks the thread. If we need to read data from 2,160 workers, this results in frequent thread context switching, which can significantly impact performance.
auto client = Worker::NewStub(grpc::CreateChannel(url,
grpc::InsecureChannelCredentials()));
Request request;
Response response;
// Set some request
request.set_request(message);\
// Create a reader object that will allow to read chunks
auto reader = client->Stream(&context, request);
// Read data until server close connection
while (reader->Read(&response)) {
std::cout << response.response() << std::endl;
}
Listing 2. Simple streaming method
To optimize the communication for streaming, we will switch to an asynchronous model using coroutines. By adopting this approach, we can avoid blocking entire threads and instead suspend the coroutine during data retrieval, resuming once the desired results when they are ready. Here’s how we can implement this:
Listing 3 illustrates modification of the RequestContext class to handle streaming data asynchronously. By adopting this method, the client can manage multiple data chunks as they arrive, all without hindering threads. The co_await mechanism will be used to suspend the coroutine while waiting for data, and once the data arrives, the coroutine will resume.
Class RequestContxet {
public:
RequestContxet(/* all neccessary data */);
// Return current awaitable. We need to remember
// that this awaitable will be destroyed when we call
// `readNext` method. So, we should carefully
// use this object. Later I will show you a better solution.
Awaitable<Response>& awaitable() { return *_awaitable; }
// Return current response, each time we call `readNext`
// it will be replaced with a new data
const Response& currentChunk { return _response; }
// Initialize a streaming
void asyncStream() {
// Create first awaitable to wait until streaming start
_awaitable = std::make_shared<Awaitable<Response>>();
_context = std::make_unique<grpc::ClientContext>();
_reader = _client.AsyncStream(&context, request, _cq,
reinterpret_cast<void*>(this));
}
// Ask for another chunk and return a corresponding
// awaitable object, on which we can call `co_await`
// to suspend coroutine.
std::shared_ptr<Awaitable<Response>> readNext() {
// override previous awaitable
_awaitable = std::make_shared<Awaitable<Response>>();
// ask about next chunk
_reader->Read(&response, reinterpret_cast<void*>(this));
return _awaitable;
}
std::shared_ptr<Awaitable<Response>> finish() {
// override previous awaitable
_awaitable = std::make_shared<Awaitable<Response>>();
// send info that we want to stop streaming
_reader->Finish(&_status, reinterpret_cast<void*>(this));
return _awaitable;
}
private:
std::shared_ptr<Awaitable<Response>> _awaitable;
std::unique_ptr<grpc::ClientContext> _context;
grpc::Status _status;
Response _response;
std::unique_ptr<grpc::ClientAsyncReader<Response> _reader;
};
Listing 3. RequestContext class
After refactoring the RequestContext class, we can now initiate a stream and read as many data chunks as needed, or until the server sends everything. Since we will be overriding the awaitable object with each request (asyncStream, readNext, and finish), it is important to use a std::shared_ptr to ensure safety. Failing to do so could result in a dangling pointer situation when the object is overwritten. However, we have a more efficient solution for this challenge, which we will discuss later.
Now, let's take a look at how we should update the Worker class to support the streaming functionality. The key changes revolve around managing multiple read operations and effectively handling the lifecycle of the request.
class Worker {
public:
init(grpc::CompletionQueue* cq);
// Return an *RequestContext* object, that allow
// to get current *awaitable* object
// and read streaming data. This is a temporary
// solution which is quite bad, because
// this object will be destroyed by CompletionQueue,
// and we can have a dangling pointer
// Later I will show you how we can hide this object
// and make streaming better
RequestContext* stream(const Request& request);
private:
// Identified worker
WorkerID id;
// Channel for sending a requests
std::unique_ptr<Worker::Stub> _client;
// CompletionQueue on which we push a request.
grpc::CompletionQueue* _cq;
};
Listing 4. Worker class
The main drawback of returning RequestContext directly is that it exposes implementation details to the user, which we previously kept hidden. When dealing with a single request, the user remains unaware of this object's presence. Additionally, the issue of dangling pointers arises when the CompletionQueue disposes of the object at the end of streaming, requiring caution in its usage. Once the streaming concludes and the stop signal is received, the pointer will become invalid. In the upcoming Server implementation, I will demonstrate how we can use std::shared_ptr to eliminate manual memory management. But before that, let’s first explore how we can implement the stream method for the worker.
RequestContext* stream(const Request& request) {
// We need to manually manage the lifetime of request.
// That's why we put this class
// in *asyncHandle* request as a *tag*.
// When request finish, we will get in from
// completion queue and we can delete it.
RequestContext* requestContext =
new RequestContext(/* all necessary data */);
requestContxet->asyncStream();
return requestContxet;
}
Listing 5. Reading streaming data.
After initiating a streaming request, we create a RequestContext, which generates the first awaitable. Access to this awaitable is granted by calling the awaitable() method, allowing for the wait as the streaming commences. Although not ideal, this method suffices for now. Once we explore generators, we’ll replace it with a more efficient approach. Next, when data needs to be read, the readNext method is called. Each invocation of readNext returns an Awaitable for the Response object. When starting the streaming, the response will be empty because no data has been read yet.
Task<void> streamSomething(const std::string& messageRequest) {
Request request;
request.set_message(messageRequest);
RequestContext* context =
workerManager.requestStream(WorkerId(42), request);
// First awaitable will notify about start streaming.
auto startStreamingAwaitable = context->awaitable();
co_await *startStreamingAwaitable;
while (true) {
auto awaitable = context->readNext();
// suspend coroutine until new chunk is available.
const auto chunk = co_await *awaitable;
if (chunk->response() == "FINISH") {
// Server finish, so we also should finish.
// Ofc. This is bad to depends on such string :)
// I fix it later.
co_return;
}
// do sth with chunk
doSth(chunk);
}
co_return;
}
Listing 6. Use a RequestContext class to read data
The streamSomething function demonstrates how we can use RequestContext to read data. However, there's an issue with comparing the result to the string "FINISH" as it may pose a risk if the value "FINISH is valid message in the stream. To address this, we need to make some adjustments to the Response object. It should be a std::optional or std::unique_ptr, and when the streaming is finished, we should return std::nullopt or nullptr to signal the end of the stream. I’ll leave this modification up to you. For now, let’s move forward and explore a more solution for handling the coroutine part.
Starting with C++20, it introduced support for coroutines, which allow writing asynchronous code in a more natural, sequential style. In C++23, the standard library introduced std::generator, a coroutine-based generator type. However, since we are assuming the use of only C++20 (Ubuntu requirements), we will need to implement our own generator mechanism.
A coroutine generator is essentially a coroutine that can produce a series of values over time, each time it's awaited. Instead of returning a single value, a generator yields multiple values across suspensions and resumptions, making it suitable for streaming data or lazy evaluation. Let's look at how we can implement our own generator based on C++20 coroutine functionality.
In C++20, coroutines are based on the co_await, co_return, and co_yield keywords. To implement a generator, we'll need to define a coroutine that yields values one at a time and allows the consumer to await the next value when ready.
template <typename T>
struct generator {
struct promise_type;
using handle_type = std::coroutine_handle<promise_type>;
struct promise_type {
T currentValue;
std::exception_ptr exception;
generator get_return_object() {
return generator(handle_type::from_promise(*this));
}
std::suspend_always initial_suspend() { return {}; }
std::suspend_always final_suspend() { return {}; }
void return_void() {}
std::suspend_always yield_value(const T& val) {
currentValue = val;
return {};
}
void unhandled_exception() {
// Store exception for further use
exception = std::current_exception();
}
};
handle_type h_;
generator(handle_type h) : h_(h) {}
~generator() {
if (h_) {
h_.destroy();
}
}
bool getNext() {
h_.resume();
if (h_.promise().exception) {
// propagate coroutine exception in called context
std::rethrow_exception(h_.promise().exception);
}
return !h_.done();
}
T value() {
return h_.promise().currentValue ;
}
};
Listing 7. Simple coroutine implementation
Now that we have our generator class, we can use it in the following way (listing 8).
generator<int> getValues(int n) {
for (int i = 0; i <= n; ++i) {
co_yield i; // Yield values one at a time
}
}
int main() {
auto gen = getValues(5);
while (gen.getNext()) {
std::cout << gen.value() << std::endl;
}
return 0;
}
Listing 8. Simple usage of generic coroutine generator.
Now that we've set up a basic coroutine generator capable of yielding values and managing exceptions, let's implement a simple streaming function. This function will use our generator to produce n values.
The key idea is that any function that uses the co_yield operator is a generator function. This allows us to create multiple objects or values as part of a single function call. When we want to stop producing values, we use the co_return operator to stop the coroutine. The operator bool will indicate whether there are more values to produce, and it will return false when the generator has finished.
We'll also introduce a full_ member, which acts as a safeguard to ensure that the coroutine doesn't resume twice when we've already reached the end of the stream.
emplate <typename T>
struct generator {
struct promise_type;
using handle_type = std::coroutine_handle<promise_type>;
struct promise_type {
T current_value;
// To track if the coroutine has already been resumed once
bool full_ = false;
std::exception_ptr exception;
generator get_return_object() {
return generator(handle_type::from_promise(*this));
}
std::suspend_always initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void return_void() {}
std::suspend_always yield_value(const T& val) {
current_value = val;
return {};
}
void unhandled_exception() {
exception = std::current_exception(); }
};
generator(handle_type h) :
_h(h) {
}
~generator() {
if (_h) {
_h.destroy();
}
}
operator bool() {
// The only way to check if generator is done
// is to resume it and check if there is sth more
fill();
return !_h.done();
}
T operator()() {
fill();
if (_h.promise().exception) {
std::rethrow_exception(_h.promise().exception);
}
full_ = false;
return _h.promise().current_value;
}
private:
void fill() {
if (!full_ && _h) {
_h.resume();
full_ = true;
}
}
handle_type _h;
bool full_ = false;
};
// Generator function that simulates streaming of `n` values
generator<int> streamSomething(int n) {
for (int i = 0; i < n; ++i) {
co_yield i; // Yielding values one at a time
}
co_return; // End of the generator when all values are produced
}
int main() {
// Request 5 values from the stream
auto gen = streamSomething(5);
// Streaming values, checking for completion
while (gen) {
// Produce and print the value
std::cout << "Received: " << gen() << std::endl;
}
std::cout << "Stream finished!" << std::endl;
return 0;
}
Listing 9. Example generator function.
streamSomething Function:
Main Loop:
Now that we have everything in place, we can implement streaming functionality. The stream function should remain unaware of the details of coroutines. Instead, we invoke operator() on the generator object to produce a new value. When streaming stops, bool() operator will return false to indicate that there is no more data to yield. If we decide to terminate the streaming early, we simply return from the function, and the generator will be destroyed, thereby halting the streaming process. We need to implement similar logic in the coroutine to support stopping the streaming, just like we did in the finish method of the RequestContext class (refer to Listing 3).
At this point, we are almost ready to use the generator for streaming data. However, there is one key missing piece. The stream function from Listing 9 does not wait for data—it simply produces values. To address this, we need to introduce an awaitable object, which will allow us to co_await until the next chunk of data is ready. In the improved implementation shown in Listing 10, we wait for the data to become available before calling co_yield to return it.
Generator<Response> Worker::stream(
const std::string& messageRequest) {
// Create first awaitable to wait until streaming start.
_awaitable =
std::make_shared<Awaitable<std::optional<Response>>>();
// first we need to initialize a streaming before
// we return a first value
Request request;
request.set_message(messageRequest);
_context = std::make_unique<grpc::ClientContext>();
_reader = _client.AsyncStream(&context, request, _cq,
reinterpret_cast<void*>(this));
// Here we hide information that we need to wait
// for the start of streaming.
// End user will get a generator that is ready to read data
co_await *_awaitable;
// Ask about new chunk until server will finish streaming
while (true) {
// create awaitable that will wait for result.
// Here we also hide coroutine details from the user.
_awaitable =
std::make_shared<Awaitable<std::optional<Response>>>();
// ask about next chunk
_reader->Read(&response, reinterpret_cast<void*>(this));
// Suspend until we get a new chunk
auto chunk = co_await *_awaitable;
// If we get *std::nullopt* it's mean streaming finished
if (!chunk) {
co_return;
}
// If we get a value return, a new chunk for a user
co_yield chunk;
}
}
Listing 10. Generator which suspends until new data appear.
With the current setup, we have devised elegant mechanism for streaming data, effectively concealing all implementation complexities from the end user. Users are not required to have any knowledge of RequestContext or initialization of streaming and waiting for completion of the first awaitable object.. Their only task is to call co_await to obtain the values. Additionally, users don't have to manage or even be aware of the awaitables themselves, as the generator handles the waiting for data and then co_awaiting it when it becomes available.
From the user's perspective, the process is simplified: they just call co_await operator(), and the value is produced, without the need for thread switching. The coroutine is suspended and resumed as needed, facilitating efficient data streaming.
In this implementation, I will demonstrate how to set up an asynchronous mechanism without manually allocating and freeing memory. We'll use shared_ptr to manage memory automatically. Additionally, I'll show you how to implement this without relying on coroutines, because in scenarios with only a few simultaneous requests, thread switching can become a bottleneck. For example, when a gateway opens 10 streams per worker, that leads to 10 * 2160 = 21,600 parallel requests, and frequent thread switching can slow things down significantly. However, if a single worker only opens 10 streams, thread switching becomes negligible and performance is less affected.
We will extend the previously implemented Handler class by adding a new method, InitializeStreamRequest. This method will create a dedicated StreamContext object responsible for managing the entire streaming process. Since streaming involves more complexity than a single request, we need to handle the following states: Pending, Sending, Finish, and AsyncDone.
Since we don't want to manually manage the lifetime of objects, we've decided to use a reference-counting mechanism. C++ provides std::shared_ptr, which automatically tracks references to an object and destroys it when the last reference is released. This mechanism is thread-safe, making it ideal for our use case. Each state will be represented by a HandlerTag object, and each tag will hold its own copy of the shared_ptr to the StreamContext. When the last HandlerTag is processed, the request will no longer be in use and will be automatically freed, thanks to the shared_ptr's reference counting and automatic memory management.
class RequestContext : public
std::enable_shared_from_this<RequestContext> {
public:
using Callback =
std::function<void(std::shared_ptr<RequestContext>)>;
RequestContext(grpc::ServerCompletionQueue* cq,
Callback callback,
Worker::AsyncService* service);
// Virtual D’tor and rest of rule of 5
virtual void run();
virtual void initRequest(const std::string& requestName);
virtual void finish(const QByteArray& result) = 0;
private:
HandlerTag _pendingTag;
HandlerTag _finishTag;
HandlerTag _doneTag;
// other memebers
};
Listing11. Request context class
gRPC allows for the initialization of one request at a time. While it can handle multiple requests simultaneously, the initialization process needs to be performed sequentially. Therefore, the developer must ensure thread safety and ensure that RequestStream is called only once at a time. When the server receives a request, it will be notified through the completion queue (CQ) by receiving the corresponding tag that was provided during initialization. Only once this process is complete can RequestStream be called again for the next request.
Sending chunks can occur simultaneously, as multiple threads can send their own data without needing to synchronize with each other. Each time we want to send a new chunk, we create a SendingTag, which will be triggered once the receiver gets the message. To maximize throughput and minimize waiting time, we will always prepare the next chunk to send while waiting for the acknowledgment of the previous one. To facilitate this, we will store two tags at a time: the current tag and the previous tag. This allows us to manage the sending process efficiently and continue streaming without unnecessary delays.
Based on the information you provided, the server has two options when ending a stream: the Finish method and the WriteAndFinish method. The first approach simply closes the stream with a status code (either OK or an error). The second approach sends the last chunk of data and then closes the stream with the appropriate status code. If streaming finishes normally, after all data has been sent, we call Finish. However, if an error occurs during streaming, we use WriteAndFinish to send the final message, ensuring that the last message sent is an error message, signaling the issue to the client before closing the stream.
With our server now equipped with a fully asynchronous streaming process, complete with error handling and automatic memory management, we encounter a pivotal question: What happens if the client decides to interrupt the streaming? The client can decide at any moment that it has received all the data and wants to stop the stream. This can be done by calling try_cancel on the clientContext. Once the client interrupts the streaming, the server will receive an AsyncDone tag. At this point, the server should stop sending new chunks and close the stream. This is why we need both AsyncDone and FinishTag:
void StreamContext::sendRow(std::unique_ptr<HandlerTag>&& sendTag) {
try {
if (const auto row = _stream->read()) {
// Send next chunk
Response response;
response.set_response(*row);
_prevTag = std::move(_sendTag);
_sendTag = std::move(sendTag);
_responder.Write(response, (void*)_sendTag.get());
return;
}
// Finish streaming
_responder.Finish(grpc::Status::OK, (void*)&_finishTag);
} catch (const Exception& ec) {
terrarium::Response response;
response.set_response(ec.toJson());
_responder.WriteAndFinish(response, {}, grpc::Status::OK,
(void*)&_finishTag);
}
}
Listing 12. Send row method
Conclusion
In this article, we explored how to enhance the efficiency of streaming and request handling in a distributed system using gRPC and coroutines. By leveraging the power of asynchronous programming and streaming, we demonstrated how to optimize the handling of large datasets and complex queries. Through coroutines, we avoided the pitfalls of thread-blocking, ensuring that the system remains responsive even under heavy load. Additionally, our introduction of coroutine-based generators offers a notable advantage, simplifying the management of streaming data and facilitating request handling without the need for intricate thread management strategies.
On the server side, we implemented a robust memory management strategy using std::shared_ptr, which ensures that resources are properly allocated and deallocated without manual intervention. This allowed us to handle multiple streaming requests simultaneously without performance degradation, while also enabling the system to respond quickly to client interruptions and errors.
Through the integration of these techniques, we've engineered a scalable and efficient solution tailored for managing high-volume, real-time data streams. Whether tasked with handling millions of records or intricate analytic queries, this approach steadfastly guarantees the delivery of results without overwhelming the system or compromising performance.
As we continue to build upon these foundations, there are numerous opportunities for further optimization, including fine-tuning the coroutine mechanisms and exploring alternative data streaming paradigms. However, the key takeaway is that by using gRPC with coroutines and streaming, we can significantly improve both the efficiency and flexibility of distributed systems, making them more capable of handling modern, data-intensive workloads.
Mateusz Adamski, Senior C++ developer at Synerise