2013-10-04

Go-style channels and coroutines in C++

Go-style channels and coroutines in C++

Adventures with C++ coroutines and Boost.Context continued.

The way of Go

Last time I described simple Python-style generator. Since then I've been working on something bigger: Go-style coroutines. In Go, corotuines are part of the language. A coroutine (goroutine, as they call it) is started by applying go keyword to a function, and blocking, fixed-size channels are used for synchronization and data transfer. sample Go code:
 c := make(chan int)  // Allocate a channel.
// Start the sort in a goroutine; when it completes, signal on the channel.
go func() {
    list.Sort()
    c <- 1  // Send a signal; value does not matter.
}()
doSomethingForAWhile()
<-c   // Wait for sort to finish; discard sent value.

How to do it in C++

This is what we need:
  • A scheduler - something that will maintain a thread pool, and a per-thread queue of coroutines
  • A monitor - a tool that will allow one or more coroutines to pause (yield) and wait for a signal.
  • A channel - inter-thread, multiple-producer-multiple-consumer, fixed size queue. Operations on channel are 'blocking': when coroutine tries to read from empty channel or write to a full one - it yields, and the next one is executed
  • Couple of global functions: go and make_channel - to emulate Go.
The code is here: https://github.com/maciekgajewski/coroutines/tree/0.1

How does it work - a simple example

This is a simple example: parallel file decompression. Full working source code is here. The task is to find all xz-compressed files in input directory, and decompress each of them into output directory.

Step 1- setup scheduler, launch first corountines

At first, the program looks for compressed files. For each file it launches a coroutine process_file, using global> function go. Apart from being launched this way, process_file is just a normal C++ function.
The scheduler itself needs to be created explicitly. set_scheduler sets global variable used by all the global functions.
    namespace bfs = boost::filesystem;
    scheduler sched(4 /*threads*/); // Go: runtime.MAXPROC(4)
    set_scheduler(&sched);

    try
    {
        bfs::path input_dir(in);
        bfs::path output_dir(out);


        for(bfs::directory_iterator it(input_dir); it != bfs::directory_iterator(); ++it)
        {
            if (it->path().extension() == ".xz" && it->status().type() == bfs::regular_file)
            {
                bfs::path output_path = output_dir / it->path().filename().stem();
                go(std::string("process_file ") + it->path().string(), process_file, it->path(), output_path);
            }

        }

    }
    catch(const std::exception& e)
    {
        std::cerr << "Error :" << e.what() << std::endl;
    }

    sched.wait();
    set_scheduler(nullptr);

Step 2 - setup pipeline

The code is already concurrent: each file is processed in separate coroutine, concurrently and possibly in parallel.
But we can go further: the process of decompressing file can be split into 3 stages: reading compressed file, decompressing and writing output. Each of the stages is implemented as a separate coroutine. process_file creates processing pipeline: coroutines connected by channels.
The channels will pass buffers with data. Only fixed amount of buffers is going allocated: this is C++, not some garbage-collected language! The amount of memory used by the program is deterministic. The return channels are needed to circulate buffers.
// used to send blocks of data around. Movable, bot not copytable
class buffer
{
public:

    typedef char value_type;
    typedef char* iterator;
    typedef const char* const_iterator;

    // null buffer
    buffer() = default;

    // alocated buffer
    buffer(std::size_t capacity)
    : _capacity(capacity), _size(0), _data(new char[_capacity])
    {
    }

    ~buffer()
    {
    }

    buffer(const buffer&) = delete;
    buffer(buffer&& o) noexcept
    {
        swap(o);
    }

    buffer& operator=(buffer&& o)
    {
        swap(o);
        return *this;
    }

    // iterators
    iterator begin() { return _data.get(); }
    iterator end() { return _data.get() + _size; }
    const_iterator begin() const { return _data.get(); }
    const_iterator end() const { return _data.get() + _size; }

    // size/capacity
    void set_size(std::size_t s) { _size = s; }
    std::size_t size() const { return _size; }
    std::size_t capacity() const { return _capacity; }

    bool is_null() const { return !_capacity; }

    // other
    void swap(buffer& o) noexcept
    {
        std::swap(_capacity, o._capacity);
        std::swap(_size, o._size);
        std::swap(_data, o._data);
    }

private:

