NCBI C++ ToolKit
grid_rw_impl.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: grid_rw_impl.cpp 100701 2023-08-31 19:21:12Z lavr $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Maxim Didenko, Dmitry Kazimirov
27  *
28  * File Description:
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include <corelib/rwstream.hpp>
35 
37 
40 
41 
42 #define NCBI_USE_ERRCODE_X ConnServ_ReadWrite
43 
45 
46 static const char s_JobOutputPrefixEmbedded[] = "D ";
47 static const char s_JobOutputPrefixNetCache[] = "K ";
48 
49 #define JOB_OUTPUT_PREFIX_LEN 2
50 
51 CStringOrWriter::CStringOrWriter(size_t max_data_size, string& data_ref, TWriterCreate writer_create) :
52  m_MaxDataSize(max_data_size),
53  m_Data(data_ref),
54  m_WriterCreate(writer_create)
55 {
57 }
58 
59 ERW_Result CStringOrWriter::Write(const void* buf, size_t count, size_t* bytes_written)
60 {
61  if (m_Writer) return m_Writer->Write(buf, count, bytes_written);
62 
63  if (m_Data.size() + count <= m_MaxDataSize) {
64  m_Data.append((const char*) buf, count);
65 
66  if (bytes_written) *bytes_written = count;
67  return eRW_Success;
68  }
69 
70  string key;
71  unique_ptr<IEmbeddedStreamWriter> writer(m_WriterCreate(key));
72 
73  if (!writer) return eRW_Error;
74 
75  if (m_Data.size() > JOB_OUTPUT_PREFIX_LEN) {
76  ERW_Result ret = writer->Write(
78  m_Data.size() - JOB_OUTPUT_PREFIX_LEN);
79 
80  if (ret != eRW_Success) return ret;
81  }
82 
84 
85  m_Writer = std::move(writer);
86  return m_Writer->Write(buf, count, bytes_written);
87 }
88 
90 {
91  return m_Writer ? m_Writer->Flush() : eRW_Success;
92 }
93 
95 {
96  if (m_Writer) m_Writer->Close();
97 }
98 
100 {
101  if (m_Writer) m_Writer->Abort();
102 }
103 
105 {
106  return [api](string& key) mutable { return api.PutData(&key); };
107 }
108 
110  SNetCacheAPIImpl* storage, string& job_output_ref) :
111  CStringOrWriter(max_string_size, job_output_ref, s_NetCacheWriterCreate(storage))
112 {
113 }
114 
115 CNcbiOstream& SGridWrite::operator()(CNetCacheAPI nc_api, size_t embedded_max_size, string& data)
116 {
117  writer.reset(new CStringOrBlobStorageWriter(embedded_max_size, nc_api, data));
118 
119  stream.reset(new CWStream(writer.get(), 0, 0, CRWStreambuf::fLeakExceptions));
120  stream->exceptions(IOS_BASE::badbit | IOS_BASE::failbit);
121 
122  return *stream;
123 }
124 
125 void SGridWrite::Reset(bool flush)
126 {
127  if (flush && stream) stream->flush();
128  stream.reset();
129 
130  if (writer) {
131  writer->Close();
132  writer.reset();
133  }
134 }
135 
136 ////////////////////////////////////////////////////////////////////////////
137 //
138 
140  string& data)
141 {
144  data.erase(0, JOB_OUTPUT_PREFIX_LEN);
145  return eNetCache;
146  }
147 
150  data.erase(0, JOB_OUTPUT_PREFIX_LEN);
151  return eEmbedded;
152  }
153 
154  return data.empty() ? eEmpty : eRaw;
155 }
156 
158  SNetCacheAPIImpl* storage, size_t* data_size) :
159  m_Storage(storage), m_Data(data_or_key)
160 {
161  switch (x_GetDataType(m_Data)) {
162  case eNetCache:
163  // If NetCache API is not provided, initialize it using info from key
164  if (!m_Storage) {
166  string service(key.GetServiceName());
167 
168  if (service.empty()) {
169  service = key.GetHost() + ":" + NStr::UIntToString(key.GetPort());
170  }
171 
172  m_Storage = CNetCacheAPI(service, kEmptyStr);
174  key.GetHost(), key.GetPort());
175  }
176 
177  m_NetCacheReader.reset(m_Storage.GetReader(m_Data, data_size));
178  return;
179 
180  case eEmbedded:
181  case eEmpty:
182  m_BytesToRead = m_Data.size();
183  if (data_size != NULL)
184  *data_size = m_BytesToRead;
185  return;
186 
187  default:
189  "Unknown data type \"" <<
190  m_Data.substr(0, JOB_OUTPUT_PREFIX_LEN) << '"');
191  }
192 }
193 
195  size_t count,
196  size_t* bytes_read)
197 {
198  if (m_NetCacheReader.get())
199  return m_NetCacheReader->Read(buf, count, bytes_read);
200 
201  if (m_BytesToRead == 0) {
202  if (bytes_read != NULL)
203  *bytes_read = 0;
204  return eRW_Eof;
205  }
206 
207  if (count > m_BytesToRead)
209  memcpy(buf, &*(m_Data.end() - m_BytesToRead), count);
210  m_BytesToRead -= count;
211  if (bytes_read != NULL)
212  *bytes_read = count;
213  return eRW_Success;
214 }
215 
217 {
218  if (m_NetCacheReader.get())
219  return m_NetCacheReader->PendingCount(count);
220 
221  *count = m_BytesToRead;
222  return eRW_Success;
223 }
224 
225 CNcbiIstream& SGridRead::operator()(CNetCacheAPI nc_api, const string& data, size_t* data_size)
226 {
227  auto reader = new CStringOrBlobStorageReader(data, nc_api, data_size);
228 
230  stream->exceptions(IOS_BASE::badbit | IOS_BASE::failbit);
231 
232  return *stream;
233 }
234 
236 {
237  stream.reset();
238 }
239 
Client API for NetCache server.
void StickToServer(SSocketAddress address)
CNetServerPool GetServerPool()
Note about the "buf_size" parameter for streams in this API.
Definition: rwstream.hpp:122
@ fLeakExceptions
Exceptions leaked out.
Definition: rwstreambuf.hpp:72
@ fOwnReader
Own the underlying reader.
Definition: rwstreambuf.hpp:66
String or Blob Storage Reader.
CStringOrBlobStorageReader(const string &data_or_key, SNetCacheAPIImpl *storage, size_t *data_size=NULL)
virtual ERW_Result Read(void *buf, size_t count, size_t *bytes_read=0)
Read as many as "count" bytes into a buffer pointed to by the "buf" argument.
static EType x_GetDataType(string &data)
virtual ERW_Result PendingCount(size_t *count)
Via parameter "count" (which is guaranteed to be supplied non-NULL) return the number of bytes that a...
unique_ptr< IReader > m_NetCacheReader
String or Blob Storage Writer.
CStringOrBlobStorageWriter(size_t max_string_size, SNetCacheAPIImpl *storage, string &job_output_ref)
void Abort() override
ERW_Result Write(const void *buf, size_t count, size_t *bytes_written=0) override
Write up to "count" bytes from the buffer pointed to by the "buf" argument onto the output device.
function< IEmbeddedStreamWriter *(string &)> TWriterCreate
ERW_Result Flush() override
Flush pending data (if any) down to the output device.
CStringOrWriter(size_t max_data_size, string &data_ref, TWriterCreate writer_create)
TWriterCreate m_WriterCreate
unique_ptr< IEmbeddedStreamWriter > m_Writer
void Close() override
Writer-based output stream.
Definition: rwstream.hpp:171
char data[12]
Definition: iconv.c:80
#define JOB_OUTPUT_PREFIX_LEN
static const char s_JobOutputPrefixEmbedded[]
static const char s_JobOutputPrefixNetCache[]
CStringOrWriter::TWriterCreate s_NetCacheWriterCreate(CNetCacheAPI api)
#define NULL
Definition: ncbistd.hpp:225
#define NCBI_THROW_FMT(exception_class, err_code, message)
The same as NCBI_THROW but with message processed as output to ostream.
Definition: ncbiexpt.hpp:719
string PutData(const void *buf, size_t size, const CNamedParameterList *optional=NULL)
Put BLOB to server.
IReader * GetReader(const string &key, size_t *blob_size=NULL, const CNamedParameterList *optional=NULL)
Get a pointer to the IReader interface to read blob contents.
CNetService GetService()
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
ERW_Result
Result codes for I/O operations.
IO_PREFIX::ostream CNcbiOstream
Portable alias for ostream.
Definition: ncbistre.hpp:149
IO_PREFIX::istream CNcbiIstream
Portable alias for istream.
Definition: ncbistre.hpp:146
@ eRW_Eof
End of data, should be considered permanent.
@ eRW_Error
Unrecoverable error, no retry possible.
@ eRW_Success
Everything is okay, I/O completed.
#define kEmptyStr
Definition: ncbistr.hpp:123
static string UIntToString(unsigned int value, TNumToStringFlags flags=0, int base=10)
Convert UInt to string.
Definition: ncbistr.hpp:5103
static int CompareCase(const CTempString s1, SIZE_TYPE pos, SIZE_TYPE n, const char *s2)
Case-sensitive compare of a substring with another string.
Definition: ncbistr.cpp:135
Definition of all error codes used in connect services library (xconnserv.lib and others).
char * buf
const struct ncbi::grid::netcache::search::fields::KEY key
#define count
Reader-writer based streams.
Meaningful information encoded in the NetCache key.
CNcbiIstream & operator()(CNetCacheAPI nc_api, const string &data, size_t *data_size)
void Reset()
unique_ptr< CNcbiIstream > stream
unique_ptr< IEmbeddedStreamWriter > writer
CNcbiOstream & operator()(CNetCacheAPI nc_api, size_t embedded_max_size, string &data)
unique_ptr< CNcbiOstream > stream
void Reset(bool flush=false)
Modified on Fri Sep 20 14:57:53 2024 by modify_doxy.py rev. 669887