Read-Write Lock


書き込みを行っている場合にだけ排他制御を行うパターン。
Boost には read_write_mutex というのがあって、これを使えば Read-Write Lock を実現することがでk……ってバージョン1.34.1 だとドキュメントにはあるのにファイルが無いよ!消えてるよ!
どうやら read_write_mutex にバグがあったみたいで、今は消えてしまっているようです。
→追記: バージョン 1.35.0 以降からは boost::shared_mutex というのが用意されているので、それを使えばよさそうです。


ということで実現するためにはちゃんとサンプル通りの read_write_mutex(サンプルでは ReadWriteLock という名前だったのだけれども、namespace と被ってしまったので read_write_mutex という名前にしている)を実装してやる必要がありそうです。


ということで以下は増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編のサンプルを Boost.Thread を使って書いたコード。


scoped_read_lock や scoped_write_lock といったデストラクタで確実に Before/After パターンの After 処理を行えるクラスが使えるのが C++ の特徴ですね。
ただまあ、やっぱり finally 節は欲しいですが……。


ちなみに thread_helper::sleep() というのは、sleep 処理を長々と書くのが面倒になったので mtdp ネームスペースにヘルパ用のクラスを作っておきました。


main.cpp

#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/bind.hpp>
#include "data.h"
#include "reader_thread.h"
#include "writer_thread.h"

namespace mtdp{ namespace read_write_lock
{
    void main()
    {
        boost::shared_ptr<data> d(new data(10));
        boost::thread_group group;
        group.create_thread(boost::bind(&reader_thread::run, boost::shared_ptr<reader_thread>(new reader_thread(d))));
        group.create_thread(boost::bind(&reader_thread::run, boost::shared_ptr<reader_thread>(new reader_thread(d))));
        group.create_thread(boost::bind(&reader_thread::run, boost::shared_ptr<reader_thread>(new reader_thread(d))));
        group.create_thread(boost::bind(&reader_thread::run, boost::shared_ptr<reader_thread>(new reader_thread(d))));
        group.create_thread(boost::bind(&reader_thread::run, boost::shared_ptr<reader_thread>(new reader_thread(d))));
        group.create_thread(boost::bind(&reader_thread::run, boost::shared_ptr<reader_thread>(new reader_thread(d))));
        group.create_thread(boost::bind(&writer_thread::run, boost::shared_ptr<writer_thread>(new writer_thread(d, "ABCDEFGHIJKLMNOPQRSTUVWXYZ"))));
        group.create_thread(boost::bind(&writer_thread::run, boost::shared_ptr<writer_thread>(new writer_thread(d, "abcdefghijklmnopqrstuvwxyz"))));
        group.join_all();
    }
}}

data.h

#ifndef MTDP_READ_WRITE_LOCK_DATA_H_INCLUDED
#define MTDP_READ_WRITE_LOCK_DATA_H_INCLUDED

#include <boost/thread.hpp>
#include <boost/foreach.hpp>
#include <vector>
#include "read_write_mutex.h"
#include "../thread_helper.h"

namespace mtdp{ namespace read_write_lock
{
    class data
    {
    private:
        std::vector<char> buffer_;
        read_write_mutex rw_mutex_;

    public:
        data(std::size_t size)
            : buffer_(size, L'*')
        {
        }

        std::vector<char> read()
        {
            scoped_read_lock lock(rw_mutex_);
            return do_read();
        }
        void write(char c)
        {
            scoped_write_lock lock(rw_mutex_);
            do_write(c);
        }

    private:
        std::vector<char> do_read()
        {
            slowly();
            return buffer_;
        }
        void do_write(char c)
        {
            BOOST_FOREACH (char& _c, buffer_)
            {
                _c = c;
                slowly();
            }
        }

        void slowly()
        {
            thread_helper::sleep(50);
        }
    };
}}

#endif // MTDP_READ_WRITE_LOCK_DATA_H_INCLUDED

writer_thread.hpp

#ifndef MTDP_READ_WRITE_LOCK_WRITER_THREAD_H_INCLUDED
#define MTDP_READ_WRITE_LOCK_WRITER_THREAD_H_INCLUDED

#include <boost/shared_ptr.hpp>
#include <boost/random.hpp>
#include <boost/nondet_random.hpp>
#include <time.h>
#include "data.h"
#include "../thread_helper.h"

namespace mtdp{ namespace read_write_lock
{
    class writer_thread
    {
    private:
        const boost::shared_ptr<data> data_;
        const std::string filler_;
        std::size_t index_;

    public:
        writer_thread(boost::shared_ptr<data> d, std::string filler)
            : data_(d), filler_(filler), index_(0)
        {
        }

