NCBI C++ ToolKit
transmissionrw.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: transmissionrw.cpp 86060 2019-04-02 19:20:44Z sadyrovr $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author: Anatoliy Kuznetsov
27  *
28  * File Description: Transmission control.
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 #include <corelib/ncbidbg.hpp>
34 #include <corelib/ncbi_bswap.hpp>
35 #include <corelib/ncbistr.hpp>
36 #include <util/util_exception.hpp>
37 #include <util/transmissionrw.hpp>
38 
40 
41 static const Uint4 sStartWord = 0x01020304;
42 static const Uint4 sEndPacket = 0xFFFFFFFF;
43 const size_t kLengthSize = sizeof(Uint4);
44 
46  EOwnership own_writer,
47  ESendEofPacket send_eof)
48  : m_Wrt(wrt),
49  m_OwnWrt(own_writer),
50  m_SendEof(send_eof),
51  m_PacketBytesToWrite(0)
52 {
53  _ASSERT(wrt);
54 
56  if (res != eRW_Success) {
57  NCBI_THROW(CIOException, eWrite, "Cannot write the byte order");
58  }
59 }
60 
61 
62 namespace {
63 class CIOBytesCountGuard
64 {
65 public:
66  size_t count;
67  CIOBytesCountGuard(size_t* ret)
68  : count(0), m_Ret(ret)
69  {}
70 
71  ~CIOBytesCountGuard() { if (m_Ret) *m_Ret = count; }
72 private:
73  size_t* m_Ret;
74 };
75 } // namespace
76 
78  size_t count,
79  size_t* bytes_written)
80 {
81  CIOBytesCountGuard wrt(bytes_written);
82 
83  if (count >= sEndPacket) {
84  // The buffer cannot fit in one packet -- split it.
85  count = (size_t) 0x80008000LU;
86  }
87 
88  if (!m_PacketBytesToWrite) {
89  Uint4 cnt = (Uint4) count;
90  ERW_Result res = WriteUint4(cnt);
91  if (res != eRW_Success)
92  return res;
94  }
95 
96  ERW_Result res = m_Wrt->Write(buf, m_PacketBytesToWrite, &wrt.count);
97  if (res == eRW_Success) m_PacketBytesToWrite -= wrt.count;
98  return res;
99 }
100 
102 {
103  return m_Wrt->Flush();
104 }
105 
107 {
108  if (m_PacketBytesToWrite) {
109  return eRW_Error;
110  }
111 
112  if (m_SendEof != eSendEofPacket)
113  return eRW_Success;
114 
116 
117  return WriteUint4(sEndPacket);
118 }
119 
121 {
122  Close();
123 
124  if (m_OwnWrt)
125  delete m_Wrt;
126 }
127 
129 {
130  auto data = reinterpret_cast<const char*>(&value);
131  size_t written;
132  ERW_Result rv = eRW_Success;
133 
134  for (auto to_write = kLengthSize; (rv == eRW_Success) && to_write; to_write -= written) {
135  rv = m_Wrt->Write(data, to_write, &written);
136  data += written;
137  }
138 
139  return rv;
140 }
141 
142 template <typename TType>
143 inline const TType* CTransmissionReader::SReadData::Data() const
144 {
145  return reinterpret_cast<const TType*>(data() + m_Start);
146 }
147 
149 {
150  return m_End - m_Start;
151 }
152 
154 {
155  return data() + m_End;
156 }
157 
159 {
160  return size() - m_End;
161 }
162 
163 inline void CTransmissionReader::SReadData::Add(size_t count)
164 {
165  m_End += count;
166  _ASSERT(m_End <= size());
167 }
168 
169 inline void CTransmissionReader::SReadData::Remove(size_t count)
170 {
171  m_Start += count;
172  if (DataSize() == 0) m_Start = m_End = 0;
173  _ASSERT(m_Start < size());
174 }
175 
177  : m_Rdr(rdr),
178  m_OwnRdr(own_reader),
180  m_ByteSwap(false),
182 {
183 }
184 
185 
187 {
188  if (m_OwnRdr) {
189  delete m_Rdr;
190  }
191 }
192 
193 
195 {
196  size_t read;
197 
198  auto res = m_Rdr->Read(m_ReadData.Space(), m_ReadData.SpaceSize(), &read);
199  if (res == eRW_Success) m_ReadData.Add(read);
200 
201  return res;
202 }
203 
204 
206  size_t count,
207  size_t* bytes_read)
208 {
209  const size_t kMinReadSize = 32 * 1024;
210 
211  CIOBytesCountGuard read(bytes_read);
212 
213  if (!m_StartRead) {
214  auto res = x_ReadStart();
215  if (res != eRW_Success) return res;
216  }
217 
218  // read packet header
219  while (m_PacketBytesToRead == 0) {
220  auto res = ReadLength(m_PacketBytesToRead);
221  if (res != eRW_Success) return res;
222  }
223 
224  if (m_PacketBytesToRead == sEndPacket) return eRW_Eof;
225 
226  size_t to_read = count < m_PacketBytesToRead ? count : m_PacketBytesToRead;
227  size_t already_read = m_ReadData.DataSize();
228 
229  if (!already_read) {
230  // If chunk to read is big enough, read directly in the buf
231  if (to_read >= kMinReadSize) {
232  auto res = m_Rdr->Read(buf, to_read, &read.count);
233  if (res == eRW_Success) m_PacketBytesToRead -= static_cast<Uint4>(read.count);
234  return res;
235  }
236 
237  auto res = ReadData();
238  if (res != eRW_Success) return res;
239 
240  already_read = m_ReadData.DataSize();
241  }
242 
243  if (already_read) {
244  read.count = min(to_read, already_read);
245  copy_n(m_ReadData.Data(), read.count, static_cast<char*>(buf));
246  m_ReadData.Remove(read.count);
247  }
248 
249  m_PacketBytesToRead -= static_cast<Uint4>(read.count);
250  return eRW_Success;
251 }
252 
253 
255 {
256  return m_Rdr->PendingCount(count);
257 }
258 
259 
261 {
263 
264  m_StartRead = true;
265 
266  Uint4 start_word_coming;
267 
268  auto res = ReadLength(start_word_coming);
269  if (res != eRW_Success) return res;
270 
271  m_ByteSwap = (start_word_coming != sStartWord);
272 
273  if (start_word_coming != sStartWord &&
274  start_word_coming != 0x04030201)
275  NCBI_THROW(CUtilException, eWrongData,
276  "Cannot determine the byte order. Got: "
277  + NStr::UIntToString(start_word_coming, 0, 16));
278 
279  return eRW_Success;
280 }
281 
282 
284 {
285  while (m_ReadData.DataSize() < kLengthSize) {
286  auto res = ReadData();
287  if (res != eRW_Success) return res;
288  }
289 
290  if (m_ByteSwap) {
291  length = CByteSwap::GetInt4(m_ReadData.Data<unsigned char>());
292  } else {
293  length = *m_ReadData.Data<Uint4>();
294  }
295 
297  return eRW_Success;
298 }
299 
300 
#define false
Definition: bool.h:36
static Int4 GetInt4(const unsigned char *ptr)
Definition: ncbi_bswap.hpp:121
virtual ~CTransmissionReader()
ERW_Result x_ReadStart()
ERW_Result ReadLength(Uint4 &length)
virtual ERW_Result PendingCount(size_t *count)
Via parameter "count" (which is guaranteed to be supplied non-NULL) return the number of bytes that a...
virtual ERW_Result Read(void *buf, size_t count, size_t *bytes_read=0)
Read as many as "count" bytes into a buffer pointed to by the "buf" argument.
CTransmissionReader(IReader *rdr, EOwnership own_reader=eNoOwnership)
Constructed on another IReader (supposed to implement the actual transmission)
CTransmissionWriter(IWriter *wrt, EOwnership own_writer=eNoOwnership, ESendEofPacket send_eof=eDontSendEofPacket)
Constructed on another IWriter (comm.
ERW_Result Close(void)
ERW_Result WriteUint4(const Uint4 &value)
virtual ERW_Result Flush(void)
Flush pending data (if any) down to the output device.
virtual ERW_Result Write(const void *buf, size_t count, size_t *bytes_written=0)
Write up to "count" bytes from the buffer pointed to by the "buf" argument onto the output device.
ESendEofPacket m_SendEof
virtual ~CTransmissionWriter()
@ eDontSendEofPacket
Writer will not send EOF packet in the destructor.
@ eSendEofPacket
Writer will send EOF packet in the destructor.
A very basic data-read interface.
A very basic data-write interface.
char value[7]
Definition: config.c:431
The NCBI C++ standard methods for dealing with std::string.
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
uint32_t Uint4
4-byte (32-bit) unsigned integer
Definition: ncbitype.h:103
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
ERW_Result
Result codes for I/O operations.
virtual ERW_Result Flush(void)=0
Flush pending data (if any) down to the output device.
virtual ERW_Result PendingCount(size_t *count)=0
Via parameter "count" (which is guaranteed to be supplied non-NULL) return the number of bytes that a...
virtual ERW_Result Write(const void *buf, size_t count, size_t *bytes_written=0)=0
Write up to "count" bytes from the buffer pointed to by the "buf" argument onto the output device.
virtual ERW_Result Read(void *buf, size_t count, size_t *bytes_read=0)=0
Read as many as "count" bytes into a buffer pointed to by the "buf" argument.
@ eRW_Eof
End of data, should be considered permanent.
@ eRW_Error
Unrecoverable error, no retry possible.
@ eRW_Success
Everything is okay, I/O completed.
static string UIntToString(unsigned int value, TNumToStringFlags flags=0, int base=10)
Convert UInt to string.
Definition: ncbistr.hpp:5109
enum ENcbiOwnership EOwnership
Ownership relations between objects.
char * buf
const struct ncbi::grid::netcache::search::fields::SIZE size
NCBI C++ auxiliary debug macros.
T min(T x_, T y_)
static unsigned cnt[256]
#define _ASSERT
static const Uint4 sStartWord
const size_t kLengthSize
static const Uint4 sEndPacket
Reader writer with transmission checking.
Modified on Mon Mar 04 05:12:47 2024 by modify_doxy.py rev. 669887