aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backup.cpp46
-rw-r--r--src/buffered_stream.cpp98
-rw-r--r--src/buffered_stream.h24
-rw-r--r--src/format_v2.h20
-rw-r--r--src/restore.cpp9
5 files changed, 104 insertions, 93 deletions
diff --git a/src/backup.cpp b/src/backup.cpp
index b6b1471..98898b9 100644
--- a/src/backup.cpp
+++ b/src/backup.cpp
@@ -70,55 +70,23 @@ class PagedStreamReader
{
public:
PagedStreamReader(std::istream &istr, size_t page_size_bytes)
- : m_page_size_bytes(page_size_bytes), m_istream(istr),
- m_stream_pos_bytes(0), m_buffer_index(0)
- {
- try {
- m_buffers[0] = std::shared_ptr<char[]>(new char[m_page_size_bytes]);
- m_buffers[1] = std::shared_ptr<char[]>(new char[m_page_size_bytes]);
- } catch (const std::bad_alloc &e) {
- throw BufferedStream::Error(
- "cannot allocate pages for input stream data");
- }
- };
+ : m_page_size_bytes(page_size_bytes),
+ m_reader(istr, page_size_bytes, 2), m_stream_pos_bytes(0){};
Page getNextPage()
{
- m_buffer_index = (m_buffer_index + 1) % 2;
- auto buf = m_buffers[m_buffer_index];
- // Buffer for new data must not in use
- assert(buf.use_count() == 2);
+ const BufferedStream::DataPart dp{
+ m_reader.readMultipart(m_page_size_bytes)};
- const size_t bytes_read{readFromStream(buf.get())};
- m_stream_pos_bytes += bytes_read;
+ m_stream_pos_bytes += dp.size;
- if (bytes_read == 0) {
- buf = std::shared_ptr<char[]>();
- }
- return Page{buf, m_stream_pos_bytes - bytes_read, m_stream_pos_bytes};
+ return Page{dp.data, m_stream_pos_bytes - dp.size, m_stream_pos_bytes};
}
private:
const size_t m_page_size_bytes;
- std::istream &m_istream;
+ BufferedStream::Reader m_reader;
uint64_t m_stream_pos_bytes;
- std::array<std::shared_ptr<char[]>, 2> m_buffers;
- unsigned m_buffer_index;
-
- size_t readFromStream(char *const data)
- {
- if (m_istream.eof()) {
- return 0;
- }
-
- m_istream.read(data, m_page_size_bytes);
-
- if (!m_istream.good() && !m_istream.eof()) {
- throw BufferedStream::Error("cannot read from stream");
- }
-
- return m_istream.gcount();
- }
};
enum class MergeState {
diff --git a/src/buffered_stream.cpp b/src/buffered_stream.cpp
index 371e221..00e651f 100644
--- a/src/buffered_stream.cpp
+++ b/src/buffered_stream.cpp
@@ -36,77 +36,109 @@
namespace BufferedStream
{
-Reader::Reader(std::istream &istream, size_t buffer_capacity)
- : m_istream(istream), m_buffer_offset(0), m_buffer_size(0),
- m_buffer_capacity(buffer_capacity)
+Reader::Reader(std::istream &istream, size_t buffer_capacity,
+ size_t buffer_count)
+ : m_buffer_count(buffer_count), m_buffer_capacity(buffer_capacity),
+ m_istream(istream), m_buffers(buffer_count),
+ m_buffer_index(buffer_count - 1), m_buffer_offset(buffer_capacity),
+ m_buffer_size(buffer_capacity)
{
- try {
- m_buffer = std::make_unique<char[]>(m_buffer_capacity);
- } catch (const std::bad_alloc &e) {
- throw Error("cannot allocate buffer for input stream data");
+ for (size_t i = 0; i < m_buffer_count; ++i) {
+ try {
+ m_buffers[i] = std::shared_ptr<char[]>(new char[m_buffer_capacity]);
+ } catch (const std::bad_alloc &e) {
+ throw Error("cannot allocate buffer for input stream data");
+ }
}
+
+ refill_next_buffer();
};
size_t
-Reader::read(char *data, size_t data_size)
+Reader::read(size_t data_size, char *dest_buf)
{
size_t retry_count{0};
size_t offset{0};
+ size_t to_read{data_size};
- while ((data_size > 0) && (retry_count < 2)) {
- char *d;
- const size_t r{tryRead(data_size, &d)};
- if (r == 0) {
+ while ((to_read > 0) && (retry_count < 2)) {
+ const DataPart dp{readMultipart(to_read)};
+ if (dp.size == 0) {
++retry_count;
continue;
}
- memcpy(data + offset, d, r);
- offset += r;
- data_size -= r;
+ retry_count = 0;
+ memcpy(dest_buf + offset, dp.data.get(), dp.size);
+ offset += dp.size;
+ to_read -= dp.size;
}
+ assert(offset <= data_size);
return offset;
}
-size_t
-Reader::tryRead(size_t data_size, char **return_data)
+DataPart
+Reader::readMultipart(size_t data_size)
{
const size_t size_left{m_buffer_size - m_buffer_offset};
if (size_left == 0) {
- refill_buffer();
+ refill_next_buffer();
if (m_buffer_size == 0) {
- return 0;
+ return DataPart{.size = 0, .data = std::shared_ptr<char[]>()};
}
}
// There is at least one byte in the buffer
- const size_t size_read{read_buffer(data_size, return_data)};
- assert(size_read > 0);
- return size_read;
-};
+ const DataPart dp{read_current_buffer(data_size)};
+ assert(dp.size > 0);
+ return dp;
+}
-size_t
-Reader::read_buffer(size_t data_size, char **return_data)
+DataPart
+Reader::read_current_buffer(size_t data_size)
{
- *return_data = static_cast<char *>(m_buffer.get()) + m_buffer_offset;
+ DataPart dp;
+
+ // Set data
+ if (m_buffer_offset == 0) {
+ dp.data = std::shared_ptr<char[]>{m_buffers[m_buffer_index]};
+ } else {
+ dp.data = std::shared_ptr<char[]>{
+ m_buffers[m_buffer_index],
+ static_cast<char *>(m_buffers[m_buffer_index].get()) +
+ m_buffer_offset};
+ }
const size_t size_left{m_buffer_size - m_buffer_offset};
- const size_t size_read{std::min(data_size, size_left)};
- m_buffer_offset += size_read;
- return size_read;
+ dp.size = std::min(data_size, size_left);
+ m_buffer_offset += dp.size;
+ return dp;
};
void
-Reader::refill_buffer()
+Reader::refill_next_buffer()
{
- m_buffer_size = read_stream(m_buffer.get(), m_buffer_capacity);
+ if (m_buffer_size == 0) {
+ // Current buffer is the last one. Don't fill the next one.
+ return;
+ }
+
+ // Current buffer must be completely read before filling the next one
+ assert(m_buffer_offset == m_buffer_size);
+
+ m_buffer_index = (m_buffer_index + 1) % m_buffer_count;
+ auto buf = m_buffers[m_buffer_index];
+ // Buffer for new data must not in use
+ assert(buf.use_count() == 2);
+
+ m_buffer_size = read_stream(buf, m_buffer_capacity);
m_buffer_offset = 0;
};
size_t
-Reader::read_stream(char *data, size_t data_size)
+Reader::read_stream(std::shared_ptr<char[]> data, size_t data_size)
{
- m_istream.read(data, data_size);
+ m_istream.read(data.get(), data_size);
if (!m_istream.good() && !m_istream.eof()) {
throw Error("cannot read from stream");
diff --git a/src/buffered_stream.h b/src/buffered_stream.h
index 4b05a64..360dd60 100644
--- a/src/buffered_stream.h
+++ b/src/buffered_stream.h
@@ -31,6 +31,7 @@
#include <cstring>
#include <filesystem>
#include <fstream>
+#include <vector>
namespace BufferedStream
{
@@ -41,25 +42,32 @@ class Error : public DiffddError
explicit Error(const std::string &message) : DiffddError(message) {}
};
+struct DataPart {
+ size_t size;
+ std::shared_ptr<char[]> data;
+};
+
class Reader
{
public:
- Reader(std::istream &istream, size_t buffer_capacity);
+ Reader(std::istream &istream, size_t buffer_capacity, size_t buffer_count);
virtual ~Reader() = default;
- size_t read(char *data, size_t data_size);
- size_t tryRead(size_t data_size, char **return_data);
+ size_t read(size_t data_size, char *dest_buf);
+ DataPart readMultipart(size_t data_size);
private:
+ const size_t m_buffer_count;
+ const size_t m_buffer_capacity;
std::istream &m_istream;
- std::unique_ptr<char[]> m_buffer;
+ std::vector<std::shared_ptr<char[]>> m_buffers;
+ size_t m_buffer_index;
size_t m_buffer_offset;
size_t m_buffer_size;
- const size_t m_buffer_capacity;
- size_t read_buffer(size_t data_size, char **return_data);
- void refill_buffer();
- size_t read_stream(char *data, size_t data_size);
+ DataPart read_current_buffer(size_t data_size);
+ void refill_next_buffer();
+ size_t read_stream(std::shared_ptr<char[]> data, size_t data_size);
};
class Writer
diff --git a/src/format_v2.h b/src/format_v2.h
index 3a0fea2..e9b4edb 100644
--- a/src/format_v2.h
+++ b/src/format_v2.h
@@ -84,16 +84,16 @@ class Reader
{
public:
Reader(std::istream &istream, size_t buffer_size)
- : m_reader{BufferedStream::Reader{istream, buffer_size}}, m_eof{
- false} {};
+ : m_reader{BufferedStream::Reader{istream, buffer_size, 1}},
+ m_eof{false} {};
bool eof() { return m_eof; };
uint64_t readOffset()
{
uint64_t raw_offset;
- const size_t r{m_reader.read(reinterpret_cast<char *>(&raw_offset),
- sizeof(raw_offset))};
+ const size_t r{m_reader.read(sizeof(raw_offset),
+ reinterpret_cast<char *>(&raw_offset))};
if (r != sizeof(raw_offset)) {
m_eof = true;
}
@@ -103,17 +103,21 @@ class Reader
size_t readSize()
{
uint32_t raw_size;
- const size_t r{m_reader.read(reinterpret_cast<char *>(&raw_size),
- sizeof(raw_size))};
+ const size_t r{m_reader.read(sizeof(raw_size),
+ reinterpret_cast<char *>(&raw_size))};
if (r != sizeof(raw_size)) {
m_eof = true;
}
return be32toh(raw_size);
};
- size_t readData(size_t size, char **return_data)
+ RecordData readRecordData(size_t size)
{
- return m_reader.tryRead(size, return_data);
+ const BufferedStream::DataPart dp = m_reader.readMultipart(size);
+ return RecordData{
+ .size = dp.size,
+ .data = dp.data,
+ };
};
private:
diff --git a/src/restore.cpp b/src/restore.cpp
index f0c4c9e..5b490d9 100644
--- a/src/restore.cpp
+++ b/src/restore.cpp
@@ -63,17 +63,16 @@ restore(const OptionsRestore &opts)
uint64_t size{diff_reader.readSize()};
while (size > 0) {
- char *data;
- const size_t r{diff_reader.readData(size, &data)};
- if (r == 0) {
+ const FormatV2::RecordData rd{diff_reader.readRecordData(size)};
+ if (rd.size == 0) {
break;
}
- if (!out_file.write(data, r)) {
+ if (!out_file.write(rd.data.get(), rd.size)) {
throw RestoreError("cannot write to output file");
}
- size -= r;
+ size -= rd.size;
}
if (size > 0) {