Render/update thread synchronization (C++11)

> Coding, hacking, computer graphics, game dev, and such...
User avatar
fips
Site Admin
Posts: 170
Joined: Wed Nov 12, 2008 9:49 pm
Location: Prague
Contact:

Render/update thread synchronization (C++11)

Post by fips »

Following the previous post, I'm going to demonstrate a simple way to synchronize two interactive threads, using just the means of the C++11 standard, namely std::thread, std::mutex and std::condition_variable.

The image below shows the setup. Both threads (update and render) run concurrently in an interactive loop, each having its own copy of the command queue. Our aim is to synchronize the threads, so that at the end of each frame (loop), there's a short window (during which the render thread is paused), which then allows to swap the unprotected command queues without any harm (the swap is invoked from the update thread). So in this case, we protect shared resources by thread cooperation rather than by explicit locking.

Image

In the code below, I make use of a simple auto-reset event, which combines std::mutex and std::condition_variable into a single synchronization primitive. This proves to be quite convenient in this particular scenario.

VIEW THE CODE BELOW IN FULL-SCREEN (render_thread_synchronization_sample.cpp)

Code: Select all

/*
(c) 2013 +++ Filip Stoklas, aka FipS, http://www.4FipS.com +++
THIS CODE IS FREE - LICENSED UNDER THE MIT LICENSE
ARTICLE URL: http://forums.4fips.com/viewtopic.php?f=3&t=1081
*/

#include <thread>
#include <condition_variable>
#include <deque>
#include <cassert>

/// An auto-reset event.
class Event
{
 public:

    Event(): _signalled(false) {}

    void notify()
    {
        std::unique_lock<std::mutex> lock(_mutex);
        _signalled = true;
        _condvar.notify_one();
    }

    void wait()
    {
        std::unique_lock<std::mutex> lock(_mutex);
        _condvar.wait(lock, [&] { return _signalled; }); // also takes care about spurious wakeup
        _signalled = false; // auto-reset semantics
    }

 private:

    Event(const Event &); ///< ~ '= delete'.
    Event & operator = (const Event &); ///< ~ '= delete'.

    std::mutex _mutex;
    std::condition_variable _condvar;
    bool _signalled;
};

/// An unprotected shared command queue (resource).
class Command_queue
{
 public:

    enum { exit_command = 42 };

    void push(size_t value)
    {
        printf("+ %u\n", unsigned(value));
        _queue.emplace_back(value);
    }

    size_t pop()
    {
        const size_t value = _queue.front();
        printf("- %u\n", unsigned(value));
        _queue.pop_front();
        return value;
    }

    bool empty() const { return _queue.empty(); }

 private:

    std::deque<size_t> _queue;
};

/// A shared interface (protocol) between the render and update threads.
class Shared_state
{
 public:

    Shared_state() {}

    void notify_render_ready() { _render_ready_event.notify(); }
    void wait_for_render_ready() { _render_ready_event.wait(); }

    void notify_update_ready() { _update_ready_event.notify(); }
    void wait_for_update_ready() { _update_ready_event.wait(); }

    void notify_render_done() { _render_done_event.notify(); }
    void wait_for_render_done() { _render_done_event.wait(); }

    void notify_swap_done() { _swap_done_event.notify(); }
    void wait_for_swap_done() { _swap_done_event.wait(); }

    const Command_queue & update_queue() const { return _update_queue; }
    Command_queue & update_queue() { return _update_queue; }

    const Command_queue & render_queue() const { return _render_queue; }
    Command_queue & render_queue() { return _render_queue; }

    void swap_command_queues() { using namespace std; swap(_update_queue, _render_queue); }

 private:

    Shared_state(const Shared_state &); ///< ~ '= delete'.
    Shared_state & operator = (const Shared_state &); ///< ~ '= delete'.

    Event _render_ready_event;
    Event _update_ready_event;
    Event _render_done_event;
    Event _swap_done_event;

    Command_queue _update_queue;
    Command_queue _render_queue;
};

class Update_thread
{
 public:

    Update_thread(Shared_state &shared):
    _shared(shared),
    _counter(0)
    {
        _thread = std::thread(&Update_thread::loop, this);
    }

    ~Update_thread()
    {
        _thread.join();
    }

 private:

    Update_thread(const Update_thread &); ///< ~ '= delete'.
    Update_thread & operator = (const Update_thread &); ///< ~ '= delete'.

    void loop()
    {
        _shared.notify_update_ready();
        _shared.wait_for_render_ready();
        bool exit_request = false;

        while(!exit_request)
        {
            Command_queue &queue = _shared.update_queue();
            assert(queue.empty());

            for(size_t i = 0; i < 3; ++i, ++_counter)
            {
                queue.push(_counter);
                exit_request = (_counter == Command_queue::exit_command);

                if(exit_request)
                    break;
            }

            _shared.wait_for_render_done();
            {
                // this is the point of safe unprotected sync, the render thread is paused
                printf("swap\n");
                _shared.swap_command_queues();
            }
            _shared.notify_swap_done();

            if(exit_request)
                break;
        }
    }

    Shared_state &_shared;
    std::thread _thread;
    size_t _counter;
};

class Render_thread
{
 public:

    Render_thread(Shared_state &shared):
    _shared(shared)
    {
        _thread = std::thread(&Render_thread::loop, this);
    }

    ~Render_thread()
    {
        _thread.join();
    }

 private:

    Render_thread(const Render_thread &); ///< ~ '= delete'.
    Render_thread & operator = (const Render_thread &); ///< ~ '= delete'.

    void loop()
    {
        _shared.notify_render_ready();
        _shared.wait_for_update_ready();
        bool exit_request = false;

        for(;;)
        {
            Command_queue &queue = _shared.render_queue();

            while(!exit_request && !queue.empty())
            {
                const size_t value = queue.pop();
                exit_request = (value == Command_queue::exit_command);

                if(exit_request) // cooperative exit
                    break;
            }

            _shared.notify_render_done();
            _shared.wait_for_swap_done();

            if(exit_request)
                break;
        }
    }

    Shared_state &_shared;
    std::thread _thread;
};

int main()
{
    Shared_state shared;
    Update_thread updater(shared);
    Render_thread renderer(shared);

    return 0;
}

// Compiled under Visual C++ 2012, output:
// + 0
// + 1
// + 2
// swap
// + 3
// - 0
// + 4
// + 5
// - 1
// - 2
// swap
// + 6
// - 3
// + 7
// + 8
// - 4
// - 5
// swap
// + 9
// - 6
// + 10
// + 11
// - 7
// - 8
// swap
// + 12
// - 9
// + 13
// - 10
// - 11
// + 14
// swap
// + 15
// - 12
// + 16
// + 17
// - 13
// - 14
// swap
// + 18
// - 15
// + 19
// + 20
// - 16
// - 17
// swap
// + 21
// - 18
// + 22
// + 23
// - 19
// - 20
// swap
// + 24
// - 21
// + 25
// + 26
// - 22
// - 23
// swap
// + 27
// - 24
// + 28
// + 29
// - 25
// - 26
// swap
// + 30
// + 31
// + 32
// - 27
// - 28
// - 29
// swap
// + 33
// - 30
// - 31
// - 32
// + 34
// + 35
// swap
// + 36
// - 33
// + 37
// - 34
// + 38
// - 35
// swap
// + 39
// - 36
// + 40
// + 41
// - 37
// - 38
// swap
// + 42
// - 39
// - 40
// - 41
// swap
// - 42