    std::size_t _capacity = 0;  // buffer capacity
    std::size_t _size = 0;      // amount of data in
    std::unique_ptr<char[]> _data;
};
And the pipeline is created like this:
void process_file(const bfs::path& input_file, const bfs::path& output_file)
{
    channel_pair<buffer> compressed = make_channel<buffer>(BUFFERS, "compressed");
    channel_pair<buffer> decompressed = make_channel<buffer>(BUFFERS, "decompressed");
    channel_pair<buffer> compressed_return = make_channel<buffer>(BUFFERS, "compressed_return");
    channel_pair<buffer> decompressed_return = make_channel<buffer>(BUFFERS, "decompressed_return");


    // start writer
    go(std::string("write_output ") + output_file.string(),
        write_output,
        decompressed.reade, decompressed_return.writer, output_file);

    // start decompressor
    go(std::string("lzma_decompress ") + input_file.string(),
        lzma_decompress,
        compressed.reader, compressed_return.writer,
        decompressed_return.reader, decompressed.writer);

    // read (in this coroutine)
    read_input(compressed.writer, compressed_return.reader, input_file);
}

Step 3 - read->decompress->write

And this is the final step: reading, decompressing and writing, all in separate coroutines, passing data around using channels.
read_input
void read_input(buffer_writer& compressed, buffer_reader& compressed_return, const bfs::path& input_file)
{
    try
    {
        file f(input_file.string().c_str(), "rb");

        unsigned counter = 0;
        for(;;)
        {
            buffer b;
            if (counter++ < BUFFERS)
                b = buffer(BUFFER_SIZE);
            else
                b = compressed_return.get(); // get spent buffer from decoder
            std::size_t r = f.read(b.begin(), b.capacity());
            if (r == 0)
                break; // this will close the channel
            else
            {
                b.set_size(r);
                compressed.put(std::move(b));
            }
        }
    }
    catch(const std::exception& e)
    {
        std::cerr << "Error reading file " << input_file << " : " << e.what() << std::endl;
    }
}
lzma_decompress
void lzma_decompress(
    buffer_reader& compressed,
    buffer_writer& compressed_return,

    buffer_reader& decompressed_return,
    buffer_writer& decopressed
)
{
    lzma_stream stream = LZMA_STREAM_INIT;
    lzma_ret ret = lzma_stream_decoder(&stream, UINT64_MAX, LZMA_CONCATENATED);
    if (ret != LZMA_OK)
    {
        throw std::runtime_error("lzma initialization failed");
    }

    buffer inbuf;
    buffer outbuf = decompressed_return.get(); // get allocated buffer from writer

    stream.next_in = nullptr;
    stream.avail_in = 0;
    stream.next_out = (unsigned char*)outbuf.begin();
    stream.avail_out = outbuf.capacity();


    while(ret == LZMA_OK)
    {
        lzma_action action = LZMA_RUN;

        // read more data, if input buffer empty
        if(stream.avail_in == 0)
        {
            // return previous used buffer
            if (!inbuf.is_null())
                compressed_return.put_nothrow(std::move(inbuf));
            try
            {
                // read one
                inbuf = compressed.get();
                stream.next_in = (unsigned char*)inbuf.begin();
                stream.avail_in = inbuf.size();
            }
            catch(const coroutines::channel_closed&)
            {
                action = LZMA_FINISH;
            }
        }

        // decompress
        ret = lzma_code(&stream, action);
        if (stream.avail_out == 0 || ret == LZMA_STREAM_END)
        {
            outbuf.set_size(stream.next_out - (unsigned char*)outbuf.begin());
            // send the buffer, receive an empty one
            decopressed.put(std::move(outbuf));

            if (ret != LZMA_STREAM_END)
            {
                outbuf = decompressed_return.get();
                stream.next_out = (unsigned char*)outbuf.begin();
                stream.avail_out = outbuf.capacity();
            }
        }

    }

    lzma_end(&stream);

    if (ret != LZMA_STREAM_END)
    {
        std::cerr << "lzma decoding error" << std::endl;
    }
    // exit will close all channels
}
write_output
void write_output(buffer_reader& decompressed, buffer_writer& decompressed_return, const  bfs::path& output_file)
{
    try
    {
        // open file
        file f(output_file.string().c_str(), "wb");

        // fill the queue with allocated buffers
        for(unsigned i = 0; i < BUFFERS; i++)
        {
            decompressed_return.put(buffer(BUFFER_SIZE));
        }

        for(;;)
        {
            buffer b = decompressed.get();
            f.write(b.begin(), b.size());
            decompressed_return.put_nothrow(std::move(b)); // return buffer to decoder
        }
    }
    catch(const channel_closed&) // this exception is expected when channels are closed
    {
    }
    catch(const std::exception& e)
    {
        std::cerr << "Error writing to output file " << output_file << " : " << e.what() << std::endl;
    }
}
And this is it: concurrent, parallel processing without a single mutex!

What next

This one was easy - some processing, file IO, nothing fancy. To make it really useful, network IO is needed: set of socket operations which would seem like they are blocking from coroutine's perspective, but in fact they would use event loop and context switching to provide concurrency.

I'm working on it. Watch this space!

No comments:

Post a Comment