NCBI C++ ToolKit
processing.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: processing.cpp 101413 2023-12-11 18:11:30Z sadyrovr $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author: Rafael Sadyrov
27  *
28  */
29 
30 #include <ncbi_pch.hpp>
31 
32 #include <numeric>
33 #include <unordered_set>
34 #include <unordered_map>
35 
40 #include <serial/enumvalues.hpp>
41 #include <serial/objcopy.hpp>
42 #include <serial/objistr.hpp>
43 #include <serial/objostr.hpp>
44 #include <util/checksum.hpp>
45 #include <util/compress/zlib.hpp>
46 #include <util/compress/stream.hpp>
47 
48 #include "performance.hpp"
49 #include "processing.hpp"
50 
52 
55 bool SParams::verbose = false;
56 
61 };
62 
64 {
67  {
70  }
71 
73  {
75  }
76 
78 
79 private:
81  void operator=(const SNewRequestContext&) = delete;
82 
84 };
85 
87 {
88  SInteractiveNewRequestStart(const string& request, CJson_ConstObject params_obj);
89 
90 private:
92  {
93  SExtra() : CDiagContext_Extra(GetDiagContext().PrintRequestStart()) {}
94 
95  void Print(const string& prefix, CJson_ConstValue json);
96  void Print(const string& prefix, CJson_ConstArray json);
97  void Print(const string& prefix, CJson_ConstObject json);
98  void Print(const string& prefix, CJson_ConstNode json);
99 
101  };
102 };
103 
104 string s_GetId(const CJson_Document& req_doc);
106 
108 {
109  switch (t) {
110  case eStart: return "Start";
111  case eSubmit: return "Submit";
112  case eReply: return "Reply";
113  case eDone: return "Done";
114  case eSend: return "Send";
115  case eReceive: return "Receive";
116  case eClose: return "Close";
117  case eRetry: return "Retry";
118  case eFail: return "Fail";
119 
120  case eLastType:
121  case eError: break;
122  }
123 
124  _TROUBLE;
125  return "ERROR";
126 }
127 
128 const char* s_StrStatus(EPSG_Status status)
129 {
130  switch (status) {
131  case EPSG_Status::eSuccess: return "Success";
132  case EPSG_Status::eInProgress: return "InProgress";
133  case EPSG_Status::eNotFound: return "NotFound";
134  case EPSG_Status::eCanceled: return "Canceled";
135  case EPSG_Status::eForbidden: return "Forbidden";
136  case EPSG_Status::eError: break;
137  }
138 
139  return "Error";
140 }
141 
143 {
144  stringstream ss;
145  doc.Write(ss, m_Flags);
146 
147  unique_lock<mutex> lock(m_Mutex);
148 
149  if (m_Pipe) {
150  cout << ss.rdbuf() << endl;
151  } else {
152  cout << m_Separator << '\n' << ss.rdbuf() << flush;
153  m_Separator = ',';
154  }
155 
156  return *this;
157 }
158 
160 {
161  // If not in pipe mode and printed some JSON
162  if (!m_Pipe && (m_Separator == ',')) {
163  cout << "\n]" << endl;
164  }
165 }
166 
167 template <class TItem, class... TArgs>
168 CJsonResponse::CJsonResponse(EPSG_Status status, TItem item, TArgs&&... args) :
169  m_JsonObj(SetObject())
170 {
171  try {
172  AddRequestID(item, std::forward<TArgs>(args)...);
173  Fill(status, item);
174  AddMessage(std::forward<TArgs>(args)...);
175 
176  for (auto l = [&]() { return item->GetNextMessage(SParams::min_severity); }; auto message = l(); ) {
177  AddMessage(message);
178  }
179  }
180  catch (exception& e) {
181  CJson_Document new_doc;
182  new_doc.ResetObject()["data"].AssignCopy(*this);
183  AssignCopy(new_doc);
184  Set("code", eJsonRpc_ExceptionOnRead);
185  Set("message", e.what());
186  m_Error = true;
187  }
188 }
189 
191  CJsonResponse(id)
192 {
193  m_JsonObj[result.m_Error ? "error" : "result"].AssignCopy(result);
194 }
195 
196 CJsonResponse::CJsonResponse(const string& id, int code, const string& message) :
197  CJsonResponse(id)
198 {
199  CJson_Object error_obj = m_JsonObj.insert_object("error");
200  Set(error_obj, "code", code);
201  Set(error_obj, "message", message);
202 }
203 
204 CJsonResponse::CJsonResponse(const string& id) :
205  m_JsonObj(SetObject())
206 {
207  Set("jsonrpc", "2.0");
208 
209  auto id_value = m_JsonObj["id"].SetValue();
210 
211  if (id.empty()) {
212  id_value.SetNull();
213  } else {
214  id_value.SetString(id);
215  }
216 }
217 
218 template <class TItem, class... TArgs>
219 void CJsonResponse::AddRequestID(TItem item, TArgs&&...)
220 {
221  if (auto request_id = s_GetReply(item)->GetRequest()->template GetUserContext<string>()) {
222  Set("request_id", *request_id);
223  }
224 }
225 
226 const char* s_GetItemName(CPSG_ReplyItem::EType type, bool trouble = true)
227 {
228  switch (type) {
229  case CPSG_ReplyItem::eBlobData: return "BlobData";
230  case CPSG_ReplyItem::eBlobInfo: return "BlobInfo";
231  case CPSG_ReplyItem::eSkippedBlob: return "SkippedBlob";
232  case CPSG_ReplyItem::eBioseqInfo: return "BioseqInfo";
233  case CPSG_ReplyItem::eNamedAnnotInfo: return "NamedAnnotInfo";
234  case CPSG_ReplyItem::eNamedAnnotStatus: return "NamedAnnotStatus";
235  case CPSG_ReplyItem::ePublicComment: return "PublicComment";
236  case CPSG_ReplyItem::eProcessor: return "Processor";
237  case CPSG_ReplyItem::eIpgInfo: return "IpgInfo";
238  case CPSG_ReplyItem::eEndOfReply: if (!trouble) return "Reply"; _TROUBLE;
239  }
240 
241  return "UnknownItem";
242 }
243 
244 CJsonResponse CJsonResponse::NewItem(const shared_ptr<CPSG_ReplyItem>& reply_item)
245 {
246  CJsonResponse rv;
247  rv.Set("reply", "NewItem");
248  rv.Set("item", s_GetItemName(reply_item->GetType()));
249  rv.Set("status", s_StrStatus(reply_item->GetStatus(CDeadline::eNoWait)));
250  return rv;
251 }
252 
253 void CJsonResponse::Fill(EPSG_Status status, shared_ptr<CPSG_Reply>)
254 {
255  Set("status", s_StrStatus(status));
256 }
257 
258 void CJsonResponse::Fill(EPSG_Status reply_item_status, shared_ptr<CPSG_ReplyItem> reply_item)
259 {
260  auto reply_item_type = reply_item->GetType();
261 
262  if (sm_SetReplyType) {
263  Set("reply", s_GetItemName(reply_item_type));
264  }
265 
266  if (SParams::verbose) {
267  Set("processor_id", reply_item->GetProcessorId());
268  }
269 
270  if (reply_item_status != EPSG_Status::eSuccess) {
271  Set("status", s_StrStatus(reply_item_status));
272  return;
273  }
274 
275  switch (reply_item_type) {
277  return Fill(static_pointer_cast<CPSG_BlobData>(reply_item));
278 
280  return Fill(static_pointer_cast<CPSG_BlobInfo>(reply_item));
281 
283  return Fill(static_pointer_cast<CPSG_SkippedBlob>(reply_item));
284 
286  return Fill(static_pointer_cast<CPSG_BioseqInfo>(reply_item));
287 
289  return Fill(static_pointer_cast<CPSG_NamedAnnotInfo>(reply_item));
290 
292  return Fill(static_pointer_cast<CPSG_NamedAnnotStatus>(reply_item));
293 
295  return Fill(static_pointer_cast<CPSG_PublicComment>(reply_item));
296 
298  return Fill(static_pointer_cast<CPSG_Processor>(reply_item));
299 
301  return Fill(static_pointer_cast<CPSG_IpgInfo>(reply_item));
302 
304  _TROUBLE;
305  return;
306  }
307 
308  throw logic_error("Received unknown item: " + to_string(reply_item_type));
309 }
310 
311 auto s_IsRawRequest(shared_ptr<const CPSG_Request>& request)
312 {
313  return (request->GetType() == CPSG_Request::eBlob) && dynamic_pointer_cast<const CRawRequest>(request);
314 }
315 
316 auto s_IsRawResponse(const CPSG_BlobId* blob_id)
317 {
318  return blob_id && (blob_id->GetLastModified() == CPSG_BlobId::TLastModified(numeric_limits<Int8>::min()));
319 }
320 
321 void CJsonResponse::Fill(shared_ptr<CPSG_BlobData> blob_data)
322 {
323  if (auto request = blob_data->GetReply()->GetRequest(); s_IsRawRequest(request)) {
324  if (auto blob_id = blob_data->GetId<CPSG_BlobId>(); s_IsRawResponse(blob_id)) {
325  if (CJson_Document json_doc; json_doc.Read(blob_data->GetStream())) {
326  Set("reply", blob_id->GetId());
327 
328  for (const auto& p : json_doc.GetObject()) {
329  m_JsonObj.insert(p.name, p.value);
330  }
331 
332  return;
333  }
334  }
335  }
336 
337  Set("id", blob_data);
338  ostringstream os;
339  os << blob_data->GetStream().rdbuf();
340  auto data = os.str();
341 
342  if (const auto data_size = data.size(); data_size <= sm_DataLimit) {
343  Set("data", NStr::Base64Encode(data));
344  } else {
345  Set("length", static_cast<Uint8>(data_size)); // macOS requires static_cast, Set() would be ambiguous otherwise
346  CHash hash;
347  hash.Calculate(data.data(), data_size);
348  Set("hash", hash.GetResultHex());
349  data.erase(sm_PreviewSize);
350  Set("preview", NStr::Base64Encode(data));
351  }
352 }
353 
354 void CJsonResponse::Fill(shared_ptr<CPSG_BlobInfo> blob_info)
355 {
356  Set("id", blob_info);
357  Set("compression", blob_info->GetCompression());
358  Set("format", blob_info->GetFormat());
359  Set("storage_size", blob_info->GetStorageSize());
360  Set("size", blob_info->GetSize());
361  Set("is_dead", blob_info->IsDead());
362  Set("is_suppressed", blob_info->IsSuppressed());
363  Set("is_withdrawn", blob_info->IsWithdrawn());
364  Set("hup_release_date", blob_info->GetHupReleaseDate().AsString());
365  Set("owner", blob_info->GetOwner());
366  Set("original_load_date", blob_info->GetOriginalLoadDate().AsString());
367  Set("class", objects::CBioseq_set::ENUM_METHOD_NAME(EClass)()->FindName(blob_info->GetClass(), true));
368  Set("division", blob_info->GetDivision());
369  Set("username", blob_info->GetUsername());
370  Set("id2_info", blob_info->GetId2Info());
371  Set("n_chunks", blob_info->GetNChunks());
372 }
373 
375 {
376  switch (reason) {
377  case CPSG_SkippedBlob::eExcluded: return "Excluded";
378  case CPSG_SkippedBlob::eInProgress: return "InProgress";
379  case CPSG_SkippedBlob::eSent: return "Sent";
380  case CPSG_SkippedBlob::eUnknown: return "Unknown";
381  };
382 
383  _TROUBLE;
384  return "";
385 }
386 
387 void CJsonResponse::Fill(shared_ptr<CPSG_SkippedBlob> skipped_blob)
388 {
389  Set("id", skipped_blob);
390  Set("reason", s_ReasonToString(skipped_blob->GetReason()));
391  Set("sent_seconds_ago", skipped_blob->GetSentSecondsAgo());
392  Set("time_until_resend", skipped_blob->GetTimeUntilResend());
393 }
394 
395 void CJsonResponse::Fill(shared_ptr<CPSG_BioseqInfo> bioseq_info)
396 {
397  const auto included_info = bioseq_info->IncludedInfo();
398 
399  if (included_info & CPSG_Request_Resolve::fCanonicalId) Set("canonical_id", bioseq_info->GetCanonicalId());
400  if (included_info & CPSG_Request_Resolve::fOtherIds) Set("other_ids", bioseq_info->GetOtherIds());
401  if (included_info & CPSG_Request_Resolve::fMoleculeType) Set("molecule_type", objects::CSeq_inst::ENUM_METHOD_NAME(EMol)()->FindName(bioseq_info->GetMoleculeType(), true));
402  if (included_info & CPSG_Request_Resolve::fLength) Set("length", bioseq_info->GetLength());
403  if (included_info & CPSG_Request_Resolve::fChainState) Set("seq_state", bioseq_info->GetChainState());
404  if (included_info & CPSG_Request_Resolve::fState) Set("state", bioseq_info->GetState());
405  if (included_info & CPSG_Request_Resolve::fBlobId) Set("blob_id", bioseq_info->GetBlobId());
406  if (included_info & CPSG_Request_Resolve::fTaxId) Set("tax_id", TAX_ID_TO(Int8, bioseq_info->GetTaxId()));
407  if (included_info & CPSG_Request_Resolve::fHash) Set("hash", bioseq_info->GetHash());
408  if (included_info & CPSG_Request_Resolve::fDateChanged) Set("date_changed", bioseq_info->GetDateChanged().AsString());
409  if (included_info & CPSG_Request_Resolve::fGi) Set("gi", GI_TO(Int8, bioseq_info->GetGi()));
410 }
411 
412 void CJsonResponse::Fill(shared_ptr<CPSG_NamedAnnotInfo> named_annot_info)
413 {
414  Set("name", named_annot_info->GetName());
415  Set("blob_id", named_annot_info->GetBlobId());
416  Set("id2_annot_info", named_annot_info->GetId2AnnotInfo());
417 }
418 
419 void CJsonResponse::Fill(shared_ptr<CPSG_NamedAnnotStatus> named_annot_status)
420 {
421  auto ar = m_JsonObj["statuses"].ResetArray();
422 
423  for (const auto& status : named_annot_status->GetId2AnnotStatusList()) {
424  ar.push_back();
425  auto obj = ar.back().ResetObject();
426  Set(obj, "name", status.first);
427  Set(obj, "status", s_StrStatus(status.second));
428  }
429 }
430 
431 void CJsonResponse::Fill(shared_ptr<CPSG_PublicComment> public_comment)
432 {
433  Set("id", public_comment);
434  Set("text", public_comment->GetText());
435 }
436 
438 {
439  switch (progress_status) {
440  case CPSG_Processor::eStart: return "start";
441  case CPSG_Processor::eDone: return "done";
442  case CPSG_Processor::eNotFound: return "not_found";
443  case CPSG_Processor::eCanceled: return "canceled";
444  case CPSG_Processor::eTimeout: return "timeout";
445  case CPSG_Processor::eError: return "error";
446  case CPSG_Processor::eUnauthorized: return "unauthorized";
447  case CPSG_Processor::eInProgress: return "inprogress";
448  case CPSG_Processor::eUnknown: return "unknown";
449  }
450 
451  _TROUBLE;
452  return "";
453 }
454 
455 void CJsonResponse::Fill(shared_ptr<CPSG_Processor> processor)
456 {
457  const auto progress_status = processor->GetProgressStatus();
458 
459  // Progress status does not make sense without processor ID,
460  // the latter is reported in the verbose mode elsewhere
461  if (!SParams::verbose) {
462  Set("processor_id", processor->GetProcessorId());
463  }
464 
465  Set("progress_status", s_ProgressStatusToString(progress_status));
466 }
467 
468 void CJsonResponse::Fill(shared_ptr<CPSG_IpgInfo> ipg_info)
469 {
470  Set("protein", ipg_info->GetProtein());
471  Set("ipg", ipg_info->GetIpg());
472  Set("nucleotide", ipg_info->GetNucleotide());
473  Set("tax_id", ipg_info->GetTaxId());
474  Set("gb_state", ipg_info->GetGbState());
475 }
476 
478 {
479  auto messages = m_JsonObj["messages"];
480  auto ar = messages.IsNull() ? messages.ResetArray() : messages.SetArray();
481  ar.push_back();
482  auto obj = ar.back().ResetObject();
483  Set(obj, "severity", CNcbiDiag::SeverityName(message.severity));
484  Set(obj, "code", message.code);
485  Set(obj, "text", message);
486 }
487 
488 void CJsonResponse::Set(CJson_Node node, const CPSG_BioId& bio_id)
489 {
490  auto obj = node.ResetObject();
491  Set(obj, "id", bio_id.GetId());
492 
493  const auto& type = bio_id.GetType();
494 
496  Set(obj, "type", type);
497  }
498 }
499 
500 void CJsonResponse::Set(CJson_Node node, const vector<CPSG_BioId>& bio_ids)
501 {
502  auto ar = node.ResetArray();
503 
504  for (const auto& bio_id : bio_ids) {
505  ar.push_back();
506  Set(ar.back(), bio_id);
507  }
508 }
509 
510 void CJsonResponse::Set(CJson_Node node, const CPSG_BlobId& blob_id)
511 {
512  auto obj = node.ResetObject();
513  Set(obj, "id", blob_id.GetId());
514  Set(obj, "last_modified", blob_id.GetLastModified());
515 }
516 
517 void CJsonResponse::Set(CJson_Node node, const CPSG_ChunkId& chunk_id)
518 {
519  auto obj = node.ResetObject();
520  Set(obj, "id2_chunk", chunk_id.GetId2Chunk());
521  Set(obj, "id2_info", chunk_id.GetId2Info());
522 }
523 
524 string SInteractiveParams::GetService(string service, bool one_server)
525 {
526  if (one_server) {
527  auto servers = CServiceDiscovery(service.empty() ? TPSG_Service::GetDefault() : service)();
528 
529  if (!servers.empty()) {
530  sort(servers.begin(), servers.end(), [](const auto& l, const auto& r) { return l.second < r.second; });
531  return servers.back().first.AsString();
532  }
533  }
534 
535  return service;
536 }
537 
538 void SMetrics::OutputItems(ostream& os) const
539 {
540  for (const auto& item : m_Items) {
541  os << '\t' << s_GetItemName(item.first, false) << '=' << s_StrStatus(item.second);
542  }
543 }
544 
546 {
548 
549  void ItemComplete(EPSG_Status status, const shared_ptr<CPSG_ReplyItem>& item);
550  void ReplyComplete(EPSG_Status status, const shared_ptr<CPSG_Reply>& reply);
551 
552  operator int() const { return int(m_Status); }
553 
554 private:
555  void Process(shared_ptr<CPSG_BlobInfo> blob_info);
556  void Process(shared_ptr<CPSG_BlobData> blob_data);
557  void Process(shared_ptr<CPSG_NamedAnnotInfo> named_annot_info);
558  void Process(shared_ptr<CPSG_BlobInfo> blob_info, shared_ptr<CPSG_BlobData> blob_data);
559 
560  template <class TItem>
561  bool ReportErrors(EPSG_Status status, TItem item, const char* prefix);
562 
564  unordered_map<string, pair<shared_ptr<CPSG_BlobInfo>, shared_ptr<CPSG_BlobData>>> m_Data;
566 };
567 
568 template <class TItem>
569 bool SDataOnlyCopy::ReportErrors(EPSG_Status status, TItem item, const char* prefix)
570 {
571  if (status == EPSG_Status::eSuccess) {
572  return false;
573  }
574 
575  if (m_Status == EPSG_Status::eSuccess) m_Status = status;
576 
577  stringstream ss;
578 
579  if (m_Params.messages_only) {
580  auto delim = "";
581 
582  for (;;) {
583  auto message = item->GetNextMessage();
584 
585  if (message.empty()) break;
586 
587  ss << delim << message;
588  delim = ", ";
589  }
590  } else {
591  ss << prefix << s_StrStatus(status);
592 
593  for (;;) {
594  auto message = item->GetNextMessage();
595 
596  if (message.empty()) break;
597 
598  ss << "\n\t" << message;
599  }
600 
601  ss << '\n';
602  }
603 
604  cerr << ss.rdbuf();
605  return true;
606 }
607 
608 void SDataOnlyCopy::ItemComplete(EPSG_Status status, const shared_ptr<CPSG_ReplyItem>& item)
609 {
610  if (ReportErrors(status, item, "Item error: ")) {
611  return;
612 
613  } else if (item->GetType() == CPSG_ReplyItem::eBlobInfo) {
614  Process(static_pointer_cast<CPSG_BlobInfo>(item));
615 
616  } else if (item->GetType() == CPSG_ReplyItem::eBlobData) {
617  Process(static_pointer_cast<CPSG_BlobData>(item));
618 
619  } else if (item->GetType() == CPSG_ReplyItem::eNamedAnnotInfo) {
620  Process(static_pointer_cast<CPSG_NamedAnnotInfo>(item));
621  }
622 }
623 
624 void SDataOnlyCopy::ReplyComplete(EPSG_Status status, const shared_ptr<CPSG_Reply>& reply)
625 {
626  ReportErrors(status, reply, "Reply error: ");
627 }
628 
630 {
631  if (format == "asn1-text") return eSerial_AsnText;
632  if (format == "xml") return eSerial_Xml;
633  if (format == "json") return eSerial_Json;
634 
635  return eSerial_AsnBinary;
636 }
637 
638 TTypeInfo s_GetInputType(const shared_ptr<CPSG_BlobData>& blob_data)
639 {
640  using namespace objects;
641 
642  if (auto chunk_id = blob_data->GetId<CPSG_ChunkId>()) {
643  return chunk_id->GetId2Chunk() == 999999999 ? CID2S_Split_Info::GetTypeInfo() : CID2S_Chunk::GetTypeInfo();
644  }
645 
646  return CSeq_entry::GetTypeInfo();
647 }
648 
649 void SDataOnlyCopy::Process(shared_ptr<CPSG_BlobInfo> blob_info)
650 {
651  auto& p = m_Data[blob_info->GetId()->Repr()];
652 
653  if (p.second) {
654  Process(std::move(blob_info), std::move(p.second));
655  } else {
656  p.first = std::move(blob_info);
657  }
658 }
659 
660 void SDataOnlyCopy::Process(shared_ptr<CPSG_BlobData> blob_data)
661 {
662  auto& p = m_Data[blob_data->GetId()->Repr()];
663 
664  if (p.first) {
665  Process(std::move(p.first), std::move(blob_data));
666  } else {
667  p.second = std::move(blob_data);
668  }
669 }
670 
671 void SDataOnlyCopy::Process(shared_ptr<CPSG_BlobInfo> blob_info, shared_ptr<CPSG_BlobData> blob_data)
672 {
673  auto& is = blob_data->GetStream();
674 
676  cout << is.rdbuf();
677  return;
678  }
679 
680  auto input_format = s_GetInputFormat(blob_info->GetFormat());
681  unique_ptr<CObjectIStream> in;
682 
683  if (blob_info->GetCompression().find("zip") == string::npos) {
684  in.reset(CObjectIStream::Open(input_format, is));
685  } else {
686  unique_ptr<CZipStreamDecompressor> zip(new CZipStreamDecompressor);
687  unique_ptr<CCompressionIStream> compressed_is(new CCompressionIStream(is, zip.release(), CCompressionIStream::fOwnProcessor));
688  in.reset(CObjectIStream::Open(input_format, *compressed_is.release(), eTakeOwnership));
689  }
690 
691  _ASSERT(in);
692  in->UseMemoryPool();
693 
694  stringstream ss;
695  unique_ptr<CObjectOStream> out(CObjectOStream::Open(m_Params.output_format, ss));
696  CObjectStreamCopier copier(*in, *out);
697 
698  try {
699  copier.Copy(s_GetInputType(blob_data));
700  }
701  catch (CException& ex) {
702  cerr << "Failed to process blob '" << blob_data->GetId()->Repr() << "': " << ex.ReportThis() << endl;
703  return;
704  }
705 
706  cout << ss.rdbuf();
707 }
708 
709 void SDataOnlyCopy::Process(shared_ptr<CPSG_NamedAnnotInfo> named_annot_info)
710 {
712  cout << NStr::Base64Decode(named_annot_info->GetId2AnnotInfo());
713  return;
714  }
715 
716  for (const auto& info : named_annot_info->GetId2AnnotInfoList() ) {
718  }
719 }
720 
721 template <class... TArgs>
723 {
724  static void ItemComplete(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_ReplyItem>& item)
725  {
726  json_out << CJsonResponse(status, item);
727  }
728 
729  static void ReplyComplete(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_Reply>& reply)
730  {
731  json_out << CJsonResponse(status, reply);
732  }
733 };
734 
735 template <class... TArgs>
736 struct SNonVerbose : SNonVerboseBase<TArgs...>
737 {
738  using SNonVerboseBase<TArgs...>::SNonVerboseBase;
739 
740  void ItemComplete(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_ReplyItem>& item);
741  void ReplyComplete(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_Reply>& reply);
742 
743 private:
744  using TBase = SNonVerboseBase<TArgs...>;
745 
747 };
748 
749 template <class... TArgs>
750 void SNonVerbose<TArgs...>::ItemComplete(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_ReplyItem>& item)
751 {
752  const auto type = item->GetType();
753 
754  if (status == EPSG_Status::eNotFound) {
755  m_Items.try_emplace(type, item);
756  } else {
757  TBase::ItemComplete(json_out, status, item);
758 
759  if (auto [it, result] = m_Items.try_emplace(type, nullptr); !result && it->second) {
760  it->second.reset();
761  }
762  }
763 }
764 
765 template <class... TArgs>
766 void SNonVerbose<TArgs...>::ReplyComplete(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_Reply>& reply)
767 {
768  for (const auto& p : m_Items) {
769  if (auto& item = p.second) {
770  TBase::ItemComplete(json_out, EPSG_Status::eNotFound, item);
771  }
772  }
773 
774  if (status != EPSG_Status::eSuccess) {
775  TBase::ReplyComplete(json_out, status, reply);
776  }
777 }
778 
779 template <class... TArgs>
780 void s_ItemComplete(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_ReplyItem>& item)
781 {
782  json_out << CJsonResponse(status, item);
783 }
784 
785 template <class... TArgs>
786 void s_ReplyComplete(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_Reply>& reply)
787 {
788  json_out << CJsonResponse(status, reply);
789 }
790 
791 template <>
792 void s_ReplyComplete<SOneRequestParams>(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_Reply>& reply)
793 {
794  if (status != EPSG_Status::eSuccess) {
795  s_ReplyComplete<>(json_out, status, reply);
796  }
797 }
798 
799 template <class... TArgs>
800 void s_NewItem(SJsonOut&, const shared_ptr<CPSG_ReplyItem>&)
801 {
802 }
803 
804 template <>
805 void s_NewItem<SOneRequestParams>(SJsonOut& json_out, const shared_ptr<CPSG_ReplyItem>& reply_item)
806 {
807  json_out << CJsonResponse::NewItem(reply_item);
808 }
809 
810 int CProcessing::OneRequest(const SOneRequestParams& params, shared_ptr<CPSG_Request> request)
811 {
812  SDataOnlyCopy data_only_copy(params.data_only);
813  CLogLatencyReport latency_report{
814  "/objtools/pubseq_gateway/client/",
815  R"(\S+: (\S+:[0-9]+)/\S+?\S+&client_id=\S+)",
816  R"(\S+: Closed with status \S+)",
817  R"(\S+: \S+&item_type=reply&\S+&exec_time=([0-9]+)\\n)"
818  };
819 
820  if (params.latency.which) {
821  latency_report.Start(params.latency.which);
822  latency_report.SetDebug(params.latency.debug);
823  TPSG_DebugPrintout::SetDefault(TPSG_DebugPrintout::TValue::eSome);
824  }
825 
826  CPSG_EventLoop queue;
827  SJsonOut json_out;
828  SNonVerbose<> non_verbose;
829 
830  using namespace placeholders;
831 
832  if (params.data_only.enabled) {
833  auto item_complete = bind(&SDataOnlyCopy::ItemComplete, &data_only_copy, _1, _2);
834  auto reply_complete = bind(&SDataOnlyCopy::ReplyComplete, &data_only_copy, _1, _2);
835  queue = CPSG_EventLoop(params.service, item_complete, reply_complete);
836  } else if (!params.verbose) {
837  auto item_complete = bind(&SNonVerbose<>::ItemComplete, &non_verbose, ref(json_out), _1, _2);
838  auto reply_complete = bind(&SNonVerbose<>::ReplyComplete, &non_verbose, ref(json_out), _1, _2);
839  queue = CPSG_EventLoop(params.service, item_complete, reply_complete);
840  } else {
841  auto item_complete = bind(&s_ItemComplete<SOneRequestParams>, ref(json_out), _1, _2);
842  auto reply_complete = bind(&s_ReplyComplete<SOneRequestParams>, ref(json_out), _1, _2);
843  auto new_item = bind(&s_NewItem<SOneRequestParams>, ref(json_out), _1);
844  queue = CPSG_EventLoop(params.service, item_complete, reply_complete, new_item);
845  }
846 
848  _VERIFY(queue.SendRequest(request, CDeadline::eInfinite));
849  queue.Stop();
851  return data_only_copy;
852 }
853 
854 template <>
856 {
858  SNonVerboseBase(string id) : string(std::move(id)) {}
859 
860  void ItemComplete(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_ReplyItem>& item);
861  void ReplyComplete(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_Reply>& reply);
862 };
863 
864 template <class TParams>
866 {
868 }
869 
870 template <>
872 {
873  string id;
874 
875  while (m_InputQueue.Pop(id)) {
876  _ASSERT(!id.empty()); // ReadLine makes sure it's not empty
877  auto bio_id = CPSG_BioId(id, m_Params.type);
878  auto user_context = make_shared<SNonVerbose<SBatchResolveParams>>(std::move(id));
879  auto request = make_shared<CPSG_Request_Resolve>(std::move(bio_id), m_Params.bio_id_resolution, std::move(user_context));
880 
881  request->IncludeInfo(m_Params.include_info);
882  request->SetAccSubstitution(m_Params.acc_substitution);
883 
884  _VERIFY(output.SendRequest(std::move(request), CDeadline::eInfinite));
885  }
886 
887  output.Stop();
888 }
889 
890 using verbose = true_type;
891 using no_verbose = false_type;
892 
893 template <>
894 void s_ItemComplete<no_verbose>(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_ReplyItem>& item)
895 {
896  auto context = item->GetReply()->GetRequest()->GetUserContext<SNonVerbose<SBatchResolveParams>>();
897  _ASSERT(context);
898  context->ItemComplete(json_out, status, item);
899 }
900 
901 void SNonVerboseBase<SBatchResolveParams>::ItemComplete(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_ReplyItem>& item)
902 {
903  reported.emplace(status);
904  s_ItemComplete<verbose>(json_out, status, item);
905 }
906 
907 template <>
908 void s_ReplyComplete<no_verbose>(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_Reply>& reply)
909 {
910  auto context = reply->GetRequest()->GetUserContext<SNonVerbose<SBatchResolveParams>>();
911  _ASSERT(context);
912  context->ReplyComplete(json_out, status, reply);
913 }
914 
915 void SNonVerboseBase<SBatchResolveParams>::ReplyComplete(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_Reply>& reply)
916 {
917  if (auto first_message = reply->GetNextMessage(SParams::min_severity); first_message || (reported.find(status) == reported.end())) {
918  CJsonResponse result_doc(status, reply, first_message);
919  json_out << result_doc;
920  }
921 }
922 
923 template <class TParams>
925 {
926  // MS VS does not like the ternary operator here anymore
927  if (SParams::verbose) {
928  return &s_ItemComplete<verbose>;
929  } else {
931  }
932 }
933 
934 template <class TParams>
936 {
937  // MS VS does not like the ternary operator here anymore
938  if (SParams::verbose) {
939  return &s_ReplyComplete<verbose>;
940  } else {
942  }
943 }
944 
945 template <class TParams>
947 {
948  return &s_NewItem<TParams>;
949 }
950 
951 template <>
953 {
954  string line, protein, n;
955 
956  while (m_InputQueue.Pop(line)) {
957  _ASSERT(!line.empty()); // ReadLine makes sure it's not empty
958 
959  auto nucleotide = NStr::SplitInTwo(line, ",", protein, n) ? CPSG_Request_IpgResolve::TNucleotide(n) : null;
960  auto user_context = make_shared<SNonVerbose<SBatchResolveParams>>(std::move(line));
961  auto request = make_shared<CPSG_Request_IpgResolve>(std::move(protein), 0, std::move(nucleotide), std::move(user_context));
962 
963  _VERIFY(output.SendRequest(std::move(request), CDeadline::eInfinite));
964  }
965 
966  output.Stop();
967 }
968 
969 template <>
971 {
974 }
975 
976 template <>
978 {
979  CJson_Document json_schema_doc(CProcessing::RequestSchema());
980  CJson_Schema json_schema(json_schema_doc);
981  string line;
982 
983  while (m_InputQueue.Pop(line)) {
984  _ASSERT(!line.empty()); // ReadLine makes sure it's not empty
985 
986  CJson_Document json_doc;
987 
988  if (!json_doc.ParseString(line)) {
989  json_out << CJsonResponse(s_GetId(json_doc), eJsonRpc_ParseError, json_doc.GetReadError());
990  } else if (!json_schema.Validate(json_doc)) {
991  json_out << CJsonResponse(s_GetId(json_doc), eJsonRpc_InvalidRequest, json_schema.GetValidationError());
992  } else {
993  if (m_Params.echo) json_out << json_doc;
994 
995  CJson_ConstObject json_obj(json_doc.GetObject());
996  auto method = json_obj["method"].GetValue().GetString();
997  auto id = json_obj["id"].GetValue().GetString();
998  auto params_obj = json_obj["params"].GetObject();
999  auto user_context = make_shared<string>(id);
1000  auto request_context = m_Params.testing ? null : SInteractiveNewRequestStart(method, params_obj).Get();
1001 
1002  if (auto request = SRequestBuilder::Build(method, params_obj, std::move(user_context), std::move(request_context))) {
1003  _VERIFY(output.SendRequest(std::move(request), CDeadline::eInfinite));
1004  }
1005  }
1006  }
1007 
1008  output.Stop();
1009 }
1010 
1011 using testing = true_type;
1012 using no_testing = false_type;
1013 using server_mode = true_type;
1014 using no_server_mode = false_type;
1015 
1016 template <>
1017 void s_ReplyComplete<SInteractiveParams, testing, server_mode>(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_Reply>& reply)
1018 {
1019  const auto request = reply->GetRequest();
1020  const auto& request_id = *request->GetUserContext<string>();
1021 
1022  CJsonResponse result_doc(status, reply, CJsonResponse::eDoNotAddRequestID);
1023  json_out << CJsonResponse(request_id, result_doc);
1024 }
1025 
1026 template <>
1027 void s_ReplyComplete<SInteractiveParams, testing, no_server_mode>(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_Reply>& reply)
1028 {
1029  if (status != EPSG_Status::eSuccess) {
1031  }
1032 }
1033 
1034 template <>
1035 void s_ReplyComplete<SInteractiveParams, no_testing, server_mode>(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_Reply>& reply)
1036 {
1037  const auto request = reply->GetRequest();
1038  CRequestContextGuard_Base guard(request->GetRequestContext()->Clone());
1039  guard.SetStatus(s_PsgStatusToRequestStatus(status));
1040 
1042 }
1043 
1044 template <>
1045 void s_ReplyComplete<SInteractiveParams, no_testing, no_server_mode>(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_Reply>& reply)
1046 {
1047  const auto request = reply->GetRequest();
1048  CRequestContextGuard_Base guard(request->GetRequestContext()->Clone());
1049  guard.SetStatus(s_PsgStatusToRequestStatus(status));
1050 
1052 }
1053 
1054 template <>
1055 void s_ItemComplete<SInteractiveParams>(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_ReplyItem>& item)
1056 {
1057  const auto request = item->GetReply()->GetRequest();
1058  const auto& request_id = *request->GetUserContext<string>();
1059 
1060  CJsonResponse result_doc(status, item, CJsonResponse::eDoNotAddRequestID);
1061  json_out << CJsonResponse(request_id, result_doc);
1062 }
1063 
1064 template <>
1066 {
1068 }
1069 
1070 template <>
1072 {
1073  if (m_Params.testing) {
1074  // MS VS does not like the ternary operator here anymore
1075  if (m_Params.server) {
1077  } else {
1079  }
1080  } else {
1081  // MS VS does not like the ternary operator here anymore
1082  if (m_Params.server) {
1084  } else {
1086  }
1087  }
1088 }
1089 
1090 template <>
1091 void s_NewItem<SInteractiveParams, verbose>(SJsonOut& json_out, const shared_ptr<CPSG_ReplyItem>& item)
1092 {
1093  const auto request = item->GetReply()->GetRequest();
1094  const auto& request_id = *request->GetUserContext<string>();
1095 
1096  auto result_doc = CJsonResponse::NewItem(item);
1097  json_out << CJsonResponse(request_id, result_doc);
1098 }
1099 
1100 template <>
1102 {
1103  // MS VS does not like the ternary operator here anymore
1104  if (SParams::verbose) {
1106  } else {
1107  return &s_NewItem<SInteractiveParams, no_verbose>;
1108  }
1109 }
1110 
1111 template <class TParams>
1113  json_out(params.pipe, params.server),
1114  m_Params(params)
1115 {
1116  Init(params);
1117 }
1118 
1119 template <class TParams>
1121  m_Impl(params)
1122 {
1123  using namespace placeholders;
1124  auto item_complete = bind(m_Impl.GetItemComplete(), ref(m_Impl.json_out), _1, _2);
1125  auto reply_complete = bind(m_Impl.GetReplyComplete(), ref(m_Impl.json_out), _1, _2);
1126  auto new_item = bind(m_Impl.GetNewItem(), ref(m_Impl.json_out), _1);
1127 
1128  for (int n = params.worker_threads; n > 0; --n) {
1129  m_PsgQueues.emplace_back(params.service, item_complete, reply_complete, new_item);
1130  auto& queue = m_PsgQueues.back();
1131  queue.SetRequestFlags(params.request_flags);
1132  queue.SetUserArgs(params.user_args);
1133  m_Threads.emplace_back(&CPSG_EventLoop::Run, ref(queue), CDeadline::eInfinite);
1134  m_Threads.emplace_back(&SImpl::Submitter, &m_Impl, ref(queue));
1135  }
1136 }
1137 
1138 shared_ptr<CPSG_Reply> s_GetReply(shared_ptr<CPSG_ReplyItem>& item)
1139 {
1140  return item->GetReply();
1141 }
1142 
1143 shared_ptr<CPSG_Reply> s_GetReply(shared_ptr<CPSG_Reply>& reply)
1144 {
1145  return reply;
1146 }
1147 
1149 {
1150  switch (psg_status) {
1156  case EPSG_Status::eInProgress: _TROUBLE; break;
1157  }
1158 
1160 }
1161 
1162 string s_GetId(const CJson_Document& req_doc)
1163 {
1164  string id;
1165 
1166  if (req_doc.IsObject()) {
1167  auto req_obj = req_doc.GetObject();
1168 
1169  if (req_obj.has("id")) {
1170  auto id_node = req_obj["id"];
1171 
1172  if (id_node.IsValue()) {
1173  auto id_value = id_node.GetValue();
1174 
1175  if (id_value.IsString()) {
1176  id = id_value.GetString();
1177  }
1178  }
1179  }
1180  }
1181 
1182  return id;
1183 }
1184 
1185 template <class TCreateContext>
1186 vector<shared_ptr<CPSG_Request>> CProcessing::ReadCommands(TCreateContext create_context, size_t report_progress_after)
1187 {
1188  static CJson_Schema json_schema(RequestSchema());
1189  string line;
1190  vector<shared_ptr<CPSG_Request>> requests;
1191  unordered_set<string> ids;
1192 
1193  // Read requests from cin
1194  while (ReadLine(line)) {
1195  CJson_Document json_doc;
1196 
1197  if (!json_doc.ParseString(line)) {
1198  cerr << "Error in request '" << line << "': " << json_doc.GetReadError() << endl;
1199  return {};
1200  }
1201 
1202  auto id = s_GetId(json_doc);
1203 
1204  if (id.empty()) {
1205  cerr << "Error in request '" << line << "': no id or id is empty" << endl;
1206  return {};
1207  } else if (!json_schema.Validate(json_doc)) {
1208  cerr << "Error in request '" << id << "': " << json_schema.GetValidationError() << endl;
1209  return {};
1210  } else if (!ids.insert(id).second) {
1211  cerr << "Error in request '" << id << "': duplicate ID" << endl;
1212  return {};
1213  } else {
1214  CJson_ConstObject json_obj(json_doc.GetObject());
1215  auto method = json_obj["method"].GetValue().GetString();
1216  auto params_obj = json_obj["params"].GetObject();
1217  auto user_context = create_context(id, params_obj);
1218 
1219  if (!user_context) return {};
1220 
1221  if (auto request = SRequestBuilder::Build(method, params_obj, std::move(user_context))) {
1222  requests.emplace_back(std::move(request));
1223  if (report_progress_after && (requests.size() % report_progress_after == 0)) cerr << '.';
1224  }
1225  }
1226  }
1227 
1228  return requests;
1229 }
1230 
1232 {
1233  if (params.delay < 0.0) {
1234  cerr << "DELAY must be non-negative" << endl;
1235  return -1;
1236  }
1237 
1238  const size_t kReportProgressAfter = 2000;
1239 
1240  using TReplyStorage = deque<shared_ptr<CPSG_Reply>>;
1241 
1242  if (SParams::verbose) cerr << "Preparing requests: ";
1243  auto requests = ReadCommands([](string id, CJson_ConstNode&){ return make_shared<SMetrics>(std::move(id)); }, SParams::verbose ? kReportProgressAfter : 0);
1244 
1245  if (requests.empty()) return -1;
1246 
1247  atomic_size_t start(params.user_threads);
1248  atomic_int to_submit(static_cast<int>(requests.size()));
1249  auto wait = [&]() { while (start > 0) this_thread::sleep_for(chrono::microseconds(1)); };
1250 
1251  auto l = [&](CPSG_Queue& queue, TReplyStorage& replies) {
1252  start--;
1253  wait();
1254 
1255  for (;;) {
1256  auto i = to_submit--;
1257 
1258  if (i <= 0) break;
1259 
1260  // Submit and get response
1261  auto& request = requests[requests.size() - i];
1262  auto metrics = request->GetUserContext<SMetrics>();
1263 
1264  metrics->Set(SMetricType::eStart);
1265  auto reply = queue.SendRequestAndGetReply(request, CDeadline::eInfinite);
1266  metrics->Set(SMetricType::eSubmit);
1267 
1268  _ASSERT(reply);
1269 
1270  metrics->Set(SMetricType::eReply);
1271 
1272  for (;;) {
1273  auto reply_item = reply->GetNextItem(CDeadline::eInfinite);
1274  _ASSERT(reply_item);
1275 
1276  const auto type = reply_item->GetType();
1277 
1278  if (type == CPSG_ReplyItem::eEndOfReply) break;
1279 
1280  const auto status = reply_item->GetStatus(CDeadline::eInfinite);
1281  metrics->AddItem({type, status});
1282  }
1283 
1284  const auto status = reply->GetStatus(CDeadline::eInfinite);
1285  metrics->Set(SMetricType::eDone);
1286  metrics->AddItem({CPSG_ReplyItem::eEndOfReply, status});
1287 
1288  if (params.report_immediately) {
1289  // Metrics are reported on destruction
1290  metrics.reset();
1291  request.reset();
1292  reply.reset();
1293  } else {
1294  // Store the reply for now to prevent metrics from being written to cout (affects performance)
1295  replies.emplace_back(reply);
1296  }
1297 
1298  if (params.delay) {
1299  this_thread::sleep_for(chrono::duration<double>(params.delay));
1300  }
1301  }
1302  };
1303 
1304  vector<CPSG_Queue> queues;
1305  vector<TReplyStorage> replies(params.user_threads);
1306 
1307  for (size_t i = 0; i < (params.local_queue ? params.user_threads : 1); ++i) {
1308  queues.emplace_back(params.service);
1309  queues.back().SetRequestFlags(params.request_flags);
1310  queues.back().SetUserArgs(params.user_args);
1311  }
1312 
1313  vector<thread> threads;
1314  threads.reserve(params.user_threads);
1315 
1316  // Start threads in advance so it won't affect metrics
1317  for (size_t i = 0; i < params.user_threads; ++i) {
1318  threads.emplace_back(l, ref(queues[params.local_queue ? i : 0]), ref(replies[i]));
1319  }
1320 
1321  wait();
1322 
1323  // Start processing replies
1324  if (SParams::verbose) {
1325  cerr << "\nSubmitting requests: ";
1326  size_t previous = requests.size() / kReportProgressAfter;
1327 
1328  while (to_submit > 0) {
1329  size_t current = to_submit / kReportProgressAfter;
1330 
1331  for (auto i = current; i < previous; ++i) {
1332  cerr << '.';
1333  }
1334 
1335  previous = current;
1336  this_thread::sleep_for(chrono::milliseconds(100));
1337  }
1338 
1339  cerr << "\nWaiting for threads: " << params.user_threads << '\n';
1340  }
1341 
1342  for (auto& t : threads) {
1343  t.join();
1344  }
1345 
1346  // Release all replies held in queues, if any
1347  queues.clear();
1348 
1349  // Output metrics
1350  if (SParams::verbose) cerr << "Outputting metrics: ";
1351  requests.clear();
1352  size_t output = 0;
1353 
1354  for (auto& thread_replies : replies) {
1355  for (auto& reply : thread_replies) {
1356  reply.reset();
1357  if (SParams::verbose && (++output % kReportProgressAfter == 0)) cerr << '.';
1358  }
1359  }
1360 
1361  if (SParams::verbose) cerr << '\n';
1362  return 0;
1363 }
1364 
1365 bool CProcessing::ReadLine(string& line, istream& is)
1366 {
1367  for (;;) {
1368  if (!getline(is, line)) {
1369  return false;
1370  } else if (!line.empty()) {
1371  return true;
1372  }
1373  }
1374 }
1375 
1379 
1381 {
1382  // All JSON types have already been validated with the scheme
1383 
1384  auto context = params_obj.find("context");
1386 
1387  if (context != params_obj.end()) {
1388  auto context_obj = context->value.GetObject();
1389 
1390  auto sid = context_obj.find("sid");
1391 
1392  if (sid != context_obj.end()) {
1393  ctx.SetSessionID(sid->value.GetValue().GetString());
1394  }
1395 
1396  auto phid = context_obj.find("phid");
1397 
1398  if (phid != context_obj.end()) {
1399  ctx.SetHitID(phid->value.GetValue().GetString());
1400  }
1401 
1402  auto auth_token = context_obj.find("auth_token");
1403 
1404  if (auth_token != context_obj.end()) {
1405  ctx.SetProperty("auth_token", auth_token->value.GetValue().GetString());
1406  }
1407 
1408  auto client_ip = context_obj.find("client_ip");
1409 
1410  if (client_ip != context_obj.end()) {
1411  ctx.SetClientIP(client_ip->value.GetValue().GetString());
1412  }
1413  }
1414 
1415  if (!ctx.IsSetSessionID()) ctx.SetSessionID();
1416  if (!ctx.IsSetHitID()) ctx.SetHitID();
1417 
1418  SExtra extra;
1419  extra.Print("request", request);
1420  extra.Print("params", params_obj);
1421 }
1422 
1424 {
1425  _ASSERT(json.IsNumber());
1426 
1427  if (json.IsInt4()) {
1428  Print(prefix, json.GetInt4());
1429  } else if (json.IsUint4()) {
1430  Print(prefix, json.GetUint4());
1431  } else if (json.IsInt8()) {
1432  Print(prefix, json.GetInt8());
1433  } else if (json.IsUint8()) {
1434  Print(prefix, json.GetUint8());
1435  } else if (json.IsDouble()) {
1436  Print(prefix, json.GetDouble());
1437  } else {
1438  _TROUBLE;
1439  }
1440 }
1441 
1443 {
1444  for (size_t i = 0; i < json.size(); ++i) {
1445  Print(prefix + '[' + to_string(i) + ']', json[i]);
1446  }
1447 }
1448 
1450 {
1451  for (const auto& pair : json) {
1452  Print(prefix + '.' + pair.name, pair.value);
1453  }
1454 }
1455 
1457 {
1458  switch (json.GetType()) {
1460  Print(prefix, "<null>");
1461  break;
1462 
1464  Print(prefix, json.GetValue().GetBool() ? "true" : "false");
1465  break;
1466 
1468  Print(prefix, json.GetValue().GetString());
1469  break;
1470 
1472  Print(prefix, json.GetValue());
1473  break;
1474 
1476  Print(prefix, json.GetArray());
1477  break;
1478 
1480  Print(prefix, json.GetObject());
1481  break;
1482  };
1483 }
1484 
1485 int CProcessing::JsonCheck(istream* schema_is)
1486 {
1487  CJson_Document schema_doc;
1488 
1489  if (!schema_is) {
1490  schema_doc = RequestSchema();
1491 
1492  } else if (!schema_doc.Read(*schema_is)) {
1493  cerr << "Error on reading JSON schema: " << schema_doc.GetReadError() << endl;
1494  return -1;
1495 
1496  } else if (CJson_MetaSchema meta_schema; !meta_schema.Validate(schema_doc)) {
1497  cerr << "Error on validating JSON schema: " << meta_schema.GetValidationError() << endl;
1498  return -1;
1499  }
1500 
1501  CJson_Schema schema(schema_doc);
1502  string line;
1503  size_t line_no = 0;
1504  int rv = 0;
1505 
1506  while (ReadLine(line)) {
1507  CJson_Document input_doc;
1508  ++line_no;
1509 
1510  if (!input_doc.ParseString(line)) {
1511  cout << "Error on reading JSON document (" << line_no << "): " << input_doc.GetReadError() << endl;
1512  if (rv == 0) rv = -2;
1513  } else if (schema.Validate(input_doc)) {
1514  cout << "JSON document (" << line_no << ") is valid" << endl;
1515  } else {
1516  cout << "Error on validating JSON document (" << line_no << "): " << schema.GetValidationError() << endl;
1517  if (rv == 0) rv = -3;
1518  }
1519  }
1520 
1521  cout << line_no << " JSON document(s) have been checked" << endl;
1522  return rv;
1523 }
1524 
1526 {
1527  CObjectTypeInfo info(objects::CSeq_id::GetTypeInfo());
1528 
1529  if (auto index = info.FindVariantIndex(type)) return static_cast<CPSG_BioId::TType>(index);
1530  if (auto value = objects::CSeq_id::WhichInverseSeqId(type)) return value;
1531 
1532  return static_cast<CPSG_BioId::TType>(atoi(type.c_str()));
1533 }
1534 
1536 {
1537  auto id = array[0].GetValue().GetString();
1538 
1539  if (array.size() == 1) return CPSG_BioId(id);
1540 
1541  auto value = array[1].GetValue();
1542  auto type = value.IsString() ? GetBioIdType(value.GetString()) : static_cast<CPSG_BioId::TType>(value.GetInt4());
1543  return CPSG_BioId(id, type);
1544 }
1545 
1547 {
1548  CPSG_BioIds rv;
1549 
1550  if (input.has("bio_ids")) {
1551  auto bio_ids = input["bio_ids"].GetArray();
1552 
1553  for (const auto& bio_id : bio_ids) {
1554  rv.push_back(GetBioId(bio_id.GetArray()));
1555  }
1556  } else {
1557  rv.push_back(GetBioId(input["bio_id"].GetArray()));
1558  }
1559 
1560  return rv;
1561 }
1562 
1564 {
1565  auto array = input["blob_id"].GetArray();
1566  auto id = array[0].GetValue().GetString();
1567  return array.size() > 1 ? CPSG_BlobId(std::move(id), array[1].GetValue().GetInt8()) : std::move(id);
1568 }
1569 
1571 {
1572  auto array = input["chunk_id"].GetArray();
1573  return { static_cast<int>(array[0].GetValue().GetInt4()), array[1].GetValue().GetString() };
1574 }
1575 
1577 {
1578  auto na_array = input["named_annots"].GetArray();
1580 
1581  for (const auto& na : na_array) {
1582  names.push_back(na.GetValue().GetString());
1583  }
1584 
1585  return names;
1586 }
1587 
1588 template <>
1590 {
1591  const auto& info_flags = GetInfoFlags();
1592 
1593  auto i = info_flags.begin();
1594  bool all_info_except = specified(i->name);
1595  CPSG_Request_Resolve::TIncludeInfo include_info = all_info_except ? CPSG_Request_Resolve::fAllInfo : CPSG_Request_Resolve::TIncludeInfo(0);
1596 
1597  for (++i; i != info_flags.end(); ++i) {
1598  if (specified(i->name)) {
1599  if (all_info_except) {
1600  include_info &= ~i->value;
1601  } else {
1602  include_info |= i->value;
1603  }
1604  }
1605  }
1606 
1607  // Provide all info if nothing is specified explicitly
1608  return include_info ? include_info : CPSG_Request_Resolve::fAllInfo;
1609 }
1610 
1612 {
1613  if (!input.has("exclude_blobs")) return;
1614 
1615  auto blob_ids = input["exclude_blobs"].GetArray();
1616 
1617  for (const auto& blob_id : blob_ids) {
1618  exclude(blob_id.GetValue().GetString());
1619  }
1620 }
1621 
1622 void SRequestBuilder::SReader<CJson_ConstObject>::SetRequestFlags(shared_ptr<CPSG_Request> request) const
1623 {
1624  if (!input.has("request_flags")) return;
1625 
1626  for (const auto& request_flag : input["request_flags"].GetArray()) {
1627  const auto value = request_flag.GetValue().GetString();
1628 
1629  if (value == "exclude-hup") {
1630  request->SetFlags(CPSG_Request::fExcludeHUP);
1631 
1632  } else if (value == "include-hup") {
1633  request->SetFlags(CPSG_Request::fIncludeHUP);
1634  }
1635  }
1636 }
1637 
1638 const initializer_list<SDataFlag> kDataFlags =
1639 {
1640  { "no-tse", "Return only the info", CPSG_Request_Biodata::eNoTSE },
1641  { "slim-tse", "Return split info blob if available, or nothing", CPSG_Request_Biodata::eSlimTSE },
1642  { "smart-tse", "Return split info blob if available, or original blob", CPSG_Request_Biodata::eSmartTSE },
1643  { "whole-tse", "Return all split blobs if available, or original blob", CPSG_Request_Biodata::eWholeTSE },
1644  { "orig-tse", "Return original blob", CPSG_Request_Biodata::eOrigTSE },
1645 };
1646 
1647 const initializer_list<SDataFlag>& SRequestBuilder::GetDataFlags()
1648 {
1649  return kDataFlags;
1650 }
1651 
1652 const initializer_list<SInfoFlag> kInfoFlags =
1653 {
1654  { "all-info-except", "Return all info except explicitly specified by other flags", CPSG_Request_Resolve::fAllInfo },
1655  { "canonical-id", "Return canonical ID info", CPSG_Request_Resolve::fCanonicalId },
1656  { "name", "Use name for canonical ID info, if returned", CPSG_Request_Resolve::fName },
1657  { "other-ids", "Return other IDs info", CPSG_Request_Resolve::fOtherIds },
1658  { "molecule-type", "Return molecule type info", CPSG_Request_Resolve::fMoleculeType },
1659  { "length", "Return length info", CPSG_Request_Resolve::fLength },
1660  { "chain-state", "Return chain state info (in seq_state pair)", CPSG_Request_Resolve::fChainState },
1661  { "state", "Return state info", CPSG_Request_Resolve::fState },
1662  { "blob-id", "Return blob ID info", CPSG_Request_Resolve::fBlobId },
1663  { "tax-id", "Return tax ID info", CPSG_Request_Resolve::fTaxId },
1664  { "hash", "Return hash info", CPSG_Request_Resolve::fHash },
1665  { "date-changed", "Return date changed info", CPSG_Request_Resolve::fDateChanged },
1666  { "gi", "Return GI", CPSG_Request_Resolve::fGi },
1667 };
1668 
1669 const initializer_list<SInfoFlag>& SRequestBuilder::GetInfoFlags()
1670 {
1671  return kInfoFlags;
1672 }
1673 
1675 {
1676  CJson_Document rv(R"REQUEST_SCHEMA(
1677 {
1678  "type": "object",
1679  "definitions": {
1680  "jsonrpc": {
1681  "$id": "#jsonrpc",
1682  "enum": [
1683  "2.0"
1684  ]
1685  },
1686  "bio_id": {
1687  "$id": "#bio_id",
1688  "type": "array",
1689  "items": [
1690  {
1691  "type": "string"
1692  },
1693  {
1694  "oneOf": [
1695  {
1696  "type": "string"
1697  },
1698  {
1699  "type": "number"
1700  }
1701  ]
1702  }
1703  ],
1704  "minItems": 1,
1705  "maxItems": 2
1706  },
1707  "bio_ids": {
1708  "$id": "#bio_ids",
1709  "type": "array",
1710  "items": {
1711  "$ref": "#/definitions/bio_id"
1712  },
1713  "minItems": 1
1714  },
1715  "blob_id": {
1716  "$id": "#blob_id",
1717  "type": "array",
1718  "items": [
1719  {
1720  "type": "string"
1721  },
1722  {
1723  "type": "number"
1724  }
1725  ],
1726  "minItems": 1,
1727  "maxItems": 2
1728  },
1729  "chunk_id": {
1730  "$id": "#chunk_id",
1731  "type": "array",
1732  "items": [
1733  {
1734  "type": "number"
1735  },
1736  {
1737  "type": "string"
1738  }
1739  ],
1740  "minItems": 2,
1741  "maxItems": 2
1742  },
1743  "include_data": {
1744  "$id": "#include_data",
1745  "enum": [
1746  "no-tse",
1747  "slim-tse",
1748  "smart-tse",
1749  "whole-tse",
1750  "orig-tse"
1751  ]
1752  },
1753  "include_info": {
1754  "$id": "#include_info",
1755  "type": "array",
1756  "items": {
1757  "type": "string",
1758  "enum": [
1759  "all-info-except",
1760  "canonical-id",
1761  "name",
1762  "other-ids",
1763  "molecule-type",
1764  "length",
1765  "chain-state",
1766  "state",
1767  "blob-id",
1768  "tax-id",
1769  "hash",
1770  "date-changed",
1771  "gi"
1772  ]
1773  },
1774  "uniqueItems": true
1775  },
1776  "named_annots": {
1777  "$id": "#named_annots",
1778  "type": "array",
1779  "items": {
1780  "type": "string"
1781  }
1782  },
1783  "exclude_blobs": {
1784  "$id": "#exclude_blobs",
1785  "type": "array",
1786  "items": {
1787  "type": "string"
1788  }
1789  },
1790  "acc_substitution": {
1791  "$id": "#acc_substitution",
1792  "enum": [
1793  "default",
1794  "limited",
1795  "never"
1796  ]
1797  },
1798  "snp_scale_limit": {
1799  "$id": "#snp_scale_limit",
1800  "enum": [
1801  "default",
1802  "unit",
1803  "contig",
1804  "supercontig",
1805  "chromosome"
1806  ]
1807  },
1808  "context": {
1809  "$id": "#context",
1810  "type": "object",
1811  "properties": {
1812  "sid": {
1813  "type": "string"
1814  },
1815  "phid": {
1816  "type": "string"
1817  },
1818  "auth_token": {
1819  "type": "string"
1820  },
1821  "client_ip": {
1822  "type": "string"
1823  }
1824  }
1825  },
1826  "request_flags": {
1827  "$id": "#request_flags",
1828  "type": "array",
1829  "items": {
1830  "type": "string",
1831  "enum": [
1832  "exclude-hup",
1833  "include-hup"
1834  ]
1835  },
1836  "uniqueItems": true
1837  },
1838  )REQUEST_SCHEMA"
1839  R"REQUEST_SCHEMA(
1840  "biodata": {
1841  "$id": "#biodata",
1842  "type": "object",
1843  "properties": {
1844  "jsonrpc": {
1845  "$ref": "#/definitions/jsonrpc"
1846  },
1847  "method": {
1848  "enum": [
1849  "biodata"
1850  ]
1851  },
1852  "params": {
1853  "type": "object",
1854  "properties": {
1855  "bio_id": {
1856  "$ref": "#/definitions/bio_id"
1857  },
1858  "include_data": {
1859  "$ref": "#/definitions/include_data"
1860  },
1861  "exclude_blobs": {
1862  "$ref": "#/definitions/exclude_blobs"
1863  },
1864  "acc_substitution": {
1865  "$ref": "#/definitions/acc_substitution"
1866  },
1867  "bio_id_resolution": {
1868  "type": "boolean"
1869  },
1870  "resend_timeout": {
1871  "type": "number"
1872  },
1873  "context": {
1874  "$ref": "#/definitions/context"
1875  },
1876  "request_flags": {
1877  "$ref": "#/definitions/request_flags"
1878  },
1879  "user_args": {
1880  "type": "string"
1881  }
1882  },
1883  "required": [
1884  "bio_id"
1885  ]
1886  },
1887  "id": {
1888  "type": "string"
1889  }
1890  },
1891  "required": [
1892  "jsonrpc",
1893  "method",
1894  "params",
1895  "id"
1896  ]
1897  },
1898  "blob": {
1899  "$id": "#blob",
1900  "type": "object",
1901  "properties": {
1902  "jsonrpc": {
1903  "$ref": "#/definitions/jsonrpc"
1904  },
1905  "method": {
1906  "enum": [
1907  "blob"
1908  ]
1909  },
1910  "params": {
1911  "type": "object",
1912  "properties": {
1913  "blob_id": {
1914  "$ref": "#/definitions/blob_id"
1915  },
1916  "include_data": {
1917  "$ref": "#/definitions/include_data"
1918  },
1919  "context": {
1920  "$ref": "#/definitions/context"
1921  },
1922  "request_flags": {
1923  "$ref": "#/definitions/request_flags"
1924  },
1925  "user_args": {
1926  "type": "string"
1927  }
1928  },
1929  "required": [
1930  "blob_id"
1931  ]
1932  },
1933  "id": {
1934  "type": "string"
1935  }
1936  },
1937  "required": [
1938  "jsonrpc",
1939  "method",
1940  "params",
1941  "id"
1942  ]
1943  },
1944  "resolve": {
1945  "$id": "#resolve",
1946  "type": "object",
1947  "properties": {
1948  "jsonrpc": {
1949  "$ref": "#/definitions/jsonrpc"
1950  },
1951  "method": {
1952  "enum": [
1953  "resolve"
1954  ]
1955  },
1956  "params": {
1957  "type": "object",
1958  "properties": {
1959  "bio_id": {
1960  "$ref": "#/definitions/bio_id"
1961  },
1962  "include_info": {
1963  "$ref": "#/definitions/include_info"
1964  },
1965  "acc_substitution": {
1966  "$ref": "#/definitions/acc_substitution"
1967  },
1968  "bio_id_resolution": {
1969  "type": "boolean"
1970  },
1971  "context": {
1972  "$ref": "#/definitions/context"
1973  },
1974  "request_flags": {
1975  "$ref": "#/definitions/request_flags"
1976  },
1977  "user_args": {
1978  "type": "string"
1979  }
1980  },
1981  "required": [
1982  "bio_id"
1983  ]
1984  },
1985  "id": {
1986  "type": "string"
1987  }
1988  },
1989  "required": [
1990  "jsonrpc",
1991  "method",
1992  "params",
1993  "id"
1994  ]
1995  },
1996  "named_annot": {
1997  "$id": "#named_annot",
1998  "type": "object",
1999  "properties": {
2000  "jsonrpc": {
2001  "$ref": "#/definitions/jsonrpc"
2002  },
2003  "method": {
2004  "enum": [
2005  "named_annot"
2006  ]
2007  },
2008  "params": {
2009  "type": "object",
2010  "properties": {
2011  "bio_id": {
2012  "$ref": "#/definitions/bio_id"
2013  },
2014  "bio_ids": {
2015  "$ref": "#/definitions/bio_ids"
2016  },
2017  "named_annots": {
2018  "$ref": "#/definitions/named_annots"
2019  },
2020  "acc_substitution": {
2021  "$ref": "#/definitions/acc_substitution"
2022  },
2023  "bio_id_resolution": {
2024  "type": "boolean"
2025  },
2026  "snp_scale_limit": {
2027  "$ref": "#/definitions/snp_scale_limit"
2028  },
2029  "context": {
2030  "$ref": "#/definitions/context"
2031  },
2032  "request_flags": {
2033  "$ref": "#/definitions/request_flags"
2034  },
2035  "user_args": {
2036  "type": "string"
2037  }
2038  },
2039  "required": [
2040  "named_annots"
2041  ],
2042  "oneOf": [
2043  {
2044  "required": [
2045  "bio_id"
2046  ]
2047  },
2048  {
2049  "required": [
2050  "bio_ids"
2051  ]
2052  }
2053  ]
2054  },
2055  "id": {
2056  "type": "string"
2057  }
2058  },
2059  "required": [
2060  "jsonrpc",
2061  "method",
2062  "params",
2063  "id"
2064  ]
2065  },
2066  "chunk": {
2067  "$id": "#chunk",
2068  "type": "object",
2069  "properties": {
2070  "jsonrpc": {
2071  "$ref": "#/definitions/jsonrpc"
2072  },
2073  "method": {
2074  "enum": [
2075  "chunk"
2076  ]
2077  },
2078  "params": {
2079  "type": "object",
2080  "properties": {
2081  "chunk_id": {
2082  "$ref": "#/definitions/chunk_id"
2083  },
2084  "context": {
2085  "$ref": "#/definitions/context"
2086  },
2087  "request_flags": {
2088  "$ref": "#/definitions/request_flags"
2089  },
2090  "user_args": {
2091  "type": "string"
2092  }
2093  },
2094  "required": [
2095  "chunk_id"
2096  ]
2097  },
2098  "id": {
2099  "type": "string"
2100  }
2101  },
2102  "required": [
2103  "jsonrpc",
2104  "method",
2105  "params",
2106  "id"
2107  ]
2108  },
2109  "ipg_resolve": {
2110  "$id": "#ipg_resolve",
2111  "type": "object",
2112  "properties": {
2113  "jsonrpc": {
2114  "$ref": "#/definitions/jsonrpc"
2115  },
2116  "method": {
2117  "enum": [
2118  "ipg_resolve"
2119  ]
2120  },
2121  "params": {
2122  "type": "object",
2123  "properties": {
2124  "protein": {
2125  "type": "string"
2126  },
2127  "ipg": {
2128  "type": "number"
2129  },
2130  "nucleotide": {
2131  "type": "string"
2132  },
2133  "context": {
2134  "$ref": "#/definitions/context"
2135  },
2136  "request_flags": {
2137  "$ref": "#/definitions/request_flags"
2138  },
2139  "user_args": {
2140  "type": "string"
2141  }
2142  },
2143  "dependencies": {
2144  "nucleotide": [
2145  "protein"
2146  ]
2147  },
2148  "anyOf": [
2149  {
2150  "required": [
2151  "protein"
2152  ]
2153  },
2154  {
2155  "required": [
2156  "ipg"
2157  ]
2158  }
2159  ]
2160  },
2161  "id": {
2162  "type": "string"
2163  }
2164  },
2165  "required": [
2166  "jsonrpc",
2167  "method",
2168  "params",
2169  "id"
2170  ]
2171  },
2172  "raw": {
2173  "$id": "#raw",
2174  "type": "object",
2175  "properties": {
2176  "jsonrpc": {
2177  "$ref": "#/definitions/jsonrpc"
2178  },
2179  "method": {
2180  "enum": [
2181  "raw"
2182  ]
2183  },
2184  "params": {
2185  "type": "object",
2186  "properties": {
2187  "abs_path_ref": {
2188  "type": "string"
2189  },
2190  "context": {
2191  "$ref": "#/definitions/context"
2192  },
2193  "request_flags": {
2194  "$ref": "#/definitions/request_flags"
2195  },
2196  "user_args": {
2197  "type": "string"
2198  }
2199  },
2200  "required": [
2201  "abs_path_ref"
2202  ]
2203  },
2204  "id": {
2205  "type": "string"
2206  }
2207  },
2208  "required": [
2209  "jsonrpc",
2210  "method",
2211  "params",
2212  "id"
2213  ]
2214  }
2215  },
2216  "oneOf": [
2217  {
2218  "$ref": "#/definitions/biodata"
2219  },
2220  {
2221  "$ref": "#/definitions/blob"
2222  },
2223  {
2224  "$ref": "#/definitions/resolve"
2225  },
2226  {
2227  "$ref": "#/definitions/named_annot"
2228  },
2229  {
2230  "$ref": "#/definitions/chunk"
2231  },
2232  {
2233  "$ref": "#/definitions/ipg_resolve"
2234  },
2235  {
2236  "$ref": "#/definitions/raw"
2237  }
2238  ]
2239 }
2240  )REQUEST_SCHEMA");
2241 
2242  _DEBUG_CODE(if (!rv.ReadSucceeded()) { auto error = rv.GetReadError(); NCBI_ALWAYS_TROUBLE(error.c_str()); });
2243  _DEBUG_CODE(if (CJson_MetaSchema m; !m.Validate(rv)) { auto error = m.GetValidationError(); NCBI_ALWAYS_TROUBLE(error.c_str()); });
2244  return rv;
2245 }
2246 
User-defined methods of the data storage class.
User-defined methods of the data storage class.
Checksum and hash calculation classes.
Temporary object for holding extra message arguments.
Definition: ncbidiag.hpp:1828
CHash – Hash calculator.
Definition: checksum.hpp:195
void AddMessage(const SPSG_Message &message)
Definition: processing.cpp:477
static void SetDataLimit(size_t value)
Definition: processing.hpp:91
CJson_Object m_JsonObj
Definition: processing.hpp:159
static bool sm_SetReplyType
Definition: processing.hpp:161
static auto sm_PreviewSize
Definition: processing.hpp:163
void Set(const char *name, shared_ptr< TReplyItem > &reply_item)
Definition: processing.hpp:123
static CJsonResponse NewItem(const shared_ptr< CPSG_ReplyItem > &reply_item)
Definition: processing.cpp:244
static void SetReplyType(bool value)
Definition: processing.hpp:90
void AddRequestID(TItem item, EDoNotAddRequestID)
Definition: processing.hpp:101
static auto sm_DataLimit
Definition: processing.hpp:162
void Fill(EPSG_Status status, shared_ptr< CPSG_Reply >)
Definition: processing.cpp:253
static void SetPreviewSize(size_t value)
Definition: processing.hpp:92
void push_back(void)
Add null element to the end of the array.
CJson_Array.
size_t size(void) const
Return the number of elements in the array.
CJson_Node.
bool IsObject(void) const
CJson_ConstObject GetObject(void) const
Get JSON object contents of the node.
CJson_ConstArray GetArray(void) const
Get JSON array contents of the node.
bool IsNull(void) const
CJson_ConstValue GetValue(void) const
Get JSON value contents of the node.
EJsonType GetType(void) const
Get value type.
CJson_Object.
const_iterator find(const CJson_Node::TKeyType &name) const
Return an iterator that points to the location of the element.
const_iterator end(void) const
Return an iterator that points to the location after the last element.
CJson_Value.
Uint4 GetUint4(void) const
Uint8 GetUint8(void) const
bool GetBool(void) const
Get primitive value data.
Int8 GetInt8(void) const
Int4 GetInt4(void) const
bool IsInt8(void) const
TStringType GetString(void) const
bool IsUint8(void) const
double GetDouble(void) const
bool IsUint4(void) const
bool IsInt4(void) const
bool IsDouble(void) const
bool IsNumber(void) const
bool Read(std::istream &in)
Read JSON data from a stream.
std::string GetReadError(void) const
Get most recent read error.
bool ParseString(const TStringType &v)
Read JSON data from a UTF8 string.
bool ReadSucceeded(void) const
Test if the most recent read was successful.
bool Write(std::ostream &out, TJson_Write_Flags flags=fJson_Write_IndentWithSpace, unsigned int indent_char_count=4) const
Write JSON data into a stream.
JSON meta-schema describes JSON schema itself So, other JSON schemas can be validated against it.
CJson_Object ResetObject(void)
Erase node data and convert it into JSON object.
CJson_Array ResetArray(void)
Erase node data and convert it into JSON array.
CJson_Node & AssignCopy(const CJson_ConstNode &n)
Copy Node contents data into this node.
CJson_Node & SetNull(void)
Erase node data and convert it into JSON NULL value.
CJson_Value SetValue(void)
Get JSON value contents of the node.
void insert(const CJson_Node::TKeyType &name)
Insert null element into the object.
CJson_Object insert_object(const CJson_Node::TKeyType &name)
Insert object type element into the object.
CJson_Schema.
bool Validate(const CJson_Document &v)
Validate JSON document against schema.
std::string GetValidationError() const
Return validation error.
void Start(EWhich which)
CObjectStreamCopier –.
Definition: objcopy.hpp:71
CObjectTypeInfo –.
Definition: objectinfo.hpp:94
Bio-id (such as accession)
Definition: psg_client.hpp:175
const string & GetId() const
Get ID.
Definition: psg_client.hpp:196
TType GetType() const
Get type.
Definition: psg_client.hpp:199
Blob unique ID.
Definition: psg_client.hpp:226
const string & GetId() const
Get ID.
Definition: psg_client.hpp:250
const TLastModified & GetLastModified() const
Get last modified.
Definition: psg_client.hpp:253
CNullable< Int8 > TLastModified
Definition: psg_client.hpp:228
Chunk unique ID.
Definition: psg_client.hpp:265
const string & GetId2Info() const
Get ID2 info.
Definition: psg_client.hpp:279
int GetId2Chunk() const
Get ID2 chunk number.
Definition: psg_client.hpp:276
A class derived from the queue class that additionally allows to run event loop.
bool Run(CDeadline deadline)
Process everything in the queue until it's empty or times out.
A queue to retrieve data (accession resolution info; bio-sequence; annotation blobs) from the storage...
bool SendRequest(shared_ptr< CPSG_Request > request, CDeadline deadline)
Push request into the queue.
shared_ptr< CPSG_Reply > SendRequestAndGetReply(shared_ptr< CPSG_Request > request, CDeadline deadline)
Push request into the queue and get corresponding reply.
void Stop()
Stop accepting new requests.
@ eEndOfReply
No more items expected in the (overall!) reply.
Definition: psg_client.hpp:675
@ eNoTSE
Only the info.
Definition: psg_client.hpp:340
@ eSmartTSE
If ID2 split is available, return split info blob only.
Definition: psg_client.hpp:348
@ eSlimTSE
If ID2 split is available, return split info blob only.
Definition: psg_client.hpp:344
@ eWholeTSE
If ID2 split is available, return all split blobs.
Definition: psg_client.hpp:352
@ eOrigTSE
Return all Cassandra data chunks of the blob itself.
Definition: psg_client.hpp:355
CNullable< string > TNucleotide
Definition: psg_client.hpp:600
vector< string > TAnnotNames
Names of the named annotations.
Definition: psg_client.hpp:498
@ fName
Requests name to use for canonical bio-id.
Definition: psg_client.hpp:423
CParallelProcessing(const TParams &params)
list< SThread > m_Threads
Definition: processing.hpp:323
list< CPSG_EventLoop > m_PsgQueues
Definition: processing.hpp:322
static vector< shared_ptr< CPSG_Request > > ReadCommands(TCreateContext create_context, size_t report_progress_after=0)
static int JsonCheck(istream *schema_is)
static int OneRequest(const SOneRequestParams &params, shared_ptr< CPSG_Request > request)
Definition: processing.cpp:810
static CParallelProcessing< SBatchResolveParams > CreateParallelProcessing(const SBatchResolveParams &params)
static CJson_Document RequestSchema()
static bool ReadLine(string &line, istream &is=cin)
static int Performance(const SPerformanceParams &params)
Take guard of the current CRequestContext, handle app-state, start/stop logging and request status in...
CTypeInfo class contains all information about C++ types (both basic and classes): members and layout...
Definition: typeinfo.hpp:76
CZipStreamDecompressor – zlib based decompression stream processor.
Definition: zlib.hpp:817
Definition: map.hpp:338
void Print(const CCompactSAMApplication::AlignInfo &ai)
std::ofstream out("events_result.xml")
main entry point for tests
CS_CONTEXT * ctx
Definition: t0006.c:12
static const struct name_t names[]
static int type
Definition: getdata.c:31
static SQLCHAR output[256]
Definition: print.c:5
static const char * schema
Definition: stats.c:20
char data[12]
Definition: iconv.c:80
#define TAX_ID_TO(T, tax_id)
Definition: ncbimisc.hpp:1110
#define GI_TO(T, gi)
Definition: ncbimisc.hpp:1085
@ null
Definition: ncbimisc.hpp:646
@ eTakeOwnership
An object can take ownership of another.
Definition: ncbi_types.h:136
string
Definition: cgiapp.hpp:687
#define NCBI_ALWAYS_TROUBLE(mess)
Definition: ncbidbg.hpp:85
#define _DEBUG_CODE(code)
Definition: ncbidbg.hpp:136
#define _VERIFY(expr)
Definition: ncbidbg.hpp:161
CDiagContext_Extra & Print(const string &name, const string &value)
The method does not print the argument, but adds it to the string.
Definition: ncbidiag.cpp:2622
CDiagContext & GetDiagContext(void)
Get diag context instance.
Definition: logging.cpp:818
static void SetRequestContext(CRequestContext *ctx)
Shortcut to CDiagContextThreadData::GetThreadData().SetRequestContext()
Definition: ncbidiag.cpp:1907
void SetRequestID(TCount rid)
Set request ID.
static CRequestContext & GetRequestContext(void)
Shortcut to CDiagContextThreadData::GetThreadData().GetRequestContext()
Definition: ncbidiag.cpp:1901
void SetStatus(int status)
Set request context status.
static const char * SeverityName(EDiagSev sev)
Get a common symbolic name for the severity levels.
EDiagSev
Severity level for the posted diagnostics.
Definition: ncbidiag.hpp:650
@ eDiag_Warning
Warning message.
Definition: ncbidiag.hpp:652
@ e499_BrokenConnection
Non-standard status code - used to indicate broken connection while serving normal request.
string ReportThis(TDiagPostFlags flags=eDPF_Exception) const
Report this exception only.
Definition: ncbiexpt.cpp:397
#define ENUM_METHOD_NAME(EnumName)
Definition: serialbase.hpp:994
ESerialDataFormat
Data file format.
Definition: serialdef.hpp:71
@ eSerial_AsnText
ASN.1 text.
Definition: serialdef.hpp:73
@ eSerial_Xml
XML.
Definition: serialdef.hpp:75
@ eSerial_Json
JSON.
Definition: serialdef.hpp:76
@ eSerial_None
Definition: serialdef.hpp:72
@ eSerial_AsnBinary
ASN.1 binary.
Definition: serialdef.hpp:74
static CObjectOStream * Open(ESerialDataFormat format, CNcbiOstream &outStream, bool deleteOutStream)
Create serial object writer and attach it to an output stream.
Definition: objostr.cpp:126
static CObjectIStream * Open(ESerialDataFormat format, CNcbiIstream &inStream, bool deleteInStream)
Create serial object reader and attach it to an input stream.
Definition: objistr.cpp:195
void Copy(const CObjectTypeInfo &type)
Copy data.
Definition: objcopy.cpp:74
int64_t Int8
8-byte (64-bit) signed integer
Definition: ncbitype.h:104
uint64_t Uint8
8-byte (64-bit) unsigned integer
Definition: ncbitype.h:105
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
static string Base64Encode(const CTempString str, size_t line_len=0)
Base64-encode string.
Definition: ncbistr.cpp:6270
static bool SplitInTwo(const CTempString str, const CTempString delim, string &str1, string &str2, TSplitFlags flags=0)
Split a string into two pieces using the specified delimiters.
Definition: ncbistr.cpp:3554
static string Base64Decode(const CTempString str)
Base64-decode string.
Definition: ncbistr.cpp:6286
@ eNoWait
No-wait, expires immediately.
Definition: ncbitime.hpp:1846
@ eInfinite
Infinite deadline.
Definition: ncbitime.hpp:1845
E_Choice
Choice variants.
Definition: Seq_id_.hpp:93
unsigned int
A callback function used to compare two keys in a database.
Definition: types.hpp:1210
@ e_not_set
static int input()
int i
yy_size_t n
static MDB_envinfo info
Definition: mdb_load.c:37
constexpr auto sort(_Init &&init)
constexpr bool empty(list< Ts... >) noexcept
const GenericPointer< typename T::ValueType > T2 value
Definition: pointer.h:1227
EIPRangeType t
Definition: ncbi_localip.c:101
T min(T x_, T y_)
static Format format
Definition: njn_ioutil.cpp:53
std::istream & in(std::istream &in_, double &x_)
double r(size_t dimension_, const Int4 *score_, const double *prob_, double theta_)
static const char * prefix[]
Definition: pcregrep.c:405
string ReadLine(CNcbiIstream &in)
Definition: phrap.cpp:74
void s_ItemComplete< SInteractiveParams >(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
void s_ItemComplete(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
Definition: processing.cpp:780
CRequestStatus::ECode s_PsgStatusToRequestStatus(EPSG_Status psg_status)
EJsonRpcErrors
Definition: processing.cpp:57
@ eJsonRpc_InvalidRequest
Definition: processing.cpp:59
@ eJsonRpc_ExceptionOnRead
Definition: processing.cpp:60
@ eJsonRpc_ParseError
Definition: processing.cpp:58
const char * s_StrStatus(EPSG_Status status)
Definition: processing.cpp:128
false_type no_server_mode
string s_ProgressStatusToString(CPSG_Processor::EProgressStatus progress_status)
Definition: processing.cpp:437
string s_GetId(const CJson_Document &req_doc)
auto s_IsRawResponse(const CPSG_BlobId *blob_id)
Definition: processing.cpp:316
void s_ReplyComplete< SOneRequestParams >(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
Definition: processing.cpp:792
shared_ptr< CPSG_Reply > s_GetReply(shared_ptr< CPSG_ReplyItem > &item)
void s_NewItem(SJsonOut &, const shared_ptr< CPSG_ReplyItem > &)
Definition: processing.cpp:800
void s_ReplyComplete< SInteractiveParams, testing, server_mode >(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
const initializer_list< SInfoFlag > kInfoFlags
true_type testing
void s_ReplyComplete(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
Definition: processing.cpp:786
void s_NewItem< SInteractiveParams, verbose >(SJsonOut &json_out, const shared_ptr< CPSG_ReplyItem > &item)
false_type no_testing
const initializer_list< SDataFlag > kDataFlags
true_type server_mode
void s_ReplyComplete< SInteractiveParams, testing, no_server_mode >(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
ESerialDataFormat s_GetInputFormat(const string &format)
Definition: processing.cpp:629
void s_ReplyComplete< no_verbose >(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
Definition: processing.cpp:908
false_type no_verbose
Definition: processing.cpp:891
void s_ReplyComplete< SInteractiveParams, no_testing, no_server_mode >(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
TTypeInfo s_GetInputType(const shared_ptr< CPSG_BlobData > &blob_data)
Definition: processing.cpp:638
auto s_IsRawRequest(shared_ptr< const CPSG_Request > &request)
Definition: processing.cpp:311
void s_ItemComplete< no_verbose >(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
Definition: processing.cpp:894
const char * s_GetItemName(CPSG_ReplyItem::EType type, bool trouble=true)
Definition: processing.cpp:226
true_type verbose
Definition: processing.cpp:890
string s_ReasonToString(CPSG_SkippedBlob::EReason reason)
Definition: processing.cpp:374
void s_ReplyComplete< SInteractiveParams, no_testing, server_mode >(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
void s_NewItem< SOneRequestParams >(SJsonOut &json_out, const shared_ptr< CPSG_ReplyItem > &reply_item)
Definition: processing.cpp:805
EPSG_Status
Retrieval result.
Definition: psg_client.hpp:626
@ eSuccess
Successfully retrieved.
@ eInProgress
Retrieval is not finalized yet, more info may come.
@ eForbidden
User is not authorized for the retrieval.
@ eCanceled
Request canceled.
@ eError
An error was encountered while trying to send request or to read and to process the reply.
@ eNotFound
Not found.
vector< CPSG_BioId > CPSG_BioIds
Definition: psg_client.hpp:206
SImpl(const TParams &params)
void(*)(SJsonOut &, EPSG_Status, const shared_ptr< CPSG_ReplyItem > &) TItemComplete
Definition: processing.hpp:296
void(*)(SJsonOut &, EPSG_Status, const shared_ptr< CPSG_Reply > &) TReplyComplete
Definition: processing.hpp:297
void Init(const TParams &params)
Definition: processing.cpp:865
TReplyComplete GetReplyComplete()
Definition: processing.cpp:935
TItemComplete GetItemComplete()
Definition: processing.cpp:924
void Submitter(CPSG_Queue &output)
void(*)(SJsonOut &, const shared_ptr< CPSG_ReplyItem > &) TNewItem
Definition: processing.hpp:298
bool ReportErrors(EPSG_Status status, TItem item, const char *prefix)
Definition: processing.cpp:569
unordered_map< string, pair< shared_ptr< CPSG_BlobInfo >, shared_ptr< CPSG_BlobData > > > m_Data
Definition: processing.cpp:564
SDataOnlyCopy(const SOneRequestParams::SDataOnly &params)
Definition: processing.cpp:547
const SOneRequestParams::SDataOnly & m_Params
Definition: processing.cpp:563
EPSG_Status m_Status
Definition: processing.cpp:565
void ItemComplete(EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
Definition: processing.cpp:608
void ReplyComplete(EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
Definition: processing.cpp:624
void Process(shared_ptr< CPSG_BlobInfo > blob_info)
Definition: processing.cpp:649
CDiagContext_Extra & Print(const string &name, const string &value)
The method does not print the argument, but adds it to the string.
Definition: ncbidiag.cpp:2622
void Print(const string &prefix, CJson_ConstValue json)
SInteractiveNewRequestStart(const string &request, CJson_ConstObject params_obj)
const size_t preview_size
Definition: processing.hpp:244
static string GetService(string service, bool one_server)
Definition: processing.cpp:524
const size_t data_limit
Definition: processing.hpp:243
char m_Separator
Definition: processing.hpp:75
const TJson_Write_Flags m_Flags
Definition: processing.hpp:77
mutex m_Mutex
Definition: processing.hpp:74
SJsonOut & operator<<(const CJson_Document &doc)
Definition: processing.cpp:142
const bool m_Pipe
Definition: processing.hpp:76
static const char * Name(EType t)
Definition: processing.cpp:107
void Set(EType t)
Definition: performance.hpp:84
void OutputItems(ostream &os) const
Definition: processing.cpp:538
vector< TItem > m_Items
Definition: performance.hpp:98
SNewRequestContext(const SNewRequestContext &)=delete
CRef< CRequestContext > Get() const
Definition: processing.cpp:77
void operator=(const SNewRequestContext &)=delete
CRef< CRequestContext > m_RequestContext
Definition: processing.cpp:83
static void ReplyComplete(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
Definition: processing.cpp:729
static void ItemComplete(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
Definition: processing.cpp:724
void ReplyComplete(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
Definition: processing.cpp:766
map< CPSG_ReplyItem::EType, shared_ptr< CPSG_ReplyItem > > m_Items
Definition: processing.cpp:746
void ItemComplete(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
Definition: processing.cpp:750
const ESerialDataFormat output_format
Definition: processing.hpp:194
const CLogLatencies::EWhich which
Definition: processing.hpp:186
SDataOnly data_only
Definition: processing.hpp:198
EDiagSev severity
Definition: psg_client.hpp:649
optional< int > code
Definition: psg_client.hpp:650
const SPSG_UserArgs user_args
Definition: processing.hpp:170
static EDiagSev min_severity
Definition: processing.hpp:172
const string service
Definition: processing.hpp:168
static bool verbose
Definition: processing.hpp:173
const CPSG_Request::TFlags request_flags
Definition: processing.hpp:169
const bool local_queue
Definition: processing.hpp:263
const double delay
Definition: processing.hpp:262
const size_t user_threads
Definition: processing.hpp:261
const bool report_immediately
Definition: processing.hpp:264
static CPSG_Request_Resolve::TIncludeInfo GetIncludeInfo(TSpecified specified)
static CPSG_BioId::TType GetBioIdType(const string &type)
function< void(string)> TExclude
Definition: processing.hpp:404
static shared_ptr< TRequest > Build(const TInput &input, TArgs &&... args)
Definition: processing.hpp:503
static const initializer_list< SInfoFlag > & GetInfoFlags()
static const initializer_list< SDataFlag > & GetDataFlags()
function< bool(const string &)> TSpecified
Definition: processing.hpp:403
Definition: inftrees.h:24
Definition: _hash_fun.h:40
Definition: type.c:6
#define _TROUBLE
#define _ASSERT
else result
Definition: token2.c:20
static CS_CONTEXT * context
Definition: will_convert.c:21
ZLib Compression API.
Modified on Wed Apr 24 14:16:59 2024 by modify_doxy.py rev. 669887