Go-style channels and coroutines in C++
Adventures with C++ coroutines and
Boost.Context continued.
The way of Go
Last time I described simple Python-style generator.
Since then I've been working on something bigger: Go-style coroutines.
In
Go, corotuines are part of the language. A coroutine (
goroutine, as they call it) is started
by applying
go keyword to a function, and blocking, fixed-size
channels are used for synchronization and data transfer.
sample Go code:
c := make(chan int) // Allocate a channel.
// Start the sort in a goroutine; when it completes, signal on the channel.
go func() {
list.Sort()
c <- 1 // Send a signal; value does not matter.
}()
doSomethingForAWhile()
<-c // Wait for sort to finish; discard sent value.
How to do it in C++
This is what we need:
- A scheduler - something that will maintain a thread pool, and a per-thread queue of coroutines
- A monitor - a tool that will allow one or more coroutines to pause (yield) and wait for a signal.
- A channel - inter-thread, multiple-producer-multiple-consumer, fixed size queue. Operations on channel are 'blocking': when coroutine tries to read from empty channel or write to a full one - it yields, and the next one is executed
- Couple of global functions: go and make_channel - to emulate Go.
The code is here:
https://github.com/maciekgajewski/coroutines/tree/0.1
How does it work - a simple example
This is a simple example: parallel file decompression. Full working source code is
here.
The task is to find all xz-compressed files in input directory, and decompress each of them into output directory.
Step 1- setup scheduler, launch first corountines
At first, the program looks for compressed files. For each file it launches a coroutine
process_file, using global> function
go. Apart from being launched this way,
process_file is just a normal C++ function.
The scheduler itself needs to be created explicitly.
set_scheduler sets global variable used by all the global functions.
namespace bfs = boost::filesystem;
scheduler sched(4 /*threads*/); // Go: runtime.MAXPROC(4)
set_scheduler(&sched);
try
{
bfs::path input_dir(in);
bfs::path output_dir(out);
for(bfs::directory_iterator it(input_dir); it != bfs::directory_iterator(); ++it)
{
if (it->path().extension() == ".xz" && it->status().type() == bfs::regular_file)
{
bfs::path output_path = output_dir / it->path().filename().stem();
go(std::string("process_file ") + it->path().string(), process_file, it->path(), output_path);
}
}
}
catch(const std::exception& e)
{
std::cerr << "Error :" << e.what() << std::endl;
}
sched.wait();
set_scheduler(nullptr);
Step 2 - setup pipeline
The code is already concurrent: each file is processed in separate coroutine, concurrently and possibly in parallel.
But we can go further: the process of decompressing file can be split into 3 stages: reading compressed file, decompressing and writing output. Each of the stages is implemented as a separate coroutine.
process_file creates processing pipeline: coroutines connected by channels.
The channels will pass buffers with data. Only fixed amount of buffers is going allocated: this is C++, not some garbage-collected language! The amount of memory used by the program is deterministic. The return channels are needed to circulate buffers.
// used to send blocks of data around. Movable, bot not copytable
class buffer
{
public:
typedef char value_type;
typedef char* iterator;
typedef const char* const_iterator;
// null buffer
buffer() = default;
// alocated buffer
buffer(std::size_t capacity)
: _capacity(capacity), _size(0), _data(new char[_capacity])
{
}
~buffer()
{
}
buffer(const buffer&) = delete;
buffer(buffer&& o) noexcept
{
swap(o);
}
buffer& operator=(buffer&& o)
{
swap(o);
return *this;
}
// iterators
iterator begin() { return _data.get(); }
iterator end() { return _data.get() + _size; }
const_iterator begin() const { return _data.get(); }
const_iterator end() const { return _data.get() + _size; }
// size/capacity
void set_size(std::size_t s) { _size = s; }
std::size_t size() const { return _size; }
std::size_t capacity() const { return _capacity; }
bool is_null() const { return !_capacity; }
// other
void swap(buffer& o) noexcept
{
std::swap(_capacity, o._capacity);
std::swap(_size, o._size);
std::swap(_data, o._data);
}
private:
std::size_t _capacity = 0; // buffer capacity
std::size_t _size = 0; // amount of data in
std::unique_ptr<char[]> _data;
};
And the pipeline is created like this:
void process_file(const bfs::path& input_file, const bfs::path& output_file)
{
channel_pair<buffer> compressed = make_channel<buffer>(BUFFERS, "compressed");
channel_pair<buffer> decompressed = make_channel<buffer>(BUFFERS, "decompressed");
channel_pair<buffer> compressed_return = make_channel<buffer>(BUFFERS, "compressed_return");
channel_pair<buffer> decompressed_return = make_channel<buffer>(BUFFERS, "decompressed_return");
// start writer
go(std::string("write_output ") + output_file.string(),
write_output,
decompressed.reade, decompressed_return.writer, output_file);
// start decompressor
go(std::string("lzma_decompress ") + input_file.string(),
lzma_decompress,
compressed.reader, compressed_return.writer,
decompressed_return.reader, decompressed.writer);
// read (in this coroutine)
read_input(compressed.writer, compressed_return.reader, input_file);
}
Step 3 - read->decompress->write
And this is the final step: reading, decompressing and writing, all in separate coroutines, passing data around using channels.
read_input
void read_input(buffer_writer& compressed, buffer_reader& compressed_return, const bfs::path& input_file)
{
try
{
file f(input_file.string().c_str(), "rb");
unsigned counter = 0;
for(;;)
{
buffer b;
if (counter++ < BUFFERS)
b = buffer(BUFFER_SIZE);
else
b = compressed_return.get(); // get spent buffer from decoder
std::size_t r = f.read(b.begin(), b.capacity());
if (r == 0)
break; // this will close the channel
else
{
b.set_size(r);
compressed.put(std::move(b));
}
}
}
catch(const std::exception& e)
{
std::cerr << "Error reading file " << input_file << " : " << e.what() << std::endl;
}
}
lzma_decompress
void lzma_decompress(
buffer_reader& compressed,
buffer_writer& compressed_return,
buffer_reader& decompressed_return,
buffer_writer& decopressed
)
{
lzma_stream stream = LZMA_STREAM_INIT;
lzma_ret ret = lzma_stream_decoder(&stream, UINT64_MAX, LZMA_CONCATENATED);
if (ret != LZMA_OK)
{
throw std::runtime_error("lzma initialization failed");
}
buffer inbuf;
buffer outbuf = decompressed_return.get(); // get allocated buffer from writer
stream.next_in = nullptr;
stream.avail_in = 0;
stream.next_out = (unsigned char*)outbuf.begin();
stream.avail_out = outbuf.capacity();
while(ret == LZMA_OK)
{
lzma_action action = LZMA_RUN;
// read more data, if input buffer empty
if(stream.avail_in == 0)
{
// return previous used buffer
if (!inbuf.is_null())
compressed_return.put_nothrow(std::move(inbuf));
try
{
// read one
inbuf = compressed.get();
stream.next_in = (unsigned char*)inbuf.begin();
stream.avail_in = inbuf.size();
}
catch(const coroutines::channel_closed&)
{
action = LZMA_FINISH;
}
}
// decompress
ret = lzma_code(&stream, action);
if (stream.avail_out == 0 || ret == LZMA_STREAM_END)
{
outbuf.set_size(stream.next_out - (unsigned char*)outbuf.begin());
// send the buffer, receive an empty one
decopressed.put(std::move(outbuf));
if (ret != LZMA_STREAM_END)
{
outbuf = decompressed_return.get();
stream.next_out = (unsigned char*)outbuf.begin();
stream.avail_out = outbuf.capacity();
}
}
}
lzma_end(&stream);
if (ret != LZMA_STREAM_END)
{
std::cerr << "lzma decoding error" << std::endl;
}
// exit will close all channels
}
write_output
void write_output(buffer_reader& decompressed, buffer_writer& decompressed_return, const bfs::path& output_file)
{
try
{
// open file
file f(output_file.string().c_str(), "wb");
// fill the queue with allocated buffers
for(unsigned i = 0; i < BUFFERS; i++)
{
decompressed_return.put(buffer(BUFFER_SIZE));
}
for(;;)
{
buffer b = decompressed.get();
f.write(b.begin(), b.size());
decompressed_return.put_nothrow(std::move(b)); // return buffer to decoder
}
}
catch(const channel_closed&) // this exception is expected when channels are closed
{
}
catch(const std::exception& e)
{
std::cerr << "Error writing to output file " << output_file << " : " << e.what() << std::endl;
}
}
And this is it: concurrent, parallel processing without a single mutex!
What next
This one was easy - some processing, file IO, nothing fancy. To make it really useful, network IO is needed: set of socket operations which would seem like they are blocking from coroutine's perspective, but in fact they would use event loop and context switching to provide concurrency.
I'm working on it. Watch this space!