        void run()
        {
            static boost::variate_generator<boost::mt19937, boost::uniform_int<> > random(
                boost::mt19937((boost::uint32_t)::time(0)), boost::uniform_int<>(0, 3000));

            while (true)
            {
                char c = nextchar();
                data_->write(c);
                thread_helper::sleep(random());
            }
        }

    private:
        char nextchar()
        {
            char c = filler_[index_];
            index_++;
            if (index_ >= filler_.size())
            {
                index_ = 0;
            }
            return c;
        }
    };
}}

#endif // MTDP_READ_WRITE_LOCK_WRITER_THREAD_H_INCLUDED

reader_thread.hpp

#ifndef MTDP_READ_WRITE_LOCK_READER_THREAD_H_INCLUDED
#define MTDP_READ_WRITE_LOCK_READER_THREAD_H_INCLUDED

#include <boost/shared_ptr.hpp>
#include <vector>
#include <string>
#include <iostream>
#include "data.h"
#include "../thread_helper.h"

// for ::GetCurrentThreadId
#include <windows.h>

namespace mtdp{ namespace read_write_lock
{
    class reader_thread
    {
    private:
        const boost::shared_ptr<data> data_;

    public:
        reader_thread(boost::shared_ptr<data> d) : data_(d)
        {
        }

        void run()
        {
            while (true)
            {
                std::vector<char> readbuf = data_->read();
                thread_helper::shared_cout(to_string(::GetCurrentThreadId()) + " reads " + std::string(readbuf.begin(), readbuf.end()) + "\n");
            }
        }
    };
}}

#endif // MTDP_READ_WRITE_LOCK_READER_THREAD_H_INCLUDED

read_write_mutex.hpp

#ifndef MTDP_READ_WRITE_LOCK_READ_WRITE_MUTEX_H_INCLUDED
#define MTDP_READ_WRITE_LOCK_READ_WRITE_MUTEX_H_INCLUDED

#include <boost/thread.hpp>

namespace mtdp{ namespace read_write_lock
{
    class read_write_mutex
    {
    private:
        volatile std::size_t reading_readers_;      // (A) 実際に読んでいる最中のスレッドの数
        volatile std::size_t waiting_writers_;      // (B) 書くのを待っているスレッドの数
        volatile std::size_t writing_writers_;      // (C) 実際に書いている最中のスレッドの数
        volatile bool prefer_writer_;               // 書くのを優先するなら true

        boost::mutex mutex_;
        boost::condition condition_;

    public:
        read_write_mutex()
            : reading_readers_(0), waiting_writers_(0), writing_writers_(0),
            prefer_writer_(true)
        {
        }

        void read_lock()
        {
            boost::mutex::scoped_lock lock(mutex_);

            while (writing_writers_ > 0 || (prefer_writer_ && waiting_writers_ > 0))
            {
                condition_.wait(lock);
            }
            reading_readers_++;     // (A) 実際に読んでいるスレッドの数を1増やす
        }
        void read_unlock()
        {
            boost::mutex::scoped_lock lock(mutex_);

            reading_readers_--;     // (A) 実際に読んでいるスレッドの数を1減らす
            prefer_writer_ = true;
            condition_.notify_all();
        }

        void write_lock()
        {
            boost::mutex::scoped_lock lock(mutex_);

            waiting_writers_++;     // (B) 書くのを待っているスレッドの数を1増やす

            try
            {
                while (reading_readers_ > 0 || writing_writers_ > 0)
                {
                    condition_.wait(lock);
                }
            }
            catch (...)
            {
                waiting_writers_--; // (B) 書くのを待っているスレッドの数を1減らす
                throw;
            }
            waiting_writers_--;     // (B) 書くのを待っているスレッドの数を1減らす
            writing_writers_++;     // (C) 実際に書いているスレッドの数を1増やす
        }

        void write_unlock()
        {
            boost::mutex::scoped_lock lock(mutex_);

            writing_writers_--;     // (C) 実際に書いているスレッドの数を1減らす
            prefer_writer_ = false;
            condition_.notify_all();
        }
    };

    struct scoped_read_lock
    {
        read_write_mutex& mutex_;
        scoped_read_lock(read_write_mutex& mutex)
            : mutex_(mutex) { mutex_.read_lock(); }
        ~scoped_read_lock() { mutex_.read_unlock(); }
    };
    struct scoped_write_lock
    {
        read_write_mutex& mutex_;
        scoped_write_lock(read_write_mutex& mutex)
            : mutex_(mutex) { mutex_.write_lock(); }
        ~scoped_write_lock() { mutex_.write_unlock(); }
    };
}}

#endif // MTDP_READ_WRITE_LOCK_READ_WRITE_MUTEX_H_INCLUDED