From 598d8f872fd248de9f9045d2a2a448b9cfd913d3 Mon Sep 17 00:00:00 2001 From: Mathew Jacob Date: Thu, 28 Mar 2024 18:29:05 -0500 Subject: [PATCH 01/15] Implemented basic FileReader api + in the tests. Will try to add buffering into the FileReader so as to avoid unncessary network calls. Implemented an intial version of the FileReaderBuffer in order to try and reduce the number of network calls someone has to make when reading using FileReader --- src/libs/ck-libs/io/ckio.C | 63 +++++++++ src/libs/ck-libs/io/ckio.h | 242 +++++++++++++++++++++++++++----- tests/charm++/io_read/iotest.C | 29 ++++ tests/charm++/io_read/iotest.ci | 1 + 4 files changed, 298 insertions(+), 37 deletions(-) diff --git a/src/libs/ck-libs/io/ckio.C b/src/libs/ck-libs/io/ckio.C index e967a2f9ae..af58bfb795 100644 --- a/src/libs/ck-libs/io/ckio.C +++ b/src/libs/ck-libs/io/ckio.C @@ -988,7 +988,70 @@ public: } int registerArray(CkArrayIndex& numElements, CkArrayID aid) { return 0; } + }; +FileReader::FileReader(Ck::IO::Session session) : _session_token(session), _offset(session.offset), _num_bytes(session.bytes) {} + +FileReader& FileReader::read(char* buffer, size_t num_bytes_to_read){ + if(_eofbit){ // no more bytes to read + _gcount = 0; + return *this; + } + // TODO: incorporate checking the buffer into this code + size_t bytes_to_read = std::min(num_bytes_to_read, (_offset + _num_bytes - _curr_pos)); + ReadCompleteMsg* read_msg; + Ck::IO::read(_session_token, bytes_to_read, _curr_pos, buffer, CkCallbackResumeThread((void*&) read_msg)); + // below will not get executed until the read is done + _curr_pos += (read_msg -> bytes); + if(_curr_pos >= (_offset + _num_bytes)) { + _eofbit = true; // ran out of data to read + _curr_pos = _offset + _num_bytes; + } + _gcount = read_msg -> bytes; + return *this; + } + +size_t FileReader::tellg() { return _curr_pos;} + +FileReader& FileReader::seekg(size_t pos){ + _curr_pos = pos; + if(_curr_pos < _offset){ + _curr_pos = _offset; + _eofbit = false; + } else if (_curr_pos >= (_offset + _num_bytes)) { + _curr_pos = _offset + _num_bytes; + _eofbit = true; + } + _eofbit = false; + return *this; + } + +bool FileReader::eof() { return _eofbit;} + +size_t FileReader::gcount() { return _gcount; } + +FileReaderBuffer::FileReaderBuffer(){ + _buffer = new char[_buff_capacity]; +} + +FileReaderBuffer::FileReaderBuffer(size_t buff_size){ + _buff_capacity = buff_capacity; + _buffer = new char[_buff_capacity]; +} + +void FileReaderBuffer::setBuffer(size_t offset, size_t num_bytes, char* data){ + _offset = offset; + _buffer_size = std::min(_buff_capacity, num_bytes); + std::memcpy(_buffer, data, _buffer_size); // copy the first section of bytes +} + +size_t FileReaderBuffer::getFromBuffer(size_t offset, size_t num_bytes, char* buffer){ + if(offset < _offset || offset >= (_offset + _buffer_size)) return 0; // the buffer has nothing of relevance + size_t cached_len = std::min(offset + num_bytes, _offset + _buff_size) - offset; + std::memcpy(buffer, _buffer, cached_len); + return cached_len; +} + } // namespace IO } // namespace Ck diff --git a/src/libs/ck-libs/io/ckio.h b/src/libs/ck-libs/io/ckio.h index f3281427be..9ec58f2fd4 100644 --- a/src/libs/ck-libs/io/ckio.h +++ b/src/libs/ck-libs/io/ckio.h @@ -17,31 +17,170 @@ class Session; } } // namespace Ck -namespace Ck -{ -namespace IO -{ -/// Note: The values in options are not currently a stable or working interface. -/// Users should not set anything in them. -struct Options -{ - Options() - : peStripe(0), writeStripe(0), activePEs(-1), basePE(-1), skipPEs(-1), numReaders(0) - { +namespace Ck { namespace IO { class Session; }} + +namespace Ck { namespace IO { + /// Note: The values in options are not currently a stable or working interface. + /// Users should not set anything in them. + struct Options { + Options() + : peStripe(0), writeStripe(0), activePEs(-1), basePE(-1), skipPEs(-1), read_stride(0), numReaders(0) + { } + + /// How much contiguous data (in bytes) should be assigned to each active PE + size_t peStripe; + /// How much contiguous data (in bytes) should a PE gather before writing it out + size_t writeStripe; + /// How many PEs should participate in this activity + int activePEs; + /// Which PE should be the first to participate in this activity + int basePE; + /// How should active PEs be spaced out? + int skipPEs; + // How many bytes each Read Session should hold + size_t read_stride; + // How many IO buffers should there be + size_t numReaders; + + void pup(PUP::er &p) { + p|peStripe; + p|writeStripe; + p|activePEs; + p|basePE; + p|skipPEs; + p|read_stride; + p | numReaders; + } + }; + class FileReader; + class FileReaderBuffer; + + class File; + // class ReadAssembler; + /// Open the named file on the selected subset of PEs, and send a + /// FileReadyMsg to the opened callback when the system is ready to accept + /// session requests on that file. + /// Note: The values in options are not currently a stable or working interface. + /// Users should not set anything in them. + void open(std::string name, CkCallback opened, Options opts); + + /// Prepare to write data into the file described by token, in the window + /// defined by the offset and byte length. When the session is set up, a + /// SessionReadyMsg will be sent to the ready callback. When all of the data + /// has been written and synced, a message will be sent to the complete + /// callback. + void startSession(File file, size_t bytes, size_t offset, + CkCallback ready, CkCallback complete); + + /// Prepare to write data into @arg file, in the window defined by the @arg + /// offset and length in @arg bytes. When the session is set up, a + /// SessionReadyMsg will be sent to the @arg ready callback. When all of the + /// data has been written and synced, an additional write will be made to the + /// file to `commit' the session's work. When that write has completed, a + /// message will be sent to the @arg complete callback. + void startSession(File file, size_t bytes, size_t offset, CkCallback ready, + const char *commitData, size_t commitBytes, size_t commitOffset, + CkCallback complete); + + /// Write the given data into the file to which session is attached. The + /// offset is relative to the file as a whole, not to the session's offset. + void write(Session session, const char *data, size_t bytes, size_t offset); + + /// Close a previously-opened file. All sessions on that file must have + /// already signalled that they are complete. + void close(File file, CkCallback closed); + + /** + * Prepare to read data from @arg file section specified by @arg bytes and @arg offset. + * This method will proceed to eagerly read all of the data in that window into memory + * for future read calls. After all the data is read in, the ready callback will be invoked. + * The ready callback will take in a SessionReadyMessage* that will contain the offset, the amount of bytes + * , and the buffer in the form of a vector. + */ + void startReadSession(File file, size_t bytes, size_t offset, CkCallback ready); + + /** + * Same as the above start session in function. However, there is an extra @arg pes_to_map. pes_to_map will contain a sequence + * of numbers representing pes. CkIO will map the IO Buffer chares to those pes specified in pes_to_map in a round_robin fashion. + */ + void startReadSession(File file, size_t bytes, size_t offset, CkCallback ready, std::vector pes_to_map); + + /** + * Used to end the current read session and will then invoke the after_end callback that takes a CkReductionMsg* with nothing in it + * Will effectively call ckDestroy() on the CProxy_Reader of the associated FileInfo + */ + + void closeReadSession(Session read_session, CkCallback after_end); + /** + * Is a method that reads data from the @arg session of length @arg bytes at offset + * @arg offset (in file). After this read finishes, the @arg after_read callback is invoked, taking + * a ReadCompleteMsg* which points to a vector buffer, the offset, and the number of + * bytes of the read. + * */ + void read(Session session, size_t bytes, size_t offset, char* data, CkCallback after_read); + void read(Session session, size_t bytes, size_t offset, CkCallback after_read, size_t tag); + +// ZERO COPY READ; + void read(Session session, size_t bytes, size_t offset, CkCallback after_read, size_t tag, char* user_buffer); + + + class File { + int token; + friend void startSession(File file, size_t bytes, size_t offset, + CkCallback ready, CkCallback complete); + + friend void startReadSession(File file, size_t bytes, size_t offset, CkCallback ready); + friend void startReadSession(File file, size_t bytes, size_t offset, CkCallback ready, std::vector pes_to_map); + + friend void startSession(File file, size_t bytes, size_t offset, CkCallback ready, + const char *commitData, size_t commitBytes, size_t commitOffset, + CkCallback complete); + friend void close(File file, CkCallback closed); + friend class FileReadyMsg; + + public: + File(int token_) : token(token_) { } + File() : token(-1) { } + void pup(PUP::er &p) { p|token; } + }; + + class FileReadyMsg : public CMessage_FileReadyMsg { + public: + File file; + FileReadyMsg(const File &tok) : file(tok) {} + }; + + namespace impl { + class Manager; + int getRDMATag(); + class Director; // forward declare Director class as impl + class ReadAssembler; } - /// How much contiguous data (in bytes) should be assigned to each active PE - size_t peStripe; - /// How much contiguous data (in bytes) should a PE gather before writing it out - size_t writeStripe; - /// How many PEs should participate in this activity - int activePEs; - /// Which PE should be the first to participate in this activity - int basePE; - /// How should active PEs be spaced out? - int skipPEs; - // How many IO buffers should there be - size_t numReaders; + class Session { + int file; + size_t bytes, offset; + CkArrayID sessionID; + friend class Ck::IO::impl::Manager; + friend class Ck::IO::impl::Director; + friend class Ck::IO::impl::ReadAssembler; + friend void read(Session session, size_t bytes, size_t offset, char* data, CkCallback after_read); + friend struct std::hash; + friend class FileReader; + public: + Session(int file_, size_t bytes_, size_t offset_, + CkArrayID sessionID_) + : file(file_), bytes(bytes_), offset(offset_), sessionID(sessionID_) + { } + Session() { } + void pup(PUP::er &p) { + p|file; + p|bytes; + p|offset; + p|sessionID; + } + + int getFile() const { return file;} void pup(PUP::er& p) { @@ -81,22 +220,51 @@ void startSession(File file, size_t bytes, size_t offset, CkCallback ready, const char* commitData, size_t commitBytes, size_t commitOffset, CkCallback complete); -/// Write the given data into the file to which session is attached. The -/// offset is relative to the file as a whole, not to the session's offset. -void write(Session session, const char* data, size_t bytes, size_t offset); + class ReadCompleteMsg : public CMessage_ReadCompleteMsg { + public: + size_t read_tag; + size_t offset; + size_t bytes; + ReadCompleteMsg(){} + ReadCompleteMsg(size_t in_tag, size_t in_offset, size_t in_bytes) : read_tag(in_tag), offset(in_offset), bytes(in_bytes){ + } + -/// Close a previously-opened file. All sessions on that file must have -/// already signalled that they are complete. -void close(File file, CkCallback closed); + }; + + + class FileReader { + Session _session_token; + size_t _curr_pos = 0; + size_t _offset, _num_bytes; + bool _eofbit = false; + size_t _gcount = 0; + public: + FileReader(Ck::IO::Session session); + /* In order to use the read functionality, make sure + * that this function is being called in a threaded entry method + * because it utilizes CkCallbackResumeThread() in order to overlay work + * correctly and block execution + * */ + FileReader& read(char* buffer, size_t num_bytes_to_read); + size_t tellg(); + FileReader& seekg(size_t pos); + bool eof(); + size_t gcount(); + }; -/** - * Prepare to read data from @arg file section specified by @arg bytes and @arg offset. - * On starting the session, the buffer chares begin eagerly reading all requested data - * into memory. The ready callback is invoked once all buffer chares have been created and - * their reads have been initiated (but the reads are not guaranteed to be complete at - * this point). - */ -void startReadSession(File file, size_t bytes, size_t offset, CkCallback ready); + class FileReaderBuffer { + size_t _buff_capacity = 4096; // the size of the buffer array + size_t _buff_size = 0; // the number of valid elements in the array + size_t _offset = -1; // the offset byte + char* _buffer; + +public: + FileReaderBuffer(); + FileReader(size_t buff_capacity) + void setBuffer(size_t offset, size_t num_bytes, char* data); // writes the data to the buffer + size_t getFromBuffer(size_t offset, size_t num_bytes, char* buffer); + }; /** * Same as the above start session in function. However, there is an extra @arg diff --git a/tests/charm++/io_read/iotest.C b/tests/charm++/io_read/iotest.C index 4f02616ced..c34e3c3ac8 100644 --- a/tests/charm++/io_read/iotest.C +++ b/tests/charm++/io_read/iotest.C @@ -47,6 +47,7 @@ public: class Test : public CBase_Test { char* dataBuffer; + char* file_reader_buffer; int size; std::string _fname; @@ -55,10 +56,32 @@ public: { CkPrintf("Inside the constructor of tester %d\n", thisIndex); _fname = filename; + thisProxy[thisIndex].testMethod(token, bytesToRead); + //CkCallback sessionEnd(CkIndex_Test::readDone(0), thisProxy[thisIndex]); + //try + //{ + // dataBuffer = new char[bytesToRead]; + // file_reader_buffer = new char[bytesToRead]; + //} + //catch (const std::bad_alloc& e) + //{ + // CkPrintf("ERROR: Data buffer malloc of %zu bytes in Test chare %d failed.\n", + // bytesToRead, thisIndex); + // CkExit(); + //} + //size = bytesToRead; + //Ck::IO::FileReader fr(token); + //fr.seekg(bytesToRead * thisIndex); // seek to the correct place in the file + //fr.read(file_reader_buffer, size); // hopefully this will return the same data as Ck::IO::read + //Ck::IO::read(token, bytesToRead, bytesToRead * thisIndex, dataBuffer, sessionEnd); + } + + void testMethod(Ck::IO::Session token, size_t bytesToRead){ CkCallback sessionEnd(CkIndex_Test::readDone(0), thisProxy[thisIndex]); try { dataBuffer = new char[bytesToRead]; + file_reader_buffer = new char[bytesToRead]; } catch (const std::bad_alloc& e) { @@ -67,6 +90,10 @@ public: CkExit(); } size = bytesToRead; + Ck::IO::FileReader fr(token); + fr.seekg(bytesToRead * thisIndex); // seek to the correct place in the file + fr.read(file_reader_buffer, size); // hopefully this will return the same data as Ck::IO::read + CkPrintf("the FileReader::read function on tester[%d] is done with first character=%c\n", thisIndex, file_reader_buffer[0]); Ck::IO::read(token, bytesToRead, bytesToRead * thisIndex, dataBuffer, sessionEnd); } @@ -101,9 +128,11 @@ public: thisIndex, (m->offset), (m->bytes), i, verify_buffer[i], i, dataBuffer[i]); } assert(verify_buffer[i] == dataBuffer[i]); + assert(verify_buffer[i] == file_reader_buffer[i]); } delete[] verify_buffer; delete[] dataBuffer; + delete[] file_reader_buffer; CkPrintf("Index %d is now done with the reads...\n", thisIndex); contribute(done); } diff --git a/tests/charm++/io_read/iotest.ci b/tests/charm++/io_read/iotest.ci index 7ed66f071d..c7ed55ded0 100644 --- a/tests/charm++/io_read/iotest.ci +++ b/tests/charm++/io_read/iotest.ci @@ -79,6 +79,7 @@ entry void iterDone(); array[1D] Test { entry Test(Ck::IO::Session token, size_t bytesToRead, std::string filename); + entry [threaded] void testMethod(Ck::IO::Session token, size_t bytesToRead); entry void readDone(Ck::IO::ReadCompleteMsg * m); } } From ad27fb56e32443d7f0585326777786f0738f1be0 Mon Sep 17 00:00:00 2001 From: Mathew Jacob Date: Fri, 29 Mar 2024 03:56:30 -0500 Subject: [PATCH 02/15] implemented the FileReadBuffer to support the caching of larger data blocks. should hopefully reduce the number of network calls when reading small chunks of data using FileReader api. --- src/libs/ck-libs/io/ckio.C | 48 ++++++++++++++++++++++++++++++-------- src/libs/ck-libs/io/ckio.h | 33 +++++++++++++------------- 2 files changed, 55 insertions(+), 26 deletions(-) diff --git a/src/libs/ck-libs/io/ckio.C b/src/libs/ck-libs/io/ckio.C index af58bfb795..1ecf5a9da1 100644 --- a/src/libs/ck-libs/io/ckio.C +++ b/src/libs/ck-libs/io/ckio.C @@ -997,17 +997,37 @@ FileReader& FileReader::read(char* buffer, size_t num_bytes_to_read){ _gcount = 0; return *this; } - // TODO: incorporate checking the buffer into this code - size_t bytes_to_read = std::min(num_bytes_to_read, (_offset + _num_bytes - _curr_pos)); + size_t amt_from_cache = _data_cache.getFromBuffer(_curr_pos, num_bytes_to_read, buffer); // get whatever data the cache has for us + _curr_pos += amt_from_cache; + if(amt_from_cache == num_bytes_to_read){ + return *this; + } + size_t bytes_to_read_left = num_bytes_to_read - amt_from_cache; + size_t bytes_to_read = std::min(std::max(bytes_to_read_left, size_t(4096)), (_offset + _num_bytes - _curr_pos)); // if the read is too small, get more data to store in the buffer + if(!bytes_to_read){ + return *this; + } + char* tmp_data_buff = new char[bytes_to_read]; // temporary buffer that will hold all the data ReadCompleteMsg* read_msg; - Ck::IO::read(_session_token, bytes_to_read, _curr_pos, buffer, CkCallbackResumeThread((void*&) read_msg)); + Ck::IO::read(_session_token, bytes_to_read, _curr_pos, tmp_data_buff, CkCallbackResumeThread((void*&) read_msg)); // below will not get executed until the read is done - _curr_pos += (read_msg -> bytes); + size_t bytes_read = read_msg -> bytes; + if(bytes_read > bytes_to_read_left){ + // if I read more bytes than what was left to read, that means I have extra bytes that the buffer can use + _data_cache.setBuffer(_curr_pos, bytes_read, tmp_data_buff); + _curr_pos += bytes_to_read_left; + std::memcpy(buffer + amt_from_cache, tmp_data_buff, bytes_to_read_left); + } else { + // too many bytes, nothing to actually cache + _curr_pos += bytes_read; + std::memcpy(buffer + amt_from_cache, tmp_data_buff, bytes_read); + } + delete[] tmp_data_buff; if(_curr_pos >= (_offset + _num_bytes)) { _eofbit = true; // ran out of data to read _curr_pos = _offset + _num_bytes; } - _gcount = read_msg -> bytes; + _gcount = std::min(bytes_read, bytes_to_read_left); return *this; } @@ -1034,24 +1054,32 @@ FileReaderBuffer::FileReaderBuffer(){ _buffer = new char[_buff_capacity]; } -FileReaderBuffer::FileReaderBuffer(size_t buff_size){ +FileReaderBuffer::FileReaderBuffer(size_t buff_capacity){ _buff_capacity = buff_capacity; _buffer = new char[_buff_capacity]; } void FileReaderBuffer::setBuffer(size_t offset, size_t num_bytes, char* data){ + is_dirty = false; _offset = offset; - _buffer_size = std::min(_buff_capacity, num_bytes); - std::memcpy(_buffer, data, _buffer_size); // copy the first section of bytes + _buff_size = std::min(_buff_capacity, num_bytes); + std::memcpy(_buffer, data, _buff_size); // copy the first section of bytes } size_t FileReaderBuffer::getFromBuffer(size_t offset, size_t num_bytes, char* buffer){ - if(offset < _offset || offset >= (_offset + _buffer_size)) return 0; // the buffer has nothing of relevance + + if(is_dirty || offset < _offset || offset >= (_offset + _buff_size)) { + return 0; // the buffer has nothing of relevance + } size_t cached_len = std::min(offset + num_bytes, _offset + _buff_size) - offset; - std::memcpy(buffer, _buffer, cached_len); + std::memcpy(buffer, _buffer + (offset - _offset), cached_len); return cached_len; } +FileReaderBuffer::~FileReaderBuffer(){ + delete[] _buffer; +} + } // namespace IO } // namespace Ck diff --git a/src/libs/ck-libs/io/ckio.h b/src/libs/ck-libs/io/ckio.h index 9ec58f2fd4..1f0647c0c4 100644 --- a/src/libs/ck-libs/io/ckio.h +++ b/src/libs/ck-libs/io/ckio.h @@ -3,9 +3,7 @@ #include #include -#include -#include -#include +#include #include "CkIO.decl.h" @@ -52,8 +50,8 @@ namespace Ck { namespace IO { p | numReaders; } }; + class FileReader; - class FileReaderBuffer; class File; // class ReadAssembler; @@ -232,6 +230,20 @@ void startSession(File file, size_t bytes, size_t offset, CkCallback ready, }; + class FileReaderBuffer { + size_t _buff_capacity = 4096; // the size of the buffer array + size_t _buff_size = 0; // the number of valid elements in the array + ssize_t _offset = 0; // the offset byte + char* _buffer; + bool is_dirty = true; + +public: + FileReaderBuffer(); + FileReaderBuffer(size_t buff_capacity); + ~FileReaderBuffer(); + void setBuffer(size_t offset, size_t num_bytes, char* data); // writes the data to the buffer + size_t getFromBuffer(size_t offset, size_t num_bytes, char* buffer); + }; class FileReader { Session _session_token; @@ -239,6 +251,7 @@ void startSession(File file, size_t bytes, size_t offset, CkCallback ready, size_t _offset, _num_bytes; bool _eofbit = false; size_t _gcount = 0; + FileReaderBuffer _data_cache; public: FileReader(Ck::IO::Session session); /* In order to use the read functionality, make sure @@ -253,18 +266,6 @@ void startSession(File file, size_t bytes, size_t offset, CkCallback ready, size_t gcount(); }; - class FileReaderBuffer { - size_t _buff_capacity = 4096; // the size of the buffer array - size_t _buff_size = 0; // the number of valid elements in the array - size_t _offset = -1; // the offset byte - char* _buffer; - -public: - FileReaderBuffer(); - FileReader(size_t buff_capacity) - void setBuffer(size_t offset, size_t num_bytes, char* data); // writes the data to the buffer - size_t getFromBuffer(size_t offset, size_t num_bytes, char* buffer); - }; /** * Same as the above start session in function. However, there is an extra @arg From 64e58ef8c15b1fa1d02f70d808fcfdcbff87365a Mon Sep 17 00:00:00 2001 From: Maya Taylor Date: Wed, 3 Apr 2024 17:35:20 -0400 Subject: [PATCH 03/15] reformatting --- src/libs/ck-libs/io/ckio.h | 308 ++++++++++++------------------------- 1 file changed, 99 insertions(+), 209 deletions(-) diff --git a/src/libs/ck-libs/io/ckio.h b/src/libs/ck-libs/io/ckio.h index 1f0647c0c4..6d44530e93 100644 --- a/src/libs/ck-libs/io/ckio.h +++ b/src/libs/ck-libs/io/ckio.h @@ -2,8 +2,11 @@ #define CK_IO_H #include -#include #include +#include +#include +#include +#include #include "CkIO.decl.h" @@ -15,170 +18,39 @@ class Session; } } // namespace Ck -namespace Ck { namespace IO { class Session; }} - -namespace Ck { namespace IO { - /// Note: The values in options are not currently a stable or working interface. - /// Users should not set anything in them. - struct Options { - Options() - : peStripe(0), writeStripe(0), activePEs(-1), basePE(-1), skipPEs(-1), read_stride(0), numReaders(0) - { } - - /// How much contiguous data (in bytes) should be assigned to each active PE - size_t peStripe; - /// How much contiguous data (in bytes) should a PE gather before writing it out - size_t writeStripe; - /// How many PEs should participate in this activity - int activePEs; - /// Which PE should be the first to participate in this activity - int basePE; - /// How should active PEs be spaced out? - int skipPEs; - // How many bytes each Read Session should hold - size_t read_stride; - // How many IO buffers should there be - size_t numReaders; - - void pup(PUP::er &p) { - p|peStripe; - p|writeStripe; - p|activePEs; - p|basePE; - p|skipPEs; - p|read_stride; - p | numReaders; - } - }; - - class FileReader; - - class File; - // class ReadAssembler; - /// Open the named file on the selected subset of PEs, and send a - /// FileReadyMsg to the opened callback when the system is ready to accept - /// session requests on that file. - /// Note: The values in options are not currently a stable or working interface. - /// Users should not set anything in them. - void open(std::string name, CkCallback opened, Options opts); - - /// Prepare to write data into the file described by token, in the window - /// defined by the offset and byte length. When the session is set up, a - /// SessionReadyMsg will be sent to the ready callback. When all of the data - /// has been written and synced, a message will be sent to the complete - /// callback. - void startSession(File file, size_t bytes, size_t offset, - CkCallback ready, CkCallback complete); - - /// Prepare to write data into @arg file, in the window defined by the @arg - /// offset and length in @arg bytes. When the session is set up, a - /// SessionReadyMsg will be sent to the @arg ready callback. When all of the - /// data has been written and synced, an additional write will be made to the - /// file to `commit' the session's work. When that write has completed, a - /// message will be sent to the @arg complete callback. - void startSession(File file, size_t bytes, size_t offset, CkCallback ready, - const char *commitData, size_t commitBytes, size_t commitOffset, - CkCallback complete); - - /// Write the given data into the file to which session is attached. The - /// offset is relative to the file as a whole, not to the session's offset. - void write(Session session, const char *data, size_t bytes, size_t offset); - - /// Close a previously-opened file. All sessions on that file must have - /// already signalled that they are complete. - void close(File file, CkCallback closed); - - /** - * Prepare to read data from @arg file section specified by @arg bytes and @arg offset. - * This method will proceed to eagerly read all of the data in that window into memory - * for future read calls. After all the data is read in, the ready callback will be invoked. - * The ready callback will take in a SessionReadyMessage* that will contain the offset, the amount of bytes - * , and the buffer in the form of a vector. - */ - void startReadSession(File file, size_t bytes, size_t offset, CkCallback ready); - - /** - * Same as the above start session in function. However, there is an extra @arg pes_to_map. pes_to_map will contain a sequence - * of numbers representing pes. CkIO will map the IO Buffer chares to those pes specified in pes_to_map in a round_robin fashion. - */ - void startReadSession(File file, size_t bytes, size_t offset, CkCallback ready, std::vector pes_to_map); - - /** - * Used to end the current read session and will then invoke the after_end callback that takes a CkReductionMsg* with nothing in it - * Will effectively call ckDestroy() on the CProxy_Reader of the associated FileInfo - */ - - void closeReadSession(Session read_session, CkCallback after_end); - /** - * Is a method that reads data from the @arg session of length @arg bytes at offset - * @arg offset (in file). After this read finishes, the @arg after_read callback is invoked, taking - * a ReadCompleteMsg* which points to a vector buffer, the offset, and the number of - * bytes of the read. - * */ - void read(Session session, size_t bytes, size_t offset, char* data, CkCallback after_read); - void read(Session session, size_t bytes, size_t offset, CkCallback after_read, size_t tag); - -// ZERO COPY READ; - void read(Session session, size_t bytes, size_t offset, CkCallback after_read, size_t tag, char* user_buffer); - - - class File { - int token; - friend void startSession(File file, size_t bytes, size_t offset, - CkCallback ready, CkCallback complete); - - friend void startReadSession(File file, size_t bytes, size_t offset, CkCallback ready); - friend void startReadSession(File file, size_t bytes, size_t offset, CkCallback ready, std::vector pes_to_map); - - friend void startSession(File file, size_t bytes, size_t offset, CkCallback ready, - const char *commitData, size_t commitBytes, size_t commitOffset, - CkCallback complete); - friend void close(File file, CkCallback closed); - friend class FileReadyMsg; - - public: - File(int token_) : token(token_) { } - File() : token(-1) { } - void pup(PUP::er &p) { p|token; } - }; - - class FileReadyMsg : public CMessage_FileReadyMsg { - public: - File file; - FileReadyMsg(const File &tok) : file(tok) {} - }; - - namespace impl { - class Manager; - int getRDMATag(); - class Director; // forward declare Director class as impl - class ReadAssembler; +namespace Ck +{ +namespace IO +{ +/// Note: The values in options are not currently a stable or working interface. +/// Users should not set anything in them. +struct Options +{ + Options() + : peStripe(0), + writeStripe(0), + activePEs(-1), + basePE(-1), + skipPEs(-1), + read_stride(0), + numReaders(0) + { } - class Session { - int file; - size_t bytes, offset; - CkArrayID sessionID; - friend class Ck::IO::impl::Manager; - friend class Ck::IO::impl::Director; - friend class Ck::IO::impl::ReadAssembler; - friend void read(Session session, size_t bytes, size_t offset, char* data, CkCallback after_read); - friend struct std::hash; - friend class FileReader; - public: - Session(int file_, size_t bytes_, size_t offset_, - CkArrayID sessionID_) - : file(file_), bytes(bytes_), offset(offset_), sessionID(sessionID_) - { } - Session() { } - void pup(PUP::er &p) { - p|file; - p|bytes; - p|offset; - p|sessionID; - } - - int getFile() const { return file;} + /// How much contiguous data (in bytes) should be assigned to each active PE + size_t peStripe; + /// How much contiguous data (in bytes) should a PE gather before writing it out + size_t writeStripe; + /// How many PEs should participate in this activity + int activePEs; + /// Which PE should be the first to participate in this activity + int basePE; + /// How should active PEs be spaced out? + int skipPEs; + // How many bytes each Read Session should hold + size_t read_stride; + // How many IO buffers should there be + size_t numReaders; void pup(PUP::er& p) { @@ -187,10 +59,13 @@ namespace Ck { namespace IO { p | activePEs; p | basePE; p | skipPEs; + p | read_stride; p | numReaders; } }; +class FileReader; + class File; // class ReadAssembler; /// Open the named file on the selected subset of PEs, and send a @@ -218,54 +93,22 @@ void startSession(File file, size_t bytes, size_t offset, CkCallback ready, const char* commitData, size_t commitBytes, size_t commitOffset, CkCallback complete); - class ReadCompleteMsg : public CMessage_ReadCompleteMsg { - public: - size_t read_tag; - size_t offset; - size_t bytes; - ReadCompleteMsg(){} - ReadCompleteMsg(size_t in_tag, size_t in_offset, size_t in_bytes) : read_tag(in_tag), offset(in_offset), bytes(in_bytes){ - } - - - }; - - class FileReaderBuffer { - size_t _buff_capacity = 4096; // the size of the buffer array - size_t _buff_size = 0; // the number of valid elements in the array - ssize_t _offset = 0; // the offset byte - char* _buffer; - bool is_dirty = true; +/// Write the given data into the file to which session is attached. The +/// offset is relative to the file as a whole, not to the session's offset. +void write(Session session, const char* data, size_t bytes, size_t offset); -public: - FileReaderBuffer(); - FileReaderBuffer(size_t buff_capacity); - ~FileReaderBuffer(); - void setBuffer(size_t offset, size_t num_bytes, char* data); // writes the data to the buffer - size_t getFromBuffer(size_t offset, size_t num_bytes, char* buffer); - }; - - class FileReader { - Session _session_token; - size_t _curr_pos = 0; - size_t _offset, _num_bytes; - bool _eofbit = false; - size_t _gcount = 0; - FileReaderBuffer _data_cache; - public: - FileReader(Ck::IO::Session session); - /* In order to use the read functionality, make sure - * that this function is being called in a threaded entry method - * because it utilizes CkCallbackResumeThread() in order to overlay work - * correctly and block execution - * */ - FileReader& read(char* buffer, size_t num_bytes_to_read); - size_t tellg(); - FileReader& seekg(size_t pos); - bool eof(); - size_t gcount(); - }; +/// Close a previously-opened file. All sessions on that file must have +/// already signalled that they are complete. +void close(File file, CkCallback closed); +/** + * Prepare to read data from @arg file section specified by @arg bytes and @arg offset. + * This method will proceed to eagerly read all of the data in that window into memory + * for future read calls. After all the data is read in, the ready callback will be + * invoked. The ready callback will take in a SessionReadyMessage* that will contain the + * offset, the amount of bytes , and the buffer in the form of a vector. + */ +void startReadSession(File file, size_t bytes, size_t offset, CkCallback ready); /** * Same as the above start session in function. However, there is an extra @arg @@ -290,6 +133,12 @@ void closeReadSession(Session read_session, CkCallback after_end); * */ void read(Session session, size_t bytes, size_t offset, char* data, CkCallback after_read); +void read(Session session, size_t bytes, size_t offset, CkCallback after_read, + size_t tag); + +// ZERO COPY READ; +void read(Session session, size_t bytes, size_t offset, CkCallback after_read, size_t tag, + char* user_buffer); class File { @@ -339,6 +188,7 @@ class Session friend void read(Session session, size_t bytes, size_t offset, char* data, CkCallback after_read); friend struct std::hash; + friend class FileReader; public: Session(int file_, size_t bytes_, size_t offset_, CkArrayID sessionID_) @@ -386,6 +236,46 @@ class ReadCompleteMsg : public CMessage_ReadCompleteMsg } }; +class FileReaderBuffer +{ + size_t _buff_capacity = 4096; // the size of the buffer array + size_t _buff_size = 0; // the number of valid elements in the array + ssize_t _offset = 0; // the offset byte + char* _buffer; + bool is_dirty = true; + +public: + FileReaderBuffer(); + FileReaderBuffer(size_t buff_capacity); + ~FileReaderBuffer(); + void setBuffer(size_t offset, size_t num_bytes, + char* data); // writes the data to the buffer + size_t getFromBuffer(size_t offset, size_t num_bytes, char* buffer); +}; + +class FileReader +{ + Session _session_token; + size_t _curr_pos = 0; + size_t _offset, _num_bytes; + bool _eofbit = false; + size_t _gcount = 0; + FileReaderBuffer _data_cache; + +public: + FileReader(Ck::IO::Session session); + /* In order to use the read functionality, make sure + * that this function is being called in a threaded entry method + * because it utilizes CkCallbackResumeThread() in order to overlay work + * correctly and block execution + * */ + FileReader& read(char* buffer, size_t num_bytes_to_read); + size_t tellg(); + FileReader& seekg(size_t pos); + bool eof(); + size_t gcount(); +}; + } // namespace IO } // namespace Ck From 5496d885f19ea490f2eebb89a24f3698fb261ce0 Mon Sep 17 00:00:00 2001 From: Maya Taylor Date: Wed, 3 Apr 2024 17:46:26 -0400 Subject: [PATCH 04/15] adding seekg and ! support to filereader --- src/libs/ck-libs/io/ckio.C | 147 ++++++++++++++++++++++++------------ src/libs/ck-libs/io/ckio.ci | 2 +- src/libs/ck-libs/io/ckio.h | 9 +++ 3 files changed, 107 insertions(+), 51 deletions(-) diff --git a/src/libs/ck-libs/io/ckio.C b/src/libs/ck-libs/io/ckio.C index 1ecf5a9da1..4d781d5c9f 100644 --- a/src/libs/ck-libs/io/ckio.C +++ b/src/libs/ck-libs/io/ckio.C @@ -280,6 +280,8 @@ private: public: ReadAssembler(Session session) { _session = session; } + ReadAssembler(CkMigrateMessage* m) : CBase_ReadAssembler(m) {} + /* * This function adds the read request to the _read_info_buffer table * which maps a tag to a ReadInfo struct @@ -308,6 +310,11 @@ public: return _curr_read_tag - 1; } + void pup(PUP::er& p) + { + // TODO: All files must be closed across checkpoint/restart + } + void removeEntryFromReadTable(int tag) { _read_info_buffer.erase(tag); } /** @@ -988,63 +995,103 @@ public: } int registerArray(CkArrayIndex& numElements, CkArrayID aid) { return 0; } - }; FileReader::FileReader(Ck::IO::Session session) : _session_token(session), _offset(session.offset), _num_bytes(session.bytes) {} FileReader& FileReader::read(char* buffer, size_t num_bytes_to_read){ if(_eofbit){ // no more bytes to read - _gcount = 0; - return *this; - } + _gcount = 0; + return *this; + } size_t amt_from_cache = _data_cache.getFromBuffer(_curr_pos, num_bytes_to_read, buffer); // get whatever data the cache has for us - _curr_pos += amt_from_cache; + _curr_pos += amt_from_cache; if(amt_from_cache == num_bytes_to_read){ - return *this; - } - size_t bytes_to_read_left = num_bytes_to_read - amt_from_cache; + return *this; + } + size_t bytes_to_read_left = num_bytes_to_read - amt_from_cache; size_t bytes_to_read = std::min(std::max(bytes_to_read_left, size_t(4096)), (_offset + _num_bytes - _curr_pos)); // if the read is too small, get more data to store in the buffer if(!bytes_to_read){ - return *this; - } + return *this; + } char* tmp_data_buff = new char[bytes_to_read]; // temporary buffer that will hold all the data - ReadCompleteMsg* read_msg; + ReadCompleteMsg* read_msg; Ck::IO::read(_session_token, bytes_to_read, _curr_pos, tmp_data_buff, CkCallbackResumeThread((void*&) read_msg)); - // below will not get executed until the read is done + // below will not get executed until the read is done size_t bytes_read = read_msg -> bytes; if(bytes_read > bytes_to_read_left){ // if I read more bytes than what was left to read, that means I have extra bytes that the buffer can use - _data_cache.setBuffer(_curr_pos, bytes_read, tmp_data_buff); - _curr_pos += bytes_to_read_left; - std::memcpy(buffer + amt_from_cache, tmp_data_buff, bytes_to_read_left); + _data_cache.setBuffer(_curr_pos, bytes_read, tmp_data_buff); + _curr_pos += bytes_to_read_left; + std::memcpy(buffer + amt_from_cache, tmp_data_buff, bytes_to_read_left); } else { - // too many bytes, nothing to actually cache - _curr_pos += bytes_read; - std::memcpy(buffer + amt_from_cache, tmp_data_buff, bytes_read); - } - delete[] tmp_data_buff; - if(_curr_pos >= (_offset + _num_bytes)) { - _eofbit = true; // ran out of data to read - _curr_pos = _offset + _num_bytes; - } - _gcount = std::min(bytes_read, bytes_to_read_left); - return *this; - } - -size_t FileReader::tellg() { return _curr_pos;} - -FileReader& FileReader::seekg(size_t pos){ - _curr_pos = pos; - if(_curr_pos < _offset){ - _curr_pos = _offset; - _eofbit = false; - } else if (_curr_pos >= (_offset + _num_bytes)) { - _curr_pos = _offset + _num_bytes; - _eofbit = true; - } - _eofbit = false; - return *this; + // too many bytes, nothing to actually cache + _curr_pos += bytes_read; + std::memcpy(buffer + amt_from_cache, tmp_data_buff, bytes_read); + } + delete[] tmp_data_buff; + if (_curr_pos >= (_offset + _num_bytes)) + { + _eofbit = true; // ran out of data to read + _curr_pos = _offset + _num_bytes; + } + _gcount = std::min(bytes_read, bytes_to_read_left); + return *this; +} + +// overload ! operator on filereader object +bool FileReader::operator!() const +{ + CkPrintf("In overwritten operator\n"); + return false; +} + +size_t FileReader::tellg() { return _curr_pos; } + +FileReader& FileReader::seekg(size_t pos, std::ios_base::seekdir dir) +{ + if (dir == std::ios_base::beg) + { + _curr_pos = pos + _offset; + } + else if (dir == std::ios_base::cur) + { + _curr_pos += pos; + } + else if (dir == std::ios_base::end) + { + _curr_pos = _offset + _num_bytes - pos; + } + + _eofbit = false; + if (_curr_pos < _offset) + { + _curr_pos = _offset; + _eofbit = false; + } + else if (_curr_pos >= (_offset + _num_bytes)) + { + _curr_pos = _offset + _num_bytes; + _eofbit = true; } + return *this; +} + +FileReader& FileReader::seekg(size_t pos) +{ + _curr_pos = pos; + if (_curr_pos < _offset) + { + _curr_pos = _offset; + _eofbit = false; + } + else if (_curr_pos >= (_offset + _num_bytes)) + { + _curr_pos = _offset + _num_bytes; + _eofbit = true; + } + _eofbit = false; + return *this; +} bool FileReader::eof() { return _eofbit;} @@ -1055,14 +1102,14 @@ FileReaderBuffer::FileReaderBuffer(){ } FileReaderBuffer::FileReaderBuffer(size_t buff_capacity){ - _buff_capacity = buff_capacity; - _buffer = new char[_buff_capacity]; + _buff_capacity = buff_capacity; + _buffer = new char[_buff_capacity]; } void FileReaderBuffer::setBuffer(size_t offset, size_t num_bytes, char* data){ - is_dirty = false; - _offset = offset; - _buff_size = std::min(_buff_capacity, num_bytes); + is_dirty = false; + _offset = offset; + _buff_size = std::min(_buff_capacity, num_bytes); std::memcpy(_buffer, data, _buff_size); // copy the first section of bytes } @@ -1070,10 +1117,10 @@ size_t FileReaderBuffer::getFromBuffer(size_t offset, size_t num_bytes, char* bu if(is_dirty || offset < _offset || offset >= (_offset + _buff_size)) { return 0; // the buffer has nothing of relevance - } - size_t cached_len = std::min(offset + num_bytes, _offset + _buff_size) - offset; - std::memcpy(buffer, _buffer + (offset - _offset), cached_len); - return cached_len; + } + size_t cached_len = std::min(offset + num_bytes, _offset + _buff_size) - offset; + std::memcpy(buffer, _buffer + (offset - _offset), cached_len); + return cached_len; } FileReaderBuffer::~FileReaderBuffer(){ diff --git a/src/libs/ck-libs/io/ckio.ci b/src/libs/ck-libs/io/ckio.ci index 3df960e375..1218c9a8c4 100644 --- a/src/libs/ck-libs/io/ckio.ci +++ b/src/libs/ck-libs/io/ckio.ci @@ -128,7 +128,7 @@ entry void addSessionReadAssemblerFinished(CkReductionMsg* msg); } // class tht will be used to assemble a specific read call -group ReadAssembler +group[migratable] ReadAssembler { // stores the parameters of the read call it is tasked with building entry ReadAssembler(Session session); diff --git a/src/libs/ck-libs/io/ckio.h b/src/libs/ck-libs/io/ckio.h index 6d44530e93..86c848bf68 100644 --- a/src/libs/ck-libs/io/ckio.h +++ b/src/libs/ck-libs/io/ckio.h @@ -261,8 +261,13 @@ class FileReader bool _eofbit = false; size_t _gcount = 0; FileReaderBuffer _data_cache; + bool _status = true; public: + std::ios_base::seekdir end = std::ios_base::end; + std::ios_base::seekdir cur = std::ios_base::cur; + std::ios_base::seekdir beg = std::ios_base::beg; + FileReader(Ck::IO::Session session); /* In order to use the read functionality, make sure * that this function is being called in a threaded entry method @@ -272,8 +277,12 @@ class FileReader FileReader& read(char* buffer, size_t num_bytes_to_read); size_t tellg(); FileReader& seekg(size_t pos); + FileReader& seekg(size_t pos, std::ios_base::seekdir dir); + bool eof(); size_t gcount(); + + bool operator!() const; }; } // namespace IO From e6ca7e117125b3e1cd7df4a5e8b4f196847eec6c Mon Sep 17 00:00:00 2001 From: mayantaylor Date: Wed, 3 Apr 2024 17:48:36 -0400 Subject: [PATCH 05/15] formatting --- src/libs/ck-libs/io/ckio.C | 71 +++++++++++++++++++++++--------------- 1 file changed, 43 insertions(+), 28 deletions(-) diff --git a/src/libs/ck-libs/io/ckio.C b/src/libs/ck-libs/io/ckio.C index 4d781d5c9f..e96484d728 100644 --- a/src/libs/ck-libs/io/ckio.C +++ b/src/libs/ck-libs/io/ckio.C @@ -996,34 +996,51 @@ public: int registerArray(CkArrayIndex& numElements, CkArrayID aid) { return 0; } }; -FileReader::FileReader(Ck::IO::Session session) : _session_token(session), _offset(session.offset), _num_bytes(session.bytes) {} +FileReader::FileReader(Ck::IO::Session session) + : _session_token(session), _offset(session.offset), _num_bytes(session.bytes) +{ +} -FileReader& FileReader::read(char* buffer, size_t num_bytes_to_read){ - if(_eofbit){ // no more bytes to read +FileReader& FileReader::read(char* buffer, size_t num_bytes_to_read) +{ + if (_eofbit) + { // no more bytes to read _gcount = 0; return *this; } - size_t amt_from_cache = _data_cache.getFromBuffer(_curr_pos, num_bytes_to_read, buffer); // get whatever data the cache has for us + size_t amt_from_cache = _data_cache.getFromBuffer( + _curr_pos, num_bytes_to_read, buffer); // get whatever data the cache has for us _curr_pos += amt_from_cache; - if(amt_from_cache == num_bytes_to_read){ + if (amt_from_cache == num_bytes_to_read) + { return *this; } size_t bytes_to_read_left = num_bytes_to_read - amt_from_cache; - size_t bytes_to_read = std::min(std::max(bytes_to_read_left, size_t(4096)), (_offset + _num_bytes - _curr_pos)); // if the read is too small, get more data to store in the buffer - if(!bytes_to_read){ + size_t bytes_to_read = std::min( + std::max(bytes_to_read_left, size_t(4096)), + (_offset + _num_bytes - + _curr_pos)); // if the read is too small, get more data to store in the buffer + if (!bytes_to_read) + { return *this; } - char* tmp_data_buff = new char[bytes_to_read]; // temporary buffer that will hold all the data + char* tmp_data_buff = + new char[bytes_to_read]; // temporary buffer that will hold all the data ReadCompleteMsg* read_msg; - Ck::IO::read(_session_token, bytes_to_read, _curr_pos, tmp_data_buff, CkCallbackResumeThread((void*&) read_msg)); + Ck::IO::read(_session_token, bytes_to_read, _curr_pos, tmp_data_buff, + CkCallbackResumeThread((void*&)read_msg)); // below will not get executed until the read is done - size_t bytes_read = read_msg -> bytes; - if(bytes_read > bytes_to_read_left){ - // if I read more bytes than what was left to read, that means I have extra bytes that the buffer can use + size_t bytes_read = read_msg->bytes; + if (bytes_read > bytes_to_read_left) + { + // if I read more bytes than what was left to read, that means I have extra bytes that + // the buffer can use _data_cache.setBuffer(_curr_pos, bytes_read, tmp_data_buff); _curr_pos += bytes_to_read_left; std::memcpy(buffer + amt_from_cache, tmp_data_buff, bytes_to_read_left); - } else { + } + else + { // too many bytes, nothing to actually cache _curr_pos += bytes_read; std::memcpy(buffer + amt_from_cache, tmp_data_buff, bytes_read); @@ -1093,40 +1110,38 @@ FileReader& FileReader::seekg(size_t pos) return *this; } -bool FileReader::eof() { return _eofbit;} +bool FileReader::eof() { return _eofbit; } size_t FileReader::gcount() { return _gcount; } -FileReaderBuffer::FileReaderBuffer(){ - _buffer = new char[_buff_capacity]; -} +FileReaderBuffer::FileReaderBuffer() { _buffer = new char[_buff_capacity]; } -FileReaderBuffer::FileReaderBuffer(size_t buff_capacity){ +FileReaderBuffer::FileReaderBuffer(size_t buff_capacity) +{ _buff_capacity = buff_capacity; _buffer = new char[_buff_capacity]; } -void FileReaderBuffer::setBuffer(size_t offset, size_t num_bytes, char* data){ +void FileReaderBuffer::setBuffer(size_t offset, size_t num_bytes, char* data) +{ is_dirty = false; _offset = offset; _buff_size = std::min(_buff_capacity, num_bytes); - std::memcpy(_buffer, data, _buff_size); // copy the first section of bytes + std::memcpy(_buffer, data, _buff_size); // copy the first section of bytes } -size_t FileReaderBuffer::getFromBuffer(size_t offset, size_t num_bytes, char* buffer){ - - if(is_dirty || offset < _offset || offset >= (_offset + _buff_size)) { - return 0; // the buffer has nothing of relevance +size_t FileReaderBuffer::getFromBuffer(size_t offset, size_t num_bytes, char* buffer) +{ + if (is_dirty || offset < _offset || offset >= (_offset + _buff_size)) + { + return 0; // the buffer has nothing of relevance } size_t cached_len = std::min(offset + num_bytes, _offset + _buff_size) - offset; std::memcpy(buffer, _buffer + (offset - _offset), cached_len); return cached_len; } -FileReaderBuffer::~FileReaderBuffer(){ - delete[] _buffer; -} - +FileReaderBuffer::~FileReaderBuffer() { delete[] _buffer; } } // namespace IO } // namespace Ck From 23be2c95185697e82726b2f912718ebd9a0d5b99 Mon Sep 17 00:00:00 2001 From: Mathew Jacob Date: Thu, 25 Apr 2024 02:37:29 -0500 Subject: [PATCH 06/15] added some more thorough documentation + removed the hardcoded 4096 in the FileReader when trying to select the optimal size for the Ck::IO::read call --- src/libs/ck-libs/io/ckio.C | 9 ++- src/libs/ck-libs/io/ckio.h | 112 +++++++++++++++++++++++++++++++++---- 2 files changed, 109 insertions(+), 12 deletions(-) diff --git a/src/libs/ck-libs/io/ckio.C b/src/libs/ck-libs/io/ckio.C index e96484d728..b7f38b26cc 100644 --- a/src/libs/ck-libs/io/ckio.C +++ b/src/libs/ck-libs/io/ckio.C @@ -1017,7 +1017,7 @@ FileReader& FileReader::read(char* buffer, size_t num_bytes_to_read) } size_t bytes_to_read_left = num_bytes_to_read - amt_from_cache; size_t bytes_to_read = std::min( - std::max(bytes_to_read_left, size_t(4096)), + std::max(bytes_to_read_left, _data_cache.capacity()), (_offset + _num_bytes - _curr_pos)); // if the read is too small, get more data to store in the buffer if (!bytes_to_read) @@ -1058,7 +1058,7 @@ FileReader& FileReader::read(char* buffer, size_t num_bytes_to_read) // overload ! operator on filereader object bool FileReader::operator!() const { - CkPrintf("In overwritten operator\n"); + // CkPrintf("In overwritten operator\n"); return false; } @@ -1105,6 +1105,7 @@ FileReader& FileReader::seekg(size_t pos) { _curr_pos = _offset + _num_bytes; _eofbit = true; + return *this; } _eofbit = false; return *this; @@ -1141,6 +1142,10 @@ size_t FileReaderBuffer::getFromBuffer(size_t offset, size_t num_bytes, char* bu return cached_len; } +size_t FileReaderBuffer::capacity() { + return _buff_capacity; +} + FileReaderBuffer::~FileReaderBuffer() { delete[] _buffer; } } // namespace IO diff --git a/src/libs/ck-libs/io/ckio.h b/src/libs/ck-libs/io/ckio.h index 86c848bf68..c30ba3df15 100644 --- a/src/libs/ck-libs/io/ckio.h +++ b/src/libs/ck-libs/io/ckio.h @@ -235,7 +235,17 @@ class ReadCompleteMsg : public CMessage_ReadCompleteMsg { } }; - +/** + * This is used by the FileReader in order to try to minimize + * the number of networks calls made during a read. Instead + * of calling Ck::IO::read repeatedly and each call has only a + * small amount of data, the FileReader will make a Ck::IO::read + * call with a larger amount of data and store that data in the + * FileReaderBuffer. This way, if the FileReader is making small + * read calls, the data will hopefully already be in the buffer, + * which prevents superfluous messages. This is NOT a user facing + * class and should not be used by the user. + */ class FileReaderBuffer { size_t _buff_capacity = 4096; // the size of the buffer array @@ -248,11 +258,44 @@ class FileReaderBuffer FileReaderBuffer(); FileReaderBuffer(size_t buff_capacity); ~FileReaderBuffer(); + /** + * Copies the @arg data into the head of _buffer + * until the _buffer is full or data has been fully copied. + * Will also set _buff_size to the number of bytes that was + * copied. + * + * @arg offset: the offset in the file the @arg data arguments are from. + * @arg num_bytes: the length of @arg data + * @arg data: the array with the data to be put into the FileReaderBuffer + */ void setBuffer(size_t offset, size_t num_bytes, char* data); // writes the data to the buffer + /** + * This data checks whether, given a request specified by @arg offset + * and @arg num_bytes, can use some of its cached data to fulfill the + * request. This method changes the @arg buffer. + * + * @arg offset: the offset in the session the read request is. + * @arg num_bytes: the number of bytes of the read request. + * @arg buffer: the address of the buffer the read will go; + * this method will write to that address. + */ size_t getFromBuffer(size_t offset, size_t num_bytes, char* buffer); + /** + * Returns the capacity of the internal buffer. + * @return size_t: the total capacity of _buffer. + */ + size_t capacity(); }; - +/** + * The Ck:IO equivalent to std::ifstream. If the user + * doesn't want to write callbacks after a lot of reads, + * or the user is making a series of very small sequential + * reads, this abstraction will make it very easy. FileReader + * uses caching in order to try and minimize the number of + * extraneous network calls made during a series of read requests. + * This class should be used in threaded entry methods. + */ class FileReader { Session _session_token; @@ -267,21 +310,70 @@ class FileReader std::ios_base::seekdir end = std::ios_base::end; std::ios_base::seekdir cur = std::ios_base::cur; std::ios_base::seekdir beg = std::ios_base::beg; - + /** + * @arg Session: the session token the FileReader will use + */ FileReader(Ck::IO::Session session); - /* In order to use the read functionality, make sure - * that this function is being called in a threaded entry method - * because it utilizes CkCallbackResumeThread() in order to overlay work - * correctly and block execution - * */ + /** + * Perform a request of size @arg num_bytes_to_read, with + * an offset of wherever the FileReader is in the stream. + * It will write the result to @arg buffer. + * + * @arg buffer: the location where the read will be written to + * @arg num_bytes_to_read: the number of bytes to read + */ FileReader& read(char* buffer, size_t num_bytes_to_read); + /** + * Returns the current position in the file the FileReader + * is i.e te next byte the read will start. + * + * @return size_t: the position the FileReader is at in the + * file + */ size_t tellg(); + /** + * Seeks to a position in the file for the FileReader from the + * beginning of the read session. If the seek goes beyond the end of + * the read session, it will set the internal position to be one byte + * further than the end of session and the eof flag will be set. + * + * @arg pos: the position in the session wrt the beginning of + * the session to seek to. + */ FileReader& seekg(size_t pos); + /** + * Seeks to a position in the file for the FileReader wrt the + * @arg dir specifies. If the seek goes beyond the end of the + * read session, it will set the internal position to be one byte + * further than the end of session and the eof flag will be set. + * + * @arg pos: the position in the session wrt what @arg dir + * the session to seek to. + * @arg dir: Where to seek with respect to. If dir=std::ios_base::beg, + * then it is with respect to the beginning of the file. If + * dir=std::ios_base::cur, it is with respect to the current + * position of the FileReader. If dir=std::ios_base_end, then + * it is with respect to the end of the stream. + */ FileReader& seekg(size_t pos, std::ios_base::seekdir dir); - + /** + * Returns whether the FileReader is at the end of the session. + * + * @return bool: whether the FileReader is at end of session. + */ bool eof(); + /** + * Returns the number of bytes the last read did. + * @return size_t: the number of bytes the last read call did. + */ size_t gcount(); - + /** + * Will return true if the FileReader is on a bad file. + * Currently this always returns false because we assume + * that the Session points to a good file. + * + * @return bool: false + */ bool operator!() const; }; From 9f1bba253ae979cbd7f402aea1b0002e71693cc4 Mon Sep 17 00:00:00 2001 From: Mathew Jacob Date: Thu, 25 Apr 2024 02:51:00 -0500 Subject: [PATCH 07/15] Added more tests for the FileReader functionality. --- tests/charm++/io_read/iotest.C | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/charm++/io_read/iotest.C b/tests/charm++/io_read/iotest.C index c34e3c3ac8..d5f762d6c8 100644 --- a/tests/charm++/io_read/iotest.C +++ b/tests/charm++/io_read/iotest.C @@ -93,8 +93,23 @@ public: Ck::IO::FileReader fr(token); fr.seekg(bytesToRead * thisIndex); // seek to the correct place in the file fr.read(file_reader_buffer, size); // hopefully this will return the same data as Ck::IO::read + CkAssert(fr.gcount() == size); // makes sure that the gcount is correct + CkAssert(fr.tellg() == (size + bytesToRead * thisIndex)); // make sure that the tellg points to the correct place in the stream CkPrintf("the FileReader::read function on tester[%d] is done with first character=%c\n", thisIndex, file_reader_buffer[0]); Ck::IO::read(token, bytesToRead, bytesToRead * thisIndex, dataBuffer, sessionEnd); + // test that the eof works + size_t og_pos= fr.tellg(); + fr.seekg(100000000000000); + CkAssert(fr.eof()); + fr.seekg(og_pos); + CkAssert(fr.eof() == false); + fr.seekg(1, std::ios_base::cur); + CkAssert(fr.tellg() == og_pos + 1); // test that the seekg with different offset worked + fr.seekg(0, std::ios_base::end); + CkAssert(fr.eof()); // seeked to the end of file, make sure that the flag is on + fr.seekg(og_pos, std::ios_base::beg); + CkAssert(fr.tellg() == og_pos); + CkAssert(fr.eof() == false); } Test(CkMigrateMessage* m) {} From 261449beb541ed928276b7b63fb0e95375495254 Mon Sep 17 00:00:00 2001 From: Mathew Jacob Date: Thu, 25 Apr 2024 16:53:15 -0500 Subject: [PATCH 08/15] updated git ignore --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index 88faa0e2b8..d3a9ddb12a 100644 --- a/.gitignore +++ b/.gitignore @@ -50,3 +50,6 @@ charmrun ampirun pgm *.swp + +#Ignore the generated headers dir +src/libs/ck-libs/io/headers# From 86329c6fc57e63a2d23c7c27e2074bb98d77e876 Mon Sep 17 00:00:00 2001 From: Mathew Jacob Date: Thu, 25 Apr 2024 17:50:25 -0500 Subject: [PATCH 09/15] tried to fix a test... --- tests/charm++/io_read/iotest.C | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/charm++/io_read/iotest.C b/tests/charm++/io_read/iotest.C index d5f762d6c8..9577389f92 100644 --- a/tests/charm++/io_read/iotest.C +++ b/tests/charm++/io_read/iotest.C @@ -101,8 +101,9 @@ public: size_t og_pos= fr.tellg(); fr.seekg(100000000000000); CkAssert(fr.eof()); - fr.seekg(og_pos); + fr.seekg(10); CkAssert(fr.eof() == false); + fr.seekg(og_pos); fr.seekg(1, std::ios_base::cur); CkAssert(fr.tellg() == og_pos + 1); // test that the seekg with different offset worked fr.seekg(0, std::ios_base::end); From 671a1c958ef5ab369d50247ed97cbf77606d0569 Mon Sep 17 00:00:00 2001 From: Mathew Jacob Date: Thu, 25 Apr 2024 19:06:26 -0500 Subject: [PATCH 10/15] made a new function to test the individual functions of FileReader. --- tests/charm++/io_read/iotest.C | 38 ++++++++++++---------------------- 1 file changed, 13 insertions(+), 25 deletions(-) diff --git a/tests/charm++/io_read/iotest.C b/tests/charm++/io_read/iotest.C index 9577389f92..0bfb427e77 100644 --- a/tests/charm++/io_read/iotest.C +++ b/tests/charm++/io_read/iotest.C @@ -57,23 +57,6 @@ public: CkPrintf("Inside the constructor of tester %d\n", thisIndex); _fname = filename; thisProxy[thisIndex].testMethod(token, bytesToRead); - //CkCallback sessionEnd(CkIndex_Test::readDone(0), thisProxy[thisIndex]); - //try - //{ - // dataBuffer = new char[bytesToRead]; - // file_reader_buffer = new char[bytesToRead]; - //} - //catch (const std::bad_alloc& e) - //{ - // CkPrintf("ERROR: Data buffer malloc of %zu bytes in Test chare %d failed.\n", - // bytesToRead, thisIndex); - // CkExit(); - //} - //size = bytesToRead; - //Ck::IO::FileReader fr(token); - //fr.seekg(bytesToRead * thisIndex); // seek to the correct place in the file - //fr.read(file_reader_buffer, size); // hopefully this will return the same data as Ck::IO::read - //Ck::IO::read(token, bytesToRead, bytesToRead * thisIndex, dataBuffer, sessionEnd); } void testMethod(Ck::IO::Session token, size_t bytesToRead){ @@ -98,21 +81,26 @@ public: CkPrintf("the FileReader::read function on tester[%d] is done with first character=%c\n", thisIndex, file_reader_buffer[0]); Ck::IO::read(token, bytesToRead, bytesToRead * thisIndex, dataBuffer, sessionEnd); // test that the eof works - size_t og_pos= fr.tellg(); - fr.seekg(100000000000000); + testFileReader(fr); + } + + void testFileReader(Ck::IO::FileReader& fr){ + size_t og_pos = fr.tellg(); + fr.seekg(100000000000000); // way beyond the bounds of read session, should trigger eof CkAssert(fr.eof()); - fr.seekg(10); + fr.seekg(5); CkAssert(fr.eof() == false); - fr.seekg(og_pos); fr.seekg(1, std::ios_base::cur); - CkAssert(fr.tellg() == og_pos + 1); // test that the seekg with different offset worked + CkAssert(fr.tellg() == 6); // test that the seekg with different offset worked fr.seekg(0, std::ios_base::end); CkAssert(fr.eof()); // seeked to the end of file, make sure that the flag is on - fr.seekg(og_pos, std::ios_base::beg); - CkAssert(fr.tellg() == og_pos); - CkAssert(fr.eof() == false); + fr.seekg(6, std::ios_base::beg); + CkAssert(fr.tellg() == 6); + fr.seekg(og_pos); } + + Test(CkMigrateMessage* m) {} void readDone(Ck::IO::ReadCompleteMsg* m) From cf972b5a9094b2f7766cda098adfa2ff763fb8ca Mon Sep 17 00:00:00 2001 From: Mathew Jacob Date: Thu, 25 Apr 2024 20:56:23 -0500 Subject: [PATCH 11/15] changed FileReaderBuffer::_offset to be size_t instead of ssize_t because ssize_t may not be defined on windows. May want to explore the possibility of switching to signed types in the future but not right now. --- src/libs/ck-libs/io/ckio.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libs/ck-libs/io/ckio.h b/src/libs/ck-libs/io/ckio.h index c30ba3df15..3891952f08 100644 --- a/src/libs/ck-libs/io/ckio.h +++ b/src/libs/ck-libs/io/ckio.h @@ -250,7 +250,7 @@ class FileReaderBuffer { size_t _buff_capacity = 4096; // the size of the buffer array size_t _buff_size = 0; // the number of valid elements in the array - ssize_t _offset = 0; // the offset byte + size_t _offset = 0; // the offset byte char* _buffer; bool is_dirty = true; From 1f1361a7d5c0b092336e40821569779b4aa37738 Mon Sep 17 00:00:00 2001 From: mayantaylor Date: Tue, 30 Apr 2024 08:12:23 -0500 Subject: [PATCH 12/15] formatting and fixing print types --- tests/charm++/io_read/iotest.C | 54 +++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 24 deletions(-) diff --git a/tests/charm++/io_read/iotest.C b/tests/charm++/io_read/iotest.C index 0bfb427e77..3eb6f91213 100644 --- a/tests/charm++/io_read/iotest.C +++ b/tests/charm++/io_read/iotest.C @@ -59,7 +59,8 @@ public: thisProxy[thisIndex].testMethod(token, bytesToRead); } - void testMethod(Ck::IO::Session token, size_t bytesToRead){ + void testMethod(Ck::IO::Session token, size_t bytesToRead) + { CkCallback sessionEnd(CkIndex_Test::readDone(0), thisProxy[thisIndex]); try { @@ -74,33 +75,38 @@ public: } size = bytesToRead; Ck::IO::FileReader fr(token); - fr.seekg(bytesToRead * thisIndex); // seek to the correct place in the file - fr.read(file_reader_buffer, size); // hopefully this will return the same data as Ck::IO::read - CkAssert(fr.gcount() == size); // makes sure that the gcount is correct - CkAssert(fr.tellg() == (size + bytesToRead * thisIndex)); // make sure that the tellg points to the correct place in the stream - CkPrintf("the FileReader::read function on tester[%d] is done with first character=%c\n", thisIndex, file_reader_buffer[0]); + fr.seekg(bytesToRead * thisIndex); // seek to the correct place in the file + fr.read(file_reader_buffer, + size); // hopefully this will return the same data as Ck::IO::read + CkAssert(fr.gcount() == size); // makes sure that the gcount is correct + CkAssert(fr.tellg() == + (size + bytesToRead * thisIndex)); // make sure that the tellg points to the + // correct place in the stream + CkPrintf( + "the FileReader::read function on tester[%d] is done with first character=%c\n", + thisIndex, file_reader_buffer[0]); Ck::IO::read(token, bytesToRead, bytesToRead * thisIndex, dataBuffer, sessionEnd); - // test that the eof works - testFileReader(fr); + // test that the eof works + testFileReader(fr); } - void testFileReader(Ck::IO::FileReader& fr){ - size_t og_pos = fr.tellg(); - fr.seekg(100000000000000); // way beyond the bounds of read session, should trigger eof - CkAssert(fr.eof()); - fr.seekg(5); - CkAssert(fr.eof() == false); - fr.seekg(1, std::ios_base::cur); - CkAssert(fr.tellg() == 6); // test that the seekg with different offset worked - fr.seekg(0, std::ios_base::end); - CkAssert(fr.eof()); // seeked to the end of file, make sure that the flag is on - fr.seekg(6, std::ios_base::beg); - CkAssert(fr.tellg() == 6); - fr.seekg(og_pos); + void testFileReader(Ck::IO::FileReader& fr) + { + size_t og_pos = fr.tellg(); + fr.seekg( + 100000000000000); // way beyond the bounds of read session, should trigger eof + CkAssert(fr.eof()); + fr.seekg(5); + CkAssert(fr.eof() == false); + fr.seekg(1, std::ios_base::cur); + CkAssert(fr.tellg() == 6); // test that the seekg with different offset worked + fr.seekg(0, std::ios_base::end); + CkAssert(fr.eof()); // seeked to the end of file, make sure that the flag is on + fr.seekg(6, std::ios_base::beg); + CkAssert(fr.tellg() == 6); + fr.seekg(og_pos); } - - Test(CkMigrateMessage* m) {} void readDone(Ck::IO::ReadCompleteMsg* m) @@ -127,7 +133,7 @@ public: if (verify_buffer[i] != dataBuffer[i]) { CkPrintf( - "From reader %d, offset=%d, bytes=%d, verify_buuffer[%d]=%c, " + "From reader %d, offset=%zu, bytes=%zu, verify_buuffer[%d]=%c, " "dataBuffer[%d]=%c\n", thisIndex, (m->offset), (m->bytes), i, verify_buffer[i], i, dataBuffer[i]); } From d8364aa77a7e158803616fd990fc2792eb2685d4 Mon Sep 17 00:00:00 2001 From: mayantaylor Date: Tue, 30 Apr 2024 08:15:50 -0500 Subject: [PATCH 13/15] adding comment --- tests/charm++/io_read/iotest.C | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/charm++/io_read/iotest.C b/tests/charm++/io_read/iotest.C index 3eb6f91213..7aa60adf7f 100644 --- a/tests/charm++/io_read/iotest.C +++ b/tests/charm++/io_read/iotest.C @@ -73,6 +73,8 @@ public: bytesToRead, thisIndex); CkExit(); } + + // setup and read using Ck::IO::FileReader size = bytesToRead; Ck::IO::FileReader fr(token); fr.seekg(bytesToRead * thisIndex); // seek to the correct place in the file @@ -85,9 +87,11 @@ public: CkPrintf( "the FileReader::read function on tester[%d] is done with first character=%c\n", thisIndex, file_reader_buffer[0]); - Ck::IO::read(token, bytesToRead, bytesToRead * thisIndex, dataBuffer, sessionEnd); - // test that the eof works + testFileReader(fr); + + // read using plain Ck::IO::Read + Ck::IO::read(token, bytesToRead, bytesToRead * thisIndex, dataBuffer, sessionEnd); } void testFileReader(Ck::IO::FileReader& fr) From bcbc757b6b638494c0b210924bdecc8609cc6c9c Mon Sep 17 00:00:00 2001 From: mayantaylor Date: Tue, 30 Apr 2024 08:34:08 -0500 Subject: [PATCH 14/15] adding FileReader to the docs --- doc/libraries/manual.rst | 66 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 65 insertions(+), 1 deletion(-) diff --git a/doc/libraries/manual.rst b/doc/libraries/manual.rst index 7908b9c0bc..c60680ce36 100644 --- a/doc/libraries/manual.rst +++ b/doc/libraries/manual.rst @@ -989,10 +989,74 @@ The following functions comprise the interface to the library for parallel file the ``FileReadyMsg`` sent to the ``opened`` callback after a file has been opened. This method should only be called from a single PE, once per file. +FileReader API +-------------- + +The FileReader API is an additional abstraction layer built on top of Ck::IO to support +streaming reads from a file and implement the callback internally. This API is designed to +match that of the c++ std::ifstream. Under the hood, when an application reads a small +number of bytes via a FileReader object, +the Buffer Chare will send a large chunk of data to the FileReader which can be buffered there +until the application requests more. + +- Creating a FileReader object: + + .. code-block:: c++ + + FileReader::FileReader(Ck::IO::Session session) + + Before creating a FileReader, the Ck::IO::Session must be created (see above). This session is + passed in to the FileReader constructor. + +- Reading data: + + .. code-block:: c++ + + FileReader& FileReader::read(char* buffer, size_t num_bytes_to_read) + + Read the specified number of bytes from the file opened in the session. The data is read into the buffer. + This method is blocking and returns a pointer to the FileReader object. + +- Seeking: + + There are two functions for seeking in the file, one for seeking from the current position, and one for seeking from a set position (like the end, or the beginning). + + .. code-block:: c++ + + FileReader& FileReader::seekg(size_t pos) + + .. code-block:: c++ + + FileReader& FileReader::seekg(size_t pos, std::ios_base::seekdir dir) + + The options for std::ios_base::seekdir are std::ios_base::beg, std::ios_base::cur, and std::ios_base::end. + +- Tell functionality: + + .. code-block:: c++ + + size_t FileReader::tellg() + + This function returns the current position in the file. + +- End of file and gcount: + + .. code-block:: c++ + + bool FileReader::eof() + + This function returns true if the end of the file has been reached and false otherwise. + + .. code-block:: c++ + + size_t FileReader::gcount() + + This function returns the number of bytes read by the last read operation. + Examples -------- For example code showing how to use CkIO for output, see ``tests/charm++/io/``. -For example code showing how to use CkIO for input, see ``tests/charm++/io_read/``. +For example code showing how to use CkIO for input (including FileReader), see ``tests/charm++/io_read/``. From cc4cf1aa0b0405d94d3ca1c516bfa5abadba10c0 Mon Sep 17 00:00:00 2001 From: mayantaylor Date: Tue, 30 Apr 2024 09:41:55 -0500 Subject: [PATCH 15/15] fixing docs typo for ckio read --- doc/libraries/manual.rst | 2 +- src/libs/ck-libs/io/ckio.h | 38 +++++++++++++++++++------------------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/doc/libraries/manual.rst b/doc/libraries/manual.rst index c60680ce36..507ed9a17e 100644 --- a/doc/libraries/manual.rst +++ b/doc/libraries/manual.rst @@ -974,7 +974,7 @@ The following functions comprise the interface to the library for parallel file This method is invoked to read data asynchronously from the read session. This method returns immediately to the caller, but the read is only guaranteed complete once the callback ``after_read`` is called. Internally, the read request is buffered until the Buffer Chares can respond with the requested data. After the read finishes, the - after_read callback is invoked taking a ReadCompleteMsg* which points to a vector buffer, the offset, + after_read callback is invoked taking a ReadCompleteMsg* which points to a char* buffer, the offset, and the number of bytes of the read. diff --git a/src/libs/ck-libs/io/ckio.h b/src/libs/ck-libs/io/ckio.h index 3891952f08..f301f7a4b0 100644 --- a/src/libs/ck-libs/io/ckio.h +++ b/src/libs/ck-libs/io/ckio.h @@ -128,7 +128,7 @@ void closeReadSession(Session read_session, CkCallback after_end); /** * Is a method that reads data from the @arg session of length @arg bytes at offset * @arg offset (in file). After this read finishes, the @arg after_read callback is - * invoked, taking a ReadCompleteMsg* which points to a vector buffer, the offset, + * invoked, taking a ReadCompleteMsg* which points to a char* buffer, the offset, * and the number of bytes of the read. * */ void read(Session session, size_t bytes, size_t offset, char* data, @@ -238,19 +238,19 @@ class ReadCompleteMsg : public CMessage_ReadCompleteMsg /** * This is used by the FileReader in order to try to minimize * the number of networks calls made during a read. Instead - * of calling Ck::IO::read repeatedly and each call has only a + * of calling Ck::IO::read repeatedly and each call has only a * small amount of data, the FileReader will make a Ck::IO::read - * call with a larger amount of data and store that data in the + * call with a larger amount of data and store that data in the * FileReaderBuffer. This way, if the FileReader is making small * read calls, the data will hopefully already be in the buffer, * which prevents superfluous messages. This is NOT a user facing - * class and should not be used by the user. + * class and should not be used by the user. */ class FileReaderBuffer { size_t _buff_capacity = 4096; // the size of the buffer array size_t _buff_size = 0; // the number of valid elements in the array - size_t _offset = 0; // the offset byte + size_t _offset = 0; // the offset byte char* _buffer; bool is_dirty = true; @@ -270,9 +270,9 @@ class FileReaderBuffer */ void setBuffer(size_t offset, size_t num_bytes, char* data); // writes the data to the buffer - /** + /** * This data checks whether, given a request specified by @arg offset - * and @arg num_bytes, can use some of its cached data to fulfill the + * and @arg num_bytes, can use some of its cached data to fulfill the * request. This method changes the @arg buffer. * * @arg offset: the offset in the session the read request is. @@ -291,10 +291,10 @@ class FileReaderBuffer * The Ck:IO equivalent to std::ifstream. If the user * doesn't want to write callbacks after a lot of reads, * or the user is making a series of very small sequential - * reads, this abstraction will make it very easy. FileReader - * uses caching in order to try and minimize the number of + * reads, this abstraction will make it very easy. FileReader + * uses caching in order to try and minimize the number of * extraneous network calls made during a series of read requests. - * This class should be used in threaded entry methods. + * This class should be used in threaded entry methods. */ class FileReader { @@ -316,8 +316,8 @@ class FileReader FileReader(Ck::IO::Session session); /** * Perform a request of size @arg num_bytes_to_read, with - * an offset of wherever the FileReader is in the stream. - * It will write the result to @arg buffer. + * an offset of wherever the FileReader is in the stream. + * It will write the result to @arg buffer. * * @arg buffer: the location where the read will be written to * @arg num_bytes_to_read: the number of bytes to read @@ -327,26 +327,26 @@ class FileReader * Returns the current position in the file the FileReader * is i.e te next byte the read will start. * - * @return size_t: the position the FileReader is at in the + * @return size_t: the position the FileReader is at in the * file */ size_t tellg(); /** - * Seeks to a position in the file for the FileReader from the + * Seeks to a position in the file for the FileReader from the * beginning of the read session. If the seek goes beyond the end of * the read session, it will set the internal position to be one byte * further than the end of session and the eof flag will be set. - * + * * @arg pos: the position in the session wrt the beginning of * the session to seek to. */ FileReader& seekg(size_t pos); /** - * Seeks to a position in the file for the FileReader wrt the - * @arg dir specifies. If the seek goes beyond the end of the + * Seeks to a position in the file for the FileReader wrt the + * @arg dir specifies. If the seek goes beyond the end of the * read session, it will set the internal position to be one byte * further than the end of session and the eof flag will be set. - * + * * @arg pos: the position in the session wrt what @arg dir * the session to seek to. * @arg dir: Where to seek with respect to. If dir=std::ios_base::beg, @@ -368,7 +368,7 @@ class FileReader */ size_t gcount(); /** - * Will return true if the FileReader is on a bad file. + * Will return true if the FileReader is on a bad file. * Currently this always returns false because we assume * that the Session points to a good file. *