Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

boost/lockfree/spsc_value.hpp

//  lock-free single-producer/single-consumer value
//  implemented via a triple buffer
//
//  Copyright (C) 2023-2024 Tim Blechmann
//
//  Distributed under the Boost Software License, Version 1.0. (See
//  accompanying file LICENSE_1_0.txt or copy at
//  http://www.boost.org/LICENSE_1_0.txt)

#ifndef BOOST_LOCKFREE_SPSC_VALUE_HPP_INCLUDED
#define BOOST_LOCKFREE_SPSC_VALUE_HPP_INCLUDED

#include <boost/config.hpp>

#ifdef BOOST_HAS_PRAGMA_ONCE
#    pragma once
#endif

#include <boost/lockfree/detail/atomic.hpp>
#include <boost/lockfree/detail/parameter.hpp>
#include <boost/lockfree/detail/uses_optional.hpp>
#include <boost/lockfree/lockfree_forward.hpp>
#include <boost/lockfree/policies.hpp>

#include <boost/parameter/optional.hpp>
#include <boost/parameter/parameters.hpp>

#include <array>
#include <cstdint>

#ifndef BOOST_DOXYGEN_INVOKED

#    ifdef BOOST_NO_CXX17_IF_CONSTEXPR
#        define ifconstexpr
#    else
#        define ifconstexpr constexpr
#    endif

#endif

namespace boost { namespace lockfree {

/** The spcs_value provides a single-writer/single-reader value, implemented by a triple buffer
 *
 *  \b Policies:
 *  - \ref boost::lockfree::allow_multiple_reads, defaults to
 *    \ref boost::lockfree::allow_multiple_reads "boost::lockfree::allow_multiple_reads<false>" \n
 *    If multiple reads are allowed, a value written to the spsc_value can be read multiple times, but not moved out
 *    of the instance. If multiple reads are not allowed, the class works as single-element queue that overwrites on
 *    write
 *
 * */
template < typename T, typename... Options >
struct spsc_value
{
#ifndef BOOST_DOXYGEN_INVOKED
private:
    using spsc_value_signature = parameter::parameters< boost::parameter::optional< tag::allow_multiple_reads > >;
    using bound_args           = typename spsc_value_signature::bind< Options... >::type;

    static const bool allow_multiple_reads = detail::extract_allow_multiple_reads< bound_args >::value;

public:
#endif

    /** Construct a \ref boost::lockfree::spsc_value "spsc_value"
     *
     *  If configured with \ref boost::lockfree::allow_multiple_reads "boost::lockfree::allow_multiple_reads<true>" it
     *  is initialized to a default-constructed value
     *
     * */
    explicit spsc_value()
    {
        if ifconstexpr ( allow_multiple_reads ) {
            // populate initial reader
            m_write_index = tagged_index {
                1,
            };
            m_available_index.store(
                tagged_index {
                    0,
                    true,
                },
                std::memory_order_relaxed );
            m_buffer[ 0 ].value = {};
        }
    }

    /** Construct a \ref boost::lockfree::spsc_value "spsc_value", initialized to a value
     * */
    explicit spsc_value( T value ) :
        m_write_index {
            1,
        },
        m_available_index {
            tagged_index {
                0,
                true,
            },
        }
    {
        m_buffer[ 0 ].value = std::move( value );
    }

    /** Writes `value` to the \ref boost::lockfree::spsc_value "spsc_value"
     *
     * \pre  only one thread is allowed to write data to the \ref boost::lockfree::spsc_value "spsc_value"
     * \post object will be written to the \ref boost::lockfree::spsc_value "spsc_value"
     *
     * \note Thread-safe and wait-free
     * */
    void write( T&& value )
    {
        m_buffer[ m_write_index.index() ].value = std::forward< T >( value );
        swap_write_buffer();
    }

    /// \copydoc boost::lockfree::spsc_value::write(T&& value)
    void write( const T& value )
    {
        m_buffer[ m_write_index.index() ].value = value;
        swap_write_buffer();
    }

    /** Reads content of the \ref boost::lockfree::spsc_value "spsc_value"
     *
     * \pre     only one thread is allowed to write data to the \ref boost::lockfree::spsc_value "spsc_value"
     * \post    if read operation is successful, object will be copied to `ret`.
     * \returns `true`, if the read operation is successful, false if the \ref boost::lockfree::spsc_value "spsc_value" is
     *          configured with \ref boost::lockfree::allow_multiple_reads
     *          "boost::lockfree::allow_multiple_reads<false>" and no value is available for reading
     *
     * \note Thread-safe and wait-free
     * */
    bool read( T& ret )
    {
#ifndef BOOST_NO_CXX17_IF_CONSTEXPR
        bool read_index_updated = swap_read_buffer();

        if constexpr ( allow_multiple_reads ) {
            ret = m_buffer[ m_read_index.index() ].value;
        } else {
            if ( !read_index_updated )
                return false;
            ret = std::move( m_buffer[ m_read_index.index() ].value );
        }

        return true;
#else
        return read_helper( ret, std::integral_constant< bool, allow_multiple_reads > {} );
#endif
    }

#if !defined( BOOST_NO_CXX17_HDR_OPTIONAL ) || defined( BOOST_DOXYGEN_INVOKED )
    /** Reads content of the \ref boost::lockfree::spsc_value "spsc_value", returning an optional
     *
     * \pre     only one thread is allowed to write data to the \ref boost::lockfree::spsc_value "spsc_value"
     * \returns `std::optional` with value if successful, `std::nullopt` if spsc_value is configured with \ref
     *          boost::lockfree::allow_multiple_reads "boost::lockfree::allow_multiple_reads<false>" and no value is
     *          available for reading
     *
     * \note Thread-safe and wait-free
     * */
    std::optional< T > read( uses_optional_t )
    {
        T to_dequeue;
        if ( read( to_dequeue ) )
            return to_dequeue;
        else
            return std::nullopt;
    }
#endif

