NCBI C++ ToolKit
psg_client_impl.hpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 #ifndef OBJTOOLS__PUBSEQ_GATEWAY__PSG_CLIENT_IMPL_HPP
2 #define OBJTOOLS__PUBSEQ_GATEWAY__PSG_CLIENT_IMPL_HPP
3 
4 /* $Id: psg_client_impl.hpp 101787 2024-02-12 18:26:20Z sadyrovr $
5  * ===========================================================================
6  *
7  * PUBLIC DOMAIN NOTICE
8  * National Center for Biotechnology Information
9  *
10  * This software/database is a "United States Government Work" under the
11  * terms of the United States Copyright Act. It was written as part of
12  * the author's official duties as a United States Government employee and
13  * thus cannot be copyrighted. This software/database is freely available
14  * to the public for use. The National Library of Medicine and the U.S.
15  * Government have not placed any restriction on its use or reproduction.
16  *
17  * Although all reasonable efforts have been taken to ensure the accuracy
18  * and reliability of the software and data, the NLM and the U.S.
19  * Government do not and cannot warrant the performance or results that
20  * may be obtained by using this software or data. The NLM and the U.S.
21  * Government disclaim all warranties, express or implied, including
22  * warranties of performance, merchantability or fitness for any particular
23  * purpose.
24  *
25  * Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Author: Rafael Sadyrov
30  *
31  */
32 
35 
36 #ifdef HAVE_PSG_CLIENT
37 
38 #include "psg_client_transport.hpp"
39 
41 #include <corelib/rwstream.hpp>
42 
43 #include <unordered_map>
44 #include <mutex>
45 
47 
48 // ICC does not like this array to be a direct base of SPSG_RStream
49 struct SPSG_BlobReader : IReader, protected array<char, 64 * 1024>
50 {
51  using TStats = pair<bool, weak_ptr<SPSG_Stats>>;
53 
54  ERW_Result Read(void* buf, size_t count, size_t* bytes_read = 0);
55  ERW_Result PendingCount(size_t* count);
56 
57 private:
58  void CheckForNewChunks();
59  ERW_Result x_Read(void* buf, size_t count, size_t* bytes_read);
60 
63  vector<SPSG_Chunk> m_Data;
64  size_t m_Chunk = 0;
65  size_t m_Index = 0;
66 };
67 
68 struct SPSG_RStream : private SPSG_BlobReader, public CRStream
69 {
70  template <class... TArgs>
71  SPSG_RStream(TArgs&&... args) :
72  SPSG_BlobReader(std::forward<TArgs>(args)...),
73  CRStream(this, size(), data())
74  {}
75 };
76 
78 {
81 };
82 
84 {
85  shared_ptr<SPSG_Reply> reply;
86  weak_ptr<CPSG_Reply> user_reply;
87 
88  shared_ptr<CPSG_ReplyItem> Create(SPSG_Reply::SItem::TTS& item_ts);
89 
90 private:
91  template <class TReplyItem>
92  CPSG_ReplyItem* CreateImpl(TReplyItem* item, const vector<SPSG_Chunk>& chunks);
93  CPSG_ReplyItem* CreateImpl(SPSG_Reply::SItem::TTS& item_ts, const SPSG_Args& args, shared_ptr<SPSG_Stats>& stats);
94  CPSG_ReplyItem* CreateImpl(CPSG_SkippedBlob::EReason reason, const SPSG_Args& args, shared_ptr<SPSG_Stats>& stats);
96 };
97 
99 {
101  void SetQueueArgs(SPSG_UserArgs queue_args) { m_QueueArgs = std::move(queue_args); x_UpdateCache(); }
102  void Build(ostream& os, const SPSG_UserArgs& request_args);
103  void BuildRaw(ostringstream& os, const SPSG_UserArgs& request_args);
104 
105 private:
106  void x_UpdateCache();
107 
108  struct MergeValues; // A function-like class
109  static bool Merge(SPSG_UserArgs& higher_priority, const SPSG_UserArgs& lower_priority);
110 
111  static const SPSG_UserArgs& s_GetIniArgs();
112 
114  string m_CachedArgs;
115 };
116 
118 {
119  shared_ptr<TPSG_Queue> queue;
120 
121  SImpl(const string& service);
122 
123  bool SendRequest(shared_ptr<CPSG_Request> request, CDeadline deadline);
124  shared_ptr<CPSG_Reply> SendRequestAndGetReply(shared_ptr<CPSG_Request> request, CDeadline deadline);
125  bool WaitForEvents(CDeadline deadline);
126 
127  bool RejectsRequests() const { return m_Service.ioc.RejectsRequests(); }
128  void SetRequestFlags(CPSG_Request::TFlags request_flags) { m_RequestFlags = request_flags; }
129  void SetUserArgs(SPSG_UserArgs user_args) { m_UserArgsBuilder.GetLock()->SetQueueArgs(std::move(user_args)); }
130 
131  static TApiLock GetApiLock() { return CService::GetMap(); }
132 
133 private:
134  class CService
135  {
136  // Have to use unique_ptr as some old compilers do not use move ctor of SPSG_IoCoordinator
137  using TMap = unordered_map<string, unique_ptr<SPSG_IoCoordinator>>;
138 
139  SPSG_IoCoordinator& GetIoC(const string& service);
140 
141  shared_ptr<TMap> m_Map;
142  static pair<mutex, weak_ptr<TMap>> sm_Instance;
143 
144  public:
146 
147  CService(const string& service) : m_Map(GetMap()), ioc(GetIoC(service)) {}
148 
149  static shared_ptr<TMap> GetMap();
150  };
151 
152  string x_GetAbsPathRef(shared_ptr<const CPSG_Request> user_request, const CPSG_Request::TFlags& flags, bool raw);
153 
157 };
158 
159 inline ostream& operator<<(ostream& os, const SPSG_UserArgs& request_args)
160 {
161  for (const auto& p : request_args) {
162  for (const auto& s : p.second) {
163  os << '&' << p.first << '=' << s;
164  }
165  }
166 
167  return os;
168 }
169 
171 
172 #endif
173 #endif
CDeadline.
Definition: ncbitime.hpp:1830
SPSG_IoCoordinator & GetIoC(const string &service)
Definition: psg_client.cpp:523
CService(const string &service)
static pair< mutex, weak_ptr< TMap > > sm_Instance
unordered_map< string, unique_ptr< SPSG_IoCoordinator > > TMap
static shared_ptr< TMap > GetMap()
Definition: psg_client.cpp:541
shared_ptr< void > TApiLock
Get an API lock.
A self-containing part of the reply, e.g. a meta-data or a data blob.
Definition: psg_client.hpp:663
Note about the "buf_size" parameter for streams in this API.
Definition: rwstream.hpp:122
A very basic data-read interface.
Abstract reader-writer interface classes.
static uch flags
char data[12]
Definition: iconv.c:80
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
ERW_Result
Result codes for I/O operations.
char * buf
int i
const struct ncbi::grid::netcache::search::fields::SIZE size
ostream & operator<<(ostream &os, const SPSG_UserArgs &request_args)
Reader-writer based streams.
SThreadSafe< SPSG_UserArgsBuilder > m_UserArgsBuilder
shared_ptr< TPSG_Queue > queue
CPSG_Request::TFlags m_RequestFlags
static TApiLock GetApiLock()
void SetUserArgs(SPSG_UserArgs user_args)
string x_GetAbsPathRef(shared_ptr< const CPSG_Request > user_request, const CPSG_Request::TFlags &flags, bool raw)
Definition: psg_client.cpp:746
shared_ptr< CPSG_Reply > SendRequestAndGetReply(shared_ptr< CPSG_Request > request, CDeadline deadline)
Definition: psg_client.cpp:950
bool WaitForEvents(CDeadline deadline)
SImpl(const string &service)
Definition: psg_client.cpp:710
void SetRequestFlags(CPSG_Request::TFlags request_flags)
bool RejectsRequests() const
bool SendRequest(shared_ptr< CPSG_Request > request, CDeadline deadline)
Definition: psg_client.cpp:988
SPSG_Reply::SItem::TTS & item
SImpl(SPSG_Reply::SItem::TTS &i)
CPSG_ReplyItem * CreateImpl(TReplyItem *item, const vector< SPSG_Chunk > &chunks)
Definition: psg_client.cpp:340
weak_ptr< CPSG_Reply > user_reply
shared_ptr< SPSG_Reply > reply
shared_ptr< CPSG_ReplyItem > Create(SPSG_Reply::SItem::TTS &item_ts)
Definition: psg_client.cpp:501
void CheckForNewChunks()
Definition: psg_client.cpp:154
ERW_Result Read(void *buf, size_t count, size_t *bytes_read=0)
Read as many as "count" bytes into a buffer pointed to by the "buf" argument.
Definition: psg_client.cpp:111
SPSG_BlobReader(SPSG_Reply::SItem::TTS &src, TStats stats=TStats())
Definition: psg_client.cpp:73
SPSG_Reply::SItem::TTS & m_Src
ERW_Result PendingCount(size_t *count)
Via parameter "count" (which is guaranteed to be supplied non-NULL) return the number of bytes that a...
Definition: psg_client.cpp:131
ERW_Result x_Read(void *buf, size_t count, size_t *bytes_read)
Definition: psg_client.cpp:79
vector< SPSG_Chunk > m_Data
pair< bool, weak_ptr< SPSG_Stats > > TStats
SPSG_RStream(TArgs &&... args)
static bool Merge(SPSG_UserArgs &higher_priority, const SPSG_UserArgs &lower_priority)
Definition: psg_client.cpp:643
static const SPSG_UserArgs & s_GetIniArgs()
Definition: psg_client.cpp:703
void BuildRaw(ostringstream &os, const SPSG_UserArgs &request_args)
Definition: psg_client.cpp:672
void Build(ostream &os, const SPSG_UserArgs &request_args)
Definition: psg_client.cpp:656
void SetQueueArgs(SPSG_UserArgs queue_args)
Arbitrary request URL arguments.
Definition: psg_client.hpp:79
SLock< TType > GetLock()
Definition: type.c:6
Modified on Sun Apr 21 03:41:25 2024 by modify_doxy.py rev. 669887