From 79a8749249c61799ff894695431376f997c25781 Mon Sep 17 00:00:00 2001 From: Jan Sucan Date: Wed, 22 Jan 2025 15:01:36 +0100 Subject: Move multi-buffering to BufferedStream::Reader --- src/backup.cpp | 46 ++++------------------- src/buffered_stream.cpp | 98 ++++++++++++++++++++++++++++++++----------------- src/buffered_stream.h | 24 ++++++++---- src/format_v2.h | 20 ++++++---- src/restore.cpp | 9 ++--- 5 files changed, 104 insertions(+), 93 deletions(-) (limited to 'src') diff --git a/src/backup.cpp b/src/backup.cpp index b6b1471..98898b9 100644 --- a/src/backup.cpp +++ b/src/backup.cpp @@ -70,55 +70,23 @@ class PagedStreamReader { public: PagedStreamReader(std::istream &istr, size_t page_size_bytes) - : m_page_size_bytes(page_size_bytes), m_istream(istr), - m_stream_pos_bytes(0), m_buffer_index(0) - { - try { - m_buffers[0] = std::shared_ptr(new char[m_page_size_bytes]); - m_buffers[1] = std::shared_ptr(new char[m_page_size_bytes]); - } catch (const std::bad_alloc &e) { - throw BufferedStream::Error( - "cannot allocate pages for input stream data"); - } - }; + : m_page_size_bytes(page_size_bytes), + m_reader(istr, page_size_bytes, 2), m_stream_pos_bytes(0){}; Page getNextPage() { - m_buffer_index = (m_buffer_index + 1) % 2; - auto buf = m_buffers[m_buffer_index]; - // Buffer for new data must not in use - assert(buf.use_count() == 2); + const BufferedStream::DataPart dp{ + m_reader.readMultipart(m_page_size_bytes)}; - const size_t bytes_read{readFromStream(buf.get())}; - m_stream_pos_bytes += bytes_read; + m_stream_pos_bytes += dp.size; - if (bytes_read == 0) { - buf = std::shared_ptr(); - } - return Page{buf, m_stream_pos_bytes - bytes_read, m_stream_pos_bytes}; + return Page{dp.data, m_stream_pos_bytes - dp.size, m_stream_pos_bytes}; } private: const size_t m_page_size_bytes; - std::istream &m_istream; + BufferedStream::Reader m_reader; uint64_t m_stream_pos_bytes; - std::array, 2> m_buffers; - unsigned m_buffer_index; - - size_t readFromStream(char *const data) - { - if (m_istream.eof()) { - return 0; - } - - m_istream.read(data, m_page_size_bytes); - - if (!m_istream.good() && !m_istream.eof()) { - throw BufferedStream::Error("cannot read from stream"); - } - - return m_istream.gcount(); - } }; enum class MergeState { diff --git a/src/buffered_stream.cpp b/src/buffered_stream.cpp index 371e221..00e651f 100644 --- a/src/buffered_stream.cpp +++ b/src/buffered_stream.cpp @@ -36,77 +36,109 @@ namespace BufferedStream { -Reader::Reader(std::istream &istream, size_t buffer_capacity) - : m_istream(istream), m_buffer_offset(0), m_buffer_size(0), - m_buffer_capacity(buffer_capacity) +Reader::Reader(std::istream &istream, size_t buffer_capacity, + size_t buffer_count) + : m_buffer_count(buffer_count), m_buffer_capacity(buffer_capacity), + m_istream(istream), m_buffers(buffer_count), + m_buffer_index(buffer_count - 1), m_buffer_offset(buffer_capacity), + m_buffer_size(buffer_capacity) { - try { - m_buffer = std::make_unique(m_buffer_capacity); - } catch (const std::bad_alloc &e) { - throw Error("cannot allocate buffer for input stream data"); + for (size_t i = 0; i < m_buffer_count; ++i) { + try { + m_buffers[i] = std::shared_ptr(new char[m_buffer_capacity]); + } catch (const std::bad_alloc &e) { + throw Error("cannot allocate buffer for input stream data"); + } } + + refill_next_buffer(); }; size_t -Reader::read(char *data, size_t data_size) +Reader::read(size_t data_size, char *dest_buf) { size_t retry_count{0}; size_t offset{0}; + size_t to_read{data_size}; - while ((data_size > 0) && (retry_count < 2)) { - char *d; - const size_t r{tryRead(data_size, &d)}; - if (r == 0) { + while ((to_read > 0) && (retry_count < 2)) { + const DataPart dp{readMultipart(to_read)}; + if (dp.size == 0) { ++retry_count; continue; } - memcpy(data + offset, d, r); - offset += r; - data_size -= r; + retry_count = 0; + memcpy(dest_buf + offset, dp.data.get(), dp.size); + offset += dp.size; + to_read -= dp.size; } + assert(offset <= data_size); return offset; } -size_t -Reader::tryRead(size_t data_size, char **return_data) +DataPart +Reader::readMultipart(size_t data_size) { const size_t size_left{m_buffer_size - m_buffer_offset}; if (size_left == 0) { - refill_buffer(); + refill_next_buffer(); if (m_buffer_size == 0) { - return 0; + return DataPart{.size = 0, .data = std::shared_ptr()}; } } // There is at least one byte in the buffer - const size_t size_read{read_buffer(data_size, return_data)}; - assert(size_read > 0); - return size_read; -}; + const DataPart dp{read_current_buffer(data_size)}; + assert(dp.size > 0); + return dp; +} -size_t -Reader::read_buffer(size_t data_size, char **return_data) +DataPart +Reader::read_current_buffer(size_t data_size) { - *return_data = static_cast(m_buffer.get()) + m_buffer_offset; + DataPart dp; + + // Set data + if (m_buffer_offset == 0) { + dp.data = std::shared_ptr{m_buffers[m_buffer_index]}; + } else { + dp.data = std::shared_ptr{ + m_buffers[m_buffer_index], + static_cast(m_buffers[m_buffer_index].get()) + + m_buffer_offset}; + } const size_t size_left{m_buffer_size - m_buffer_offset}; - const size_t size_read{std::min(data_size, size_left)}; - m_buffer_offset += size_read; - return size_read; + dp.size = std::min(data_size, size_left); + m_buffer_offset += dp.size; + return dp; }; void -Reader::refill_buffer() +Reader::refill_next_buffer() { - m_buffer_size = read_stream(m_buffer.get(), m_buffer_capacity); + if (m_buffer_size == 0) { + // Current buffer is the last one. Don't fill the next one. + return; + } + + // Current buffer must be completely read before filling the next one + assert(m_buffer_offset == m_buffer_size); + + m_buffer_index = (m_buffer_index + 1) % m_buffer_count; + auto buf = m_buffers[m_buffer_index]; + // Buffer for new data must not in use + assert(buf.use_count() == 2); + + m_buffer_size = read_stream(buf, m_buffer_capacity); m_buffer_offset = 0; }; size_t -Reader::read_stream(char *data, size_t data_size) +Reader::read_stream(std::shared_ptr data, size_t data_size) { - m_istream.read(data, data_size); + m_istream.read(data.get(), data_size); if (!m_istream.good() && !m_istream.eof()) { throw Error("cannot read from stream"); diff --git a/src/buffered_stream.h b/src/buffered_stream.h index 4b05a64..360dd60 100644 --- a/src/buffered_stream.h +++ b/src/buffered_stream.h @@ -31,6 +31,7 @@ #include #include #include +#include namespace BufferedStream { @@ -41,25 +42,32 @@ class Error : public DiffddError explicit Error(const std::string &message) : DiffddError(message) {} }; +struct DataPart { + size_t size; + std::shared_ptr data; +}; + class Reader { public: - Reader(std::istream &istream, size_t buffer_capacity); + Reader(std::istream &istream, size_t buffer_capacity, size_t buffer_count); virtual ~Reader() = default; - size_t read(char *data, size_t data_size); - size_t tryRead(size_t data_size, char **return_data); + size_t read(size_t data_size, char *dest_buf); + DataPart readMultipart(size_t data_size); private: + const size_t m_buffer_count; + const size_t m_buffer_capacity; std::istream &m_istream; - std::unique_ptr m_buffer; + std::vector> m_buffers; + size_t m_buffer_index; size_t m_buffer_offset; size_t m_buffer_size; - const size_t m_buffer_capacity; - size_t read_buffer(size_t data_size, char **return_data); - void refill_buffer(); - size_t read_stream(char *data, size_t data_size); + DataPart read_current_buffer(size_t data_size); + void refill_next_buffer(); + size_t read_stream(std::shared_ptr data, size_t data_size); }; class Writer diff --git a/src/format_v2.h b/src/format_v2.h index 3a0fea2..e9b4edb 100644 --- a/src/format_v2.h +++ b/src/format_v2.h @@ -84,16 +84,16 @@ class Reader { public: Reader(std::istream &istream, size_t buffer_size) - : m_reader{BufferedStream::Reader{istream, buffer_size}}, m_eof{ - false} {}; + : m_reader{BufferedStream::Reader{istream, buffer_size, 1}}, + m_eof{false} {}; bool eof() { return m_eof; }; uint64_t readOffset() { uint64_t raw_offset; - const size_t r{m_reader.read(reinterpret_cast(&raw_offset), - sizeof(raw_offset))}; + const size_t r{m_reader.read(sizeof(raw_offset), + reinterpret_cast(&raw_offset))}; if (r != sizeof(raw_offset)) { m_eof = true; } @@ -103,17 +103,21 @@ class Reader size_t readSize() { uint32_t raw_size; - const size_t r{m_reader.read(reinterpret_cast(&raw_size), - sizeof(raw_size))}; + const size_t r{m_reader.read(sizeof(raw_size), + reinterpret_cast(&raw_size))}; if (r != sizeof(raw_size)) { m_eof = true; } return be32toh(raw_size); }; - size_t readData(size_t size, char **return_data) + RecordData readRecordData(size_t size) { - return m_reader.tryRead(size, return_data); + const BufferedStream::DataPart dp = m_reader.readMultipart(size); + return RecordData{ + .size = dp.size, + .data = dp.data, + }; }; private: diff --git a/src/restore.cpp b/src/restore.cpp index f0c4c9e..5b490d9 100644 --- a/src/restore.cpp +++ b/src/restore.cpp @@ -63,17 +63,16 @@ restore(const OptionsRestore &opts) uint64_t size{diff_reader.readSize()}; while (size > 0) { - char *data; - const size_t r{diff_reader.readData(size, &data)}; - if (r == 0) { + const FormatV2::RecordData rd{diff_reader.readRecordData(size)}; + if (rd.size == 0) { break; } - if (!out_file.write(data, r)) { + if (!out_file.write(rd.data.get(), rd.size)) { throw RestoreError("cannot write to output file"); } - size -= r; + size -= rd.size; } if (size > 0) { -- cgit v1.2.3