    /** consumes value via a functor
     *
     *  reads element from the spsc_value and applies the functor on this object
     *
     * \returns `true`, if element was consumed
     *
     * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
     * */

    template < typename Functor >
    bool consume( Functor&& f )
    {
#ifndef BOOST_NO_CXX17_IF_CONSTEXPR
        bool read_index_updated = swap_read_buffer();

        if constexpr ( allow_multiple_reads ) {
            f( m_buffer[ m_read_index.index() ].value );
        } else {
            if ( !read_index_updated )
                return false;
            f( std::move( m_buffer[ m_read_index.index() ].value ) );
        }

        return true;
#else
        return consume_helper( f, std::integral_constant< bool, allow_multiple_reads > {} );
#endif
    }

private:
#ifndef BOOST_DOXYGEN_INVOKED
    using allow_multiple_reads_true  = std::true_type;
    using allow_multiple_reads_false = std::false_type;

#    ifdef BOOST_NO_CXX17_IF_CONSTEXPR
    template < typename Functor >
    bool consume_helper( Functor&& f, allow_multiple_reads_true = {} )
    {
        swap_read_buffer();
        f( m_buffer[ m_read_index.index() ].value );
        return true;
    }

    template < typename Functor >
    bool consume_helper( Functor&& f, allow_multiple_reads_false = {} )
    {
        bool read_index_updated = swap_read_buffer();
        if ( !read_index_updated )
            return false;
        f( std::move( m_buffer[ m_read_index.index() ].value ) );
        return true;
    }

    template < typename TT >
    bool read_helper( TT& ret, allow_multiple_reads_true = {} )
    {
        swap_read_buffer();
        ret = m_buffer[ m_read_index.index() ].value;
        return true;
    }

    template < typename TT >
    bool read_helper( TT& ret, allow_multiple_reads_false = {} )
    {
        bool read_index_updated = swap_read_buffer();
        if ( !read_index_updated )
            return false;
        ret = std::move( m_buffer[ m_read_index.index() ].value );
        return true;
    }
#    endif

    void swap_write_buffer()
    {
        tagged_index old_avail_index = m_available_index.exchange(
            tagged_index {
                m_write_index.index(),
                true,
            },
            std::memory_order_release );
        m_write_index.set_tag_and_index( old_avail_index.index(), false );
    }

    bool swap_read_buffer()
    {
        constexpr bool use_compare_exchange = false; // exchange is most likely faster

        if ifconstexpr ( use_compare_exchange ) {
            tagged_index new_avail_index = m_read_index;

            tagged_index current_avail_index_with_tag = tagged_index {
                m_available_index.load( std::memory_order_acquire ).index(),
                true,
            };

            if ( m_available_index.compare_exchange_strong( current_avail_index_with_tag,
                                                            new_avail_index,
                                                            std::memory_order_acquire ) ) {
                m_read_index = tagged_index( current_avail_index_with_tag.index(), false );
                return true;
            } else
                return false;
        } else {
            tagged_index new_avail_index = m_read_index;

            tagged_index current_avail_index = m_available_index.load( std::memory_order_acquire );
            if ( !current_avail_index.is_consumable() )
                return false;

            current_avail_index = m_available_index.exchange( new_avail_index, std::memory_order_acquire );
            m_read_index        = tagged_index {
                current_avail_index.index(),
                false,
            };
            return true;
        }
    }

    struct tagged_index
    {
        tagged_index( uint8_t index, bool tag = false )
        {
            set_tag_and_index( index, tag );
        }

        uint8_t index() const
        {
            return byte & 0x07;
        }

        bool is_consumable() const
        {
            return byte & 0x08;
        }

        void set_tag_and_index( uint8_t index, bool tag )
        {
            byte = index | ( tag ? 0x08 : 0x00 );
        }

        uint8_t byte;
    };

    static constexpr size_t cacheline_bytes = detail::cacheline_bytes;

    struct alignas( cacheline_bytes ) cache_aligned_value
    {
        T value;
    };

    std::array< cache_aligned_value, 3 > m_buffer;

    alignas( cacheline_bytes ) tagged_index m_write_index { 0 };
    alignas( cacheline_bytes ) detail::atomic< tagged_index > m_available_index { 1 };
    alignas( cacheline_bytes ) tagged_index m_read_index { 2 };
#endif
};

}} // namespace boost::lockfree

#undef ifconstexpr

#endif /* BOOST_LOCKFREE_SPSC_VALUE_HPP_INCLUDED */