From 79a8749249c61799ff894695431376f997c25781 Mon Sep 17 00:00:00 2001 From: Jan Sucan Date: Wed, 22 Jan 2025 15:01:36 +0100 Subject: Move multi-buffering to BufferedStream::Reader --- src/buffered_stream.cpp | 98 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 65 insertions(+), 33 deletions(-) (limited to 'src/buffered_stream.cpp') diff --git a/src/buffered_stream.cpp b/src/buffered_stream.cpp index 371e221..00e651f 100644 --- a/src/buffered_stream.cpp +++ b/src/buffered_stream.cpp @@ -36,77 +36,109 @@ namespace BufferedStream { -Reader::Reader(std::istream &istream, size_t buffer_capacity) - : m_istream(istream), m_buffer_offset(0), m_buffer_size(0), - m_buffer_capacity(buffer_capacity) +Reader::Reader(std::istream &istream, size_t buffer_capacity, + size_t buffer_count) + : m_buffer_count(buffer_count), m_buffer_capacity(buffer_capacity), + m_istream(istream), m_buffers(buffer_count), + m_buffer_index(buffer_count - 1), m_buffer_offset(buffer_capacity), + m_buffer_size(buffer_capacity) { - try { - m_buffer = std::make_unique(m_buffer_capacity); - } catch (const std::bad_alloc &e) { - throw Error("cannot allocate buffer for input stream data"); + for (size_t i = 0; i < m_buffer_count; ++i) { + try { + m_buffers[i] = std::shared_ptr(new char[m_buffer_capacity]); + } catch (const std::bad_alloc &e) { + throw Error("cannot allocate buffer for input stream data"); + } } + + refill_next_buffer(); }; size_t -Reader::read(char *data, size_t data_size) +Reader::read(size_t data_size, char *dest_buf) { size_t retry_count{0}; size_t offset{0}; + size_t to_read{data_size}; - while ((data_size > 0) && (retry_count < 2)) { - char *d; - const size_t r{tryRead(data_size, &d)}; - if (r == 0) { + while ((to_read > 0) && (retry_count < 2)) { + const DataPart dp{readMultipart(to_read)}; + if (dp.size == 0) { ++retry_count; continue; } - memcpy(data + offset, d, r); - offset += r; - data_size -= r; + retry_count = 0; + memcpy(dest_buf + offset, dp.data.get(), dp.size); + offset += dp.size; + to_read -= dp.size; } + assert(offset <= data_size); return offset; } -size_t -Reader::tryRead(size_t data_size, char **return_data) +DataPart +Reader::readMultipart(size_t data_size) { const size_t size_left{m_buffer_size - m_buffer_offset}; if (size_left == 0) { - refill_buffer(); + refill_next_buffer(); if (m_buffer_size == 0) { - return 0; + return DataPart{.size = 0, .data = std::shared_ptr()}; } } // There is at least one byte in the buffer - const size_t size_read{read_buffer(data_size, return_data)}; - assert(size_read > 0); - return size_read; -}; + const DataPart dp{read_current_buffer(data_size)}; + assert(dp.size > 0); + return dp; +} -size_t -Reader::read_buffer(size_t data_size, char **return_data) +DataPart +Reader::read_current_buffer(size_t data_size) { - *return_data = static_cast(m_buffer.get()) + m_buffer_offset; + DataPart dp; + + // Set data + if (m_buffer_offset == 0) { + dp.data = std::shared_ptr{m_buffers[m_buffer_index]}; + } else { + dp.data = std::shared_ptr{ + m_buffers[m_buffer_index], + static_cast(m_buffers[m_buffer_index].get()) + + m_buffer_offset}; + } const size_t size_left{m_buffer_size - m_buffer_offset}; - const size_t size_read{std::min(data_size, size_left)}; - m_buffer_offset += size_read; - return size_read; + dp.size = std::min(data_size, size_left); + m_buffer_offset += dp.size; + return dp; }; void -Reader::refill_buffer() +Reader::refill_next_buffer() { - m_buffer_size = read_stream(m_buffer.get(), m_buffer_capacity); + if (m_buffer_size == 0) { + // Current buffer is the last one. Don't fill the next one. + return; + } + + // Current buffer must be completely read before filling the next one + assert(m_buffer_offset == m_buffer_size); + + m_buffer_index = (m_buffer_index + 1) % m_buffer_count; + auto buf = m_buffers[m_buffer_index]; + // Buffer for new data must not in use + assert(buf.use_count() == 2); + + m_buffer_size = read_stream(buf, m_buffer_capacity); m_buffer_offset = 0; }; size_t -Reader::read_stream(char *data, size_t data_size) +Reader::read_stream(std::shared_ptr data, size_t data_size) { - m_istream.read(data, data_size); + m_istream.read(data.get(), data_size); if (!m_istream.good() && !m_istream.eof()) { throw Error("cannot read from stream"); -- cgit v1.2.3